You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/01/23 00:13:41 UTC
[beam] branch master updated: [BEAM-3419] Support iterable on
Dataflow runner when using the unified worker.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 95ac61e [BEAM-3419] Support iterable on Dataflow runner when using the unified worker.
new dd16657 Merge pull request #10648 from lukecwik/beam3419
95ac61e is described below
commit 95ac61e854bf9d3f3d4085a94dd7ce8b19b91a0e
Author: Luke Cwik <lc...@google.com>
AuthorDate: Tue Jan 21 15:22:13 2020 -0800
[BEAM-3419] Support iterable on Dataflow runner when using the unified worker.
Note that all other portable runners are using iterable side inputs.
---
.../runners/dataflow/dataflow_runner.py | 69 ++++++++++++++--------
1 file changed, 46 insertions(+), 23 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index ada700c..762b2a2 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -302,7 +302,7 @@ class DataflowRunner(PipelineRunner):
return SetPDoneVisitor(pipeline)
@staticmethod
- def side_input_visitor():
+ def side_input_visitor(use_unified_worker=False):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.pipeline import PipelineVisitor
@@ -320,24 +320,32 @@ class DataflowRunner(PipelineRunner):
for ix, side_input in enumerate(transform_node.side_inputs):
access_pattern = side_input._side_input_data().access_pattern
if access_pattern == common_urns.side_inputs.ITERABLE.urn:
- # Add a map to ('', value) as Dataflow currently only handles
- # keyed side inputs.
- pipeline = side_input.pvalue.pipeline
- new_side_input = _DataflowIterableSideInput(side_input)
- new_side_input.pvalue = beam.pvalue.PCollection(
- pipeline,
- element_type=typehints.KV[
- bytes, side_input.pvalue.element_type],
- is_bounded=side_input.pvalue.is_bounded)
- parent = transform_node.parent or pipeline._root_transform()
- map_to_void_key = beam.pipeline.AppliedPTransform(
- pipeline,
- beam.Map(lambda x: (b'', x)),
- transform_node.full_label + '/MapToVoidKey%s' % ix,
- (side_input.pvalue,))
- new_side_input.pvalue.producer = map_to_void_key
- map_to_void_key.add_output(new_side_input.pvalue)
- parent.add_part(map_to_void_key)
+ if use_unified_worker:
+ # Patch up the access pattern to appease Dataflow when using
+ # the UW and hardcode the output type to be Any since
+ # the Dataflow JSON and pipeline proto can differ in coders
+ # which leads to encoding/decoding issues within the runner.
+ side_input.pvalue.element_type = typehints.Any
+ new_side_input = _DataflowIterableSideInput(side_input)
+ else:
+ # Add a map to ('', value) as Dataflow currently only handles
+ # keyed side inputs when using the JRH.
+ pipeline = side_input.pvalue.pipeline
+ new_side_input = _DataflowIterableAsMultimapSideInput(
+ side_input)
+ new_side_input.pvalue = beam.pvalue.PCollection(
+ pipeline,
+ element_type=typehints.KV[bytes,
+ side_input.pvalue.element_type],
+ is_bounded=side_input.pvalue.is_bounded)
+ parent = transform_node.parent or pipeline._root_transform()
+ map_to_void_key = beam.pipeline.AppliedPTransform(
+ pipeline, beam.Map(lambda x: (b'', x)),
+ transform_node.full_label + '/MapToVoidKey%s' % ix,
+ (side_input.pvalue,))
+ new_side_input.pvalue.producer = map_to_void_key
+ map_to_void_key.add_output(new_side_input.pvalue)
+ parent.add_part(map_to_void_key)
elif access_pattern == common_urns.side_inputs.MULTIMAP.urn:
# Ensure the input coder is a KV coder and patch up the
# access pattern to appease Dataflow.
@@ -397,7 +405,8 @@ class DataflowRunner(PipelineRunner):
# Convert all side inputs into a form acceptable to Dataflow.
if apiclient._use_fnapi(options):
- pipeline.visit(self.side_input_visitor())
+ pipeline.visit(
+ self.side_input_visitor(apiclient._use_unified_worker(options)))
# Performing configured PTransform overrides. Note that this is currently
# done before Runner API serialization, since the new proto needs to contain
@@ -1320,12 +1329,12 @@ class _DataflowSideInput(beam.pvalue.AsSideInput):
return self._data
-class _DataflowIterableSideInput(_DataflowSideInput):
+class _DataflowIterableAsMultimapSideInput(_DataflowSideInput):
"""Wraps an iterable side input as dataflow-compatible side input."""
- def __init__(self, iterable_side_input):
+ def __init__(self, side_input):
# pylint: disable=protected-access
- side_input_data = iterable_side_input._side_input_data()
+ side_input_data = side_input._side_input_data()
assert (
side_input_data.access_pattern == common_urns.side_inputs.ITERABLE.urn)
iterable_view_fn = side_input_data.view_fn
@@ -1335,6 +1344,20 @@ class _DataflowIterableSideInput(_DataflowSideInput):
lambda multimap: iterable_view_fn(multimap[b'']))
+class _DataflowIterableSideInput(_DataflowSideInput):
+ """Wraps an iterable side input as dataflow-compatible side input."""
+
+ def __init__(self, side_input):
+ # pylint: disable=protected-access
+ self.pvalue = side_input.pvalue
+ side_input_data = side_input._side_input_data()
+ assert (
+ side_input_data.access_pattern == common_urns.side_inputs.ITERABLE.urn)
+ self._data = beam.pvalue.SideInputData(common_urns.side_inputs.ITERABLE.urn,
+ side_input_data.window_mapping_fn,
+ side_input_data.view_fn)
+
+
class _DataflowMultimapSideInput(_DataflowSideInput):
"""Wraps a multimap side input as dataflow-compatible side input."""