You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/08 22:02:03 UTC

[GitHub] [beam] yifanmai commented on a change in pull request #14161: [BEAM-10409] Use annotations to control combiner packing.

yifanmai commented on a change in pull request #14161:
URL: https://github.com/apache/beam/pull/14161#discussion_r589651185



##########
File path: sdks/python/apache_beam/portability/python_urns.py
##########
@@ -55,3 +55,8 @@
 # and artifact fetching code.
 # (Used for testing.)
 SUBPROCESS_SDK = "beam:env:harness_subprocess_python:v1"
+
+# An annotation that indicates combiner packing is OK in all sub-transforms
+# of this transform.  This optimization may result in renamed counters and
+# PCollection element counts.
+APPLY_COMBINER_PACKING = "beam:annotation:apply_combiner_packing:v1"

Review comment:
       nit: add back newline

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1053,10 +1053,34 @@ def get_stage_key(stage):
     yield unpack_stage
 
 
-def pack_combiners(stages, context):
+def pack_combiners(stages, context, can_pack=None):
   # type: (Iterable[Stage], TransformContext) -> Iterator[Stage]
+  if can_pack is None:
+    can_pack_names = {}
+    parents = context.parents_map()
+
+    def can_pack(name):
+      if name in can_pack_names:
+        return can_pack_names[name]
+      else:
+        transform = context.components.transforms[name]
+        if python_urns.APPLY_COMBINER_PACKING in transform.annotations:
+          result = True
+        elif name in parents:
+          result = can_pack(parents[name])

Review comment:
       Could you explain why we check `parents` here for my understanding?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
##########
@@ -278,6 +279,48 @@ def expand(self, pcoll):
       vals = [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]
       _ = pipeline | Create(vals) | 'multiple-combines' >> MultipleCombines()
 
+  def test_conditionally_packed_combiners(self):

Review comment:
       Thanks for the test!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org