You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/01 17:37:17 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #10648: [BEAM-3419] Support iterable on Dataflow runner when using the unified worker.

TheNeuralBit commented on code in PR #10648:
URL: https://github.com/apache/beam/pull/10648#discussion_r934765015


##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py:
##########
@@ -320,24 +320,32 @@ def visit_transform(self, transform_node):
           for ix, side_input in enumerate(transform_node.side_inputs):
             access_pattern = side_input._side_input_data().access_pattern
             if access_pattern == common_urns.side_inputs.ITERABLE.urn:
-              # Add a map to ('', value) as Dataflow currently only handles
-              # keyed side inputs.
-              pipeline = side_input.pvalue.pipeline
-              new_side_input = _DataflowIterableSideInput(side_input)
-              new_side_input.pvalue = beam.pvalue.PCollection(
-                  pipeline,
-                  element_type=typehints.KV[
-                      bytes, side_input.pvalue.element_type],
-                  is_bounded=side_input.pvalue.is_bounded)
-              parent = transform_node.parent or pipeline._root_transform()
-              map_to_void_key = beam.pipeline.AppliedPTransform(
-                  pipeline,
-                  beam.Map(lambda x: (b'', x)),
-                  transform_node.full_label + '/MapToVoidKey%s' % ix,
-                  (side_input.pvalue,))
-              new_side_input.pvalue.producer = map_to_void_key
-              map_to_void_key.add_output(new_side_input.pvalue)
-              parent.add_part(map_to_void_key)
+              if use_unified_worker:
+                # Patch up the access pattern to appease Dataflow when using
+                # the UW and hardcode the output type to be Any since
+                # the Dataflow JSON and pipeline proto can differ in coders
+                # which leads to encoding/decoding issues within the runner.
+                side_input.pvalue.element_type = typehints.Any
+                new_side_input = _DataflowIterableSideInput(side_input)
+              else:
+                # Add a map to ('', value) as Dataflow currently only handles
+                # keyed side inputs when using the JRH.
+                pipeline = side_input.pvalue.pipeline
+                new_side_input = _DataflowIterableAsMultimapSideInput(
+                    side_input)

Review Comment:
   Why does this switch to using `DataflowIterableAsMultimapSideInput` for the non-UW case? It looks like we used `DataflowIterableSideInput` here previously?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org