You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/29 08:56:41 UTC
incubator-airflow git commit: [ARFLOW-255] Check dagrun timeout when
comparing active runs
Repository: incubator-airflow
Updated Branches:
refs/heads/master 7b382b4e9 -> 5b5c3a116
[ARFLOW-255] Check dagrun timeout when comparing active runs
Timedout DagRuns and max active runs reached would be set to failed.
Therefor, concurrency limits could be reached prematurely.
Closes #1604 from reconditesea/klin-dagrun-timeout
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5b5c3a11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5b5c3a11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5b5c3a11
Branch: refs/heads/master
Commit: 5b5c3a1165b0a7241f2c7134bb2b1466e87df66d
Parents: 7b382b4
Author: Kevin Lin <kl...@lyft.com>
Authored: Wed Jun 29 10:56:32 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 29 10:56:32 2016 +0200
----------------------------------------------------------------------
airflow/jobs.py | 7 ++++++-
tests/jobs.py | 39 +++++++++++++++++++++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5b5c3a11/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0713bbe..b3270b0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -393,15 +393,20 @@ class SchedulerJob(BaseJob):
external_trigger=False,
session=session
)
- if len(active_runs) >= dag.max_active_runs:
+ # return if already reached maximum active runs and no timeout setting
+ if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
return
+ timedout_runs = 0
for dr in active_runs:
if (
dr.start_date and dag.dagrun_timeout and
dr.start_date < datetime.now() - dag.dagrun_timeout):
dr.state = State.FAILED
dr.end_date = datetime.now()
+ timedout_runs += 1
session.commit()
+ if len(active_runs) - timedout_runs >= dag.max_active_runs:
+ return
# this query should be replaced by find dagrun
qry = (
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5b5c3a11/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 2f53fbc..d981945 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -685,6 +685,45 @@ class SchedulerJobTest(unittest.TestCase):
dr.refresh_from_db(session=session)
self.assertEquals(dr.state, State.FAILED)
+ def test_scheduler_verify_max_active_runs_and_dagrun_timeout(self):
+ """
+ Test if a a dagrun will not be scheduled if max_dag_runs has been reached and dagrun_timeout is not reached
+ Test if a a dagrun will be scheduled if max_dag_runs has been reached but dagrun_timeout is also reached
+ """
+ dag = DAG(
+ dag_id='test_scheduler_verify_max_active_runs_and_dagrun_timeout',
+ start_date=DEFAULT_DATE)
+ dag.max_active_runs = 1
+ dag.dagrun_timeout = datetime.timedelta(seconds=60)
+
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+ session.commit()
+ session.close()
+
+ scheduler = SchedulerJob()
+ dag.clear()
+
+ dr = scheduler.schedule_dag(dag)
+ self.assertIsNotNone(dr)
+
+ # Should not be scheduled as DagRun has not timedout and max_active_runs is reached
+ new_dr = scheduler.schedule_dag(dag)
+ self.assertIsNone(new_dr)
+
+ # Should be scheduled as dagrun_timeout has passed
+ dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
+ session.merge(dr)
+ session.commit()
+ new_dr = scheduler.schedule_dag(dag)
+ self.assertIsNotNone(new_dr)
+
def test_scheduler_auto_align(self):
"""
Test if the schedule_interval will be auto aligned with the start_date