You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/19 19:15:04 UTC

[GitHub] [beam] ibzib commented on a change in pull request #15351: Basic projection pushdown in Python/Runner API.

ibzib commented on a change in pull request #15351:
URL: https://github.com/apache/beam/pull/15351#discussion_r692391409



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -277,6 +277,9 @@ def __init__(self,
     self.setup_done = False
     self.step_name = None  # type: Optional[str]
 
+  def actuate_pushdown(self, fields):

Review comment:
       Do you know what the equivalent class is in Java?

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1196,7 +1196,12 @@ def create_operation(self,
     creator, parameter_type = self._known_urns[transform_proto.spec.urn]
     payload = proto_utils.parse_Bytes(
         transform_proto.spec.payload, parameter_type)
-    return creator(self, transform_id, transform_proto, payload, consumers)
+    operation = creator(self, transform_id, transform_proto, payload, consumers)
+    if common_urns.actuate_pushdown_annotation in transform_proto.annotations:
+      operation.actuate_pushdown(

Review comment:
       Does this assume the transform is a ParDo?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -197,6 +198,38 @@ def run_pipeline(self,
     return self._latest_run_result
 
   def run_via_runner_api(self, pipeline_proto):
+
+    def pushdown_projections(pipeline_proto):
+      leaf_consumers = collections.defaultdict(list)
+      for transform in pipeline_proto.components.transforms.values():
+        for pc in transform.inputs.values():
+          if not transform.subtransforms:
+            leaf_consumers[pc].append(transform)
+
+      for transform in pipeline_proto.components.transforms.values():
+        if transform.subtransforms or not transform.outputs:
+          continue
+        if not common_urns.support_pushdown_annotation in transform.annotations:
+          continue
+        # The annotations should really be per input and output.

Review comment:
       This is a really good point. And it will require us to be careful with composite sources. Let's say we have a composite source T:
   
   ptransform T : pcollection A -> pcoll C
   - pardo T.f : pcoll A -> pcoll B
   - ptransform T.g : pcoll B -> pcoll C
   
   If we request pushdown on pcollection C, but the actual read from source happens in T.f, what do we do? T.f doesn't know anything about pcoll C. The main problem is that we're saying arbitrary PTransforms support pushdown, but really only a single ParDo does.

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1196,7 +1196,12 @@ def create_operation(self,
     creator, parameter_type = self._known_urns[transform_proto.spec.urn]
     payload = proto_utils.parse_Bytes(
         transform_proto.spec.payload, parameter_type)
-    return creator(self, transform_id, transform_proto, payload, consumers)
+    operation = creator(self, transform_id, transform_proto, payload, consumers)
+    if common_urns.actuate_pushdown_annotation in transform_proto.annotations:

Review comment:
       This approach requires the worker to have access to the runner API proto (or at least some derivative of it that also contains the pushdown annotations). Do you know if that's the case for Dataflow's legacy Java worker?
   
   cc @apilloud 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org