You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "CYarros10 (via GitHub)" <gi...@apache.org> on 2023/03/09 22:00:01 UTC

[GitHub] [airflow] CYarros10 opened a new issue, #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

CYarros10 opened a new issue, #30007:
URL: https://github.com/apache/airflow/issues/30007

   ### Apache Airflow version
   
   2.5.1
   
   ### What happened
   
   [BeamRunPythonPipelineOperator](https://github.com/apache/airflow/blob/main/airflow/providers/apache/beam/operators/beam.py#L343)) does not push values to xcoms when the pipeline starts. But Dataflow Sensors work like this:
   
   ```
           discover_cancelling_jobs = DataflowJobStatusSensor(
               task_id="discover_cancelling_jobs",
               job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
               expected_statuses={DataflowJobStatus.JOB_STATE_CANCELLING},
               location="{{region}}",
               mode="poke"
           )
   ```
   
   Since the only way to retrieve Dataflow Job ID from a BeamRunPythonPipelineOperator is through xcom, and BeamRunPythonPipelineOperator does not push this xcom until the pipeline ends, the Sensor can't "sense". It will only be able to read jobs that are done.
   
   
   
   
   ### What you think should happen instead
   
   The dataflow Job ID should be pushed to xcom when/before the pipeline starts. 
   
   ### How to reproduce
   
   Sample Code
   
   ```
    # -------------------------------------------------------------------------
       # Dataflow
       # -------------------------------------------------------------------------
   
       with TaskGroup(group_id="dataflow_tg1") as dataflow_tg1:
   
   
           start_python_job = BeamRunPythonPipelineOperator(
               task_id="start_python_job",
               runner="DataflowRunner",
               py_file="gs://{{gcs_download_bucket}}/{{df_python_script}}",
               py_options=[],
               pipeline_options={
                   "output": "gs://{{gcs_download_bucket}}/dataflow_output",
               },
               py_requirements=["apache-beam[gcp]==2.36.0"],
               py_interpreter="python3",
               py_system_site_packages=False,
               dataflow_config={
                   "job_name": "{{df_job}}-python",
                   "wait_until_finished": False,
               },
           )
   
           start_python_job_async = BeamRunPythonPipelineOperator(
               task_id="start_python_job_async",
               runner="DataflowRunner",
               py_file="gs://{{gcs_download_bucket}}/{{df_python_script}}",
               py_options=[],
               pipeline_options={
                   "output": "gs://{{gcs_download_bucket}}/dataflow_output",
               },
               py_requirements=["apache-beam[gcp]==2.36.0"],
               py_interpreter="python3",
               py_system_site_packages=False,
               dataflow_config={
                   "job_name": "{{df_job}}-aysnc",
                   "wait_until_finished": False,
               },
           )
   
           start_template_job = DataflowTemplatedJobStartOperator(
               task_id="start_template_job",
               job_name="{{df_job}}-template",
               project_id="{{ project_id }}",
               template="gs://dataflow-templates/latest/Word_Count",
               parameters={"inputFile": "gs://{{gcs_download_bucket}}/{{gcs_download_obj}}", "output": "gs://{{gcs_download_bucket}}/dataflow_output"},
               location="{{region}}",
           )
   
   
           wait_for_python_job_async_done = DataflowJobStatusSensor(
               task_id="wait_for_python_job_async_done",
               job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
               expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
               location="{{region}}",
               mode="reschedule",
               poke_interval=60
           )
   
           def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
               """Check is metric greater than equals to given value."""
   
               def callback(metrics) -> bool:
                   dag.log.info("Looking for '%s' >= %d", metric_name, value)
                   for metric in metrics:
                       context = metric.get("name", {}).get("context", {})
                       original_name = context.get("original_name", "")
                       tentative = context.get("tentative", "")
                       if original_name == "Service-cpu_num_seconds" and not tentative:
                           return metric["scalar"] >= value
                   raise AirflowException(f"Metric '{metric_name}' not found in metrics")
   
               return callback
   
           wait_for_python_job_async_metric = DataflowJobMetricsSensor(
               task_id="wait_for_python_job_async_metric",
               job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}", # this doesnt work
               location="{{region}}",
               callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
               fail_on_terminal_state=False,
               mode="reschedule",
               poke_interval=60
           )
   
   
           def check_autoscaling_event(autoscaling_events) -> bool:
               """Check autoscaling event"""
               for autoscaling_event in autoscaling_events:
                   if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
                       return True
               return False
   
           wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
               task_id="wait_for_python_job_async_autoscaling_event",
               job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}", # this doesnt work
               location="{{region}}",
               callback=check_autoscaling_event,
               fail_on_terminal_state=False,
               mode="reschedule",
               poke_interval=60
           )
   
           stop_python_job = DataflowStopJobOperator(
               task_id="stop_python_dataflow_job",
               location="{{region}}",
               job_name_prefix="{{task_instance.xcom_pull('start_python_job')['dataflow_job_config']['job_id']}}",
           )
   
           stop_template_job = DataflowStopJobOperator(
               task_id="stop_dataflow_job",
               location="{{region}}",
               job_name_prefix="{{df_job}}-template",
           )
   
           stop_async_job = DataflowStopJobOperator(
               task_id="stop_async_job",
               location="{{region}}",
               job_name_prefix="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
           )
   
           start_python_job >> stop_python_job
       
           start_template_job >> stop_template_job
   
           start_python_job_async >> stop_async_job
   
           wait_for_python_job_async_metric
   
           wait_for_python_job_async_autoscaling_event
           
           wait_for_python_job_async_done
    ```
   
   ### Operating System
   
   composer-2.1.5-airflow-2.4.3
   
   ### Versions of Apache Airflow Providers
   
   2.4.3
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   Occurs every time
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] CYarros10 commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "CYarros10 (via GitHub)" <gi...@apache.org>.
CYarros10 commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1549832192

   Have you tested this? the documentation is inconsistent and not reliable to solely go off of. for example, documentation you referenced states:
   
   `    wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
           task_id="wait-for-python-job-async-done",
           job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
           expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
           project_id=GCP_PROJECT_ID,
           location='us-central1',
       )`
       
       and dataflow_job_id is not actually in the xcom


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1462883086

   Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1734233580

   @zeotuan All yours!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] zeotuan commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "zeotuan (via GitHub)" <gi...@apache.org>.
