You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/29 08:56:41 UTC

incubator-airflow git commit: [ARFLOW-255] Check dagrun timeout when comparing active runs

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7b382b4e9 -> 5b5c3a116


[ARFLOW-255] Check dagrun timeout when comparing active runs

Timedout DagRuns and max active runs reached would be set to failed.
Therefor, concurrency limits could be reached prematurely.

Closes #1604 from reconditesea/klin-dagrun-timeout


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5b5c3a11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5b5c3a11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5b5c3a11

Branch: refs/heads/master
Commit: 5b5c3a1165b0a7241f2c7134bb2b1466e87df66d
Parents: 7b382b4
Author: Kevin Lin <kl...@lyft.com>
Authored: Wed Jun 29 10:56:32 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 29 10:56:32 2016 +0200

----------------------------------------------------------------------
 airflow/jobs.py |  7 ++++++-
 tests/jobs.py   | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5b5c3a11/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0713bbe..b3270b0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -393,15 +393,20 @@ class SchedulerJob(BaseJob):
                 external_trigger=False,
                 session=session
             )
-            if len(active_runs) >= dag.max_active_runs:
+            # return if already reached maximum active runs and no timeout setting
+            if len(active_runs) >= dag.max_active_runs and not dag.dagrun_timeout:
                 return
+            timedout_runs = 0
             for dr in active_runs:
                 if (
                         dr.start_date and dag.dagrun_timeout and
                         dr.start_date < datetime.now() - dag.dagrun_timeout):
                     dr.state = State.FAILED
                     dr.end_date = datetime.now()
+                    timedout_runs += 1
             session.commit()
+            if len(active_runs) - timedout_runs >= dag.max_active_runs:
+                return
 
             # this query should be replaced by find dagrun
             qry = (

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5b5c3a11/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 2f53fbc..d981945 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -685,6 +685,45 @@ class SchedulerJobTest(unittest.TestCase):
         dr.refresh_from_db(session=session)
         self.assertEquals(dr.state, State.FAILED)
 
+    def test_scheduler_verify_max_active_runs_and_dagrun_timeout(self):
+        """
+        Test if a a dagrun will not be scheduled if max_dag_runs has been reached and dagrun_timeout is not reached
+        Test if a a dagrun will be scheduled if max_dag_runs has been reached but dagrun_timeout is also reached
+        """
+        dag = DAG(
+            dag_id='test_scheduler_verify_max_active_runs_and_dagrun_timeout',
+            start_date=DEFAULT_DATE)
+        dag.max_active_runs = 1
+        dag.dagrun_timeout = datetime.timedelta(seconds=60)
+
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+
+        # Should not be scheduled as DagRun has not timedout and max_active_runs is reached
+        new_dr = scheduler.schedule_dag(dag)
+        self.assertIsNone(new_dr)
+
+        # Should be scheduled as dagrun_timeout has passed
+        dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
+        session.merge(dr)
+        session.commit()
+        new_dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(new_dr)
+
     def test_scheduler_auto_align(self):
         """
         Test if the schedule_interval will be auto aligned with the start_date