You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/09 00:33:58 UTC

[GitHub] [beam] robertwb commented on a change in pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

robertwb commented on a change in pull request #13877:
URL: https://github.com/apache/beam/pull/13877#discussion_r572469235



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]
             for combine_payload in combine_payloads
-        ]).to_runner_api(context)  # type: ignore[arg-type]
+        ])  # type: ignore[arg-type]
     pack_transform = beam_runner_api_pb2.PTransform(
-        unique_name=pack_transform_name + '/Pack',
+        unique_name=pack_transform_name + '/CombinePerKey',
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=common_urns.composites.COMBINE_PER_KEY.urn,
             payload=beam_runner_api_pb2.CombinePayload(
-                combine_fn=pack_combine_fn,
+                combine_fn=pack_combine_fn.to_runner_api(context),
                 accumulator_coder_id=tuple_accumulator_coder_id).
             SerializeToString()),
         inputs={'in': input_pcoll_id},
         # 'None' single output key follows convention for CombinePerKey.
         outputs={'None': pack_pcoll_id},
         environment_id=fused_stage.environment)
     pack_stage = Stage(
-        pack_stage_name + '/Pack', [pack_transform],
+        pack_stage_name + '/CombinePerKey', [pack_transform],
         downstream_side_inputs=fused_stage.downstream_side_inputs,
         must_follow=fused_stage.must_follow,
         parent=fused_stage.parent,
         environment=fused_stage.environment)
+
+    # Traverse the subtransform structure.
+    original_group_by_key_transforms = []
+    original_combine_grouped_values_transforms = []
+    original_combine_values_par_do_transforms = []
+    for transform in transforms:
+      # CombinePerKey may contain GroupByKey and Combine subtransforms.
+      if transform.subtransforms:
+        assert len(transform.subtransforms) == 2
+
+        group_by_key_transform = context.components.transforms[
+            transform.subtransforms[0]]

Review comment:
       I don't think there's any requirement that they come in a particular order. 
   
   What we can assume, or use as a precondition to grouping in _group_stages_by_key, is that there will be two subtransforms of the expected kind. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]

Review comment:
       There is a check for combine_payload.combine_fn.urn == python_urns.PICKLED_COMBINE_FN above. 
   
   It is unfortunate, however, to have this python-specific stuff embedded into the SDK. Ideally the notion of a packed CombineFn could be in the model and one could create such from a set of CombineFns. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]
             for combine_payload in combine_payloads
-        ]).to_runner_api(context)  # type: ignore[arg-type]
+        ])  # type: ignore[arg-type]
     pack_transform = beam_runner_api_pb2.PTransform(
-        unique_name=pack_transform_name + '/Pack',
+        unique_name=pack_transform_name + '/CombinePerKey',
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=common_urns.composites.COMBINE_PER_KEY.urn,
             payload=beam_runner_api_pb2.CombinePayload(
-                combine_fn=pack_combine_fn,
+                combine_fn=pack_combine_fn.to_runner_api(context),
                 accumulator_coder_id=tuple_accumulator_coder_id).
             SerializeToString()),
         inputs={'in': input_pcoll_id},
         # 'None' single output key follows convention for CombinePerKey.
         outputs={'None': pack_pcoll_id},
         environment_id=fused_stage.environment)
     pack_stage = Stage(
-        pack_stage_name + '/Pack', [pack_transform],
+        pack_stage_name + '/CombinePerKey', [pack_transform],
         downstream_side_inputs=fused_stage.downstream_side_inputs,
         must_follow=fused_stage.must_follow,
         parent=fused_stage.parent,
         environment=fused_stage.environment)
+
+    # Traverse the subtransform structure.
+    original_group_by_key_transforms = []
+    original_combine_grouped_values_transforms = []
+    original_combine_values_par_do_transforms = []
+    for transform in transforms:

Review comment:
       We would still have to insert the multiplexing DoFn as a follow-on. 
   
   I do agree this function is getting a bit unwieldy though. 
   
   Could we perhaps factor this out into a function `create_combine_per_key_transform(combine_fn, input_pcoll, output_pcoll, template)` that also creates the internal structure? 

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2171,6 +2171,7 @@ def __init__(
       runtime_type_check,  # type: bool
   ):
     super(CombineValuesDoFn, self).__init__()
+    self.input_pcoll_type = input_pcoll_type

Review comment:
       Whence this change?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]
             for combine_payload in combine_payloads
