You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/01 15:37:38 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #21901: Add `max_active_runs_reached` column to DagModel

ephraimbuddy opened a new pull request #21901:
URL: https://github.com/apache/airflow/pull/21901


   Currently, we nullify next_dagrun_create_after when a dagrun reaches
   the max_active_runs. This causes deadlock when running multiple
   schedulers(3 & above). The deadlock happens at the point of updating the
   next_dagrun_create_after(dag.calculate_dagrun_date_fields).
   I think this operation of nullifying the next_dagrun_create_after is expensive
   and have created a boolean column on dagmodel to track max_active_runs
   
   Here's the error I got from running three schedulers with 100 dags with one datetime sensor each.
   
   ```
   [2022-03-01 09:48:05,479] {scheduler_job.py:707} INFO - Exited execute loop
   Traceback (most recent call last):
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context
       self.dialect.do_execute(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
       cursor.execute(statement, parameters)
   psycopg2.errors.DeadlockDetected: deadlock detected
   DETAIL:  Process 11782 waits for ShareLock on transaction 129251; blocked by process 6989.
   Process 6989 waits for ShareLock on transaction 128879; blocked by process 11782.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (1,59) in relation "dag"
   
   
   The above exception was the direct cause of the following exception:
   
   Traceback (most recent call last):
     File "/usr/local/bin/airflow", line 33, in <module>
       sys.exit(load_entry_point('apache-airflow', 'console_scripts', 'airflow')())
     File "/opt/airflow/airflow/__main__.py", line 48, in main
       args.func(args)
     File "/opt/airflow/airflow/cli/cli_parser.py", line 49, in command
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/utils/cli.py", line 99, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
       _run_scheduler_job(args=args)
     File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
       job.run()
     File "/opt/airflow/airflow/jobs/base_job.py", line 244, in run
       self._execute()
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 680, in _execute
       self._run_scheduler_loop()
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 766, in _run_scheduler_loop
       num_queued_tis = self._do_scheduling(session)
     File "/opt/airflow/airflow/jobs/scheduler_job.py", line 852, in _do_scheduling
       guard.commit()
     File "/opt/airflow/airflow/utils/sqlalchemy.py", line 281, in commit
       self.session.commit()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
       self._transaction.commit(_to_root=self.future)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 829, in commit
       self._prepare_impl()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 808, in _prepare_impl
       self.session.flush()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3255, in flush
       self._flush(objects)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3395, in _flush
       transaction.rollback(_capture_exception=True)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
       compat.raise_(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3355, in _flush
       flush_context.execute()
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 450, in execute
       n.execute_aggregate(self, set_)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 744, in execute_aggregate
       persistence.save_obj(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 234, in save_obj
       _emit_update_statements(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 998, in _emit_update_statements
       c = connection._execute_20(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1520, in _execute_20
       return meth(self, args_10style, kwargs_10style, execution_options)
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 313, in _execute_on_connection
       return connection._execute_clauseelement(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1389, in _execute_clauseelement
       ret = self._execute_context(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1748, in _execute_context
       self._handle_dbapi_exception(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1929, in _handle_dbapi_exception
       util.raise_(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
       raise exception
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context
       self.dialect.do_execute(
     File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
       cursor.execute(statement, parameters)
   sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
   DETAIL:  Process 11782 waits for ShareLock on transaction 129251; blocked by process 6989.
   Process 6989 waits for ShareLock on transaction 128879; blocked by process 11782.
   HINT:  See server log for query details.
   CONTEXT:  while updating tuple (1,59) in relation "dag"
   
   [SQL: UPDATE dag SET next_dagrun=%(next_dagrun)s, next_dagrun_data_interval_start=%(next_dagrun_data_interval_start)s, next_dagrun_data_interval_end=%(next_dagrun_data_interval_end)s, next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id = %(dag_dag_id)s]
   [parameters: {'next_dagrun': DateTime(2021, 1, 10, 0, 0, 0, tzinfo=Timezone('UTC')), 'next_dagrun_data_interval_start': DateTime(2021, 1, 10, 0, 0, 0, tzinfo=Timezone('UTC')), 'next_dagrun_data_interval_end': DateTime(2021, 1, 11, 0, 0, 0, tzinfo=Timezone('UTC')), 'next_dagrun_create_after': DateTime(2021, 1, 11, 0, 0, 0, tzinfo=Timezone('UTC')), 'dag_dag_id': 'sensor_75'}]
   (Background on this error at: http://sqlalche.me/e/14/e3q8)
   ```
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21901: Add `max_active_runs_reached` column to DagModel

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21901:
URL: https://github.com/apache/airflow/pull/21901#issuecomment-1060053449


   Hey @ephraimbuddy  - any more context on this one? I'd love to understand more context but I am afraid I am missing it :).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21901: Add `max_active_runs_reached` column to DagModel

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21901:
URL: https://github.com/apache/airflow/pull/21901#discussion_r821709950



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1072,8 +1073,7 @@ def _schedule_dag_run(
             self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
             active_runs = dag.get_num_active_runs(only_running=False, session=session)
             # Work out if we should allow creating a new DagRun now?
-            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
-                dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
+            self._max_active_runs_reached(dag, dag_model, active_runs)

Review comment:
       I suggest to rename it to `_update_max_active_runs_reached` - this function has two behaviours - update and return the updated value, I think "update" is more important and returning the value is really an optimisation of the value retrieval.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #21901: Add `max_active_runs_reached` column to DagModel

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #21901:
URL: https://github.com/apache/airflow/pull/21901#issuecomment-1061966077


   > Just one worry. Are you sure there will be no situation when max_active_runs_reached will not lead to a stale Dags ? This is - effectively - caching of whether the max dag runs have been reached. The value is re-calculated regularly and updated when it changes, but are we absoluttely sure there is no way this re-calculation is always happening for all the dags (for example I can imagine situation where we reach max active dag_runs and (for whatever reason) we never enter the _max_active_runs_reached function to recalculate it.
   
   Yeah. That makes a lot of sense and I need to do some more testing. The 
   
   > Thanks for the context! Perfect explanation. Now I understand it fully.
   > 
   > Just one worry. Are you sure there will be no situation when `max_active_runs_reached` will not lead to a stale Dags ? This is - effectively - caching of whether the max dag runs have been reached. The value is re-calculated regularly and updated when it changes, but are we absoluttely sure there is no way this re-calculation is always happening for all the dags (for example I can imagine situation where we reach max active dag_runs and (for whatever reason) we never enter the `_max_active_runs_reached` function to recalculate it.
   > 
   > I am not saying it is the case (I could not see any path that leads to it) but it's worth some careful review of the logic.
   > 
   > Small NIT for the name of the method.
   
   I will do more testing on this but I expect it not to have stale dags because I'm still doing the same nullification but not doing a calculation of the next_dagrun after update. Thanks for pointing this out!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #21901: Add `max_active_runs_reached` column to DagModel

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #21901:
URL: https://github.com/apache/airflow/pull/21901#issuecomment-1060284341


   > Hey @ephraimbuddy - any more context on this one? I'd love to understand more context but I am afraid I am missing it :).
   
   Ok. The issue is that the scheduler crashes when running more than 2 schedulers. Looking at where this error is coming from, this may also occur with one scheduler but I saw it while running 3 schedulers.
   
   What we do currently is to nullify `next_dagrun_create_after` in the scheduler to prevent creating new dagruns when we have reached max_active_runs.  https://github.com/apache/airflow/blob/4e05bbc92557f5eafaa37509387b2fb2b0ab3d4a/airflow/jobs/scheduler_job.py#L988
   
   We calculate this field once the dagrun has completed https://github.com/apache/airflow/blob/4e05bbc92557f5eafaa37509387b2fb2b0ab3d4a/airflow/jobs/scheduler_job.py#L1101-L1102
   
   I'm proposing that instead of nullifying and calculating this field, we should have a new column on the DagModel to track the max_active_runs. Because it looks to me that the operation of recalculating this field is expensive.
   
   Here's a dag I used to reproduce this behaviour:
   ```python
   from airflow.sensors.date_time import DateTimeSensor
   from airflow import DAG
   from pendulum import datetime
   
   
   def create_dag(dag_name):
       dag = DAG(
           dag_name,
           schedule_interval='@daily',
           start_date=datetime(2022, 1, 1),
       )
       check_time = DateTimeSensor(
           task_id='check_time',
           target_time=datetime(2022, 3, 8, 12, 0, 0),
           poke_interval=30,
           mode='reschedule',
           dag=dag
       )
       check_time
       return dag
   
   for i in range(100):
       globals()[f'dag_{i}'] = create_dag(f'dag_{i}')
   ```
   First, you need to have the schedulers running, before triggering all the dags.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org