You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/09/09 09:36:30 UTC

incubator-airflow git commit: Revert "[AIRFLOW-78] airflow clear leaves dag_runs"

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 32f3c1c5d -> 3a1be4aac


Revert "[AIRFLOW-78] airflow clear leaves dag_runs"

This reverts commit 197c9050ef3a142c18aa97819da48ee8cadbf8d8.

Regressions were observed and tasks were not scheduled in case of
max_dag_runs reached.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3a1be4aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3a1be4aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3a1be4aa

Branch: refs/heads/master
Commit: 3a1be4aacf31ee33d6128e5d5fa563a7625c7c62
Parents: 32f3c1c
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Sep 9 11:34:46 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Sep 9 11:34:46 2016 +0200

----------------------------------------------------------------------
 airflow/jobs.py   |  4 ----
 airflow/models.py |  9 ---------
 tests/jobs.py     | 18 ++++--------------
 3 files changed, 4 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3a1be4aa/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 075d2fb..bae1168 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1073,10 +1073,6 @@ class SchedulerJob(BaseJob):
         """
         for dag in dags:
             dag = dagbag.get_dag(dag.dag_id)
-            if dag.reached_max_runs:
-                self.logger.info("Not processing DAG {} since its max runs has been reached"
-                                .format(dag.dag_id))
-                continue
             if dag.is_paused:
                 self.logger.info("Not processing DAG {} since it's paused"
                                  .format(dag.dag_id))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3a1be4aa/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index b352b43..64727d6 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2786,15 +2786,6 @@ class DAG(BaseDag, LoggingMixin):
                 l += task.subdag.subdags
         return l
 
-    @property
-    def reached_max_runs(self):
-        active_runs = DagRun.find(
-            dag_id=self.dag_id,
-            state=State.RUNNING,
-            external_trigger=False
-        )
-        return len(active_runs) >= self.max_active_runs
-
     def resolve_template_files(self):
         for t in self.tasks:
             t.resolve_template_files()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3a1be4aa/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index ae94d98..af7ad61 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -20,7 +20,6 @@ from __future__ import unicode_literals
 import datetime
 import logging
 import os
-import time
 import unittest
 
 from airflow import AirflowException, settings
@@ -620,23 +619,14 @@ class SchedulerJobTest(unittest.TestCase):
         session.commit()
         session.close()
 
-        scheduler = SchedulerJob(dag.dag_id,
-                                run_duration=1)
+        scheduler = SchedulerJob()
+        dag.clear()
 
         dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
-        dr2 = scheduler.create_dag_run(dag)
-        self.assertIsNone(dr2)
-
-        dag.clear()
-
-        dag.max_active_runs = 0
-        scheduler.run()
-
-        session = settings.Session()
-        self.assertEqual(
-            len(session.query(TI).filter(TI.dag_id == dag.dag_id).all()), 0)
+        dr = scheduler.create_dag_run(dag)
+        self.assertIsNone(dr)
 
     def test_scheduler_fail_dagrun_timeout(self):
         """