You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/09/09 09:36:30 UTC
incubator-airflow git commit: Revert "[AIRFLOW-78] airflow clear
leaves dag_runs"
Repository: incubator-airflow
Updated Branches:
refs/heads/master 32f3c1c5d -> 3a1be4aac
Revert "[AIRFLOW-78] airflow clear leaves dag_runs"
This reverts commit 197c9050ef3a142c18aa97819da48ee8cadbf8d8.
Regressions were observed and tasks were not scheduled in case of
max_dag_runs reached.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3a1be4aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3a1be4aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3a1be4aa
Branch: refs/heads/master
Commit: 3a1be4aacf31ee33d6128e5d5fa563a7625c7c62
Parents: 32f3c1c
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Sep 9 11:34:46 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Sep 9 11:34:46 2016 +0200
----------------------------------------------------------------------
airflow/jobs.py | 4 ----
airflow/models.py | 9 ---------
tests/jobs.py | 18 ++++--------------
3 files changed, 4 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3a1be4aa/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 075d2fb..bae1168 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1073,10 +1073,6 @@ class SchedulerJob(BaseJob):
"""
for dag in dags:
dag = dagbag.get_dag(dag.dag_id)
- if dag.reached_max_runs:
- self.logger.info("Not processing DAG {} since its max runs has been reached"
- .format(dag.dag_id))
- continue
if dag.is_paused:
self.logger.info("Not processing DAG {} since it's paused"
.format(dag.dag_id))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3a1be4aa/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index b352b43..64727d6 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2786,15 +2786,6 @@ class DAG(BaseDag, LoggingMixin):
l += task.subdag.subdags
return l
- @property
- def reached_max_runs(self):
- active_runs = DagRun.find(
- dag_id=self.dag_id,
- state=State.RUNNING,
- external_trigger=False
- )
- return len(active_runs) >= self.max_active_runs
-
def resolve_template_files(self):
for t in self.tasks:
t.resolve_template_files()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3a1be4aa/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index ae94d98..af7ad61 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -20,7 +20,6 @@ from __future__ import unicode_literals
import datetime
import logging
import os
-import time
import unittest
from airflow import AirflowException, settings
@@ -620,23 +619,14 @@ class SchedulerJobTest(unittest.TestCase):
session.commit()
session.close()
- scheduler = SchedulerJob(dag.dag_id,
- run_duration=1)
+ scheduler = SchedulerJob()
+ dag.clear()
dr = scheduler.create_dag_run(dag)
self.assertIsNotNone(dr)
- dr2 = scheduler.create_dag_run(dag)
- self.assertIsNone(dr2)
-
- dag.clear()
-
- dag.max_active_runs = 0
- scheduler.run()
-
- session = settings.Session()
- self.assertEqual(
- len(session.query(TI).filter(TI.dag_id == dag.dag_id).all()), 0)
+ dr = scheduler.create_dag_run(dag)
+ self.assertIsNone(dr)
def test_scheduler_fail_dagrun_timeout(self):
"""