You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/01 15:37:38 UTC
[GitHub] [airflow] ephraimbuddy opened a new pull request #21901: Add `max_active_runs_reached` column to DagModel
ephraimbuddy opened a new pull request #21901:
URL: https://github.com/apache/airflow/pull/21901
Currently, we nullify next_dagrun_create_after when a dagrun reaches
the max_active_runs. This causes deadlock when running multiple
schedulers(3 & above). The deadlock happens at the point of updating the
next_dagrun_create_after(dag.calculate_dagrun_date_fields).
I think this operation of nullifying the next_dagrun_create_after is expensive
and have created a boolean column on dagmodel to track max_active_runs
Here's the error I got from running three schedulers with 100 dags with one datetime sensor each.
```
[2022-03-01 09:48:05,479] {scheduler_job.py:707} INFO - Exited execute loop
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 11782 waits for ShareLock on transaction 129251; blocked by process 6989.
Process 6989 waits for ShareLock on transaction 128879; blocked by process 11782.
HINT: See server log for query details.
CONTEXT: while updating tuple (1,59) in relation "dag"
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 33, in <module>
sys.exit(load_entry_point('apache-airflow', 'console_scripts', 'airflow')())
File "/opt/airflow/airflow/__main__.py", line 48, in main
args.func(args)
File "/opt/airflow/airflow/cli/cli_parser.py", line 49, in command
return func(*args, **kwargs)
File "/opt/airflow/airflow/utils/cli.py", line 99, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
_run_scheduler_job(args=args)
File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
job.run()
File "/opt/airflow/airflow/jobs/base_job.py", line 244, in run
self._execute()
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 680, in _execute
self._run_scheduler_loop()
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 766, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/opt/airflow/airflow/jobs/scheduler_job.py", line 852, in _do_scheduling
guard.commit()
File "/opt/airflow/airflow/utils/sqlalchemy.py", line 281, in commit
self.session.commit()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
self._transaction.commit(_to_root=self.future)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 829, in commit
self._prepare_impl()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 808, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3255, in flush
self._flush(objects)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3395, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3355, in _flush
flush_context.execute()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 450, in execute
n.execute_aggregate(self, set_)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 744, in execute_aggregate
persistence.save_obj(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 234, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 998, in _emit_update_statements
c = connection._execute_20(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1520, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 313, in _execute_on_connection
return connection._execute_clauseelement(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1389, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1748, in _execute_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1929, in _handle_dbapi_exception
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL: Process 11782 waits for ShareLock on transaction 129251; blocked by process 6989.
Process 6989 waits for ShareLock on transaction 128879; blocked by process 11782.
HINT: See server log for query details.
CONTEXT: while updating tuple (1,59) in relation "dag"
[SQL: UPDATE dag SET next_dagrun=%(next_dagrun)s, next_dagrun_data_interval_start=%(next_dagrun_data_interval_start)s, next_dagrun_data_interval_end=%(next_dagrun_data_interval_end)s, next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id = %(dag_dag_id)s]
[parameters: {'next_dagrun': DateTime(2021, 1, 10, 0, 0, 0, tzinfo=Timezone('UTC')), 'next_dagrun_data_interval_start': DateTime(2021, 1, 10, 0, 0, 0, tzinfo=Timezone('UTC')), 'next_dagrun_data_interval_end': DateTime(2021, 1, 11, 0, 0, 0, tzinfo=Timezone('UTC')), 'next_dagrun_create_after': DateTime(2021, 1, 11, 0, 0, 0, tzinfo=Timezone('UTC')), 'dag_dag_id': 'sensor_75'}]
(Background on this error at: http://sqlalche.me/e/14/e3q8)
```
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on pull request #21901: Add `max_active_runs_reached` column to DagModel
Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21901:
URL: https://github.com/apache/airflow/pull/21901#issuecomment-1060053449
Hey @ephraimbuddy - any more context on this one? I'd love to understand more context but I am afraid I am missing it :).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on a change in pull request #21901: Add `max_active_runs_reached` column to DagModel
Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21901:
URL: https://github.com/apache/airflow/pull/21901#discussion_r821709950
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1072,8 +1073,7 @@ def _schedule_dag_run(
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
active_runs = dag.get_num_active_runs(only_running=False, session=session)
# Work out if we should allow creating a new DagRun now?
- if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
- dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
+ self._max_active_runs_reached(dag, dag_model, active_runs)
Review comment:
I suggest to rename it to `_update_max_active_runs_reached` - this function has two behaviours - update and return the updated value, I think "update" is more important and returning the value is really an optimisation of the value retrieval.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on pull request #21901: Add `max_active_runs_reached` column to DagModel
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #21901:
URL: https://github.com/apache/airflow/pull/21901#issuecomment-1061966077
> Just one worry. Are you sure there will be no situation when max_active_runs_reached will not lead to a stale Dags ? This is - effectively - caching of whether the max dag runs have been reached. The value is re-calculated regularly and updated when it changes, but are we absoluttely sure there is no way this re-calculation is always happening for all the dags (for example I can imagine situation where we reach max active dag_runs and (for whatever reason) we never enter the _max_active_runs_reached function to recalculate it.
Yeah. That makes a lot of sense and I need to do some more testing. The
> Thanks for the context! Perfect explanation. Now I understand it fully.
>
> Just one worry. Are you sure there will be no situation when `max_active_runs_reached` will not lead to a stale Dags ? This is - effectively - caching of whether the max dag runs have been reached. The value is re-calculated regularly and updated when it changes, but are we absoluttely sure there is no way this re-calculation is always happening for all the dags (for example I can imagine situation where we reach max active dag_runs and (for whatever reason) we never enter the `_max_active_runs_reached` function to recalculate it.
>
> I am not saying it is the case (I could not see any path that leads to it) but it's worth some careful review of the logic.
>
> Small NIT for the name of the method.
I will do more testing on this but I expect it not to have stale dags because I'm still doing the same nullification but not doing a calculation of the next_dagrun after update. Thanks for pointing this out!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on pull request #21901: Add `max_active_runs_reached` column to DagModel
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #21901:
URL: https://github.com/apache/airflow/pull/21901#issuecomment-1060284341
> Hey @ephraimbuddy - any more context on this one? I'd love to understand more context but I am afraid I am missing it :).
Ok. The issue is that the scheduler crashes when running more than 2 schedulers. Looking at where this error is coming from, this may also occur with one scheduler but I saw it while running 3 schedulers.
What we do currently is to nullify `next_dagrun_create_after` in the scheduler to prevent creating new dagruns when we have reached max_active_runs. https://github.com/apache/airflow/blob/4e05bbc92557f5eafaa37509387b2fb2b0ab3d4a/airflow/jobs/scheduler_job.py#L988
We calculate this field once the dagrun has completed https://github.com/apache/airflow/blob/4e05bbc92557f5eafaa37509387b2fb2b0ab3d4a/airflow/jobs/scheduler_job.py#L1101-L1102
I'm proposing that instead of nullifying and calculating this field, we should have a new column on the DagModel to track the max_active_runs. Because it looks to me that the operation of recalculating this field is expensive.
Here's a dag I used to reproduce this behaviour:
```python
from airflow.sensors.date_time import DateTimeSensor
from airflow import DAG
from pendulum import datetime
def create_dag(dag_name):
dag = DAG(
dag_name,
schedule_interval='@daily',
start_date=datetime(2022, 1, 1),
)
check_time = DateTimeSensor(
task_id='check_time',
target_time=datetime(2022, 3, 8, 12, 0, 0),
poke_interval=30,
mode='reschedule',
dag=dag
)
check_time
return dag
for i in range(100):
globals()[f'dag_{i}'] = create_dag(f'dag_{i}')
```
First, you need to have the schedulers running, before triggering all the dags.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org