You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/20 20:47:49 UTC
[1/2] beam git commit: Port fn_api_runner to be able to use runner
protos.
Repository: beam
Updated Branches:
refs/heads/master f51fdd960 -> e4ef23e16
Port fn_api_runner to be able to use runner protos.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/08ec0d4d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/08ec0d4d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/08ec0d4d
Branch: refs/heads/master
Commit: 08ec0d4dbff330ecd48c806cd764ab5a96835bd9
Parents: f51fdd9
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jun 20 11:01:03 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jun 20 13:47:30 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/pipeline_context.py | 17 +-
.../runners/portability/fn_api_runner.py | 166 ++++++++++++-
.../runners/portability/fn_api_runner_test.py | 20 +-
.../apache_beam/runners/worker/sdk_worker.py | 243 ++++++++++++++++++-
4 files changed, 420 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index e212abf..c2ae3f3 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -24,6 +24,7 @@ For internal use only; no backwards-compatibility guarantees.
from apache_beam import pipeline
from apache_beam import pvalue
from apache_beam import coders
+from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.transforms import core
@@ -42,9 +43,10 @@ class _PipelineContextMap(object):
self._id_to_proto = proto_map if proto_map else {}
self._counter = 0
- def _unique_ref(self):
+ def _unique_ref(self, obj=None):
self._counter += 1
- return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+ return "ref_%s_%s_%s" % (
+ self._obj_type.__name__, type(obj).__name__, self._counter)
def populate_map(self, proto_map):
for id, proto in self._id_to_proto.items():
@@ -52,7 +54,7 @@ class _PipelineContextMap(object):
def get_id(self, obj):
if obj not in self._obj_to_id:
- id = self._unique_ref()
+ id = self._unique_ref(obj)
self._id_to_obj[id] = obj
self._obj_to_id[obj] = id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
@@ -79,11 +81,16 @@ class PipelineContext(object):
# TODO: environment
}
- def __init__(self, context_proto=None):
+ def __init__(self, proto=None):
+ if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
+ proto = beam_runner_api_pb2.Components(
+ coders=dict(proto.codersyyy.items()),
+ windowing_strategies=dict(proto.windowing_strategies.items()),
+ environments=dict(proto.environments.items()))
for name, cls in self._COMPONENT_TYPES.items():
setattr(
self, name, _PipelineContextMap(
- self, cls, getattr(context_proto, name, None)))
+ self, cls, getattr(proto, name, None)))
@staticmethod
def from_runner_api(proto):
http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index d792131..dabb7d6 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -24,9 +24,10 @@ import Queue as queue
import threading
from concurrent import futures
+from google.protobuf import wrappers_pb2
import grpc
-import apache_beam as beam
+import apache_beam as beam # pylint: disable=ungrouped-imports
from apache_beam.coders import WindowedValueCoder
from apache_beam.coders.coder_impl import create_InputStream
from apache_beam.coders.coder_impl import create_OutputStream
@@ -34,10 +35,13 @@ from apache_beam.internal import pickler
from apache_beam.io import iobase
from apache_beam.transforms.window import GlobalWindows
from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners import pipeline_context
from apache_beam.runners.portability import maptask_executor_runner
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import sdk_worker
+from apache_beam.utils import proto_utils
# This module is experimental. No backwards-compatibility guarantees.
@@ -110,9 +114,13 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
- def __init__(self):
+ def __init__(self, use_runner_protos=False):
super(FnApiRunner, self).__init__()
self._last_uid = -1
+ if use_runner_protos:
+ self._map_task_to_protos = self._map_task_to_runner_protos
+ else:
+ self._map_task_to_protos = self._map_task_to_fn_protos
def has_metrics_support(self):
return False
@@ -123,7 +131,140 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
def _map_task_registration(self, map_task, state_handler,
data_operation_spec):
+ input_data, side_input_data, runner_sinks, process_bundle_descriptor = (
+ self._map_task_to_protos(map_task, data_operation_spec))
+ # Side inputs will be accessed over the state API.
+ for key, elements_data in side_input_data.items():
+ state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=key)
+ state_handler.Clear(state_key)
+ state_handler.Append(state_key, [elements_data])
+ return beam_fn_api_pb2.InstructionRequest(
+ instruction_id=self._next_uid(),
+ register=beam_fn_api_pb2.RegisterRequest(
+ process_bundle_descriptor=[process_bundle_descriptor])
+ ), runner_sinks, input_data
+
+ def _map_task_to_runner_protos(self, map_task, data_operation_spec):
+ input_data = {}
+ side_input_data = {}
+ runner_sinks = {}
+
+ context = pipeline_context.PipelineContext()
+ transform_protos = {}
+ used_pcollections = {}
+
+ def uniquify(*names):
+ # An injective mapping from string* to string.
+ return ':'.join("%s:%d" % (name, len(name)) for name in names)
+
+ def pcollection_id(op_ix, out_ix):
+ if (op_ix, out_ix) not in used_pcollections:
+ used_pcollections[op_ix, out_ix] = uniquify(
+ map_task[op_ix][0], 'out', str(out_ix))
+ return used_pcollections[op_ix, out_ix]
+
+ def get_inputs(op):
+ if hasattr(op, 'inputs'):
+ inputs = op.inputs
+ elif hasattr(op, 'input'):
+ inputs = [op.input]
+ else:
+ inputs = []
+ return {'in%s' % ix: pcollection_id(*input)
+ for ix, input in enumerate(inputs)}
+
+ def get_outputs(op_ix):
+ op = map_task[op_ix][1]
+ return {tag: pcollection_id(op_ix, out_ix)
+ for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))}
+
+ for op_ix, (stage_name, operation) in enumerate(map_task):
+ transform_id = uniquify(stage_name)
+
+ if isinstance(operation, operation_specs.WorkerInMemoryWrite):
+ # Write this data back to the runner.
+ runner_sinks[(transform_id, 'out')] = operation
+ transform_spec = beam_runner_api_pb2.FunctionSpec(
+ urn=sdk_worker.DATA_OUTPUT_URN,
+ parameter=proto_utils.pack_Any(data_operation_spec))
+
+ elif isinstance(operation, operation_specs.WorkerRead):
+ # A Read from an in-memory source is done over the data plane.
+ if (isinstance(operation.source.source,
+ maptask_executor_runner.InMemorySource)
+ and isinstance(operation.source.source.default_output_coder(),
+ WindowedValueCoder)):
+ input_data[(transform_id, 'input')] = self._reencode_elements(
+ operation.source.source.read(None),
+ operation.source.source.default_output_coder())
+ transform_spec = beam_runner_api_pb2.FunctionSpec(
+ urn=sdk_worker.DATA_INPUT_URN,
+ parameter=proto_utils.pack_Any(data_operation_spec))
+
+ else:
+ # Otherwise serialize the source and execute it there.
+ # TODO: Use SDFs with an initial impulse.
+ transform_spec = beam_runner_api_pb2.FunctionSpec(
+ urn=sdk_worker.PYTHON_SOURCE_URN,
+ parameter=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(
+ value=pickler.dumps(operation.source.source))))
+
+ elif isinstance(operation, operation_specs.WorkerDoFn):
+ # Record the contents of each side input for access via the state api.
+ side_input_extras = []
+ for si in operation.side_inputs:
+ assert isinstance(si.source, iobase.BoundedSource)
+ element_coder = si.source.default_output_coder()
+ # TODO(robertwb): Actually flesh out the ViewFn API.
+ side_input_extras.append((si.tag, element_coder))
+ side_input_data[sdk_worker.side_input_tag(transform_id, si.tag)] = (
+ self._reencode_elements(
+ si.source.read(si.source.get_range_tracker(None, None)),
+ element_coder))
+ augmented_serialized_fn = pickler.dumps(
+ (operation.serialized_fn, side_input_extras))
+ transform_spec = beam_runner_api_pb2.FunctionSpec(
+ urn=sdk_worker.PYTHON_DOFN_URN,
+ parameter=proto_utils.pack_Any(
+ wrappers_pb2.BytesValue(value=augmented_serialized_fn)))
+
+ elif isinstance(operation, operation_specs.WorkerFlatten):
+ # Flatten is nice and simple.
+ transform_spec = beam_runner_api_pb2.FunctionSpec(
+ urn=sdk_worker.IDENTITY_DOFN_URN)
+
+ else:
+ raise NotImplementedError(operation)
+
+ transform_protos[transform_id] = beam_runner_api_pb2.PTransform(
+ unique_name=stage_name,
+ spec=transform_spec,
+ inputs=get_inputs(operation),
+ outputs=get_outputs(op_ix))
+
+ pcollection_protos = {
+ name: beam_runner_api_pb2.PCollection(
+ unique_name=name,
+ coder_id=context.coders.get_id(
+ map_task[op_id][1].output_coders[out_id]))
+ for (op_id, out_id), name in used_pcollections.items()
+ }
+ # Must follow creation of pcollection_protos to capture used coders.
+ context_proto = context.to_runner_api()
+ process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
+ id=self._next_uid(),
+ transforms=transform_protos,
+ pcollections=pcollection_protos,
+ codersyyy=dict(context_proto.coders.items()),
+ windowing_strategies=dict(context_proto.windowing_strategies.items()),
+ environments=dict(context_proto.environments.items()))
+ return input_data, side_input_data, runner_sinks, process_bundle_descriptor
+
+ def _map_task_to_fn_protos(self, map_task, data_operation_spec):
+
input_data = {}
+ side_input_data = {}
runner_sinks = {}
transforms = []
transform_index_to_id = {}
@@ -264,9 +405,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
element_coder.get_impl().encode_to_stream(
element, output_stream, True)
elements_data = output_stream.get()
- state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=view_id)
- state_handler.Clear(state_key)
- state_handler.Append(state_key, elements_data)
+ side_input_data[view_id] = elements_data
elif isinstance(operation, operation_specs.WorkerFlatten):
fn = sdk_worker.pack_function_spec_data(
@@ -299,13 +438,11 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
transforms.append(ptransform)
process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
- id=self._next_uid(), coders=coders.values(),
+ id=self._next_uid(),
+ coders=coders.values(),
primitive_transform=transforms)
- return beam_fn_api_pb2.InstructionRequest(
- instruction_id=self._next_uid(),
- register=beam_fn_api_pb2.RegisterRequest(
- process_bundle_descriptor=[process_bundle_descriptor
- ])), runner_sinks, input_data
+
+ return input_data, side_input_data, runner_sinks, process_bundle_descriptor
def _run_map_task(
self, map_task, control_handler, state_handler, data_plane_handler,
@@ -467,3 +604,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
self.data_plane_handler.close()
self.control_server.stop(5).wait()
self.data_server.stop(5).wait()
+
+ @staticmethod
+ def _reencode_elements(elements, element_coder):
+ output_stream = create_OutputStream()
+ for element in elements:
+ element_coder.get_impl().encode_to_stream(element, output_stream, True)
+ return output_stream.get()
http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 66d985a..e2eae26 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -23,10 +23,26 @@ from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.portability import maptask_executor_runner_test
-class FnApiRunnerTest(maptask_executor_runner_test.MapTaskExecutorRunnerTest):
+class FnApiRunnerTestWithRunnerProtos(
+ maptask_executor_runner_test.MapTaskExecutorRunnerTest):
def create_pipeline(self):
- return beam.Pipeline(runner=fn_api_runner.FnApiRunner())
+ return beam.Pipeline(
+ runner=fn_api_runner.FnApiRunner(use_runner_protos=True))
+
+ def test_combine_per_key(self):
+ # TODO(robertwb): Implement PGBKCV operation.
+ pass
+
+ # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
+
+
+class FnApiRunnerTestWithFnProtos(
+ maptask_executor_runner_test.MapTaskExecutorRunnerTest):
+
+ def create_pipeline(self):
+ return beam.Pipeline(
+ runner=fn_api_runner.FnApiRunner(use_runner_protos=False))
def test_combine_per_key(self):
# TODO(robertwb): Implement PGBKCV operation.
http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d08b179..fd7ecc4 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -36,11 +36,13 @@ from apache_beam.coders import coder_impl
from apache_beam.coders import WindowedValueCoder
from apache_beam.internal import pickler
from apache_beam.io import iobase
-from apache_beam.runners.dataflow.native_io import iobase as native_iobase
-from apache_beam.utils import counters
from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.runners.dataflow.native_io import iobase as native_iobase
+from apache_beam.runners import pipeline_context
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import operations
+from apache_beam.utils import counters
+from apache_beam.utils import proto_utils
# This module is experimental. No backwards-compatibility guarantees.
@@ -62,6 +64,10 @@ PYTHON_DOFN_URN = 'urn:org.apache.beam:dofn:java:0.1'
PYTHON_SOURCE_URN = 'urn:org.apache.beam:source:java:0.1'
+def side_input_tag(transform_id, tag):
+ return str("%d[%s][%s]" % (len(transform_id), transform_id, tag))
+
+
class RunnerIOOperation(operations.Operation):
"""Common baseclass for runner harness IO operations."""
@@ -208,6 +214,23 @@ def load_compressed(compressed_data):
dill.dill._trace(False) # pylint: disable=protected-access
+def memoize(func):
+ cache = {}
+ missing = object()
+
+ def wrapper(*args):
+ result = cache.get(args, missing)
+ if result is missing:
+ result = cache[args] = func(*args)
+ return result
+ return wrapper
+
+
+def only_element(iterable):
+ element, = iterable
+ return element
+
+
class SdkHarness(object):
def __init__(self, control_channel):
@@ -296,6 +319,51 @@ class SdkWorker(object):
return response
def create_execution_tree(self, descriptor):
+ if descriptor.primitive_transform:
+ return self.create_execution_tree_from_fn_api(descriptor)
+ else:
+ return self.create_execution_tree_from_runner_api(descriptor)
+
+ def create_execution_tree_from_runner_api(self, descriptor):
+ # TODO(robertwb): Figure out the correct prefix to use for output counters
+ # from StateSampler.
+ counter_factory = counters.CounterFactory()
+ state_sampler = statesampler.StateSampler(
+ 'fnapi-step%s-' % descriptor.id, counter_factory)
+
+ transform_factory = BeamTransformFactory(
+ descriptor, self.data_channel_factory, counter_factory, state_sampler,
+ self.state_handler)
+
+ pcoll_consumers = collections.defaultdict(list)
+ for transform_id, transform_proto in descriptor.transforms.items():
+ for pcoll_id in transform_proto.inputs.values():
+ pcoll_consumers[pcoll_id].append(transform_id)
+
+ @memoize
+ def get_operation(transform_id):
+ transform_consumers = {
+ tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
+ for tag, pcoll_id
+ in descriptor.transforms[transform_id].outputs.items()
+ }
+ return transform_factory.create_operation(
+ transform_id, transform_consumers)
+
+ # Operations must be started (hence returned) in order.
+ @memoize
+ def topological_height(transform_id):
+ return 1 + max(
+ [0] +
+ [topological_height(consumer)
+ for pcoll in descriptor.transforms[transform_id].outputs.values()
+ for consumer in pcoll_consumers[pcoll]])
+
+ return [get_operation(transform_id)
+ for transform_id in sorted(
+ descriptor.transforms, key=topological_height, reverse=True)]
+
+ def create_execution_tree_from_fn_api(self, descriptor):
# TODO(vikasrk): Add an id field to Coder proto and use that instead.
coders = {coder.function_spec.id: operation_specs.get_coder_from_spec(
json.loads(unpack_function_spec_data(coder.function_spec)))
@@ -418,14 +486,14 @@ class SdkWorker(object):
reversed_ops.append(op)
ops_by_id[transform.id] = op
- return list(reversed(reversed_ops)), ops_by_id
+ return list(reversed(reversed_ops))
def process_bundle(self, request, instruction_id):
- ops, ops_by_id = self.create_execution_tree(
+ ops = self.create_execution_tree(
self.fns[request.process_bundle_descriptor_reference])
expected_inputs = []
- for _, op in ops_by_id.items():
+ for op in ops:
if isinstance(op, DataOutputOperation):
# TODO(robertwb): Is there a better way to pass the instruction id to
# the operation?
@@ -445,9 +513,7 @@ class SdkWorker(object):
for data in input_op.data_channel.input_elements(
instruction_id, [input_op.target]):
# ignores input name
- target_op = ops_by_id[data.target.primitive_transform_reference]
- # lacks coder for non-input ops
- target_op.process_encoded(data.data)
+ input_op.process_encoded(data.data)
# Finish all operations.
for op in ops:
@@ -455,3 +521,164 @@ class SdkWorker(object):
op.finish()
return beam_fn_api_pb2.ProcessBundleResponse()
+
+
+class BeamTransformFactory(object):
+ """Factory for turning transform_protos into executable operations."""
+ def __init__(self, descriptor, data_channel_factory, counter_factory,
+ state_sampler, state_handler):
+ self.descriptor = descriptor
+ self.data_channel_factory = data_channel_factory
+ self.counter_factory = counter_factory
+ self.state_sampler = state_sampler
+ self.state_handler = state_handler
+ self.context = pipeline_context.PipelineContext(descriptor)
+
+ _known_urns = {}
+
+ @classmethod
+ def register_urn(cls, urn, parameter_type):
+ def wrapper(func):
+ cls._known_urns[urn] = func, parameter_type
+ return func
+ return wrapper
+
+ def create_operation(self, transform_id, consumers):
+ transform_proto = self.descriptor.transforms[transform_id]
+ creator, parameter_type = self._known_urns[transform_proto.spec.urn]
+ parameter = proto_utils.unpack_Any(
+ transform_proto.spec.parameter, parameter_type)
+ return creator(self, transform_id, transform_proto, parameter, consumers)
+
+ def get_coder(self, coder_id):
+ return self.context.coders.get_by_id(coder_id)
+
+ def get_output_coders(self, transform_proto):
+ return {
+ tag: self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
+ for tag, pcoll_id in transform_proto.outputs.items()
+ }
+
+ def get_only_output_coder(self, transform_proto):
+ return only_element(self.get_output_coders(transform_proto).values())
+
+ def get_input_coders(self, transform_proto):
+ return {
+ tag: self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
+ for tag, pcoll_id in transform_proto.inputs.items()
+ }
+
+ def get_only_input_coder(self, transform_proto):
+ return only_element(self.get_input_coders(transform_proto).values())
+
+ # TODO(robertwb): Update all operations to take these in the constructor.
+ @staticmethod
+ def augment_oldstyle_op(op, step_name, consumers, tag_list=None):
+ op.step_name = step_name
+ for tag, op_consumers in consumers.items():
+ for consumer in op_consumers:
+ op.add_receiver(consumer, tag_list.index(tag) if tag_list else 0)
+ return op
+
+
+@BeamTransformFactory.register_urn(
+ DATA_INPUT_URN, beam_fn_api_pb2.RemoteGrpcPort)
+def create(factory, transform_id, transform_proto, grpc_port, consumers):
+ target = beam_fn_api_pb2.Target(
+ primitive_transform_reference=transform_id,
+ name=only_element(transform_proto.outputs.keys()))
+ return DataInputOperation(
+ transform_proto.unique_name,
+ transform_proto.unique_name,
+ consumers,
+ factory.counter_factory,
+ factory.state_sampler,
+ factory.get_only_output_coder(transform_proto),
+ input_target=target,
+ data_channel=factory.data_channel_factory.create_data_channel(grpc_port))
+
+
+@BeamTransformFactory.register_urn(
+ DATA_OUTPUT_URN, beam_fn_api_pb2.RemoteGrpcPort)
+def create(factory, transform_id, transform_proto, grpc_port, consumers):
+ target = beam_fn_api_pb2.Target(
+ primitive_transform_reference=transform_id,
+ name='out')
+ return DataOutputOperation(
+ transform_proto.unique_name,
+ transform_proto.unique_name,
+ consumers,
+ factory.counter_factory,
+ factory.state_sampler,
+ # TODO(robertwb): Perhaps this could be distinct from the input coder?
+ factory.get_only_input_coder(transform_proto),
+ target=target,
+ data_channel=factory.data_channel_factory.create_data_channel(grpc_port))
+
+
+@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue)
+def create(factory, transform_id, transform_proto, parameter, consumers):
+ source = pickler.loads(parameter.value)
+ spec = operation_specs.WorkerRead(
+ iobase.SourceBundle(1.0, source, None, None),
+ [WindowedValueCoder(source.default_output_coder())])
+ return factory.augment_oldstyle_op(
+ operations.ReadOperation(
+ transform_proto.unique_name,
+ spec,
+ factory.counter_factory,
+ factory.state_sampler),
+ transform_proto.unique_name,
+ consumers)
+
+
+@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, wrappers_pb2.BytesValue)
+def create(factory, transform_id, transform_proto, parameter, consumers):
+ dofn_data = pickler.loads(parameter.value)
+ if len(dofn_data) == 2:
+ # Has side input data.
+ serialized_fn, side_input_data = dofn_data
+ else:
+ # No side input data.
+ serialized_fn, side_input_data = parameter.value, []
+
+ def create_side_input(tag, coder):
+ # TODO(robertwb): Extract windows (and keys) out of element data.
+ # TODO(robertwb): Extract state key from ParDoPayload.
+ return operation_specs.WorkerSideInputSource(
+ tag=tag,
+ source=SideInputSource(
+ factory.state_handler,
+ beam_fn_api_pb2.StateKey.MultimapSideInput(
+ key=side_input_tag(transform_id, tag)),
+ coder=coder))
+ output_tags = list(transform_proto.outputs.keys())
+ output_coders = factory.get_output_coders(transform_proto)
+ spec = operation_specs.WorkerDoFn(
+ serialized_fn=serialized_fn,
+ output_tags=output_tags,
+ input=None,
+ side_inputs=[
+ create_side_input(tag, coder) for tag, coder in side_input_data],
+ output_coders=[output_coders[tag] for tag in output_tags])
+ return factory.augment_oldstyle_op(
+ operations.DoOperation(
+ transform_proto.unique_name,
+ spec,
+ factory.counter_factory,
+ factory.state_sampler),
+ transform_proto.unique_name,
+ consumers,
+ output_tags)
+
+
+@BeamTransformFactory.register_urn(IDENTITY_DOFN_URN, None)
+def create(factory, transform_id, transform_proto, unused_parameter, consumers):
+ return factory.augment_oldstyle_op(
+ operations.FlattenOperation(
+ transform_proto.unique_name,
+ None,
+ factory.counter_factory,
+ factory.state_sampler),
+ transform_proto.unique_name,
+ consumers)
[2/2] beam git commit: Closes #3361
Posted by ro...@apache.org.
Closes #3361
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4ef23e1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4ef23e1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4ef23e1
Branch: refs/heads/master
Commit: e4ef23e16859e31e09e5fe6cf861d6f3db816b22
Parents: f51fdd9 08ec0d4
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jun 20 13:47:31 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jun 20 13:47:31 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/pipeline_context.py | 17 +-
.../runners/portability/fn_api_runner.py | 166 ++++++++++++-
.../runners/portability/fn_api_runner_test.py | 20 +-
.../apache_beam/runners/worker/sdk_worker.py | 243 ++++++++++++++++++-
4 files changed, 420 insertions(+), 26 deletions(-)
----------------------------------------------------------------------