zeotuan commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1728580476

   I can take on this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] githubwua commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "githubwua (via GitHub)" <gi...@apache.org>.
githubwua commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1549749020

   job_id is stored in:
               job_id="{{task_instance.xcom_pull('start_python_job_async')['job_id']}}",
   
   It is not stored in:
               job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
   
   Code Reference:
   
   https://github.com/apache/airflow/blob/94cad11b439e0ab102268e9e7221b0ab9d98e0df/airflow/providers/apache/beam/operators/beam.py#L115
   
   If you modify your code to retrieve dataflow job_id correctly, you will be able to retrieve it.
   
   To illustrate how this is done, here is a sample code on how to retrieve Dataflow job id:
   
   https://airflow.apache.org/docs/apache-airflow-providers-apache-beam/1.0.0/_modules/airflow/providers/apache/beam/example_dags/example_beam.html
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hubert-pietron commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "hubert-pietron (via GitHub)" <gi...@apache.org>.
hubert-pietron commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1491338495

   Can I take care of this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1473875502

   Agreed. `dataflow_job_id` should be pushed to XCom as early as its known in not only BeanRunPythonPipelineOperator but also the Java and Go versions of the operator as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] githubwua commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "githubwua (via GitHub)" <gi...@apache.org>.
githubwua commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1552569028

   You are right.  I was asked to take a look at this issue, and didn't have a chance to read the issue description in detail.
   I was only checking successful runs so I was able to get the dataflow job ids.
   
   The dataflow job id is indeed only available after a Dataflow job finishes successfully.  
   It is not available when a Dataflow job starts or while it is running.
   
   In a perfect world where no issue occurs, this is fine, but in the real world, when a Dataflow job gets cancelled, there is no job id to track the cancelled Dataflow job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] josh-fell commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "josh-fell (via GitHub)" <gi...@apache.org>.
josh-fell commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1491759200

   @hubert-pietron Sure thing, all yours!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hubert-pietron commented on issue #30007: BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless

Posted by "hubert-pietron (via GitHub)" <gi...@apache.org>.
hubert-pietron commented on issue #30007:
URL: https://github.com/apache/airflow/issues/30007#issuecomment-1546846063

   I need to unassigned myself, currently by the change of work I do not have time to look into the problem :/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org