You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/07/06 15:58:08 UTC
incubator-airflow git commit: [AIRFLOW-297] support exponential
backoff option for retry delay
Repository: incubator-airflow
Updated Branches:
refs/heads/master 9a61a5bd5 -> 8b86ee6a7
[AIRFLOW-297] support exponential backoff option for retry delay
Closes #1639 from jgao54/support-retry-backoff
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8b86ee6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8b86ee6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8b86ee6a
Branch: refs/heads/master
Commit: 8b86ee6a727090acd151ed0a312b627412ddd5eb
Parents: 9a61a5b
Author: Joy Gao <jo...@wepay.com>
Authored: Wed Jul 6 08:57:47 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Wed Jul 6 08:57:57 2016 -0700
----------------------------------------------------------------------
airflow/models.py | 31 ++++++++++++++++++++++++++++---
tests/models.py | 32 ++++++++++++++++++++++++++++++++
2 files changed, 60 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8b86ee6a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index e514e9c..60de65f 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1145,13 +1145,26 @@ class TaskInstance(Base):
"{ti.execution_date} [{ti.state}]>"
).format(ti=self)
+ def next_retry_datetime(self):
+ """
+ Get datetime of the next retry if the task instance fails. For exponential
+ backoff, retry_delay is used as base and will be converted to seconds.
+ """
+ delay = self.task.retry_delay
+ if self.task.retry_exponential_backoff:
+ delay_backoff_in_seconds = delay.total_seconds() ** self.try_number
+ delay = timedelta(seconds=delay_backoff_in_seconds)
+ if self.task.max_retry_delay:
+ delay = min(self.task.max_retry_delay, delay)
+ return self.end_date + delay
+
+
def ready_for_retry(self):
"""
Checks on whether the task instance is in the right state and timeframe
to be retried.
"""
- return self.state == State.UP_FOR_RETRY and \
- self.end_date + self.task.retry_delay < datetime.now()
+ return self.state == State.UP_FOR_RETRY and self.next_retry_datetime() < datetime.now()
@provide_session
def pool_full(self, session):
@@ -1239,7 +1252,7 @@ class TaskInstance(Base):
# todo: move this to the scheduler
self.state == State.UP_FOR_RETRY and
not self.ready_for_retry()):
- next_run = (self.end_date + task.retry_delay).isoformat()
+ next_run = self.next_retry_datetime().isoformat()
logging.info(
"Not ready for retry yet. " +
"Next run after {0}".format(next_run)
@@ -1692,6 +1705,12 @@ class BaseOperator(object):
:type retries: int
:param retry_delay: delay between retries
:type retry_delay: timedelta
+ :param retry_exponential_backoff: allow progressive longer waits between
+ retries by using exponential backoff algorithm on retry delay (delay
+ will be converted into seconds)
+ :type retry_exponential_backoff: bool
+ :param max_retry_delay: maximum delay interval between retries
+ :type max_retry_delay: timedelta
:param start_date: The ``start_date`` for the task, determines
the ``execution_date`` for the first task instance. The best practice
is to have the start_date rounded
@@ -1789,6 +1808,8 @@ class BaseOperator(object):
email_on_failure=True,
retries=0,
retry_delay=timedelta(seconds=300),
+ retry_exponential_backoff=False,
+ max_retry_delay=None,
start_date=None,
end_date=None,
schedule_interval=None, # not hooked as of now
@@ -1864,6 +1885,8 @@ class BaseOperator(object):
else:
logging.debug("retry_delay isn't timedelta object, assuming secs")
self.retry_delay = timedelta(seconds=retry_delay)
+ self.retry_exponential_backoff = retry_exponential_backoff
+ self.max_retry_delay = max_retry_delay
self.params = params or {} # Available in templates!
self.adhoc = adhoc
self.priority_weight = priority_weight
@@ -1884,6 +1907,8 @@ class BaseOperator(object):
'email',
'email_on_retry',
'retry_delay',
+ 'retry_exponential_backoff',
+ 'max_retry_delay',
'start_date',
'schedule_interval',
'depends_on_past',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8b86ee6a/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 3549e8f..49b33d1 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -421,6 +421,38 @@ class TaskInstanceTest(unittest.TestCase):
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti.try_number, 4)
+ def test_next_retry_datetime(self):
+ delay = datetime.timedelta(seconds=3)
+ delay_squared = datetime.timedelta(seconds=9)
+ max_delay = datetime.timedelta(seconds=10)
+
+ dag = models.DAG(dag_id='fail_dag')
+ task = BashOperator(
+ task_id='task_with_exp_backoff_and_max_delay',
+ bash_command='exit 1',
+ retries=3,
+ retry_delay=delay,
+ retry_exponential_backoff=True,
+ max_retry_delay=max_delay,
+ dag=dag,
+ owner='airflow',
+ start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+ ti = TI(
+ task=task, execution_date=datetime.datetime.now())
+ ti.end_date = datetime.datetime.now()
+
+ ti.try_number = 1
+ dt = ti.next_retry_datetime()
+ self.assertEqual(dt, ti.end_date+delay)
+
+ ti.try_number = 2
+ dt = ti.next_retry_datetime()
+ self.assertEqual(dt, ti.end_date+delay_squared)
+
+ ti.try_number = 3
+ dt = ti.next_retry_datetime()
+ self.assertEqual(dt, ti.end_date+max_delay)
+
def test_depends_on_past(self):
dagbag = models.DagBag()
dag = dagbag.get_dag('test_depends_on_past')