You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/12/01 09:10:57 UTC
incubator-airflow git commit: [AIRFLOW-647] Restore
dag.get_active_runs
Repository: incubator-airflow
Updated Branches:
refs/heads/master 293365588 -> 91cd6bf72
[AIRFLOW-647] Restore dag.get_active_runs
Simply added a getter back to dag that returns the
list of active dag run execution dates for the dag
from the DB.
Closes #1899 from
btallman/RestoreActiveRuns_feature
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/91cd6bf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/91cd6bf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/91cd6bf7
Branch: refs/heads/master
Commit: 91cd6bf72dd9e0a1dc1a042d2390369eb71c2acc
Parents: 2933655
Author: Benjamin Tallman <bt...@gmail.com>
Authored: Thu Dec 1 10:09:31 2016 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Dec 1 10:09:39 2016 +0100
----------------------------------------------------------------------
airflow/models.py | 21 ++++++++++++++++++++
tests/jobs.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 75 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91cd6bf7/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d2d1e0b..02e4046 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2799,6 +2799,27 @@ class DAG(BaseDag, LoggingMixin):
DagModel.dag_id == self.dag_id)
return qry.value('is_paused')
+ @provide_session
+ def get_active_runs(self, session=None):
+ """
+ Returns a list of "running" tasks
+ :param session:
+ :return: List of execution dates
+ """
+ runs = (
+ session.query(DagRun)
+ .filter(
+ DagRun.dag_id == self.dag_id,
+ DagRun.state == State.RUNNING)
+ .order_by(DagRun.execution_date)
+ .all())
+
+ active_dates = []
+ for run in runs:
+ active_dates.append(run.execution_date)
+
+ return active_dates
+
@property
def latest_execution_date(self):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91cd6bf7/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index bb74709..62e88e5 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -948,3 +948,57 @@ class SchedulerJobTest(unittest.TestCase):
session = settings.Session()
self.assertEqual(
len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+
+ def test_dag_get_active_runs(self):
+ """
+ Test to check that a DAG returns it's active runs
+ """
+
+ now = datetime.datetime.now()
+ six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
+
+ START_DATE = six_hours_ago_to_the_hour
+ DAG_NAME1 = 'get_active_runs_test'
+
+ default_args = {
+ 'owner': 'airflow',
+ 'depends_on_past': False,
+ 'start_date': START_DATE
+
+ }
+ dag1 = DAG(DAG_NAME1,
+ schedule_interval='* * * * *',
+ max_active_runs=1,
+ default_args=default_args
+ )
+
+ run_this_1 = DummyOperator(task_id='run_this_1', dag=dag1)
+ run_this_2 = DummyOperator(task_id='run_this_2', dag=dag1)
+ run_this_2.set_upstream(run_this_1)
+ run_this_3 = DummyOperator(task_id='run_this_3', dag=dag1)
+ run_this_3.set_upstream(run_this_2)
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag1.dag_id)
+ session.merge(orm_dag)
+ session.commit()
+ session.close()
+
+ scheduler = SchedulerJob()
+ dag1.clear()
+
+ dr = scheduler.create_dag_run(dag1)
+
+ # We had better get a dag run
+ self.assertIsNotNone(dr)
+
+ execution_date = dr.execution_date
+
+ running_dates = dag1.get_active_runs()
+
+ try:
+ running_date = running_dates[0]
+ except:
+ running_date = 'Except'
+
+ self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date')