You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 06:27:59 UTC
incubator-airflow git commit: [AIRFLOW-1011] Fix bug in
BackfillJob._execute() for SubDAGs
Repository: incubator-airflow
Updated Branches:
refs/heads/master 75addb4a9 -> 56501e606
[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs
BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.
This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.
Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/56501e60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/56501e60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/56501e60
Branch: refs/heads/master
Commit: 56501e6062df9456f7ac4efe94e21940734dd5bc
Parents: 75addb4
Author: Joe Schmid <js...@symphonyrm.com>
Authored: Tue Apr 4 08:27:45 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 08:27:45 2017 +0200
----------------------------------------------------------------------
airflow/jobs.py | 7 +++++--
airflow/models.py | 1 +
tests/jobs.py | 28 ++++++++++++++++++++++++++++
3 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 006a180..b5c2d5d 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob):
# consider max_active_runs but ignore when running subdags
# "parent.child" as a dag_id is by convention a subdag
- if self.dag.schedule_interval and "." not in self.dag.dag_id:
+ if self.dag.schedule_interval and not self.dag.is_subdag:
active_runs = DagRun.find(
dag_id=self.dag.dag_id,
state=State.RUNNING,
@@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob):
# create dag runs
dr_start_date = start_date or min([t.start_date for t in self.dag.tasks])
- next_run_date = self.dag.normalize_schedule(dr_start_date)
end_date = end_date or datetime.now()
+ # next run date for a subdag isn't relevant (schedule_interval for subdags
+ # is ignored) so we use the dag run's start date in the case of a subdag
+ next_run_date = (self.dag.normalize_schedule(dr_start_date)
+ if not self.dag.is_subdag else dr_start_date)
active_dag_runs = []
while next_run_date and next_run_date <= end_date:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 9d560fb..5835578 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2716,6 +2716,7 @@ class DAG(BaseDag, LoggingMixin):
self.default_view = default_view
self.orientation = orientation
self.catchup = catchup
+ self.is_subdag = False # DagBag.bag_dag() will set this to True if appropriate
self.partial = False
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index c1d6790..3eb407b 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase):
else:
self.assertEqual(State.NONE, ti.state)
+ def test_backfill_execute_subdag(self):
+ dag = self.dagbag.get_dag('example_subdag_operator')
+ subdag_op_task = dag.get_task('section-1')
+
+ subdag = subdag_op_task.subdag
+ subdag.schedule_interval = '@daily'
+
+ start_date = datetime.datetime.now()
+ executor = TestExecutor(do_update=True)
+ job = BackfillJob(dag=subdag,
+ start_date=start_date,
+ end_date=start_date,
+ executor=executor,
+ donot_pickle=True)
+ job.run()
+
+ history = executor.history
+ subdag_history = history[0]
+
+ # check that all 5 task instances of the subdag 'section-1' were executed
+ self.assertEqual(5, len(subdag_history))
+ for sdh in subdag_history:
+ ti = sdh[3]
+ self.assertIn('section-1-task-', ti.task_id)
+
+ subdag.clear()
+ dag.clear()
+
class LocalTaskJobTest(unittest.TestCase):
def setUp(self):