You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Oduig (via GitHub)" <gi...@apache.org> on 2023/02/10 10:33:38 UTC

[GitHub] [airflow] Oduig opened a new issue, #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice

Oduig opened a new issue, #29461:
URL: https://github.com/apache/airflow/issues/29461

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   The `on_failure_callback` is invoked twice when a DAG fails due to a timeout. This leads to duplicate failure alerts in our Slack channels. It happens reliably and the invocations are roughly 5 seconds apart. Context and full stack trace in both invocations are identical.
   
   ### What you think should happen instead
   
   The `on_failure_callback` should only be invoked once
   
   ### How to reproduce
   
   Here is the DAG which triggers the effect
   
   ```
   import pendulum
   from datetime import timedelta
   from airflow import DAG
   from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
   import logging
   import requests
   
   dag_settings = {
       "dag_id": f"INT_tableau_others_recommendation_classifications",
       "max_active_runs": 1,
       "dagrun_timeout": timedelta(minutes=1),
       "start_date": pendulum.today("UTC"),
       "default_args": {
           "owner": "airflow",
           "catchup": False
       },
       "tags": ["env:testing"],
       "on_failure_callback": (
           lambda context: [
               logging.info(f"Minimal example - failure callback with context: {context}"),
               # Since logging.info does not work for us, we add a request to RequestBin
               # See https://github.com/apache/airflow/issues/29442
               requests.post(
                   "https://<removed-for-publication>.m.pipedream.net",
                   json={"payload": f"Minimal example - failure callback with context: {context}"}
               )
           ]
       )
   }
   dag = DAG(**dag_settings)
   logging.info(f"Minimal example - Created DAG {dag.dag_id}.")
   
   params = [
       "--class", "com.example.Launcher",
       f"dbfs:/libraries/scala/example-fat-jar.jar"
   ]
   
   DatabricksSubmitRunOperator(
       task_id="update_dataset",
       dag=dag,
       databricks_conn_id="databricks_default",
       spark_submit_task={
           "parameters": params
       },
       new_cluster={
           "spark_version":
           "10.4.x-cpu-ml-scala2.12",
           "spark_env_vars": {
               "JNAME": "zulu11-ca-amd64"  # Use JDK 11
           },
           "spark_conf": {
               "spark.sql.session.timeZone": "UTC"
           },
           "aws_attributes": {
               "instance_profile_arn": "arn:aws:iam::765<removed-for-publication>:instance-profile/DatabricksExecution"
           },
           "instance_pool_id": "1011-<removed-for-publication>",
           "driver_instance_pool_id": "1011-<removed-for-publication>",
           "num_workers": 1
       }
   )
   ```
   
   ### Operating System
   
   composer-2.0.32
   
   ### Versions of Apache Airflow Providers
   
   Here is the full `requirements.txt`
   
   ```
   apache-airflow-providers-databricks==4.0.0
   databricks-sql-connector==2.1.0
   apache-beam~=2.43.0
   sqlalchemy-bigquery==1.5.0
   requests~=2.28.1
   apache-airflow-providers-tableau==4.0.0
   apache-airflow-providers-sendgrid==3.1.0
   python-dotenv==0.21.0
   urllib3~=1.26.8
   tableauserverclient==0.23
   apache-airflow-providers-http==4.1.0
   # time library in airflow
   pendulum==2.1.2
   ```
   
   ### Deployment
   
   Composer
   
   ### Deployment details
   
   We are running a Cloud Compose environment with image `composer-2.0.32-airflow-2.3.4`
   
   ### Anything else
   
   Since logging inside the callback is not working for us (29442), here is a screenshot from RequestBin. Both invocations share the same log line, the context and stack trace is also the same.
   
   ![image](https://user-images.githubusercontent.com/3661031/218069601-dada3f0e-4494-4cf3-a274-f5d3b8270ac8.png)
   
   Here is the complete stack trace for either invocation.
   
   ```
   File "/opt/python3.8/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/__main__.py", line 38, in main
       args.func(args)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 51, in command
       return func(*args, **kwargs)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 101, in wrapper
       return f(*args, **kwargs)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 76, in scheduler
       _run_scheduler_job(args=args)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
       job.run()
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 244, in run
       self._execute()
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 748, in _execute
       self.processor_agent.start()
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 158, in start
       process.start()
     File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
       self._popen = self._Popen(self)
     File "/opt/python3.8/lib/python3.8/multiprocessing/context.py", line 277, in _Popen
       return Popen(process_obj)
     File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
       self._launch(process_obj)
     File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line 75, in _launch
       code = process_obj._bootstrap(parent_sentinel=child_r)
     File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
       self.run()
     File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
       self._target(*self._args, **self._kwargs)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 255, in _run_processor_manager
       processor_manager.start()
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 486, in start
       return self._run_parsing_loop()
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 600, in _run_parsing_loop
       self.start_new_processes()
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 1003, in start_new_processes
       processor.start()
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 194, in start
       process.start()
     File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
       self._popen = self._Popen(self)
     File "/opt/python3.8/lib/python3.8/multiprocessing/context.py", line 277, in _Popen
       return Popen(process_obj)
     File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
       self._launch(process_obj)
     File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line 75, in _launch
       code = process_obj._bootstrap(parent_sentinel=child_r)
     File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
       self.run()
     File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
       self._target(*self._args, **self._kwargs)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 155, in _run_file_processor
       result: Tuple[int, int] = dag_file_processor.process_file(
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 656, in process_file
       self.execute_callbacks(dagbag, callback_requests)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 581, in execute_callbacks
       self._execute_dag_callbacks(dagbag, request, session)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 595, in _execute_dag_callbacks
       dag.handle_callback(
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper
       return func(*args, **kwargs)
     File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/dag.py", line 1178, in handle_callback
       callback(context)
     File "/home/airflow/gcs/dags/integration_test/example.py"
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1558237961

   This issue has been closed because it has not received response from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1548782446

   This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Oduig commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice

Posted by "Oduig (via GitHub)" <gi...@apache.org>.
Oduig commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1427514402

   Imo it is fair that QoS at-least-once applies here, and that sometimes a scenario can occur where two processes call a callback simultaneously, or a notification delivery succeeded but appeared to the sender as a failure, and it would be delivered again.
   
   However, a callback that is always called twice does not fit at-least-once semantics. In addition, with a BashOperator timeout, the callback is only executed once, so it seems like a solvable problem. Finally, when there is a timeout, the running task is marked skipped rather than failed - only the DAG itself fails, in my eyes this means the task level shouldn't even attempt to call the callback because it itself didn't fail


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hussein-awala commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice

Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1427138045

   Duplicate of #27614
   
   Please can check @potiuk explanation in [this comment](https://github.com/apache/airflow/issues/27614#issuecomment-1336132339) to understand why it's currently executed twice (and maybe more in some cases).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1437372043

   @Oduig  Does the error occur only for Databricks Submit Run or for other operators as well ?  Can you please double check and report back? 
   
   Also can you please explain in which processes the callbacks are originated - is the same scheduler/dagfile processor that is producing the same stack trace or two different schedulers ? We miss the information on your deployment about those details - we also have no idea how composer internals work, and in such case when there are always duplicate callbacks it seems that it migh be related to some duplication resulting from deployment (but this is a wild guess).. 
   
   It is quite likely that this is maybe even related to some specific deployment issues in 2.3.4 so I also advice you to move to 2.4.3 (which is currently latest in Composer). and report back to us if it hapens there. Also note that in case we will find and release fix here, it will be only released in the latest airflow (2.5.2 in-progress currently), so you will have to upgrade anyway to apply the fix, but more likely this is somehow a deployment issue and you will have to go through regular Composer issue (upgrading to 2.4.3 now to check if the problem persists is likely much faster route for you so I recommend you do it as  first step) 
   
   If you upgrade, reproduce, provide more information on the deployment and origin of those callbacks (if they will happen again after upgrade), then we can likely help you to identify what problem you should report to Composer (as long as we won't find during the process that this is something regular airflow also has). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Oduig commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice

Posted by "Oduig (via GitHub)" <gi...@apache.org>.
Oduig commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1438575233

   If I use a `BashOperator` instead, the failure callback is only invoked once. I checked the stack traces by using `traceback.extract_stack()`. It could be related to Composer, we can try to upgrade it. Let me get back to you on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
URL: https://github.com/apache/airflow/issues/29461


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org