You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/24 09:44:15 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #17787: [AIRFLOW-9300] Add DatafusionPipelineStateSensor and aync option to the CloudDataFusionStartPipelineOperator

potiuk commented on a change in pull request #17787:
URL: https://github.com/apache/airflow/pull/17787#discussion_r694691387



##########
File path: airflow/providers/google/cloud/example_dags/example_datafusion.py
##########
@@ -234,6 +254,14 @@
 
     create_instance >> get_instance >> restart_instance >> update_instance >> sleep
     sleep >> create_pipeline >> list_pipelines >> start_pipeline >> stop_pipeline >> delete_pipeline
+    (

Review comment:
       There should be no repeated create/start/stop/delete here with dependencies between them.
   
   It is quite confusing what the example is trying to do and how it will behave and how the DAG will look like.  
   
   This dag structure below seems to repeat the same dependencies again for sleep --> create, stop --> delete. and this is totally needless . Plus I am not sure what happens if the async and sync will work in parallel for the same pipeline at the same time (which looks like is going to happen here). 
   
   Sounds like recipe for disaster trying to run both sync and async run for the same pipeline at the same time (but maybe it will work who knows - i do not know DataFusion that well:D ). 
   
   In order to avoid confusion about the dag definition, this should look more like:
   
   ```
   sleep >> create_pipeline >> list_pipelines >> start_pipeline >> stop_pipeline >>  start_pipeline_async, start_pipeline_sensor >> delete_pipeline
   ```
   
   This shows much better the intentions , and allows to run first synchronous version of the pipeline and then asynchronous one.
   
   Or maybe  even you should separate it out and prepare a separate "example_datafusion_async.py" - as I am not sure if the same pipeline can be run twice in succession (probably yes and if that's the case then single example with both sync and async version is fine).

##########
File path: airflow/providers/google/cloud/example_dags/example_datafusion.py
##########
@@ -205,6 +206,25 @@
     )
     # [END howto_cloud_data_fusion_start_pipeline]
 
+    # [START howto_cloud_data_fusion_start_pipeline_async]
+    start_pipeline_async = CloudDataFusionStartPipelineOperator(
+        location=LOCATION,
+        pipeline_name=PIPELINE_NAME,
+        instance_name=INSTANCE_NAME,
+        asynchronous=True,
+        task_id="start_pipeline_async",
+    )
+
+    start_pipeline_sensor = DatafusionPipelineStateSensor(
+        task_id="pipeline_state_sensor",
+        pipeline_name=PIPELINE_NAME,
+        pipeline_id=start_pipeline_async.output,
+        expected_statuses=["COMPLETED"],
+        instance_name=INSTANCE_NAME,
+        location=LOCATION,
+    )
+    # [END howto_cloud_data_fusion_start_pipeline_async]

Review comment:
       Those START/END markers are here for a reason - they are used as entries in https://github.com/apache/airflow/blob/main/docs/apache-airflow-providers-google/operators/cloud/datafusion.rst to automatically extract pieces of the example dag to the "Howto" documentation. The Howto documentation needs to get now a separate sync/async sections.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org