You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/16 16:48:30 UTC
[3/9] incubator-airflow git commit: [AIRFLOW-759] Use previous
dag_run to verify depend_on_past
[AIRFLOW-759] Use previous dag_run to verify depend_on_past
The start_date and the schedule interval can be misaligned. This
is automatically corrected in the scheduler. The dependency checker
however did not do this.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/89f0ca4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/89f0ca4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/89f0ca4a
Branch: refs/heads/v1-8-test
Commit: 89f0ca4abfa38b66d2e26788e353bfdd17772c52
Parents: 648bd4e
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Jan 14 14:31:09 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Jan 14 21:10:56 2017 +0100
----------------------------------------------------------------------
airflow/models.py | 46 ++++++++++++++++------------
airflow/ti_deps/deps/prev_dagrun_dep.py | 4 +--
2 files changed, 28 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/89f0ca4a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index acb6667..d878457 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1027,19 +1027,22 @@ class TaskInstance(Base):
dag = self.task.dag
if dag:
dr = self.get_dagrun(session=session)
+
+ # LEGACY: most likely running from unit tests
if not dr:
# Means that this TI is NOT being run from a DR, but from a catchup
previous_scheduled_date = dag.previous_schedule(self.execution_date)
if not previous_scheduled_date:
return None
- else:
- return TaskInstance(task=self.task, execution_date=previous_scheduled_date)
- if dag.catchup:
- last_dagrun = dr.get_previous_scheduled_dagrun(session=session) if dr else None
+ return TaskInstance(task=self.task,
+ execution_date=previous_scheduled_date)
+ dr.dag = dag
+ if dag.catchup:
+ last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
else:
- last_dagrun = dr.get_previous_dagrun(session=session) if dr else None
+ last_dagrun = dr.get_previous_dagrun(session=session)
if last_dagrun:
return last_dagrun.get_task_instance(self.task_id, session=session)
@@ -1066,16 +1069,21 @@ class TaskInstance(Base):
:type verbose: boolean
"""
dep_context = dep_context or DepContext()
+ failed = False
for dep_status in self.get_failed_dep_statuses(
dep_context=dep_context,
session=session):
+ failed = True
if verbose:
- logging.warning(
- "Dependencies not met for %s, dependency '%s' FAILED: %s",
- self, dep_status.dep_name, dep_status.reason)
+ logging.info("Dependencies not met for {}, dependency '{}' FAILED: {}"
+ .format(self, dep_status.dep_name, dep_status.reason))
+
+ if failed:
return False
+
if verbose:
- logging.info("Dependencies all met for %s", self)
+ logging.info("Dependencies all met for {}".format(self))
+
return True
@provide_session
@@ -1089,12 +1097,14 @@ class TaskInstance(Base):
self,
session,
dep_context):
- if dep_status.passed:
- logging.debug("%s dependency '%s' PASSED: %s",
- self,
- dep_status.dep_name,
- dep_status.reason)
- else:
+
+ logging.debug("{} dependency '{}' PASSED: {}, {}"
+ .format(self,
+ dep_status.dep_name,
+ dep_status.passed,
+ dep_status.reason))
+
+ if not dep_status.passed:
yield dep_status
def __repr__(self):
@@ -3882,13 +3892,11 @@ class DagRun(Base):
@provide_session
def get_previous_scheduled_dagrun(self, session=None):
"""The previous, SCHEDULED DagRun, if there is one"""
-
- if not self.dag:
- return None
+ dag = self.get_dag()
return session.query(DagRun).filter(
DagRun.dag_id == self.dag_id,
- DagRun.execution_date == self.dag.previous_schedule(self.execution_date)
+ DagRun.execution_date == dag.previous_schedule(self.execution_date)
).first()
@provide_session
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/89f0ca4a/airflow/ti_deps/deps/prev_dagrun_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py
index 2fce704..7d4baa8 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -41,13 +41,11 @@ class PrevDagrunDep(BaseTIDep):
# Don't depend on the previous task instance if we are the first task
dag = ti.task.dag
if dag.catchup:
- if ti.execution_date == ti.task.start_date:
+ if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
yield self._passing_status(
reason="This task instance was the first task instance for its task.")
raise StopIteration
-
else:
-
dr = ti.get_dagrun()
last_dagrun = dr.get_previous_dagrun() if dr else None