You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/11 18:52:16 UTC

incubator-airflow git commit: AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9ad144657 -> d7d9f883e


AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[

In a backfill one can specify a specific task to
execute. We
create a subset of the orginal tasks in a subdag
from the original dag.
The subdag has the same name as the original dag.
This breaks
the integrity check of a dag_run as tasks are
suddenly not in
scope any more.

Closes #2122 from bolkedebruin/AIRFLOW-921


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d7d9f883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d7d9f883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d7d9f883

Branch: refs/heads/master
Commit: d7d9f883e4db3d864eb88ce510988456e8d468c4
Parents: 9ad1446
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 10:52:07 2017 -0800
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Mar 11 10:52:07 2017 -0800

----------------------------------------------------------------------
 airflow/jobs.py   |  1 +
 airflow/models.py | 12 +++++++++++-
 tests/jobs.py     | 49 +++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d7d9f883/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f05104a..c7db99f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1803,6 +1803,7 @@ class BackfillJob(BaseJob):
 
             # explictely mark running as we can fill gaps
             run.state = State.RUNNING
+            run.run_id = run_id
             run.verify_integrity(session=session)
 
             # check if we have orphaned tasks

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d7d9f883/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 1f1667e..ed483f5 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2696,6 +2696,8 @@ class DAG(BaseDag, LoggingMixin):
         self.orientation = orientation
         self.catchup = catchup
 
+        self.partial = False
+
         self._comps = {
             'dag_id',
             'task_ids',
@@ -3201,6 +3203,10 @@ class DAG(BaseDag, LoggingMixin):
                 tid for tid in t._upstream_task_ids if tid in dag.task_ids]
             t._downstream_task_ids = [
                 tid for tid in t._downstream_task_ids if tid in dag.task_ids]
+
+        if len(dag.tasks) < len(self.tasks):
+            dag.partial = True
+
         return dag
 
     def has_task(self, task_id):
@@ -3969,6 +3975,9 @@ class DagRun(Base):
                 else:
                     tis = tis.filter(TI.state.in_(state))
 
+        if self.dag and self.dag.partial:
+            tis = tis.filter(TI.task_id.in_(self.dag.task_ids))
+
         return tis.all()
 
     @provide_session
@@ -4029,6 +4038,7 @@ class DagRun(Base):
         """
 
         dag = self.get_dag()
+
         tis = self.get_task_instances(session=session)
 
         logging.info("Updating state for {} considering {} task(s)"
@@ -4113,7 +4123,7 @@ class DagRun(Base):
             try:
                 dag.get_task(ti.task_id)
             except AirflowException:
-                if self.state is not State.RUNNING:
+                if self.state is not State.RUNNING and not dag.partial:
                     ti.state = State.REMOVED
 
         # check for missing tasks

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d7d9f883/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 00d7829..2dc9108 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -42,6 +42,8 @@ from tests.executors.test_executor import TestExecutor
 from airflow import configuration
 configuration.load_test_config()
 
+import sqlalchemy
+
 try:
     from unittest import mock
 except ImportError:
@@ -294,6 +296,53 @@ class BackfillJobTest(unittest.TestCase):
         self.assertEqual(ti.state, State.SUCCESS)
         dag.clear()
 
+    def test_sub_set_subdag(self):
+        dag = DAG(
+            'test_sub_set_subdag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='leave1')
+            op2 = DummyOperator(task_id='leave2')
+            op3 = DummyOperator(task_id='upstream_level_1')
+            op4 = DummyOperator(task_id='upstream_level_2')
+            op5 = DummyOperator(task_id='upstream_level_3')
+            # order randomly
+            op2.set_downstream(op3)
+            op1.set_downstream(op3)
+            op4.set_downstream(op5)
+            op3.set_downstream(op4)
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id="test",
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE)
+
+        executor = TestExecutor(do_update=True)
+        sub_dag = dag.sub_dag(task_regex="leave*",
+                              include_downstream=False,
+                              include_upstream=False)
+        job = BackfillJob(dag=sub_dag,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE,
+                          executor=executor)
+        job.run()
+
+        self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
+        # the run_id should have changed, so a refresh won't work
+        drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
+        dr = drs[0]
+
+        self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()),
+                         dr.run_id)
+        for ti in dr.get_task_instances():
+            if ti.task_id == 'leave1' or ti.task_id == 'leave2':
+                self.assertEqual(State.SUCCESS, ti.state)
+            else:
+                self.assertEqual(State.NONE, ti.state)
+
 
 class SchedulerJobTest(unittest.TestCase):
     # These defaults make the test faster to run