You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/09/10 00:29:39 UTC

[GitHub] [beam] robertwb commented on a change in pull request #15391: [BEAM-12798] Add configurable combiner packing limit

robertwb commented on a change in pull request #15391:
URL: https://github.com/apache/beam/pull/15391#discussion_r705809380



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -818,7 +841,7 @@ def _eliminate_common_key_with_none(stages, context, can_pack=lambda s: True):
   # elimination, and group eligible KeyWithNone stages by parent and
   # environment.
   def get_stage_key(stage):
-    if len(stage.transforms) == 1 and can_pack(stage.name):
+    if len(stage.transforms) == 1 and int(can_pack(stage.name)) > 0:

Review comment:
       `can_pack(stage.name)` is equivalent (assuming no negative values) and more clear. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -765,6 +767,27 @@ def _group_stages_by_key(stages, get_stage_key):
   return (grouped_stages, stages_with_none_key)
 
 
+def _group_stages_with_limit(stages, get_limit):
+  # type: (Iterable[Stage], Callable[[[str], int]]) -> Iterable[Iterable[Stage]]
+  stages_with_limit = [(stage, get_limit(stage.name)) for stage in stages]
+  group = []
+  group_limit = 0
+  for stage, limit in sorted(stages_with_limit, key=operator.itemgetter(1)):
+    if limit < 1:
+      raise Exception(
+          'expected get_limit to return an integer >= 1, '
+          'instead got: %d for stage: %s' % (limit, stage))
+    if not group:
+      group_limit = limit
+    assert len(group) < group_limit
+    group.append(stage)

Review comment:
       This will put stages into groups that are too big, e.g. if we have
   
   `[(big, 10), (s1, 2), (s2, 2)]`
   
   they will all end up in the same group. Instead, it seems you should append if len(group) < limit, else make a new group. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -919,10 +945,17 @@ def _try_fuse_stages(a, b):
     else:
       raise ValueError
 
+  def _get_limit(stage_name):
+    result = can_pack(stage_name)
+    if result is True:
+      return _DEFAULT_PACK_COMBINERS_LIMIT
+    else:
+      return int(result)
+
   # Partition stages by whether they are eligible for CombinePerKey packing
   # and group eligible CombinePerKey stages by parent and environment.
   def get_stage_key(stage):
-    if (len(stage.transforms) == 1 and can_pack(stage.name) and
+    if (len(stage.transforms) == 1 and int(can_pack(stage.name)) > 0 and

Review comment:
       same




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org