You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/06 21:38:52 UTC

[GitHub] [beam] yifanmai opened a new pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

yifanmai opened a new pull request #12185:
URL: https://github.com/apache/beam/pull/12185


   Adds `eliminate_common_siblings` and `pack_combiners` graph optimization phases to the translations module. This allows  packs compatible CombinePerKey / CombineGlobally stages with a common input into a single stage. This improves performance for use cases of Beam that create thousands of Combine stages with a common input.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r470309922



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(

Review comment:
       OK. Keeping the check, but moving it further up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r468102004



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(

Review comment:
       Can't we pull this off of the fused_stage?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(

Review comment:
       We need to group and filter on the environment of the transform as well, lest we try to pack Java or Go combiners here. 
   
   The easiest way to do that would be to add a "beam:combinefn:packed_python:v1" URN at https://github.com/apache/beam/blob/master/sdks/python/apache_beam/portability/python_urns.py , inject it into the list at https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/environments.py#L587 , and then here you can look up the environment attached to the transform, and whether that URN shows up in its capabilities, so see if this is OK. 
   
   I'd be glad to explain more if this doesn't make sense. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          Stage._merge_environments,
+          [transform.environment_id or None for transform in transforms])
+      # Combiner packing only supports Python CombineFns.
+      for combine_payload in combine_payloads:
+        if combine_payload.combine_fn.urn != python_urns.PICKLED_COMBINE_FN:
+          raise ValueError('Combiner packing only supports Python CombineFns')
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),

Review comment:
       KV?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          Stage._merge_environments,
+          [transform.environment_id or None for transform in transforms])
+      # Combiner packing only supports Python CombineFns.
+      for combine_payload in combine_payloads:
+        if combine_payload.combine_fn.urn != python_urns.PICKLED_COMBINE_FN:
+          raise ValueError('Combiner packing only supports Python CombineFns')
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue

Review comment:
       OK, so this continue breaks goes back to the next set of packable stages. Just a suggestion: it might be easier to follow if you did all the checking of whether it's OK (e.g. can_fuse, the payload type) above when building these lists, rather than here. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    if sibling_key is None or len(sibling_stages) == 1:
+      continue
+    output_pcoll_ids = [
+        only_element(stage.transforms[0].outputs.values())
+        for stage in sibling_stages
+    ]
+    to_delete_pcoll_ids = output_pcoll_ids[1:]
+    for to_delete_pcoll_id in to_delete_pcoll_ids:
+      pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+      del context.components.pcollections[to_delete_pcoll_id]
+    del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    for stage in sibling_stages:
+      input_keys_to_remap = []
+      for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+        if input_pcoll_id in pcoll_id_remap:
+          input_keys_to_remap.append(input_key)
+      for input_key_to_remap in input_keys_to_remap:
+        stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+            stage.transforms[0].inputs[input_key_to_remap]]
+      yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  def _try_merge_environments(env1, env2):
+    if env1 is None:
+      return env2
+    elif env2 is None:
+      return env1
+    else:
+      if env1 != env2:
+        raise ValueError
+      return env1
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          _try_merge_environments,
+          [transform.environment_id or None for transform in transforms])
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),
+            component_coder_ids=accumulator_coder_ids))
+
+    # Build packed output coder for (key, (out1, out2, ...))
+    input_kv_coder_id = context.components.pcollections[input_pcoll_id].coder_id
+    key_coder_id = _get_key_coder_id_from_kv_coder(
+        context.components.coders[input_kv_coder_id])
+    output_kv_coder_ids = [
+        context.components.pcollections[output_pcoll_id].coder_id
+        for output_pcoll_id in output_pcoll_ids
+    ]
+    output_value_coder_ids = [
+        _get_value_coder_id_from_kv_coder(
+            context.components.coders[output_kv_coder_id])
+        for output_kv_coder_id in output_kv_coder_ids
+    ]
+    pack_output_value_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=output_value_coder_ids)
+    pack_output_value_coder_id = context.add_or_get_coder_id(
+        pack_output_value_coder)
+    pack_output_kv_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=[key_coder_id, pack_output_value_coder_id])
+    pack_output_kv_coder_id = context.add_or_get_coder_id(pack_output_kv_coder)
+
+    # Set up packed PCollection
+    pack_combine_name = fused_stage.name
+    pack_pcoll_id = unique_name(context.components.pcollections, 'pcollection')
+    input_pcoll = context.components.pcollections[input_pcoll_id]
+    context.components.pcollections[pack_pcoll_id].CopyFrom(
+        beam_runner_api_pb2.PCollection(
+            unique_name=pack_combine_name + '.out',
+            coder_id=pack_output_kv_coder_id,
+            windowing_strategy_id=input_pcoll.windowing_strategy_id,
+            is_bounded=input_pcoll.is_bounded))
+
+    # Set up Pack stage.
+    pack_combine_fn = combiners.SingleInputTupleCombineFn(*[
+        core.CombineFn.from_runner_api(combine_payload.combine_fn, context)
+        for combine_payload in combine_payloads
+    ]).to_runner_api(context)
+    pack_transform = beam_runner_api_pb2.PTransform(
+        unique_name=pack_combine_name + '/Pack',
+        spec=beam_runner_api_pb2.FunctionSpec(
+            urn=common_urns.composites.COMBINE_PER_KEY.urn,
+            payload=beam_runner_api_pb2.CombinePayload(
+                combine_fn=pack_combine_fn,
+                accumulator_coder_id=tuple_accumulator_coder_id)
+            .SerializeToString()),
+        inputs={'in': input_pcoll_id},
+        outputs={'out': pack_pcoll_id},
+        environment_id=merged_transform_environment_id)
+    pack_stage = Stage(
+        pack_combine_name + '/Pack', [pack_transform],
+        downstream_side_inputs=fused_stage.downstream_side_inputs,
+        must_follow=fused_stage.must_follow,
+        parent=fused_stage,
+        environment=fused_stage.environment)
+    yield pack_stage
+
+    # Set up Unpack stage
+    tags = [str(i) for i in range(len(output_pcoll_ids))]
+    pickled_do_fn_data = pickler.dumps((_UnpackFn(tags), (), {}, [], None))
+    unpack_transform = beam_runner_api_pb2.PTransform(
+        unique_name=pack_combine_name + '/Unpack',
+        spec=beam_runner_api_pb2.FunctionSpec(
+            urn=common_urns.primitives.PAR_DO.urn,
+            payload=beam_runner_api_pb2.ParDoPayload(
+                do_fn=beam_runner_api_pb2.FunctionSpec(
+                    urn=python_urns.PICKLED_DOFN_INFO,
+                    payload=pickled_do_fn_data)).SerializeToString()),
+        inputs={'in': pack_pcoll_id},
+        outputs=dict(zip(tags, output_pcoll_ids)),
+        environment_id=merged_transform_environment_id)

