You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/12/01 09:10:57 UTC

incubator-airflow git commit: [AIRFLOW-647] Restore dag.get_active_runs

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 293365588 -> 91cd6bf72


[AIRFLOW-647] Restore dag.get_active_runs

Simply added a getter back to dag that returns the
list of active dag run execution dates for the dag
from the DB.

Closes #1899 from
btallman/RestoreActiveRuns_feature


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/91cd6bf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/91cd6bf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/91cd6bf7

Branch: refs/heads/master
Commit: 91cd6bf72dd9e0a1dc1a042d2390369eb71c2acc
Parents: 2933655
Author: Benjamin Tallman <bt...@gmail.com>
Authored: Thu Dec 1 10:09:31 2016 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Dec 1 10:09:39 2016 +0100

----------------------------------------------------------------------
 airflow/models.py | 21 ++++++++++++++++++++
 tests/jobs.py     | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 75 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91cd6bf7/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d2d1e0b..02e4046 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2799,6 +2799,27 @@ class DAG(BaseDag, LoggingMixin):
             DagModel.dag_id == self.dag_id)
         return qry.value('is_paused')
 
+    @provide_session
+    def get_active_runs(self, session=None):
+        """
+        Returns a list of "running" tasks
+        :param session:
+        :return: List of execution dates
+        """
+        runs = (
+           session.query(DagRun)
+           .filter(
+           DagRun.dag_id == self.dag_id,
+           DagRun.state == State.RUNNING)
+           .order_by(DagRun.execution_date)
+           .all())
+
+        active_dates = []
+        for run in runs:
+            active_dates.append(run.execution_date)
+
+        return active_dates
+
     @property
     def latest_execution_date(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91cd6bf7/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index bb74709..62e88e5 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -948,3 +948,57 @@ class SchedulerJobTest(unittest.TestCase):
         session = settings.Session()
         self.assertEqual(
             len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
+
+    def test_dag_get_active_runs(self):
+        """
+        Test to check that a DAG returns it's active runs
+        """
+
+        now = datetime.datetime.now()
+        six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0)
+
+        START_DATE = six_hours_ago_to_the_hour
+        DAG_NAME1 = 'get_active_runs_test'
+
+        default_args = {
+            'owner': 'airflow',
+            'depends_on_past': False,
+            'start_date': START_DATE
+
+        }
+        dag1 = DAG(DAG_NAME1,
+                   schedule_interval='* * * * *',
+                   max_active_runs=1,
+                   default_args=default_args
+                   )
+
+        run_this_1 = DummyOperator(task_id='run_this_1', dag=dag1)
+        run_this_2 = DummyOperator(task_id='run_this_2', dag=dag1)
+        run_this_2.set_upstream(run_this_1)
+        run_this_3 = DummyOperator(task_id='run_this_3', dag=dag1)
+        run_this_3.set_upstream(run_this_2)
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag1.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+        session.close()
+
+        scheduler = SchedulerJob()
+        dag1.clear()
+
+        dr = scheduler.create_dag_run(dag1)
+
+        # We had better get a dag run
+        self.assertIsNotNone(dr)
+
+        execution_date = dr.execution_date
+
+        running_dates = dag1.get_active_runs()
+
+        try:
+            running_date = running_dates[0]
+        except:
+            running_date = 'Except'
+
+        self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date')