You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/03 13:24:02 UTC

[1/2] incubator-airflow git commit: [AIRFLOW-209] Add scheduler tests and improve lineage handling

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f72b0d33b -> 89edb6fff


[AIRFLOW-209] Add scheduler tests and improve lineage handling

This patch adds schedule_dag and process_dag unittests. It also
fixes some minor bugs that were caught by these tests. Some
small changes for readability.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fb5a3b3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fb5a3b3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fb5a3b3a

Branch: refs/heads/master
Commit: fb5a3b3a5d7c2ac40b3907f9e5fdcf333a8e0892
Parents: c2384cb
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu Jun 2 22:09:31 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jun 3 14:58:12 2016 +0200

----------------------------------------------------------------------
 .codecov.yml      |   2 +-
 airflow/jobs.py   |  11 +--
 airflow/models.py |  13 +++-
 tests/jobs.py     | 193 +++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 210 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb5a3b3a/.codecov.yml
----------------------------------------------------------------------
diff --git a/.codecov.yml b/.codecov.yml
index 69cb760..571b528 100644
--- a/.codecov.yml
+++ b/.codecov.yml
@@ -1 +1 @@
-comment: false
+# keep default

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb5a3b3a/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 18a852c..005871f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -390,7 +390,8 @@ class SchedulerJob(BaseJob):
             active_runs = DagRun.find(
                 dag_id=dag.dag_id,
                 state=State.RUNNING,
-                external_trigger=False
+                external_trigger=False,
+                session=session
             )
             if len(active_runs) >= dag.max_active_runs:
                 return
@@ -402,7 +403,7 @@ class SchedulerJob(BaseJob):
                     dr.end_date = datetime.now()
             session.commit()
 
-            # this query should be replace by find dagrun
+            # this query should be replaced by find dagrun
             qry = (
                 session.query(func.max(DagRun.execution_date))
                 .filter_by(dag_id=dag.dag_id)
@@ -434,9 +435,9 @@ class SchedulerJob(BaseJob):
             # this structure is necessary to avoid a TypeError from concatenating
             # NoneType
             if dag.schedule_interval == '@once':
-                schedule_end = next_run_date
+                period_end = next_run_date
             elif next_run_date:
-                schedule_end = dag.following_schedule(next_run_date)
+                period_end = dag.following_schedule(next_run_date)
 
             # Don't schedule a dag beyond its end_date (as specified by the dag param)
             if next_run_date and dag.end_date and next_run_date > dag.end_date:
@@ -451,7 +452,7 @@ class SchedulerJob(BaseJob):
             if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
                 return
 
-            if next_run_date and schedule_end and schedule_end <= datetime.now():
+            if next_run_date and period_end and period_end <= datetime.now():
                 next_run = dag.create_dagrun(
                     run_id='scheduled__' + next_run_date.isoformat(),
                     execution_date=next_run_date,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb5a3b3a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 3da5d72..fa3f6ca 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3444,7 +3444,11 @@ class DagRun(Base):
                      .format(self, len(tis)))
 
         for ti in tis:
-            ti.task = dag.get_task(ti.task_id)
+            # skip in db?
+            if ti.state == State.REMOVED:
+                tis.remove(ti)
+            else:
+                ti.task = dag.get_task(ti.task_id)
 
         # pre-calculate
         # db is faster
@@ -3511,8 +3515,11 @@ class DagRun(Base):
         task_ids = []
         for ti in tis:
             task_ids.append(ti.task_id)
-            if not dag.get_task(ti.task_id) and self.state not in State.unfinished():
-                ti.state = State.REMOVED
+            try:
+                dag.get_task(ti.task_id)
+            except AirflowException:
+                if self.state is not State.RUNNING:
+                    ti.state = State.REMOVED
 
         # check for missing tasks
         for task in dag.tasks:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb5a3b3a/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 8d6e80a..4214e47 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -491,3 +491,196 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler.process_dag(dag, queue=queue)
 
         queue.put.assert_not_called()
+
+    def test_scheduler_do_not_schedule_removed_task(self):
+        dag = DAG(
+            dag_id='test_scheduler_do_not_schedule_removed_task',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+
+        dag = DAG(
+            dag_id='test_scheduler_do_not_schedule_removed_task',
+            start_date=DEFAULT_DATE)
+
+        queue = mock.Mock()
+        scheduler.process_dag(dag, queue=queue)
+
+        queue.put.assert_not_called()
+
+    def test_scheduler_do_not_schedule_too_early(self):
+        dag = DAG(
+            dag_id='test_scheduler_do_not_schedule_too_early',
+            start_date=datetime.datetime(2200, 1, 1))
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNone(dr)
+
+        queue = mock.Mock()
+        scheduler.process_dag(dag, queue=queue)
+
+        queue.put.assert_not_called()
+
+    def test_scheduler_do_not_run_finished(self):
+        dag = DAG(
+            dag_id='test_scheduler_do_not_run_finished',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+
+        tis = dr.get_task_instances(session=session)
+        for ti in tis:
+            ti.state = State.SUCCESS
+
+        session.commit()
+        session.close()
+
+        queue = mock.Mock()
+        scheduler.process_dag(dag, queue=queue)
+
+        queue.put.assert_not_called()
+
+    def test_scheduler_add_new_task(self):
+        """
+        Test if a task instance will be added if the dag is updated
+        """
+        dag = DAG(
+            dag_id='test_scheduler_add_new_task',
+            start_date=DEFAULT_DATE)
+
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+
+        tis = dr.get_task_instances()
+        self.assertEquals(len(tis), 1)
+
+        dag_task2 = DummyOperator(
+            task_id='dummy2',
+            dag=dag,
+            owner='airflow')
+
+        queue = mock.Mock()
+        scheduler.process_dag(dag, queue=queue)
+
+        tis = dr.get_task_instances()
+        self.assertEquals(len(tis), 2)
+
+    def test_scheduler_verify_max_active_runs(self):
+        """
+        Test if a a dagrun will not be scheduled if max_dag_runs has been reached
+        """
+        dag = DAG(
+            dag_id='test_scheduler_verify_max_active_runs',
+            start_date=DEFAULT_DATE)
+        dag.max_active_runs = 1
+
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNone(dr)
+
+    def test_scheduler_fail_dagrun_timeout(self):
+        """
+        Test if a a dagrun wil be set failed if timeout
+        """
+        dag = DAG(
+            dag_id='test_scheduler_fail_dagrun_timeout',
+            start_date=DEFAULT_DATE)
+        dag.dagrun_timeout = datetime.timedelta(seconds=60)
+
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+        print(dr.start_date)
+        dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
+        print(dr.start_date)
+        session.merge(dr)
+        session.commit()
+
+        dr2 = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr2)
+
+        dr.refresh_from_db(session=session)
+        self.assertEquals(dr.state, State.FAILED)
+


[2/2] incubator-airflow git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow

Posted by bo...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/89edb6ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/89edb6ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/89edb6ff

Branch: refs/heads/master
Commit: 89edb6fff4957cd3baf4faee965f4f6f50871de0
Parents: fb5a3b3 f72b0d3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Jun 3 15:23:51 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jun 3 15:23:51 2016 +0200

----------------------------------------------------------------------
 README.md                                    |   1 +
 airflow/contrib/operators/qubole_operator.py | 144 +++++------
 dev/README.md                                |  22 +-
 dev/airflow-pr                               | 289 ++++++++++++++++------
 docs/code.rst                                |   1 +
 5 files changed, 301 insertions(+), 156 deletions(-)
----------------------------------------------------------------------