You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/25 19:05:42 UTC

[GitHub] [beam] robertwb commented on a change in pull request #15351: Basic projection pushdown in Python/Runner API.

robertwb commented on a change in pull request #15351:
URL: https://github.com/apache/beam/pull/15351#discussion_r696029511



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -277,6 +277,9 @@ def __init__(self,
     self.setup_done = False
     self.step_name = None  # type: Optional[str]
 
+  def actuate_pushdown(self, fields):

Review comment:
       I don't off-hand, but it's a stack of DoFnInvokers and ParDoEvaluators that stuff that should be traceable from the bundle evaluation code. There are more layers, but I think most of them will just be pass-throughs. 

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1196,7 +1196,12 @@ def create_operation(self,
     creator, parameter_type = self._known_urns[transform_proto.spec.urn]
     payload = proto_utils.parse_Bytes(
         transform_proto.spec.payload, parameter_type)
-    return creator(self, transform_id, transform_proto, payload, consumers)
+    operation = creator(self, transform_id, transform_proto, payload, consumers)
+    if common_urns.actuate_pushdown_annotation in transform_proto.annotations:

Review comment:
       This is not the case for the legacy worker, but we should not be developing new features on the legacy worker. 
   
   (We could probably add this information in the v1beta3 translation if we wanted to support legacy.)

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -197,6 +198,38 @@ def run_pipeline(self,
     return self._latest_run_result
 
   def run_via_runner_api(self, pipeline_proto):
+
+    def pushdown_projections(pipeline_proto):
+      leaf_consumers = collections.defaultdict(list)
+      for transform in pipeline_proto.components.transforms.values():
+        for pc in transform.inputs.values():
+          if not transform.subtransforms:
+            leaf_consumers[pc].append(transform)
+
+      for transform in pipeline_proto.components.transforms.values():
+        if transform.subtransforms or not transform.outputs:
+          continue
+        if not common_urns.support_pushdown_annotation in transform.annotations:
+          continue
+        # The annotations should really be per input and output.

Review comment:
       If T declares that it requests or passes through projections, it should be sure to have an implementation that supports these thinned-down inputs. I'm still not sure how to support absorbing pushdowns without forcing it to do so in its primitives. 

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1196,7 +1196,12 @@ def create_operation(self,
     creator, parameter_type = self._known_urns[transform_proto.spec.urn]
     payload = proto_utils.parse_Bytes(
         transform_proto.spec.payload, parameter_type)
-    return creator(self, transform_id, transform_proto, payload, consumers)
+    operation = creator(self, transform_id, transform_proto, payload, consumers)
+    if common_urns.actuate_pushdown_annotation in transform_proto.annotations:
+      operation.actuate_pushdown(

Review comment:
       This is only called on transforms that previously set the support_pushdown_annotation bit. (Currently, that's only DoFns, but any transform could work here.) One restriction is that this is only for primitive, execution-time transforms. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org