You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/03/13 02:49:25 UTC
[21/28] incubator-airflow git commit:
AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed
when backfilling[
AIRFLOW-932][AIRFLOW-932][AIRFLOW-921][AIRFLOW-910] Do not mark tasks removed when backfilling[
In a backfill one can specify a specific task to
execute. We
create a subset of the orginal tasks in a subdag
from the original dag.
The subdag has the same name as the original dag.
This breaks
the integrity check of a dag_run as tasks are
suddenly not in
scope any more.
Closes #2122 from bolkedebruin/AIRFLOW-921
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a8f2c27e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a8f2c27e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a8f2c27e
Branch: refs/heads/v1-8-test
Commit: a8f2c27ed44449e6611c7c4a9ec8cf2371cf0987
Parents: dacc69a
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Mar 11 10:52:07 2017 -0800
Committer: Bolke de Bruin <bo...@Bolkes-MacBook-Pro.local>
Committed: Sun Mar 12 08:34:22 2017 -0700
----------------------------------------------------------------------
airflow/jobs.py | 1 +
airflow/models.py | 12 +++++++++++-
tests/jobs.py | 49 +++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 36548c2..c61b229 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1803,6 +1803,7 @@ class BackfillJob(BaseJob):
# explictely mark running as we can fill gaps
run.state = State.RUNNING
+ run.run_id = run_id
run.verify_integrity(session=session)
# check if we have orphaned tasks
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index e63da3e..32c52ac 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2681,6 +2681,8 @@ class DAG(BaseDag, LoggingMixin):
self.orientation = orientation
self.catchup = catchup
+ self.partial = False
+
self._comps = {
'dag_id',
'task_ids',
@@ -3186,6 +3188,10 @@ class DAG(BaseDag, LoggingMixin):
tid for tid in t._upstream_task_ids if tid in dag.task_ids]
t._downstream_task_ids = [
tid for tid in t._downstream_task_ids if tid in dag.task_ids]
+
+ if len(dag.tasks) < len(self.tasks):
+ dag.partial = True
+
return dag
def has_task(self, task_id):
@@ -3946,6 +3952,9 @@ class DagRun(Base):
else:
tis = tis.filter(TI.state.in_(state))
+ if self.dag and self.dag.partial:
+ tis = tis.filter(TI.task_id.in_(self.dag.task_ids))
+
return tis.all()
@provide_session
@@ -4006,6 +4015,7 @@ class DagRun(Base):
"""
dag = self.get_dag()
+
tis = self.get_task_instances(session=session)
logging.info("Updating state for {} considering {} task(s)"
@@ -4090,7 +4100,7 @@ class DagRun(Base):
try:
dag.get_task(ti.task_id)
except AirflowException:
- if self.state is not State.RUNNING:
+ if self.state is not State.RUNNING and not dag.partial:
ti.state = State.REMOVED
# check for missing tasks
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a8f2c27e/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 1acf269..d208fd4 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -42,6 +42,8 @@ from tests.executor.test_executor import TestExecutor
from airflow import configuration
configuration.load_test_config()
+import sqlalchemy
+
try:
from unittest import mock
except ImportError:
@@ -294,6 +296,53 @@ class BackfillJobTest(unittest.TestCase):
self.assertEqual(ti.state, State.SUCCESS)
dag.clear()
+ def test_sub_set_subdag(self):
+ dag = DAG(
+ 'test_sub_set_subdag',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = DummyOperator(task_id='leave1')
+ op2 = DummyOperator(task_id='leave2')
+ op3 = DummyOperator(task_id='upstream_level_1')
+ op4 = DummyOperator(task_id='upstream_level_2')
+ op5 = DummyOperator(task_id='upstream_level_3')
+ # order randomly
+ op2.set_downstream(op3)
+ op1.set_downstream(op3)
+ op4.set_downstream(op5)
+ op3.set_downstream(op4)
+
+ dag.clear()
+ dr = dag.create_dagrun(run_id="test",
+ state=State.SUCCESS,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE)
+
+ executor = TestExecutor(do_update=True)
+ sub_dag = dag.sub_dag(task_regex="leave*",
+ include_downstream=False,
+ include_upstream=False)
+ job = BackfillJob(dag=sub_dag,
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE,
+ executor=executor)
+ job.run()
+
+ self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
+ # the run_id should have changed, so a refresh won't work
+ drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
+ dr = drs[0]
+
+ self.assertEqual(BackfillJob.ID_FORMAT_PREFIX.format(DEFAULT_DATE.isoformat()),
+ dr.run_id)
+ for ti in dr.get_task_instances():
+ if ti.task_id == 'leave1' or ti.task_id == 'leave2':
+ self.assertEqual(State.SUCCESS, ti.state)
+ else:
+ self.assertEqual(State.NONE, ti.state)
+
class SchedulerJobTest(unittest.TestCase):
# These defaults make the test faster to run