-        ]).to_runner_api(context)  # type: ignore[arg-type]
+        ])  # type: ignore[arg-type]
     pack_transform = beam_runner_api_pb2.PTransform(
-        unique_name=pack_transform_name + '/Pack',
+        unique_name=pack_transform_name + '/CombinePerKey',
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=common_urns.composites.COMBINE_PER_KEY.urn,
             payload=beam_runner_api_pb2.CombinePayload(
-                combine_fn=pack_combine_fn,
+                combine_fn=pack_combine_fn.to_runner_api(context),
                 accumulator_coder_id=tuple_accumulator_coder_id).
             SerializeToString()),
         inputs={'in': input_pcoll_id},
         # 'None' single output key follows convention for CombinePerKey.
         outputs={'None': pack_pcoll_id},
         environment_id=fused_stage.environment)
     pack_stage = Stage(
-        pack_stage_name + '/Pack', [pack_transform],
+        pack_stage_name + '/CombinePerKey', [pack_transform],
         downstream_side_inputs=fused_stage.downstream_side_inputs,
         must_follow=fused_stage.must_follow,
         parent=fused_stage.parent,
         environment=fused_stage.environment)
+
+    # Traverse the subtransform structure.
+    original_group_by_key_transforms = []
+    original_combine_grouped_values_transforms = []
+    original_combine_values_par_do_transforms = []
+    for transform in transforms:
+      # CombinePerKey may contain GroupByKey and Combine subtransforms.
+      if transform.subtransforms:
+        assert len(transform.subtransforms) == 2
+
+        group_by_key_transform = context.components.transforms[
+            transform.subtransforms[0]]
+        assert group_by_key_transform.spec.urn == common_urns.primitives.GROUP_BY_KEY.urn
+        original_group_by_key_transforms.append(group_by_key_transform)
+
+        combine_grouped_values_transform = context.components.transforms[
+            transform.subtransforms[1]]
+        assert combine_grouped_values_transform.spec.urn == common_urns.combine_components.COMBINE_GROUPED_VALUES.urn
+        original_combine_grouped_values_transforms.append(
+            combine_grouped_values_transform)
+
+        # Combine may contain a ParDo subtransform.
+        if combine_grouped_values_transform.subtransforms:
+          assert len(combine_grouped_values_transform.subtransforms) == 1
+
+          combine_values_par_do_transform = context.components.transforms[
+              combine_grouped_values_transform.subtransforms[0]]
+          assert combine_values_par_do_transform.spec.urn == common_urns.primitives.PAR_DO.urn
+          original_combine_values_par_do_transforms.append(
+              combine_values_par_do_transform)
+
+    # Pack the subtransforms if and only if the original transform had subtransforms.
+    if original_group_by_key_transforms or original_combine_grouped_values_transforms:
+      assert original_group_by_key_transforms and original_combine_grouped_values_transforms
+      # For each subtransform, reuse the an arbitrary original subtransform as a template,
+      # and then rewrite it with the correct input, output and payload.
+      # Also reuse an arbitrary GroupByKey output PCollection.
+
+      grouped_pcoll_id = next(
+          iter(original_group_by_key_transforms[0].outputs.values()))
+
+      packed_group_by_key_transform_name = (
+          pack_transform_name + '/CombinePerKey/GroupByKey')
+      packed_group_by_key_transform_key = unique_name(
+          context.components.transforms, packed_group_by_key_transform_name)
+      context.components.transforms[packed_group_by_key_transform_key].CopyFrom(
+          original_group_by_key_transforms[0])
+      context.components.transforms[
+          packed_group_by_key_transform_key].unique_name = packed_group_by_key_transform_name
+      context.components.transforms[
+          packed_group_by_key_transform_key].outputs.clear()
+      context.components.transforms[packed_group_by_key_transform_key].outputs[
+          'None'] = grouped_pcoll_id
+
+      packed_combine_grouped_values_transform_name = (
+          pack_transform_name + '/CombinePerKey/Combine')
+      packed_combine_grouped_values_transform_key = unique_name(
+          context.components.transforms,
+          packed_combine_grouped_values_transform_name)
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].CopyFrom(
+              original_combine_grouped_values_transforms[0])
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].unique_name = packed_group_by_key_transform_name
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].inputs.clear()
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].inputs[
+              '0'] = grouped_pcoll_id
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].outputs.clear()
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].outputs[
+              '0'] = pack_pcoll_id
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].spec.payload = beam_runner_api_pb2.CombinePayload(
+              combine_fn=pack_combine_fn.to_runner_api(context),
+              accumulator_coder_id=tuple_accumulator_coder_id
+          ).SerializeToString()
+
+      if original_combine_values_par_do_transforms:

Review comment:
       Why this if statement? 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
##########
@@ -67,6 +67,7 @@ def expand(self, pcoll):
     environment = environments.DockerEnvironment.from_options(
         pipeline_options.PortableOptions(sdk_location='container'))
     pipeline_proto = pipeline.to_runner_api(default_environment=environment)
+    # logging.error('[debug:yifanmai] unoptimized pipeline %s' % pipeline_proto)

Review comment:
       Remove in final PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org