Review comment:
       Commented above. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(

Review comment:
       Oh, I see you are checking for PICKLED_COMBINE_FN below. We should keep this, but the reason it may be insufficient is that one might use PICKLED_COMBINE_FN across different environments. (People using Python 2 transforms of an old version of Beam in Python 3 pipelines is an actual scenario that may come up soon as we're dropping Python 2 support.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465915603



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    if sibling_key is None or len(sibling_stages) == 1:
+      continue
+    output_pcoll_ids = [
+        only_element(stage.transforms[0].outputs.values())
+        for stage in sibling_stages
+    ]
+    to_delete_pcoll_ids = output_pcoll_ids[1:]
+    for to_delete_pcoll_id in to_delete_pcoll_ids:
+      pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+      del context.components.pcollections[to_delete_pcoll_id]
+    del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    for stage in sibling_stages:
+      input_keys_to_remap = []
+      for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+        if input_pcoll_id in pcoll_id_remap:
+          input_keys_to_remap.append(input_key)
+      for input_key_to_remap in input_keys_to_remap:
+        stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+            stage.transforms[0].inputs[input_key_to_remap]]
+      yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  def _try_merge_environments(env1, env2):

Review comment:
       Yup, replaced with `Stage._merge_environments` instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-681052485


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@1e969b4`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12185/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #12185   +/-   ##
   =========================================
     Coverage          ?   34.30%           
   =========================================
     Files             ?      695           
     Lines             ?    82362           
     Branches          ?     9323           
   =========================================
     Hits              ?    28251           
     Misses            ?    53688           
     Partials          ?      423           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=footer). Last update [1e969b4...a2ab49c](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r477506036



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -818,7 +819,7 @@ def _try_fuse_stages(a, b):
         for output_kv_coder_id in output_kv_coder_ids
     ]
     pack_output_value_coder = beam_runner_api_pb2.Coder(
-        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        spec=beam_runner_api_pb2.FunctionSpec(urn=python_urns.tuple.KV.urn),

Review comment:
       Thanks for catching that. Fixed.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,200 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent and environment, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    is_packable_combine = False
+
+    if (len(stage.transforms) == 1 and
+        stage.environment is not None and
+        python_urns.PACKED_COMBINE_FN in
+        context.components.environments[stage.environment].capabilities):
+      transform = only_transform(stage.transforms)
+      if (transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and
+          len(transform.inputs) == 1 and
+          len(transform.outputs) == 1):
+        combine_payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.CombinePayload)
+        if combine_payload.combine_fn.urn == python_urns.PICKLED_COMBINE_FN:
+          is_packable_combine = True
+
+    if is_packable_combine:
+      input_pcoll_id = only_element(transform.inputs.values())
+      stage_key = (input_pcoll_id, stage.environment)
+      combine_stages_by_input_pcoll_id[stage_key].append(stage)
+    else:
+      yield stage
+
+  for stage_key, packable_stages in combine_stages_by_input_pcoll_id.items():
+    input_pcoll_id, _ = stage_key
+    try:
+      if not len(packable_stages) > 1:
+        raise ValueError('Only one stage in this group: Skipping stage packing')
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+    except ValueError:
+      # Skip packing stages in this group.
+      # Yield the stages unmodified, and then continue to the next group.
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-681052485


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@1e969b4`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12185/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #12185   +/-   ##
   =========================================
     Coverage          ?   40.26%           
   =========================================
     Files             ?      451           
     Lines             ?    53247           
     Branches          ?        0           
   =========================================
     Hits              ?    21439           
     Misses            ?    31808           
     Partials          ?        0           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=footer). Last update [1e969b4...e02e09b](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r470310207



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(

Review comment:
       Also added a new test to make sure that we don't run combiner packing without the capability.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r457799457



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -289,6 +289,8 @@ def create_stages(
         phases=[
             translations.annotate_downstream_side_inputs,
             translations.fix_side_input_pcoll_coders,
+            translations.eliminate_common_siblings,

Review comment:
       As we discussed in a separate conversation, we would probably need a design or RFC for annotating DoFns are idempotent. As such, I've removed eliminate_common_siblings from this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-681052485


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@1e969b4`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `n/a`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465913553



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)

Review comment:
       Noted. Deleted code due to removal of `eliminate_common_siblings`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465912431



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    if sibling_key is None or len(sibling_stages) == 1:
+      continue
+    output_pcoll_ids = [
+        only_element(stage.transforms[0].outputs.values())
+        for stage in sibling_stages
+    ]
+    to_delete_pcoll_ids = output_pcoll_ids[1:]
+    for to_delete_pcoll_id in to_delete_pcoll_ids:
+      pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+      del context.components.pcollections[to_delete_pcoll_id]
+    del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    for stage in sibling_stages:
+      input_keys_to_remap = []
+      for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+        if input_pcoll_id in pcoll_id_remap:
+          input_keys_to_remap.append(input_key)
+      for input_key_to_remap in input_keys_to_remap:
+        stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+            stage.transforms[0].inputs[input_key_to_remap]]
+      yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  def _try_merge_environments(env1, env2):
+    if env1 is None:
+      return env2
+    elif env2 is None:
+      return env1
+    else:
+      if env1 != env2:
+        raise ValueError
+      return env1
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          _try_merge_environments,
+          [transform.environment_id or None for transform in transforms])
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),
+            component_coder_ids=accumulator_coder_ids))
+
+    # Build packed output coder for (key, (out1, out2, ...))
+    input_kv_coder_id = context.components.pcollections[input_pcoll_id].coder_id
+    key_coder_id = _get_key_coder_id_from_kv_coder(
+        context.components.coders[input_kv_coder_id])
+    output_kv_coder_ids = [
+        context.components.pcollections[output_pcoll_id].coder_id
+        for output_pcoll_id in output_pcoll_ids
+    ]
+    output_value_coder_ids = [
+        _get_value_coder_id_from_kv_coder(
+            context.components.coders[output_kv_coder_id])
+        for output_kv_coder_id in output_kv_coder_ids
+    ]
+    pack_output_value_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=output_value_coder_ids)
+    pack_output_value_coder_id = context.add_or_get_coder_id(
+        pack_output_value_coder)
+    pack_output_kv_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=[key_coder_id, pack_output_value_coder_id])
+    pack_output_kv_coder_id = context.add_or_get_coder_id(pack_output_kv_coder)
+
+    # Set up packed PCollection
+    pack_combine_name = fused_stage.name
+    pack_pcoll_id = unique_name(context.components.pcollections, 'pcollection')
+    input_pcoll = context.components.pcollections[input_pcoll_id]
+    context.components.pcollections[pack_pcoll_id].CopyFrom(
+        beam_runner_api_pb2.PCollection(
+            unique_name=pack_combine_name + '.out',
+            coder_id=pack_output_kv_coder_id,
+            windowing_strategy_id=input_pcoll.windowing_strategy_id,
+            is_bounded=input_pcoll.is_bounded))
+
+    # Set up Pack stage.
+    pack_combine_fn = combiners.SingleInputTupleCombineFn(*[
+        core.CombineFn.from_runner_api(combine_payload.combine_fn, context)

Review comment:
       Added check that `combine_payload.combine_fn.urn == python_urns.PICKLED_COMBINE_FN` before attempting to pack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #12185:
URL: https://github.com/apache/beam/pull/12185


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r470936066



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,199 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    is_packable_combine = False
+
+    if (len(stage.transforms) == 1 and 
+        stage.environment is not None and
+        python_urns.PACKED_COMBINE_FN in
+        context.components.environments[stage.environment].capabilities):
+      transform = only_transform(stage.transforms)
+      if (transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and
+          len(transform.inputs) == 1 and
+          len(transform.outputs) == 1):
+        combine_payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.CombinePayload)
+        if combine_payload.combine_fn.urn == python_urns.PICKLED_COMBINE_FN:
+          is_packable_combine = True
+
+    if is_packable_combine:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)

Review comment:
       Perhaps key by environment and input pcollection (to not even attempt to fuse across environments). 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          Stage._merge_environments,
+          [transform.environment_id or None for transform in transforms])
+      # Combiner packing only supports Python CombineFns.
+      for combine_payload in combine_payloads:
+        if combine_payload.combine_fn.urn != python_urns.PICKLED_COMBINE_FN:
+          raise ValueError('Combiner packing only supports Python CombineFns')
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),

Review comment:
       Technically, a KV coder can only have two components. You may have to make an actual Tuple coder here with a new (again, feel free to do Python-only for now) URN. 
   
   You should be able to add another Coder.register_urn decorator to https://github.com/apache/beam/blob/release-2.23.0/sdks/python/apache_beam/coders/coders.py#L1029 and it'll just work. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r451894237



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -289,6 +289,8 @@ def create_stages(
         phases=[
             translations.annotate_downstream_side_inputs,
             translations.fix_side_input_pcoll_coders,
+            translations.eliminate_common_siblings,

Review comment:
       We could be able to annotate/recognize certain DoFns (e.g. by URN) for which this deduplication would be safe to apply. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-669367028


   Removed sibling deduplication. @robertwb and @aaltay could you take another look?
   
   Also, filed [BEAM-10641](https://issues.apache.org/jira/browse/BEAM-10641) for adding this to Dataflow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-655767371


   @yifanmai - (a note from our earlier conversation). This elimination would not be safe if pardo has side effects or if it is not deterministic. For example, if pardo is written to randomly sample 10% of its input, this change will result in all siblings to produce the same exact sample.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r451758742



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    if sibling_key is None or len(sibling_stages) == 1:
+      continue
+    output_pcoll_ids = [
+        only_element(stage.transforms[0].outputs.values())
+        for stage in sibling_stages
+    ]
+    to_delete_pcoll_ids = output_pcoll_ids[1:]
+    for to_delete_pcoll_id in to_delete_pcoll_ids:
+      pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+      del context.components.pcollections[to_delete_pcoll_id]
+    del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    for stage in sibling_stages:
+      input_keys_to_remap = []
+      for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+        if input_pcoll_id in pcoll_id_remap:
+          input_keys_to_remap.append(input_key)
+      for input_key_to_remap in input_keys_to_remap:
+        stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+            stage.transforms[0].inputs[input_key_to_remap]]
+      yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  def _try_merge_environments(env1, env2):

Review comment:
       Was this copied from above? If needed, perhaps refactor? (Similarly for try_fuse_stages.)

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}

Review comment:
       (Just a thought) I wonder if this should be a global tracked in the context for being able to properly reconstruct composites. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -289,6 +289,8 @@ def create_stages(
         phases=[
             translations.annotate_downstream_side_inputs,
             translations.fix_side_input_pcoll_coders,
+            translations.eliminate_common_siblings,

Review comment:
       This transform is not safe to do ubiquitously (due to the possibility of side effects).  

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)

Review comment:
       One can omit the environment here--if two specs and payloads are identical, they are the same operation.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    if sibling_key is None or len(sibling_stages) == 1:
+      continue
+    output_pcoll_ids = [
+        only_element(stage.transforms[0].outputs.values())
+        for stage in sibling_stages
+    ]
+    to_delete_pcoll_ids = output_pcoll_ids[1:]
+    for to_delete_pcoll_id in to_delete_pcoll_ids:
+      pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+      del context.components.pcollections[to_delete_pcoll_id]
+    del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    for stage in sibling_stages:
+      input_keys_to_remap = []
+      for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+        if input_pcoll_id in pcoll_id_remap:
+          input_keys_to_remap.append(input_key)
+      for input_key_to_remap in input_keys_to_remap:
+        stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+            stage.transforms[0].inputs[input_key_to_remap]]
+      yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  def _try_merge_environments(env1, env2):
+    if env1 is None:
+      return env2
+    elif env2 is None:
+      return env1
+    else:
+      if env1 != env2:
+        raise ValueError
+      return env1
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          _try_merge_environments,
+          [transform.environment_id or None for transform in transforms])
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),
+            component_coder_ids=accumulator_coder_ids))
+
+    # Build packed output coder for (key, (out1, out2, ...))
+    input_kv_coder_id = context.components.pcollections[input_pcoll_id].coder_id
+    key_coder_id = _get_key_coder_id_from_kv_coder(
+        context.components.coders[input_kv_coder_id])
+    output_kv_coder_ids = [
+        context.components.pcollections[output_pcoll_id].coder_id
+        for output_pcoll_id in output_pcoll_ids
+    ]
+    output_value_coder_ids = [
+        _get_value_coder_id_from_kv_coder(
+            context.components.coders[output_kv_coder_id])
+        for output_kv_coder_id in output_kv_coder_ids
+    ]
+    pack_output_value_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=output_value_coder_ids)
+    pack_output_value_coder_id = context.add_or_get_coder_id(
+        pack_output_value_coder)
+    pack_output_kv_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=[key_coder_id, pack_output_value_coder_id])
+    pack_output_kv_coder_id = context.add_or_get_coder_id(pack_output_kv_coder)
+
+    # Set up packed PCollection
+    pack_combine_name = fused_stage.name
+    pack_pcoll_id = unique_name(context.components.pcollections, 'pcollection')
+    input_pcoll = context.components.pcollections[input_pcoll_id]
+    context.components.pcollections[pack_pcoll_id].CopyFrom(
+        beam_runner_api_pb2.PCollection(
+            unique_name=pack_combine_name + '.out',
+            coder_id=pack_output_kv_coder_id,
+            windowing_strategy_id=input_pcoll.windowing_strategy_id,
+            is_bounded=input_pcoll.is_bounded))
+
+    # Set up Pack stage.
+    pack_combine_fn = combiners.SingleInputTupleCombineFn(*[
+        core.CombineFn.from_runner_api(combine_payload.combine_fn, context)
+        for combine_payload in combine_payloads
+    ]).to_runner_api(context)
+    pack_transform = beam_runner_api_pb2.PTransform(
+        unique_name=pack_combine_name + '/Pack',
+        spec=beam_runner_api_pb2.FunctionSpec(
+            urn=common_urns.composites.COMBINE_PER_KEY.urn,
+            payload=beam_runner_api_pb2.CombinePayload(
+                combine_fn=pack_combine_fn,
+                accumulator_coder_id=tuple_accumulator_coder_id)
+            .SerializeToString()),
+        inputs={'in': input_pcoll_id},
+        outputs={'out': pack_pcoll_id},
+        environment_id=merged_transform_environment_id)
+    pack_stage = Stage(
+        pack_combine_name + '/Pack', [pack_transform],
+        downstream_side_inputs=fused_stage.downstream_side_inputs,
+        must_follow=fused_stage.must_follow,
+        parent=fused_stage,
+        environment=fused_stage.environment)
+    yield pack_stage
+
+    # Set up Unpack stage
+    tags = [str(i) for i in range(len(output_pcoll_ids))]
+    pickled_do_fn_data = pickler.dumps((_UnpackFn(tags), (), {}, [], None))
+    unpack_transform = beam_runner_api_pb2.PTransform(
+        unique_name=pack_combine_name + '/Unpack',
+        spec=beam_runner_api_pb2.FunctionSpec(
+            urn=common_urns.primitives.PAR_DO.urn,
+            payload=beam_runner_api_pb2.ParDoPayload(
+                do_fn=beam_runner_api_pb2.FunctionSpec(
+                    urn=python_urns.PICKLED_DOFN_INFO,
+                    payload=pickled_do_fn_data)).SerializeToString()),
+        inputs={'in': pack_pcoll_id},
+        outputs=dict(zip(tags, output_pcoll_ids)),
+        environment_id=merged_transform_environment_id)

Review comment:
       Similarly, this environment must be a Python environment for this to work.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    if sibling_key is None or len(sibling_stages) == 1:
+      continue
+    output_pcoll_ids = [
+        only_element(stage.transforms[0].outputs.values())
+        for stage in sibling_stages
+    ]
+    to_delete_pcoll_ids = output_pcoll_ids[1:]
+    for to_delete_pcoll_id in to_delete_pcoll_ids:
+      pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+      del context.components.pcollections[to_delete_pcoll_id]
+    del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    for stage in sibling_stages:
+      input_keys_to_remap = []
+      for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+        if input_pcoll_id in pcoll_id_remap:
+          input_keys_to_remap.append(input_key)
+      for input_key_to_remap in input_keys_to_remap:
+        stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+            stage.transforms[0].inputs[input_key_to_remap]]
+      yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  def _try_merge_environments(env1, env2):
+    if env1 is None:
+      return env2
+    elif env2 is None:
+      return env1
+    else:
+      if env1 != env2:
+        raise ValueError
+      return env1
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          _try_merge_environments,
+          [transform.environment_id or None for transform in transforms])
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),
+            component_coder_ids=accumulator_coder_ids))
+
+    # Build packed output coder for (key, (out1, out2, ...))
+    input_kv_coder_id = context.components.pcollections[input_pcoll_id].coder_id
+    key_coder_id = _get_key_coder_id_from_kv_coder(
+        context.components.coders[input_kv_coder_id])
+    output_kv_coder_ids = [
+        context.components.pcollections[output_pcoll_id].coder_id
+        for output_pcoll_id in output_pcoll_ids
+    ]
+    output_value_coder_ids = [
+        _get_value_coder_id_from_kv_coder(
+            context.components.coders[output_kv_coder_id])
+        for output_kv_coder_id in output_kv_coder_ids
+    ]
+    pack_output_value_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=output_value_coder_ids)
+    pack_output_value_coder_id = context.add_or_get_coder_id(
+        pack_output_value_coder)
+    pack_output_kv_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=[key_coder_id, pack_output_value_coder_id])
+    pack_output_kv_coder_id = context.add_or_get_coder_id(pack_output_kv_coder)
+
+    # Set up packed PCollection
+    pack_combine_name = fused_stage.name
+    pack_pcoll_id = unique_name(context.components.pcollections, 'pcollection')
+    input_pcoll = context.components.pcollections[input_pcoll_id]
+    context.components.pcollections[pack_pcoll_id].CopyFrom(
+        beam_runner_api_pb2.PCollection(
+            unique_name=pack_combine_name + '.out',
+            coder_id=pack_output_kv_coder_id,
+            windowing_strategy_id=input_pcoll.windowing_strategy_id,
+            is_bounded=input_pcoll.is_bounded))
+
+    # Set up Pack stage.
+    pack_combine_fn = combiners.SingleInputTupleCombineFn(*[
+        core.CombineFn.from_runner_api(combine_payload.combine_fn, context)

Review comment:
       This will only work for Python combine fns (and should be guarded against that). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465917488



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    if sibling_key is None or len(sibling_stages) == 1:
+      continue
+    output_pcoll_ids = [
+        only_element(stage.transforms[0].outputs.values())
+        for stage in sibling_stages
+    ]
+    to_delete_pcoll_ids = output_pcoll_ids[1:]
+    for to_delete_pcoll_id in to_delete_pcoll_ids:
+      pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+      del context.components.pcollections[to_delete_pcoll_id]
+    del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    for stage in sibling_stages:
+      input_keys_to_remap = []
+      for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+        if input_pcoll_id in pcoll_id_remap:
+          input_keys_to_remap.append(input_key)
+      for input_key_to_remap in input_keys_to_remap:
+        stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+            stage.transforms[0].inputs[input_key_to_remap]]
+      yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  def _try_merge_environments(env1, env2):
+    if env1 is None:
+      return env2
+    elif env2 is None:
+      return env1
+    else:
+      if env1 != env2:
+        raise ValueError
+      return env1
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          _try_merge_environments,
+          [transform.environment_id or None for transform in transforms])
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),
+            component_coder_ids=accumulator_coder_ids))
+
+    # Build packed output coder for (key, (out1, out2, ...))
+    input_kv_coder_id = context.components.pcollections[input_pcoll_id].coder_id
+    key_coder_id = _get_key_coder_id_from_kv_coder(
+        context.components.coders[input_kv_coder_id])
+    output_kv_coder_ids = [
+        context.components.pcollections[output_pcoll_id].coder_id
+        for output_pcoll_id in output_pcoll_ids
+    ]
+    output_value_coder_ids = [
+        _get_value_coder_id_from_kv_coder(
+            context.components.coders[output_kv_coder_id])
+        for output_kv_coder_id in output_kv_coder_ids
+    ]
+    pack_output_value_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=output_value_coder_ids)
+    pack_output_value_coder_id = context.add_or_get_coder_id(
+        pack_output_value_coder)
+    pack_output_kv_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=[key_coder_id, pack_output_value_coder_id])
+    pack_output_kv_coder_id = context.add_or_get_coder_id(pack_output_kv_coder)
+
+    # Set up packed PCollection
+    pack_combine_name = fused_stage.name
+    pack_pcoll_id = unique_name(context.components.pcollections, 'pcollection')
+    input_pcoll = context.components.pcollections[input_pcoll_id]
+    context.components.pcollections[pack_pcoll_id].CopyFrom(
+        beam_runner_api_pb2.PCollection(
+            unique_name=pack_combine_name + '.out',
+            coder_id=pack_output_kv_coder_id,
+            windowing_strategy_id=input_pcoll.windowing_strategy_id,
+            is_bounded=input_pcoll.is_bounded))
+
+    # Set up Pack stage.
+    pack_combine_fn = combiners.SingleInputTupleCombineFn(*[
+        core.CombineFn.from_runner_api(combine_payload.combine_fn, context)
+        for combine_payload in combine_payloads
+    ]).to_runner_api(context)
+    pack_transform = beam_runner_api_pb2.PTransform(
+        unique_name=pack_combine_name + '/Pack',
+        spec=beam_runner_api_pb2.FunctionSpec(
+            urn=common_urns.composites.COMBINE_PER_KEY.urn,
+            payload=beam_runner_api_pb2.CombinePayload(
+                combine_fn=pack_combine_fn,
+                accumulator_coder_id=tuple_accumulator_coder_id)
+            .SerializeToString()),
+        inputs={'in': input_pcoll_id},
+        outputs={'out': pack_pcoll_id},
+        environment_id=merged_transform_environment_id)
+    pack_stage = Stage(
+        pack_combine_name + '/Pack', [pack_transform],
+        downstream_side_inputs=fused_stage.downstream_side_inputs,
+        must_follow=fused_stage.must_follow,
+        parent=fused_stage,
+        environment=fused_stage.environment)
+    yield pack_stage
+
+    # Set up Unpack stage
+    tags = [str(i) for i in range(len(output_pcoll_ids))]
+    pickled_do_fn_data = pickler.dumps((_UnpackFn(tags), (), {}, [], None))
+    unpack_transform = beam_runner_api_pb2.PTransform(
+        unique_name=pack_combine_name + '/Unpack',
+        spec=beam_runner_api_pb2.FunctionSpec(
+            urn=common_urns.primitives.PAR_DO.urn,
+            payload=beam_runner_api_pb2.ParDoPayload(
+                do_fn=beam_runner_api_pb2.FunctionSpec(
+                    urn=python_urns.PICKLED_DOFN_INFO,
+                    payload=pickled_do_fn_data)).SerializeToString()),
+        inputs={'in': pack_pcoll_id},
+        outputs=dict(zip(tags, output_pcoll_ids)),
+        environment_id=merged_transform_environment_id)

Review comment:
       How do I do this? Should I just check that `environment_id` is `None` in the original Transforms and Stages?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-655766565


    I initially thought this change would only impact fnapi runner and local execution. I assume the intention is to apply the optimization to all portable runners. I noticed that portable runner depends on fnapi_runner (https://github.com/apache/beam/blob/2dddc0c9d60315fd212a90bc2ac39fd2bcc8bf63/sdks/python/apache_beam/runners/portability/portable_runner.py#L51). So maybe this optimization applies to all portable executions.
   
   @robertwb - Do you know why portable_runner depends on fnapi_runner?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r476702785



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,200 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent and environment, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    is_packable_combine = False
+
+    if (len(stage.transforms) == 1 and
+        stage.environment is not None and
+        python_urns.PACKED_COMBINE_FN in
+        context.components.environments[stage.environment].capabilities):
+      transform = only_transform(stage.transforms)
+      if (transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and
+          len(transform.inputs) == 1 and
+          len(transform.outputs) == 1):
+        combine_payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.CombinePayload)
+        if combine_payload.combine_fn.urn == python_urns.PICKLED_COMBINE_FN:
+          is_packable_combine = True
+
+    if is_packable_combine:
+      input_pcoll_id = only_element(transform.inputs.values())
+      stage_key = (input_pcoll_id, stage.environment)
+      combine_stages_by_input_pcoll_id[stage_key].append(stage)
+    else:
+      yield stage
+
+  for stage_key, packable_stages in combine_stages_by_input_pcoll_id.items():
+    input_pcoll_id, _ = stage_key
+    try:
+      if not len(packable_stages) > 1:
+        raise ValueError('Only one stage in this group: Skipping stage packing')
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+    except ValueError:
+      # Skip packing stages in this group.
+      # Yield the stages unmodified, and then continue to the next group.
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),

Review comment:
       Now use `python_urns.TUPLE_CODER` here rather than the KV one.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -818,7 +819,7 @@ def _try_fuse_stages(a, b):
         for output_kv_coder_id in output_kv_coder_ids
     ]
     pack_output_value_coder = beam_runner_api_pb2.Coder(
-        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        spec=beam_runner_api_pb2.FunctionSpec(urn=python_urns.tuple.KV.urn),

Review comment:
       Shouldn't this be `python_urns.TUPLE_CODER`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r470309725



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          Stage._merge_environments,
+          [transform.environment_id or None for transform in transforms])
+      # Combiner packing only supports Python CombineFns.
+      for combine_payload in combine_payloads:
+        if combine_payload.combine_fn.urn != python_urns.PICKLED_COMBINE_FN:
+          raise ValueError('Combiner packing only supports Python CombineFns')
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue

Review comment:
       Moved most of the checks to the earlier part, and cleaned up the logic for this part (which we still need, because we need to make sure the whole group is fuseable).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r470309826



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(

Review comment:
       OK, I'm now using the fused_stage environment everywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-668132171


   @yifanmai could we get the combine fn packing stuff in this PR, and we can defer the sibling deduplication to a subsequent one? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-655823773


   @aaltay: portable_runner depends on these optimizations (when --pre_optimize=all) is set to get better fusion. 
   
   @yifanmai could you file a jira to run Dataflow through this optimization as well? (Probably doesn't makes sense to have it as part of this PR.)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-681052485


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@1e969b4`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12185/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #12185   +/-   ##
   =========================================
     Coverage          ?   34.30%           
   =========================================
     Files             ?      695           
     Lines             ?    82362           
     Branches          ?     9323           
   =========================================
     Hits              ?    28251           
     Misses            ?    53688           
     Partials          ?      423           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=footer). Last update [1e969b4...a2ab49c](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r457799457



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -289,6 +289,8 @@ def create_stages(
         phases=[
             translations.annotate_downstream_side_inputs,
             translations.fix_side_input_pcoll_coders,
+            translations.eliminate_common_siblings,

Review comment:
       As we discussed in a separate conversation, we would probably need a design or RFC for annotating DoFns are idempotent. As such, I've removed `eliminate_common_siblings` from this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-670789207


   @robertwb and @aaltay, gentle ping.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-654479012


   R: @aaltay 
   R: @robertwb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-681052485


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@1e969b4`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12185/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #12185   +/-   ##
   =========================================
     Coverage          ?   40.26%           
   =========================================
     Files             ?      451           
     Lines             ?    53247           
     Branches          ?        0           
   =========================================
     Hits              ?    21439           
     Misses            ?    31808           
     Partials          ?        0           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=footer). Last update [1e969b4...e02e09b](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r457797941



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    if sibling_key is None or len(sibling_stages) == 1:
+      continue
+    output_pcoll_ids = [
+        only_element(stage.transforms[0].outputs.values())
+        for stage in sibling_stages
+    ]
+    to_delete_pcoll_ids = output_pcoll_ids[1:]
+    for to_delete_pcoll_id in to_delete_pcoll_ids:
+      pcoll_id_remap[to_delete_pcoll_id] = output_pcoll_ids[0]
+      del context.components.pcollections[to_delete_pcoll_id]
+    del sibling_stages[1:]
+
+  # Yield stages while remapping output PCollections if needed.
+  for sibling_key, sibling_stages in stages_by_sibling_key.items():
+    for stage in sibling_stages:
+      input_keys_to_remap = []
+      for input_key, input_pcoll_id in stage.transforms[0].inputs.items():
+        if input_pcoll_id in pcoll_id_remap:
+          input_keys_to_remap.append(input_key)
+      for input_key_to_remap in input_keys_to_remap:
+        stage.transforms[0].inputs[input_key_to_remap] = pcoll_id_remap[
+            stage.transforms[0].inputs[input_key_to_remap]]
+      yield stage
+
+
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  def _try_merge_environments(env1, env2):
+    if env1 is None:
+      return env2
+    elif env2 is None:
+      return env1
+    else:
+      if env1 != env2:
+        raise ValueError
+      return env1
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          _try_merge_environments,
+          [transform.environment_id or None for transform in transforms])
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),
+            component_coder_ids=accumulator_coder_ids))
+
+    # Build packed output coder for (key, (out1, out2, ...))
+    input_kv_coder_id = context.components.pcollections[input_pcoll_id].coder_id
+    key_coder_id = _get_key_coder_id_from_kv_coder(
+        context.components.coders[input_kv_coder_id])
+    output_kv_coder_ids = [
+        context.components.pcollections[output_pcoll_id].coder_id
+        for output_pcoll_id in output_pcoll_ids
+    ]
+    output_value_coder_ids = [
+        _get_value_coder_id_from_kv_coder(
+            context.components.coders[output_kv_coder_id])
+        for output_kv_coder_id in output_kv_coder_ids
+    ]
+    pack_output_value_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=output_value_coder_ids)
+    pack_output_value_coder_id = context.add_or_get_coder_id(
+        pack_output_value_coder)
+    pack_output_kv_coder = beam_runner_api_pb2.Coder(
+        spec=beam_runner_api_pb2.FunctionSpec(urn=common_urns.coders.KV.urn),
+        component_coder_ids=[key_coder_id, pack_output_value_coder_id])
+    pack_output_kv_coder_id = context.add_or_get_coder_id(pack_output_kv_coder)
+
+    # Set up packed PCollection
+    pack_combine_name = fused_stage.name
+    pack_pcoll_id = unique_name(context.components.pcollections, 'pcollection')
+    input_pcoll = context.components.pcollections[input_pcoll_id]
+    context.components.pcollections[pack_pcoll_id].CopyFrom(
+        beam_runner_api_pb2.PCollection(
+            unique_name=pack_combine_name + '.out',
+            coder_id=pack_output_kv_coder_id,
+            windowing_strategy_id=input_pcoll.windowing_strategy_id,
+            is_bounded=input_pcoll.is_bounded))
+
+    # Set up Pack stage.
+    pack_combine_fn = combiners.SingleInputTupleCombineFn(*[
+        core.CombineFn.from_runner_api(combine_payload.combine_fn, context)

Review comment:
       How do I check that we're in a Python environment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r474256063



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,199 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    is_packable_combine = False
+
+    if (len(stage.transforms) == 1 and 
+        stage.environment is not None and
+        python_urns.PACKED_COMBINE_FN in
+        context.components.environments[stage.environment].capabilities):
+      transform = only_transform(stage.transforms)
+      if (transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and
+          len(transform.inputs) == 1 and
+          len(transform.outputs) == 1):
+        combine_payload = proto_utils.parse_Bytes(
+            transform.spec.payload, beam_runner_api_pb2.CombinePayload)
+        if combine_payload.combine_fn.urn == python_urns.PICKLED_COMBINE_FN:
+          is_packable_combine = True
+
+    if is_packable_combine:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)

Review comment:
       OK, we key by environment now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-681169251


   I created https://github.com/yifanmai/beam/pull/2 to fix the lint issues. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465914422



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}

Review comment:
       Noted. I'm not currently aware of any requirement to keeping this around beyond inside this optimizer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r474252169



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          Stage._merge_environments,
+          [transform.environment_id or None for transform in transforms])
+      # Combiner packing only supports Python CombineFns.
+      for combine_payload in combine_payloads:
+        if combine_payload.combine_fn.urn != python_urns.PICKLED_COMBINE_FN:
+          raise ValueError('Combiner packing only supports Python CombineFns')
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r470310041



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -690,6 +692,196 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def pack_combiners(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  """Packs sibling CombinePerKey stages into a single CombinePerKey.
+
+  If CombinePerKey stages have a common input, one input each, and one output
+  each, pack the stages into a single stage that runs all CombinePerKeys and
+  outputs resulting tuples to a new PCollection. A subsequent stage unpacks
+  tuples from this PCollection and sends them to the original output
+  PCollections.
+  """
+
+  class _UnpackFn(core.DoFn):
+    """A DoFn that unpacks a packed to multiple tagged outputs.
+
+    Example:
+      tags = (T1, T2, ...)
+      input = (K, (V1, V2, ...))
+      output = TaggedOutput(T1, (K, V1)), TaggedOutput(T2, (K, V1)), ...
+    """
+
+    def __init__(self, tags):
+      self._tags = tags
+
+    def process(self, element):
+      key, values = element
+      return [
+          core.pvalue.TaggedOutput(tag, (key, value))
+          for tag, value in zip(self._tags, values)
+      ]
+
+  def _get_fallback_coder_id():
+    return context.add_or_get_coder_id(
+        coders.registry.get_coder(object).to_runner_api(None))
+
+  def _get_component_coder_id_from_kv_coder(coder, index):
+    assert index < 2
+    if coder.spec.urn == common_urns.coders.KV.urn and len(
+        coder.component_coder_ids) == 2:
+      return coder.component_coder_ids[index]
+    return _get_fallback_coder_id()
+
+  def _get_key_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 0)
+
+  def _get_value_coder_id_from_kv_coder(coder):
+    return _get_component_coder_id_from_kv_coder(coder, 1)
+
+  def _try_fuse_stages(a, b):
+    if a.can_fuse(b, context):
+      return a.fuse(b)
+    else:
+      raise ValueError
+
+  # Group stages by parent, yielding ineligible stages.
+  combine_stages_by_input_pcoll_id = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn and len(
+        transform.inputs) == 1 and len(transform.outputs) == 1:
+      input_pcoll_id = only_element(transform.inputs.values())
+      combine_stages_by_input_pcoll_id[input_pcoll_id].append(stage)
+    else:
+      yield stage
+
+  for input_pcoll_id, packable_stages in combine_stages_by_input_pcoll_id.items(
+  ):
+    # Yield stage and continue if it has no siblings.
+    if len(packable_stages) == 1:
+      yield packable_stages[0]
+      continue
+
+    transforms = [only_transform(stage.transforms) for stage in packable_stages]
+    combine_payloads = [
+        proto_utils.parse_Bytes(transform.spec.payload,
+                                beam_runner_api_pb2.CombinePayload)
+        for transform in transforms
+    ]
+
+    # Yield stages and continue if they cannot be packed.
+    try:
+      # Fused stage is used as template and is not yielded.
+      fused_stage = functools.reduce(_try_fuse_stages, packable_stages)
+      merged_transform_environment_id = functools.reduce(
+          Stage._merge_environments,
+          [transform.environment_id or None for transform in transforms])
+      # Combiner packing only supports Python CombineFns.
+      for combine_payload in combine_payloads:
+        if combine_payload.combine_fn.urn != python_urns.PICKLED_COMBINE_FN:
+          raise ValueError('Combiner packing only supports Python CombineFns')
+    except ValueError:
+      for stage in packable_stages:
+        yield stage
+      continue
+
+    output_pcoll_ids = [
+        only_element(transform.outputs.values()) for transform in transforms
+    ]
+
+    # Build accumulator coder for (acc1, acc2, ...)
+    accumulator_coder_ids = [
+        combine_payload.accumulator_coder_id
+        for combine_payload in combine_payloads
+    ]
+    tuple_accumulator_coder_id = context.add_or_get_coder_id(
+        beam_runner_api_pb2.Coder(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=common_urns.coders.KV.urn),

Review comment:
       I didn't understand this comment; could you clarify?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-681052485


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request base (`master@1e969b4`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12185/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master   #12185   +/-   ##
   =========================================
     Coverage          ?   40.26%           
   =========================================
     Files             ?      451           
     Lines             ?    53247           
     Branches          ?        0           
   =========================================
     Hits              ?    21439           
     Misses            ?    31808           
     Partials          ?        0           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=footer). Last update [1e969b4...e02e09b](https://codecov.io/gh/apache/beam/pull/12185?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-682389686


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12185: [BEAM-10409] Add combiner packing to graph optimizer phases

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12185:
URL: https://github.com/apache/beam/pull/12185#issuecomment-682213413


   If you could merge https://github.com/yifanmai/beam/pull/2 that should fix the lint issues and then we can get this merged. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org