You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/05/13 17:34:29 UTC

incubator-airflow git commit: Fix : Don't treat premature tasks as could_not_run tasks

Repository: incubator-airflow
Updated Branches:
  refs/heads/airbnb_rb1.7.1_3 4a5f4a0ba -> 563be1324


Fix : Don't treat premature tasks as could_not_run tasks


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/563be132
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/563be132
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/563be132

Branch: refs/heads/airbnb_rb1.7.1_3
Commit: 563be1324d982f6033e7d087f41e070805aefa21
Parents: 4a5f4a0
Author: Siddharth Anand <sa...@agari.com>
Authored: Thu May 12 03:37:51 2016 +0000
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri May 13 10:34:21 2016 -0700

----------------------------------------------------------------------
 airflow/jobs.py   |  3 +++
 airflow/models.py | 11 ++++++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/563be132/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8a38086..cbd536f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -525,7 +525,10 @@ class SchedulerJob(BaseJob):
             elif ti.is_runnable(flag_upstream_failed=True):
                 self.logger.debug('Queuing task: {}'.format(ti))
                 queue.put((ti.key, pickle_id))
+            elif ti.is_premature():
+                continue
             else:
+                self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti))
                 could_not_run.add(ti)
 
         # this type of deadlock happens when dagruns can't even start and so

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/563be132/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 24ee0f5..3bad273 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -856,7 +856,7 @@ class TaskInstance(Base):
         if self.execution_date > datetime.now():
             return False
         # is the task still in the retry waiting period?
-        elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
+        elif self.is_premature():
             return False
         # does the task have an end_date prior to the execution date?
         elif self.task.end_date and self.execution_date > self.task.end_date:
@@ -878,6 +878,15 @@ class TaskInstance(Base):
         else:
             return False
 
+
+    def is_premature(self):
+        """
+        Returns whether a task is in UP_FOR_RETRY state and its retry interval
+        has elapsed.
+        """
+        # is the task still in the retry waiting period?
+        return self.state == State.UP_FOR_RETRY and not self.ready_for_retry()
+
     def is_runnable(
             self,
             include_queued=False,