You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/05/13 17:34:29 UTC
incubator-airflow git commit: Fix : Don't treat premature tasks as
could_not_run tasks
Repository: incubator-airflow
Updated Branches:
refs/heads/airbnb_rb1.7.1_3 4a5f4a0ba -> 563be1324
Fix : Don't treat premature tasks as could_not_run tasks
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/563be132
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/563be132
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/563be132
Branch: refs/heads/airbnb_rb1.7.1_3
Commit: 563be1324d982f6033e7d087f41e070805aefa21
Parents: 4a5f4a0
Author: Siddharth Anand <sa...@agari.com>
Authored: Thu May 12 03:37:51 2016 +0000
Committer: Dan Davydov <da...@airbnb.com>
Committed: Fri May 13 10:34:21 2016 -0700
----------------------------------------------------------------------
airflow/jobs.py | 3 +++
airflow/models.py | 11 ++++++++++-
2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/563be132/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 8a38086..cbd536f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -525,7 +525,10 @@ class SchedulerJob(BaseJob):
elif ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Queuing task: {}'.format(ti))
queue.put((ti.key, pickle_id))
+ elif ti.is_premature():
+ continue
else:
+ self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti))
could_not_run.add(ti)
# this type of deadlock happens when dagruns can't even start and so
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/563be132/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 24ee0f5..3bad273 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -856,7 +856,7 @@ class TaskInstance(Base):
if self.execution_date > datetime.now():
return False
# is the task still in the retry waiting period?
- elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
+ elif self.is_premature():
return False
# does the task have an end_date prior to the execution date?
elif self.task.end_date and self.execution_date > self.task.end_date:
@@ -878,6 +878,15 @@ class TaskInstance(Base):
else:
return False
+
+ def is_premature(self):
+ """
+ Returns whether a task is in UP_FOR_RETRY state and its retry interval
+ has elapsed.
+ """
+ # is the task still in the retry waiting period?
+ return self.state == State.UP_FOR_RETRY and not self.ready_for_retry()
+
def is_runnable(
self,
include_queued=False,