You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/16 16:48:30 UTC

[3/9] incubator-airflow git commit: [AIRFLOW-759] Use previous dag_run to verify depend_on_past

[AIRFLOW-759] Use previous dag_run to verify depend_on_past

The start_date and the schedule interval can be misaligned. This
is automatically corrected in the scheduler. The dependency checker
however did not do this.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/89f0ca4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/89f0ca4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/89f0ca4a

Branch: refs/heads/v1-8-test
Commit: 89f0ca4abfa38b66d2e26788e353bfdd17772c52
Parents: 648bd4e
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Jan 14 14:31:09 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Jan 14 21:10:56 2017 +0100

----------------------------------------------------------------------
 airflow/models.py                       | 46 ++++++++++++++++------------
 airflow/ti_deps/deps/prev_dagrun_dep.py |  4 +--
 2 files changed, 28 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/89f0ca4a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index acb6667..d878457 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1027,19 +1027,22 @@ class TaskInstance(Base):
         dag = self.task.dag
         if dag:
             dr = self.get_dagrun(session=session)
+
+            # LEGACY: most likely running from unit tests
             if not dr:
                 # Means that this TI is NOT being run from a DR, but from a catchup
                 previous_scheduled_date = dag.previous_schedule(self.execution_date)
                 if not previous_scheduled_date:
                     return None
-                else:
-                    return TaskInstance(task=self.task, execution_date=previous_scheduled_date)
 
-            if dag.catchup:
-                last_dagrun = dr.get_previous_scheduled_dagrun(session=session) if dr else None
+                return TaskInstance(task=self.task,
+                                    execution_date=previous_scheduled_date)
 
+            dr.dag = dag
+            if dag.catchup:
+                last_dagrun = dr.get_previous_scheduled_dagrun(session=session)
             else:
-                last_dagrun = dr.get_previous_dagrun(session=session) if dr else None
+                last_dagrun = dr.get_previous_dagrun(session=session)
 
             if last_dagrun:
                 return last_dagrun.get_task_instance(self.task_id, session=session)
@@ -1066,16 +1069,21 @@ class TaskInstance(Base):
         :type verbose: boolean
         """
         dep_context = dep_context or DepContext()
+        failed = False
         for dep_status in self.get_failed_dep_statuses(
                 dep_context=dep_context,
                 session=session):
+            failed = True
             if verbose:
-                logging.warning(
-                    "Dependencies not met for %s, dependency '%s' FAILED: %s",
-                    self, dep_status.dep_name, dep_status.reason)
+                logging.info("Dependencies not met for {}, dependency '{}' FAILED: {}"
+                             .format(self, dep_status.dep_name, dep_status.reason))
+
+        if failed:
             return False
+
         if verbose:
-            logging.info("Dependencies all met for %s", self)
+            logging.info("Dependencies all met for {}".format(self))
+
         return True
 
     @provide_session
@@ -1089,12 +1097,14 @@ class TaskInstance(Base):
                     self,
                     session,
                     dep_context):
-                if dep_status.passed:
-                    logging.debug("%s dependency '%s' PASSED: %s",
-                                  self,
-                                  dep_status.dep_name,
-                                  dep_status.reason)
-                else:
+
+                logging.debug("{} dependency '{}' PASSED: {}, {}"
+                              .format(self,
+                                      dep_status.dep_name,
+                                      dep_status.passed,
+                                      dep_status.reason))
+
+                if not dep_status.passed:
                     yield dep_status
 
     def __repr__(self):
@@ -3882,13 +3892,11 @@ class DagRun(Base):
     @provide_session
     def get_previous_scheduled_dagrun(self, session=None):
         """The previous, SCHEDULED DagRun, if there is one"""
-
-        if not self.dag:
-            return None
+        dag = self.get_dag()
 
         return session.query(DagRun).filter(
             DagRun.dag_id == self.dag_id,
-            DagRun.execution_date == self.dag.previous_schedule(self.execution_date)
+            DagRun.execution_date == dag.previous_schedule(self.execution_date)
         ).first()
 
     @provide_session

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/89f0ca4a/airflow/ti_deps/deps/prev_dagrun_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/prev_dagrun_dep.py b/airflow/ti_deps/deps/prev_dagrun_dep.py
index 2fce704..7d4baa8 100644
--- a/airflow/ti_deps/deps/prev_dagrun_dep.py
+++ b/airflow/ti_deps/deps/prev_dagrun_dep.py
@@ -41,13 +41,11 @@ class PrevDagrunDep(BaseTIDep):
         # Don't depend on the previous task instance if we are the first task
         dag = ti.task.dag
         if dag.catchup:
-            if ti.execution_date == ti.task.start_date:
+            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
                 yield self._passing_status(
                     reason="This task instance was the first task instance for its task.")
                 raise StopIteration
-
         else:
-
             dr = ti.get_dagrun()
             last_dagrun = dr.get_previous_dagrun() if dr else None