You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Oduig (via GitHub)" <gi...@apache.org> on 2023/02/10 10:33:38 UTC
[GitHub] [airflow] Oduig opened a new issue, #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
Oduig opened a new issue, #29461:
URL: https://github.com/apache/airflow/issues/29461
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
The `on_failure_callback` is invoked twice when a DAG fails due to a timeout. This leads to duplicate failure alerts in our Slack channels. It happens reliably and the invocations are roughly 5 seconds apart. Context and full stack trace in both invocations are identical.
### What you think should happen instead
The `on_failure_callback` should only be invoked once
### How to reproduce
Here is the DAG which triggers the effect
```
import pendulum
from datetime import timedelta
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
import logging
import requests
dag_settings = {
"dag_id": f"INT_tableau_others_recommendation_classifications",
"max_active_runs": 1,
"dagrun_timeout": timedelta(minutes=1),
"start_date": pendulum.today("UTC"),
"default_args": {
"owner": "airflow",
"catchup": False
},
"tags": ["env:testing"],
"on_failure_callback": (
lambda context: [
logging.info(f"Minimal example - failure callback with context: {context}"),
# Since logging.info does not work for us, we add a request to RequestBin
# See https://github.com/apache/airflow/issues/29442
requests.post(
"https://<removed-for-publication>.m.pipedream.net",
json={"payload": f"Minimal example - failure callback with context: {context}"}
)
]
)
}
dag = DAG(**dag_settings)
logging.info(f"Minimal example - Created DAG {dag.dag_id}.")
params = [
"--class", "com.example.Launcher",
f"dbfs:/libraries/scala/example-fat-jar.jar"
]
DatabricksSubmitRunOperator(
task_id="update_dataset",
dag=dag,
databricks_conn_id="databricks_default",
spark_submit_task={
"parameters": params
},
new_cluster={
"spark_version":
"10.4.x-cpu-ml-scala2.12",
"spark_env_vars": {
"JNAME": "zulu11-ca-amd64" # Use JDK 11
},
"spark_conf": {
"spark.sql.session.timeZone": "UTC"
},
"aws_attributes": {
"instance_profile_arn": "arn:aws:iam::765<removed-for-publication>:instance-profile/DatabricksExecution"
},
"instance_pool_id": "1011-<removed-for-publication>",
"driver_instance_pool_id": "1011-<removed-for-publication>",
"num_workers": 1
}
)
```
### Operating System
composer-2.0.32
### Versions of Apache Airflow Providers
Here is the full `requirements.txt`
```
apache-airflow-providers-databricks==4.0.0
databricks-sql-connector==2.1.0
apache-beam~=2.43.0
sqlalchemy-bigquery==1.5.0
requests~=2.28.1
apache-airflow-providers-tableau==4.0.0
apache-airflow-providers-sendgrid==3.1.0
python-dotenv==0.21.0
urllib3~=1.26.8
tableauserverclient==0.23
apache-airflow-providers-http==4.1.0
# time library in airflow
pendulum==2.1.2
```
### Deployment
Composer
### Deployment details
We are running a Cloud Compose environment with image `composer-2.0.32-airflow-2.3.4`
### Anything else
Since logging inside the callback is not working for us (29442), here is a screenshot from RequestBin. Both invocations share the same log line, the context and stack trace is also the same.
![image](https://user-images.githubusercontent.com/3661031/218069601-dada3f0e-4494-4cf3-a274-f5d3b8270ac8.png)
Here is the complete stack trace for either invocation.
```
File "/opt/python3.8/bin/airflow", line 8, in <module>
sys.exit(main())
File "/opt/python3.8/lib/python3.8/site-packages/airflow/__main__.py", line 38, in main
args.func(args)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 51, in command
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 101, in wrapper
return f(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 76, in scheduler
_run_scheduler_job(args=args)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
job.run()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 244, in run
self._execute()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 748, in _execute
self.processor_agent.start()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 158, in start
process.start()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/opt/python3.8/lib/python3.8/multiprocessing/context.py", line 277, in _Popen
return Popen(process_obj)
File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line 75, in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 255, in _run_processor_manager
processor_manager.start()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 486, in start
return self._run_parsing_loop()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 600, in _run_parsing_loop
self.start_new_processes()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 1003, in start_new_processes
processor.start()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 194, in start
process.start()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/opt/python3.8/lib/python3.8/multiprocessing/context.py", line 277, in _Popen
return Popen(process_obj)
File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/opt/python3.8/lib/python3.8/multiprocessing/popen_fork.py", line 75, in _launch
code = process_obj._bootstrap(parent_sentinel=child_r)
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/python3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 155, in _run_file_processor
result: Tuple[int, int] = dag_file_processor.process_file(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 656, in process_file
self.execute_callbacks(dagbag, callback_requests)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 581, in execute_callbacks
self._execute_dag_callbacks(dagbag, request, session)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 595, in _execute_dag_callbacks
dag.handle_callback(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/dag.py", line 1178, in handle_callback
callback(context)
File "/home/airflow/gcs/dags/integration_test/example.py"
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1558237961
This issue has been closed because it has not received response from the issue author.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1548782446
This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Oduig commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
Posted by "Oduig (via GitHub)" <gi...@apache.org>.
Oduig commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1427514402
Imo it is fair that QoS at-least-once applies here, and that sometimes a scenario can occur where two processes call a callback simultaneously, or a notification delivery succeeded but appeared to the sender as a failure, and it would be delivered again.
However, a callback that is always called twice does not fit at-least-once semantics. In addition, with a BashOperator timeout, the callback is only executed once, so it seems like a solvable problem. Finally, when there is a timeout, the running task is marked skipped rather than failed - only the DAG itself fails, in my eyes this means the task level shouldn't even attempt to call the callback because it itself didn't fail
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] hussein-awala commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
Posted by "hussein-awala (via GitHub)" <gi...@apache.org>.
hussein-awala commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1427138045
Duplicate of #27614
Please can check @potiuk explanation in [this comment](https://github.com/apache/airflow/issues/27614#issuecomment-1336132339) to understand why it's currently executed twice (and maybe more in some cases).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1437372043
@Oduig Does the error occur only for Databricks Submit Run or for other operators as well ? Can you please double check and report back?
Also can you please explain in which processes the callbacks are originated - is the same scheduler/dagfile processor that is producing the same stack trace or two different schedulers ? We miss the information on your deployment about those details - we also have no idea how composer internals work, and in such case when there are always duplicate callbacks it seems that it migh be related to some duplication resulting from deployment (but this is a wild guess)..
It is quite likely that this is maybe even related to some specific deployment issues in 2.3.4 so I also advice you to move to 2.4.3 (which is currently latest in Composer). and report back to us if it hapens there. Also note that in case we will find and release fix here, it will be only released in the latest airflow (2.5.2 in-progress currently), so you will have to upgrade anyway to apply the fix, but more likely this is somehow a deployment issue and you will have to go through regular Composer issue (upgrading to 2.4.3 now to check if the problem persists is likely much faster route for you so I recommend you do it as first step)
If you upgrade, reproduce, provide more information on the deployment and origin of those callbacks (if they will happen again after upgrade), then we can likely help you to identify what problem you should report to Composer (as long as we won't find during the process that this is something regular airflow also has).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Oduig commented on issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
Posted by "Oduig (via GitHub)" <gi...@apache.org>.
Oduig commented on issue #29461:
URL: https://github.com/apache/airflow/issues/29461#issuecomment-1438575233
If I use a `BashOperator` instead, the failure callback is only invoked once. I checked the stack traces by using `traceback.extract_stack()`. It could be related to Composer, we can try to upgrade it. Let me get back to you on this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] closed issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed issue #29461: Timeout in DatabricksSubmitRunOperator causes callbacks to be invoked twice
URL: https://github.com/apache/airflow/issues/29461
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org