You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 02:49:25 UTC

[21/28] incubator-airflow git commit: AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[

AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[

In a backfill one can specify a specific task to
execute. We
create a subset of the orginal tasks in a subdag
from the original dag.
The subdag has the same name as the original dag.
This breaks
the integrity check of a dag_run as tasks are
suddenly not in
scope any more.

Closes #2122 from bolkedebruin/AIRFLOW-921


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a8f2c27e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a8f2c27e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a8f2c27e

Branch: refs/heads/v1-8-test
Commit: a8f2c27ed44449e6611c7c4a9ec8cf2371cf0987
Parents: dacc69a
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 10:52:07 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:34:22 2017 -0700

----------------------------------------------------------------------
 airflow/jobs.py   |  1 +
 airflow/models.py | 12 +++++++++++-
 tests/jobs.py     | 49 +++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 36548c2..c61b229 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1803,6 +1803,7 @@ class BackfillJob(BaseJob):
 
             # explictely mark running as we can fill gaps
             run.state = State.RUNNING
+            run.run_id = run_id
             run.verify_integrity(session=session)
 
             # check if we have orphaned tasks

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index e63da3e..32c52ac 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2681,6 +2681,8 @@ class DAG(BaseDag, LoggingMixin):
         self.orientation = orientation
         self.catchup = catchup
 
+        self.partial = False
+
         self._comps = {
             'dag_id',
             'task_ids',
@@ -3186,6 +3188,10 @@ class DAG(BaseDag, LoggingMixin):
                 tid for tid in t._upstream_task_ids if tid in dag.task_ids]
             t._downstream_task_ids = [
                 tid for tid in t._downstream_task_ids if tid in dag.task_ids]
+
+        if len(dag.tasks) < len(self.tasks):
+            dag.partial = True
+
         return dag
 
     def has_task(self, task_id):
@@ -3946,6 +3952,9 @@ class DagRun(Base):
                 else:
                     tis = tis.filter(TI.state.in_(state))
 
+        if self.dag and self.dag.partial:
+            tis = tis.filter(TI.task_id.in_(self.dag.task_ids))
+
         return tis.all()
 
     @provide_session
@@ -4006,6 +4015,7 @@ class DagRun(Base):
         """
 
         dag = self.get_dag()
+
         tis = self.get_task_instances(session=session)
 
         logging.info("Updating state for {} considering {} task(s)"
@@ -4090,7 +4100,7 @@ class DagRun(Base):
             try:
                 dag.get_task(ti.task_id)
             except AirflowException:
-                if self.state is not State.RUNNING:
+                if self.state is not State.RUNNING and not dag.partial:
                     ti.state = State.REMOVED
 
         # check for missing tasks

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 1acf269..d208fd4 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -42,6 +42,8 @@ from tests.executor.test_executor import TestExecutor
 from airflow import configuration
 configuration.load_test_config()
 
+import sqlalchemy
+
 try:
     from unittest import mock
 except ImportError:
@@ -294,6 +296,53 @@ class BackfillJobTest(unittest.TestCase):
         self.assertEqual(ti.state, State.SUCCESS)
         dag.clear()
 
+    def test_sub_set_subdag(self):
+        dag = DAG(
+            'test_sub_set_subdag',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='leave1')
+            op2 = DummyOperator(task_id='leave2')
+            op3 = DummyOperator(task_id='upstream_level_1')
+            op4 = DummyOperator(task_id='upstream_level_2')
+            op5 = DummyOperator(task_id='upstream_level_3')
+            # order randomly
+            op2.set_downstream(op3)
+            op1.set_downstream(op3)
+            op4.set_downstream(op5)
+            op3.set_downstream(op4)
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id="test",
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE)
+
+        executor = TestExecutor(do_update=True)
+        sub_dag = dag.sub_dag(task_regex="leave*",
+                              include_downstream=False,
+                              include_upstream=False)
+        job = BackfillJob(dag=sub_dag,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE,
+                          executor=executor)
+        job.run()
+
+        self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
+        # the run_id should have changed, so a refresh won't work
+        drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
+        dr = drs[0]
+
+        self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()),
+                         dr.run_id)
+        for ti in dr.get_task_instances():
+            if ti.task_id == 'leave1' or ti.task_id == 'leave2':
+                self.assertEqual(State.SUCCESS, ti.state)
+            else:
+                self.assertEqual(State.NONE, ti.state)
+
 
 class SchedulerJobTest(unittest.TestCase):
     # These defaults make the test faster to run