You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 06:27:59 UTC

incubator-airflow git commit: [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 75addb4a9 -> 56501e606


[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/56501e60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/56501e60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/56501e60

Branch: refs/heads/master
Commit: 56501e6062df9456f7ac4efe94e21940734dd5bc
Parents: 75addb4
Author: Joe Schmid <js...@symphonyrm.com>
Authored: Tue Apr 4 08:27:45 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 08:27:45 2017 +0200

----------------------------------------------------------------------
 airflow/jobs.py   |  7 +++++--
 airflow/models.py |  1 +
 tests/jobs.py     | 28 ++++++++++++++++++++++++++++
 3 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 006a180..b5c2d5d 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob):
 
         # consider max_active_runs but ignore when running subdags
         # "parent.child" as a dag_id is by convention a subdag
-        if self.dag.schedule_interval and "." not in self.dag.dag_id:
+        if self.dag.schedule_interval and not self.dag.is_subdag:
             active_runs = DagRun.find(
                 dag_id=self.dag.dag_id,
                 state=State.RUNNING,
@@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob):
 
         # create dag runs
         dr_start_date = start_date or min([t.start_date for t in self.dag.tasks])
-        next_run_date = self.dag.normalize_schedule(dr_start_date)
         end_date = end_date or datetime.now()
+        # next run date for a subdag isn't relevant (schedule_interval for subdags
+        # is ignored) so we use the dag run's start date in the case of a subdag
+        next_run_date = (self.dag.normalize_schedule(dr_start_date)
+                         if not self.dag.is_subdag else dr_start_date)
 
         active_dag_runs = []
         while next_run_date and next_run_date <= end_date:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 9d560fb..5835578 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2716,6 +2716,7 @@ class DAG(BaseDag, LoggingMixin):
         self.default_view = default_view
         self.orientation = orientation
         self.catchup = catchup
+        self.is_subdag = False  # DagBag.bag_dag() will set this to True if appropriate
 
         self.partial = False
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index c1d6790..3eb407b 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase):
             else:
                 self.assertEqual(State.NONE, ti.state)
 
+    def test_backfill_execute_subdag(self):
+        dag = self.dagbag.get_dag('example_subdag_operator')
+        subdag_op_task = dag.get_task('section-1')
+
+        subdag = subdag_op_task.subdag
+        subdag.schedule_interval = '@daily'
+
+        start_date = datetime.datetime.now()
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=subdag,
+                          start_date=start_date,
+                          end_date=start_date,
+                          executor=executor,
+                          donot_pickle=True)
+        job.run()
+
+        history = executor.history
+        subdag_history = history[0]
+
+        # check that all 5 task instances of the subdag 'section-1' were executed
+        self.assertEqual(5, len(subdag_history))
+        for sdh in subdag_history:
+            ti = sdh[3]
+            self.assertIn('section-1-task-', ti.task_id)
+
+        subdag.clear()
+        dag.clear()
+
 
 class LocalTaskJobTest(unittest.TestCase):
     def setUp(self):