You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/03 13:24:02 UTC
[1/2] incubator-airflow git commit: [AIRFLOW-209] Add scheduler tests
and improve lineage handling
Repository: incubator-airflow
Updated Branches:
refs/heads/master f72b0d33b -> 89edb6fff
[AIRFLOW-209] Add scheduler tests and improve lineage handling
This patch adds schedule_dag and process_dag unittests. It also
fixes some minor bugs that were caught by these tests. Some
small changes for readability.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fb5a3b3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fb5a3b3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fb5a3b3a
Branch: refs/heads/master
Commit: fb5a3b3a5d7c2ac40b3907f9e5fdcf333a8e0892
Parents: c2384cb
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu Jun 2 22:09:31 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jun 3 14:58:12 2016 +0200
----------------------------------------------------------------------
.codecov.yml | 2 +-
airflow/jobs.py | 11 +--
airflow/models.py | 13 +++-
tests/jobs.py | 193 +++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 210 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb5a3b3a/.codecov.yml
----------------------------------------------------------------------
diff --git a/.codecov.yml b/.codecov.yml
index 69cb760..571b528 100644
--- a/.codecov.yml
+++ b/.codecov.yml
@@ -1 +1 @@
-comment: false
+# keep default
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb5a3b3a/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 18a852c..005871f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -390,7 +390,8 @@ class SchedulerJob(BaseJob):
active_runs = DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
- external_trigger=False
+ external_trigger=False,
+ session=session
)
if len(active_runs) >= dag.max_active_runs:
return
@@ -402,7 +403,7 @@ class SchedulerJob(BaseJob):
dr.end_date = datetime.now()
session.commit()
- # this query should be replace by find dagrun
+ # this query should be replaced by find dagrun
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
@@ -434,9 +435,9 @@ class SchedulerJob(BaseJob):
# this structure is necessary to avoid a TypeError from concatenating
# NoneType
if dag.schedule_interval == '@once':
- schedule_end = next_run_date
+ period_end = next_run_date
elif next_run_date:
- schedule_end = dag.following_schedule(next_run_date)
+ period_end = dag.following_schedule(next_run_date)
# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and dag.end_date and next_run_date > dag.end_date:
@@ -451,7 +452,7 @@ class SchedulerJob(BaseJob):
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
return
- if next_run_date and schedule_end and schedule_end <= datetime.now():
+ if next_run_date and period_end and period_end <= datetime.now():
next_run = dag.create_dagrun(
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb5a3b3a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 3da5d72..fa3f6ca 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3444,7 +3444,11 @@ class DagRun(Base):
.format(self, len(tis)))
for ti in tis:
- ti.task = dag.get_task(ti.task_id)
+ # skip in db?
+ if ti.state == State.REMOVED:
+ tis.remove(ti)
+ else:
+ ti.task = dag.get_task(ti.task_id)
# pre-calculate
# db is faster
@@ -3511,8 +3515,11 @@ class DagRun(Base):
task_ids = []
for ti in tis:
task_ids.append(ti.task_id)
- if not dag.get_task(ti.task_id) and self.state not in State.unfinished():
- ti.state = State.REMOVED
+ try:
+ dag.get_task(ti.task_id)
+ except AirflowException:
+ if self.state is not State.RUNNING:
+ ti.state = State.REMOVED
# check for missing tasks
for task in dag.tasks:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb5a3b3a/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 8d6e80a..4214e47 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -491,3 +491,196 @@ class SchedulerJobTest(unittest.TestCase):
scheduler.process_dag(dag, queue=queue)
queue.put.assert_not_called()
+
+ def test_scheduler_do_not_schedule_removed_task(self):
+ dag = DAG(
+ dag_id='test_scheduler_do_not_schedule_removed_task',
+ start_date=DEFAULT_DATE)
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+ session.commit()
+ session.close()
+
+ scheduler = SchedulerJob()
+ dag.clear()
+
+ dr = scheduler.schedule_dag(dag)
+ self.assertIsNotNone(dr)
+
+ dag = DAG(
+ dag_id='test_scheduler_do_not_schedule_removed_task',
+ start_date=DEFAULT_DATE)
+
+ queue = mock.Mock()
+ scheduler.process_dag(dag, queue=queue)
+
+ queue.put.assert_not_called()
+
+ def test_scheduler_do_not_schedule_too_early(self):
+ dag = DAG(
+ dag_id='test_scheduler_do_not_schedule_too_early',
+ start_date=datetime.datetime(2200, 1, 1))
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+ session.commit()
+ session.close()
+
+ scheduler = SchedulerJob()
+ dag.clear()
+
+ dr = scheduler.schedule_dag(dag)
+ self.assertIsNone(dr)
+
+ queue = mock.Mock()
+ scheduler.process_dag(dag, queue=queue)
+
+ queue.put.assert_not_called()
+
+ def test_scheduler_do_not_run_finished(self):
+ dag = DAG(
+ dag_id='test_scheduler_do_not_run_finished',
+ start_date=DEFAULT_DATE)
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+ session.commit()
+
+ scheduler = SchedulerJob()
+ dag.clear()
+
+ dr = scheduler.schedule_dag(dag)
+ self.assertIsNotNone(dr)
+
+ tis = dr.get_task_instances(session=session)
+ for ti in tis:
+ ti.state = State.SUCCESS
+
+ session.commit()
+ session.close()
+
+ queue = mock.Mock()
+ scheduler.process_dag(dag, queue=queue)
+
+ queue.put.assert_not_called()
+
+ def test_scheduler_add_new_task(self):
+ """
+ Test if a task instance will be added if the dag is updated
+ """
+ dag = DAG(
+ dag_id='test_scheduler_add_new_task',
+ start_date=DEFAULT_DATE)
+
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+ session.commit()
+ session.close()
+
+ scheduler = SchedulerJob()
+ dag.clear()
+
+ dr = scheduler.schedule_dag(dag)
+ self.assertIsNotNone(dr)
+
+ tis = dr.get_task_instances()
+ self.assertEquals(len(tis), 1)
+
+ dag_task2 = DummyOperator(
+ task_id='dummy2',
+ dag=dag,
+ owner='airflow')
+
+ queue = mock.Mock()
+ scheduler.process_dag(dag, queue=queue)
+
+ tis = dr.get_task_instances()
+ self.assertEquals(len(tis), 2)
+
+ def test_scheduler_verify_max_active_runs(self):
+ """
+ Test if a a dagrun will not be scheduled if max_dag_runs has been reached
+ """
+ dag = DAG(
+ dag_id='test_scheduler_verify_max_active_runs',
+ start_date=DEFAULT_DATE)
+ dag.max_active_runs = 1
+
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+ session.commit()
+ session.close()
+
+ scheduler = SchedulerJob()
+ dag.clear()
+
+ dr = scheduler.schedule_dag(dag)
+ self.assertIsNotNone(dr)
+
+ dr = scheduler.schedule_dag(dag)
+ self.assertIsNone(dr)
+
+ def test_scheduler_fail_dagrun_timeout(self):
+ """
+ Test if a a dagrun wil be set failed if timeout
+ """
+ dag = DAG(
+ dag_id='test_scheduler_fail_dagrun_timeout',
+ start_date=DEFAULT_DATE)
+ dag.dagrun_timeout = datetime.timedelta(seconds=60)
+
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ session.merge(orm_dag)
+ session.commit()
+
+ scheduler = SchedulerJob()
+ dag.clear()
+
+ dr = scheduler.schedule_dag(dag)
+ self.assertIsNotNone(dr)
+ print(dr.start_date)
+ dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
+ print(dr.start_date)
+ session.merge(dr)
+ session.commit()
+
+ dr2 = scheduler.schedule_dag(dag)
+ self.assertIsNotNone(dr2)
+
+ dr.refresh_from_db(session=session)
+ self.assertEquals(dr.state, State.FAILED)
+
[2/2] incubator-airflow git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-airflow
Posted by bo...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/89edb6ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/89edb6ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/89edb6ff
Branch: refs/heads/master
Commit: 89edb6fff4957cd3baf4faee965f4f6f50871de0
Parents: fb5a3b3 f72b0d3
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Jun 3 15:23:51 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jun 3 15:23:51 2016 +0200
----------------------------------------------------------------------
README.md | 1 +
airflow/contrib/operators/qubole_operator.py | 144 +++++------
dev/README.md | 22 +-
dev/airflow-pr | 289 ++++++++++++++++------
docs/code.rst | 1 +
5 files changed, 301 insertions(+), 156 deletions(-)
----------------------------------------------------------------------