You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/23 00:07:11 UTC
[1/3] beam git commit: Closes #3424
Repository: beam
Updated Branches:
refs/heads/master 7645c44b9 -> 7a9f762e4
Closes #3424
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7a9f762e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7a9f762e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7a9f762e
Branch: refs/heads/master
Commit: 7a9f762e440168d48e3b3b2930dd83885402e51a
Parents: 7645c44 a882e8f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 22 17:05:32 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 17:05:32 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 192 +-----------------
.../runners/portability/fn_api_runner_test.py | 18 +-
.../apache_beam/runners/worker/sdk_worker.py | 201 -------------------
.../runners/worker/sdk_worker_test.py | 77 -------
4 files changed, 4 insertions(+), 484 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Remove unused (and untested) initial
splittling logic.
Posted by ro...@apache.org.
Remove unused (and untested) initial splittling logic.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a882e8f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a882e8f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a882e8f3
Branch: refs/heads/master
Commit: a882e8f3a33c4a430f55d53b65285123c5a4f50d
Parents: 5d6ad19
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 22 12:46:13 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 17:05:32 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 1 -
.../apache_beam/runners/worker/sdk_worker.py | 51 -------------
.../runners/worker/sdk_worker_test.py | 77 --------------------
3 files changed, 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index b45ff76..a8e2eb4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -19,7 +19,6 @@
"""
import base64
import collections
-import json
import logging
import Queue as queue
import threading
http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d135984..6a366eb 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -28,9 +28,7 @@ import logging
import Queue as queue
import threading
import traceback
-import zlib
-import dill
from google.protobuf import wrappers_pb2
from apache_beam.coders import coder_impl
@@ -165,37 +163,6 @@ class SideInputSource(native_iobase.NativeSource,
yield self._coder.get_impl().decode_from_stream(input_stream, True)
-def unpack_and_deserialize_py_fn(function_spec):
- """Returns unpacked and deserialized object from function spec proto."""
- return pickler.loads(unpack_function_spec_data(function_spec))
-
-
-def unpack_function_spec_data(function_spec):
- """Returns unpacked data from function spec proto."""
- data = wrappers_pb2.BytesValue()
- function_spec.data.Unpack(data)
- return data.value
-
-
-# pylint: disable=redefined-builtin
-def serialize_and_pack_py_fn(fn, urn, id=None):
- """Returns serialized and packed function in a function spec proto."""
- return pack_function_spec_data(pickler.dumps(fn), urn, id)
-# pylint: enable=redefined-builtin
-
-
-# pylint: disable=redefined-builtin
-def pack_function_spec_data(value, urn, id=None):
- """Returns packed data in a function spec proto."""
- data = wrappers_pb2.BytesValue(value=value)
- fn_proto = beam_fn_api_pb2.FunctionSpec(urn=urn)
- fn_proto.data.Pack(data)
- if id:
- fn_proto.id = id
- return fn_proto
-# pylint: enable=redefined-builtin
-
-
def memoize(func):
cache = {}
missing = object()
@@ -286,24 +253,6 @@ class SdkWorker(object):
self.fns[p_transform.function_spec.id] = p_transform.function_spec
return beam_fn_api_pb2.RegisterResponse()
- def initial_source_split(self, request, unused_instruction_id=None):
- source_spec = self.fns[request.source_reference]
- assert source_spec.urn == PYTHON_SOURCE_URN
- source_bundle = unpack_and_deserialize_py_fn(
- self.fns[request.source_reference])
- splits = source_bundle.source.split(request.desired_bundle_size_bytes,
- source_bundle.start_position,
- source_bundle.stop_position)
- response = beam_fn_api_pb2.InitialSourceSplitResponse()
- response.splits.extend([
- beam_fn_api_pb2.SourceSplit(
- source=serialize_and_pack_py_fn(split, PYTHON_SOURCE_URN),
- relative_size=split.weight,
- )
- for split in splits
- ])
- return response
-
def create_execution_tree(self, descriptor):
# TODO(robertwb): Figure out the correct prefix to use for output counters
# from StateSampler.
http://git-wip-us.apache.org/repos/asf/beam/blob/a882e8f3/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index c431bcd..553d5b8 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -27,10 +27,7 @@ import unittest
from concurrent import futures
import grpc
-from apache_beam.io.concat_source_test import RangeSource
-from apache_beam.io.iobase import SourceBundle
from apache_beam.portability.api import beam_fn_api_pb2
-from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker import sdk_worker
@@ -88,80 +85,6 @@ class SdkWorkerTest(unittest.TestCase):
harness.worker.fns,
{item.id: item for item in fns + process_bundle_descriptors})
- @unittest.skip("initial splitting not in proto")
- def test_source_split(self):
- source = RangeSource(0, 100)
- expected_splits = list(source.split(30))
-
- worker = sdk_harness.SdkWorker(
- None, data_plane.GrpcClientDataChannelFactory())
- worker.register(
- beam_fn_api_pb2.RegisterRequest(
- process_bundle_descriptor=[beam_fn_api_pb2.ProcessBundleDescriptor(
- primitive_transform=[beam_fn_api_pb2.PrimitiveTransform(
- function_spec=sdk_harness.serialize_and_pack_py_fn(
- SourceBundle(1.0, source, None, None),
- sdk_harness.PYTHON_SOURCE_URN,
- id="src"))])]))
- split_response = worker.initial_source_split(
- beam_fn_api_pb2.InitialSourceSplitRequest(
- desired_bundle_size_bytes=30,
- source_reference="src"))
-
- self.assertEqual(
- expected_splits,
- [sdk_harness.unpack_and_deserialize_py_fn(s.source)
- for s in split_response.splits])
-
- self.assertEqual(
- [s.weight for s in expected_splits],
- [s.relative_size for s in split_response.splits])
-
- @unittest.skip("initial splitting not in proto")
- def test_source_split_via_instruction(self):
-
- source = RangeSource(0, 100)
- expected_splits = list(source.split(30))
-
- test_controller = BeamFnControlServicer([
- beam_fn_api_pb2.InstructionRequest(
- instruction_id="register_request",
- register=beam_fn_api_pb2.RegisterRequest(
- process_bundle_descriptor=[
- beam_fn_api_pb2.ProcessBundleDescriptor(
- primitive_transform=[beam_fn_api_pb2.PrimitiveTransform(
- function_spec=sdk_harness.serialize_and_pack_py_fn(
- SourceBundle(1.0, source, None, None),
- sdk_harness.PYTHON_SOURCE_URN,
- id="src"))])])),
- beam_fn_api_pb2.InstructionRequest(
- instruction_id="split_request",
- initial_source_split=beam_fn_api_pb2.InitialSourceSplitRequest(
- desired_bundle_size_bytes=30,
- source_reference="src"))
- ])
-
- server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
- beam_fn_api_pb2.add_BeamFnControlServicer_to_server(test_controller, server)
- test_port = server.add_insecure_port("[::]:0")
- server.start()
-
- channel = grpc.insecure_channel("localhost:%s" % test_port)
- harness = sdk_harness.SdkHarness(channel)
- harness.run()
-
- split_response = test_controller.responses[
- "split_request"].initial_source_split
-
- self.assertEqual(
- expected_splits,
- [sdk_harness.unpack_and_deserialize_py_fn(s.source)
- for s in split_response.splits])
-
- self.assertEqual(
- [s.weight for s in expected_splits],
- [s.relative_size for s in split_response.splits])
-
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
[3/3] beam git commit: Remove fn api bundle descriptor translation.
Posted by ro...@apache.org.
Remove fn api bundle descriptor translation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5d6ad199
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5d6ad199
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5d6ad199
Branch: refs/heads/master
Commit: 5d6ad19958d0a2394f9e33720a04cc954279a7e7
Parents: 7645c44
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Thu Jun 22 12:44:23 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Thu Jun 22 17:05:32 2017 -0700
----------------------------------------------------------------------
.../runners/portability/fn_api_runner.py | 191 +------------------
.../runners/portability/fn_api_runner_test.py | 18 +-
.../apache_beam/runners/worker/sdk_worker.py | 150 ---------------
3 files changed, 4 insertions(+), 355 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index a27e293..b45ff76 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -115,13 +115,9 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
- def __init__(self, use_runner_protos=False):
+ def __init__(self):
super(FnApiRunner, self).__init__()
self._last_uid = -1
- if use_runner_protos:
- self._map_task_to_protos = self._map_task_to_runner_protos
- else:
- self._map_task_to_protos = self._map_task_to_fn_protos
def has_metrics_support(self):
return False
@@ -145,7 +141,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
process_bundle_descriptor=[process_bundle_descriptor])
), runner_sinks, input_data
- def _map_task_to_runner_protos(self, map_task, data_operation_spec):
+ def _map_task_to_protos(self, map_task, data_operation_spec):
input_data = {}
side_input_data = {}
runner_sinks = {}
@@ -265,189 +261,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
environments=dict(context_proto.environments.items()))
return input_data, side_input_data, runner_sinks, process_bundle_descriptor
- def _map_task_to_fn_protos(self, map_task, data_operation_spec):
-
- input_data = {}
- side_input_data = {}
- runner_sinks = {}
- transforms = []
- transform_index_to_id = {}
-
- # Maps coders to new coder objects and references.
- coders = {}
-
- def coder_id(coder):
- if coder not in coders:
- coders[coder] = beam_fn_api_pb2.Coder(
- function_spec=sdk_worker.pack_function_spec_data(
- json.dumps(coder.as_cloud_object()),
- sdk_worker.PYTHON_CODER_URN, id=self._next_uid()))
-
- return coders[coder].function_spec.id
-
- def output_tags(op):
- return getattr(op, 'output_tags', ['out'])
-
- def as_target(op_input):
- input_op_index, input_output_index = op_input
- input_op = map_task[input_op_index][1]
- return {
- 'ignored_input_tag':
- beam_fn_api_pb2.Target.List(target=[
- beam_fn_api_pb2.Target(
- primitive_transform_reference=transform_index_to_id[
- input_op_index],
- name=output_tags(input_op)[input_output_index])
- ])
- }
-
- def outputs(op):
- return {
- tag: beam_fn_api_pb2.PCollection(coder_reference=coder_id(coder))
- for tag, coder in zip(output_tags(op), op.output_coders)
- }
-
- for op_ix, (stage_name, operation) in enumerate(map_task):
- transform_id = transform_index_to_id[op_ix] = self._next_uid()
- if isinstance(operation, operation_specs.WorkerInMemoryWrite):
- # Write this data back to the runner.
- fn = beam_fn_api_pb2.FunctionSpec(urn=sdk_worker.DATA_OUTPUT_URN,
- id=self._next_uid())
- if data_operation_spec:
- fn.data.Pack(data_operation_spec)
- inputs = as_target(operation.input)
- side_inputs = {}
- runner_sinks[(transform_id, 'out')] = operation
-
- elif isinstance(operation, operation_specs.WorkerRead):
- # A Read is either translated to a direct injection of windowed values
- # into the sdk worker, or an injection of the source object into the
- # sdk worker as data followed by an SDF that reads that source.
- if (isinstance(operation.source.source,
- maptask_executor_runner.InMemorySource)
- and isinstance(operation.source.source.default_output_coder(),
- WindowedValueCoder)):
- output_stream = create_OutputStream()
- element_coder = (
- operation.source.source.default_output_coder().get_impl())
- # Re-encode the elements in the nested context and
- # concatenate them together
- for element in operation.source.source.read(None):
- element_coder.encode_to_stream(element, output_stream, True)
- target_name = self._next_uid()
- input_data[(transform_id, target_name)] = output_stream.get()
- fn = beam_fn_api_pb2.FunctionSpec(urn=sdk_worker.DATA_INPUT_URN,
- id=self._next_uid())
- if data_operation_spec:
- fn.data.Pack(data_operation_spec)
- inputs = {target_name: beam_fn_api_pb2.Target.List()}
- side_inputs = {}
- else:
- # Read the source object from the runner.
- source_coder = beam.coders.DillCoder()
- input_transform_id = self._next_uid()
- output_stream = create_OutputStream()
- source_coder.get_impl().encode_to_stream(
- GlobalWindows.windowed_value(operation.source),
- output_stream,
- True)
- target_name = self._next_uid()
- input_data[(input_transform_id, target_name)] = output_stream.get()
- input_ptransform = beam_fn_api_pb2.PrimitiveTransform(
- id=input_transform_id,
- function_spec=beam_fn_api_pb2.FunctionSpec(
- urn=sdk_worker.DATA_INPUT_URN,
- id=self._next_uid()),
- # TODO(robertwb): Possible name collision.
- step_name=stage_name + '/inject_source',
- inputs={target_name: beam_fn_api_pb2.Target.List()},
- outputs={
- 'out':
- beam_fn_api_pb2.PCollection(
- coder_reference=coder_id(source_coder))
- })
- if data_operation_spec:
- input_ptransform.function_spec.data.Pack(data_operation_spec)
- transforms.append(input_ptransform)
-
- # Read the elements out of the source.
- fn = sdk_worker.pack_function_spec_data(
- OLDE_SOURCE_SPLITTABLE_DOFN_DATA,
- sdk_worker.PYTHON_DOFN_URN,
- id=self._next_uid())
- inputs = {
- 'ignored_input_tag':
- beam_fn_api_pb2.Target.List(target=[
- beam_fn_api_pb2.Target(
- primitive_transform_reference=input_transform_id,
- name='out')
- ])
- }
- side_inputs = {}
-
- elif isinstance(operation, operation_specs.WorkerDoFn):
- fn = sdk_worker.pack_function_spec_data(
- operation.serialized_fn,
- sdk_worker.PYTHON_DOFN_URN,
- id=self._next_uid())
- inputs = as_target(operation.input)
- # Store the contents of each side input for state access.
- for si in operation.side_inputs:
- assert isinstance(si.source, iobase.BoundedSource)
- element_coder = si.source.default_output_coder()
- view_id = self._next_uid()
- # TODO(robertwb): Actually flesh out the ViewFn API.
- side_inputs[si.tag] = beam_fn_api_pb2.SideInput(
- view_fn=sdk_worker.serialize_and_pack_py_fn(
- element_coder, urn=sdk_worker.PYTHON_ITERABLE_VIEWFN_URN,
- id=view_id))
- # Re-encode the elements in the nested context and
- # concatenate them together
- output_stream = create_OutputStream()
- for element in si.source.read(
- si.source.get_range_tracker(None, None)):
- element_coder.get_impl().encode_to_stream(
- element, output_stream, True)
- elements_data = output_stream.get()
- side_input_data[view_id] = elements_data
-
- elif isinstance(operation, operation_specs.WorkerFlatten):
- fn = sdk_worker.pack_function_spec_data(
- operation.serialized_fn,
- sdk_worker.IDENTITY_DOFN_URN,
- id=self._next_uid())
- inputs = {
- 'ignored_input_tag':
- beam_fn_api_pb2.Target.List(target=[
- beam_fn_api_pb2.Target(
- primitive_transform_reference=transform_index_to_id[
- input_op_index],
- name=output_tags(map_task[input_op_index][1])[
- input_output_index])
- for input_op_index, input_output_index in operation.inputs
- ])
- }
- side_inputs = {}
-
- else:
- raise TypeError(operation)
-
- ptransform = beam_fn_api_pb2.PrimitiveTransform(
- id=transform_id,
- function_spec=fn,
- step_name=stage_name,
- inputs=inputs,
- side_inputs=side_inputs,
- outputs=outputs(operation))
- transforms.append(ptransform)
-
- process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
- id=self._next_uid(),
- coders=coders.values(),
- primitive_transform=transforms)
-
- return input_data, side_input_data, runner_sinks, process_bundle_descriptor
-
def _run_map_task(
self, map_task, control_handler, state_handler, data_plane_handler,
data_operation_spec):
http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index e2eae26..9159035 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -23,26 +23,12 @@ from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.portability import maptask_executor_runner_test
-class FnApiRunnerTestWithRunnerProtos(
+class FnApiRunnerTest(
maptask_executor_runner_test.MapTaskExecutorRunnerTest):
def create_pipeline(self):
return beam.Pipeline(
- runner=fn_api_runner.FnApiRunner(use_runner_protos=True))
-
- def test_combine_per_key(self):
- # TODO(robertwb): Implement PGBKCV operation.
- pass
-
- # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
-
-
-class FnApiRunnerTestWithFnProtos(
- maptask_executor_runner_test.MapTaskExecutorRunnerTest):
-
- def create_pipeline(self):
- return beam.Pipeline(
- runner=fn_api_runner.FnApiRunner(use_runner_protos=False))
+ runner=fn_api_runner.FnApiRunner())
def test_combine_per_key(self):
# TODO(robertwb): Implement PGBKCV operation.
http://git-wip-us.apache.org/repos/asf/beam/blob/5d6ad199/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index a2c9f42..d135984 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -196,25 +196,6 @@ def pack_function_spec_data(value, urn, id=None):
# pylint: enable=redefined-builtin
-# TODO(vikasrk): Consistently use same format everywhere.
-def load_compressed(compressed_data):
- """Returns a decompressed and deserialized python object."""
- # Note: SDK uses ``pickler.dumps`` to serialize certain python objects
- # (like sources), which involves serialization, compression and base64
- # encoding. We cannot directly use ``pickler.loads`` for
- # deserialization, as the runner would have already base64 decoded the
- # data. So we only need to decompress and deserialize.
-
- data = zlib.decompress(compressed_data)
- try:
- return dill.loads(data)
- except Exception: # pylint: disable=broad-except
- dill.dill._trace(True) # pylint: disable=protected-access
- return dill.loads(data)
- finally:
- dill.dill._trace(False) # pylint: disable=protected-access
-
-
def memoize(func):
cache = {}
missing = object()
@@ -324,12 +305,6 @@ class SdkWorker(object):
return response
def create_execution_tree(self, descriptor):
- if descriptor.transforms:
- return self.create_execution_tree_from_runner_api(descriptor)
- else:
- return self.create_execution_tree_from_fn_api(descriptor)
-
- def create_execution_tree_from_runner_api(self, descriptor):
# TODO(robertwb): Figure out the correct prefix to use for output counters
# from StateSampler.
counter_factory = counters.CounterFactory()
@@ -368,131 +343,6 @@ class SdkWorker(object):
for transform_id in sorted(
descriptor.transforms, key=topological_height, reverse=True)]
- def create_execution_tree_from_fn_api(self, descriptor):
- # TODO(vikasrk): Add an id field to Coder proto and use that instead.
- coders = {coder.function_spec.id: operation_specs.get_coder_from_spec(
- json.loads(unpack_function_spec_data(coder.function_spec)))
- for coder in descriptor.coders}
-
- counter_factory = counters.CounterFactory()
- # TODO(robertwb): Figure out the correct prefix to use for output counters
- # from StateSampler.
- state_sampler = statesampler.StateSampler(
- 'fnapi-step%s-' % descriptor.id, counter_factory)
- consumers = collections.defaultdict(lambda: collections.defaultdict(list))
- ops_by_id = {}
- reversed_ops = []
-
- for transform in reversed(descriptor.primitive_transform):
- # TODO(robertwb): Figure out how to plumb through the operation name (e.g.
- # "s3") from the service through the FnAPI so that msec counters can be
- # reported and correctly plumbed through the service and the UI.
- operation_name = 'fnapis%s' % transform.id
-
- def only_element(iterable):
- element, = iterable
- return element
-
- if transform.function_spec.urn == DATA_OUTPUT_URN:
- target = beam_fn_api_pb2.Target(
- primitive_transform_reference=transform.id,
- name=only_element(transform.outputs.keys()))
-
- op = DataOutputOperation(
- operation_name,
- transform.step_name,
- consumers[transform.id],
- counter_factory,
- state_sampler,
- coders[only_element(transform.outputs.values()).coder_reference],
- target,
- self.data_channel_factory.create_data_channel(
- transform.function_spec))
-
- elif transform.function_spec.urn == DATA_INPUT_URN:
- target = beam_fn_api_pb2.Target(
- primitive_transform_reference=transform.id,
- name=only_element(transform.inputs.keys()))
- op = DataInputOperation(
- operation_name,
- transform.step_name,
- consumers[transform.id],
- counter_factory,
- state_sampler,
- coders[only_element(transform.outputs.values()).coder_reference],
- target,
- self.data_channel_factory.create_data_channel(
- transform.function_spec))
-
- elif transform.function_spec.urn == PYTHON_DOFN_URN:
- def create_side_input(tag, si):
- # TODO(robertwb): Extract windows (and keys) out of element data.
- return operation_specs.WorkerSideInputSource(
- tag=tag,
- source=SideInputSource(
- self.state_handler,
- beam_fn_api_pb2.StateKey.MultimapSideInput(
- key=si.view_fn.id.encode('utf-8')),
- coder=unpack_and_deserialize_py_fn(si.view_fn)))
- output_tags = list(transform.outputs.keys())
- spec = operation_specs.WorkerDoFn(
- serialized_fn=unpack_function_spec_data(transform.function_spec),
- output_tags=output_tags,
- input=None,
- side_inputs=[create_side_input(tag, si)
- for tag, si in transform.side_inputs.items()],
- output_coders=[coders[transform.outputs[out].coder_reference]
- for out in output_tags])
-
- op = operations.DoOperation(operation_name, spec, counter_factory,
- state_sampler)
- # TODO(robertwb): Move these to the constructor.
- op.step_name = transform.step_name
- for tag, op_consumers in consumers[transform.id].items():
- for consumer in op_consumers:
- op.add_receiver(
- consumer, output_tags.index(tag))
-
- elif transform.function_spec.urn == IDENTITY_DOFN_URN:
- op = operations.FlattenOperation(operation_name, None, counter_factory,
- state_sampler)
- # TODO(robertwb): Move these to the constructor.
- op.step_name = transform.step_name
- for tag, op_consumers in consumers[transform.id].items():
- for consumer in op_consumers:
- op.add_receiver(consumer, 0)
-
- elif transform.function_spec.urn == PYTHON_SOURCE_URN:
- source = load_compressed(unpack_function_spec_data(
- transform.function_spec))
- # TODO(vikasrk): Remove this once custom source is implemented with
- # splittable dofn via the data plane.
- spec = operation_specs.WorkerRead(
- iobase.SourceBundle(1.0, source, None, None),
- [WindowedValueCoder(source.default_output_coder())])
- op = operations.ReadOperation(operation_name, spec, counter_factory,
- state_sampler)
- op.step_name = transform.step_name
- output_tags = list(transform.outputs.keys())
- for tag, op_consumers in consumers[transform.id].items():
- for consumer in op_consumers:
- op.add_receiver(
- consumer, output_tags.index(tag))
-
- else:
- raise NotImplementedError
-
- # Record consumers.
- for _, inputs in transform.inputs.items():
- for target in inputs.target:
- consumers[target.primitive_transform_reference][target.name].append(
- op)
-
- reversed_ops.append(op)
- ops_by_id[transform.id] = op
-
- return list(reversed(reversed_ops))
-
def process_bundle(self, request, instruction_id):
ops = self.create_execution_tree(
self.fns[request.process_bundle_descriptor_reference])