You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ar...@apache.org on 2016/11/16 18:49:30 UTC

[1/2] incubator-airflow git commit: [AIRFLOW-137] Fix max_active_runs on clearing tasks

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 365af16b4 -> 0112f69fa


[AIRFLOW-137] Fix max_active_runs on clearing tasks


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/804421b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/804421b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/804421b5

Branch: refs/heads/master
Commit: 804421b53e8e447fd5c09348415b1f795ce560b5
Parents: fa977b6
Author: gtoonstra <gt...@gmail.com>
Authored: Sun Nov 6 17:25:06 2016 +0100
Committer: gtoonstra <gt...@gmail.com>
Committed: Wed Nov 16 18:57:04 2016 +0100

----------------------------------------------------------------------
 airflow/jobs.py |  8 ++++++--
 tests/jobs.py   | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/804421b5/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 972f597..7eb4b99 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -793,8 +793,12 @@ class SchedulerJob(BaseJob):
             self.logger.info("Examining DAG run {}".format(run))
             # don't consider runs that are executed in the future
             if run.execution_date > datetime.now():
-                self.logging.error("Execution date is in future: {}"
-                                   .format(run.execution_date))
+                self.logger.error("Execution date is in future: {}"
+                                  .format(run.execution_date))
+                continue
+
+            if len(active_dag_runs) >= dag.max_active_runs:
+                self.logger.info("Active dag runs > max_active_run.")
                 continue
 
             # skip backfill dagruns for now as long as they are not really scheduled

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/804421b5/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index af7ad61..5562177 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -701,6 +701,45 @@ class SchedulerJobTest(unittest.TestCase):
         new_dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(new_dr)
 
+    def test_scheduler_max_active_runs_respected_after_clear(self):
+        """
+        Test if _process_task_instances only schedules ti's up to max_active_runs
+        (related to issue AIRFLOW-137)
+        """
+        dag = DAG(
+            dag_id='test_scheduler_max_active_runs_respected_after_clear',
+            start_date=DEFAULT_DATE)
+        dag.max_active_runs = 3
+
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        # First create up to 3 dagruns in RUNNING state.
+        scheduler.create_dag_run(dag)
+
+        # Reduce max_active_runs to 1
+        dag.max_active_runs = 1
+
+        queue = mock.Mock()
+        # and schedule them in, so we can check how many
+        # tasks are put on the queue (should be one, not 3)
+        scheduler._process_task_instances(dag, queue=queue)
+
+        queue.append.assert_called_with(
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE)
+        )
+
     def test_scheduler_auto_align(self):
         """
         Test if the schedule_interval will be auto aligned with the start_date


[2/2] incubator-airflow git commit: Merge pull request #1870 from gtoonstra/maxactiveruns_fix

Posted by ar...@apache.org.
Merge pull request #1870 from gtoonstra/maxactiveruns_fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0112f69f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0112f69f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0112f69f

Branch: refs/heads/master
Commit: 0112f69fa5f85271cfc1fd8597b8765a04575bb6
Parents: 365af16 804421b
Author: Arthur Wiedmer <ar...@gmail.com>
Authored: Wed Nov 16 10:49:22 2016 -0800
Committer: Arthur Wiedmer <ar...@gmail.com>
Committed: Wed Nov 16 10:49:22 2016 -0800

----------------------------------------------------------------------
 airflow/jobs.py |  8 ++++++--
 tests/jobs.py   | 39 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------