You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/07/06 15:58:08 UTC

incubator-airflow git commit: [AIRFLOW-297] support exponential backoff option for retry delay

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9a61a5bd5 -> 8b86ee6a7


[AIRFLOW-297] support exponential backoff option for retry delay

Closes #1639 from jgao54/support-retry-backoff


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8b86ee6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8b86ee6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8b86ee6a

Branch: refs/heads/master
Commit: 8b86ee6a727090acd151ed0a312b627412ddd5eb
Parents: 9a61a5b
Author: Joy Gao <jo...@wepay.com>
Authored: Wed Jul 6 08:57:47 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Wed Jul 6 08:57:57 2016 -0700

----------------------------------------------------------------------
 airflow/models.py | 31 ++++++++++++++++++++++++++++---
 tests/models.py   | 32 ++++++++++++++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8b86ee6a/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index e514e9c..60de65f 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1145,13 +1145,26 @@ class TaskInstance(Base):
             "{ti.execution_date} [{ti.state}]>"
         ).format(ti=self)
 
+    def next_retry_datetime(self):
+        """
+        Get datetime of the next retry if the task instance fails. For exponential
+        backoff, retry_delay is used as base and will be converted to seconds.
+        """
+        delay = self.task.retry_delay
+        if self.task.retry_exponential_backoff:
+            delay_backoff_in_seconds = delay.total_seconds() ** self.try_number
+            delay = timedelta(seconds=delay_backoff_in_seconds)
+            if self.task.max_retry_delay:
+                delay = min(self.task.max_retry_delay, delay)
+        return self.end_date + delay
+
+
     def ready_for_retry(self):
         """
         Checks on whether the task instance is in the right state and timeframe
         to be retried.
         """
-        return self.state == State.UP_FOR_RETRY and \
-            self.end_date + self.task.retry_delay < datetime.now()
+        return self.state == State.UP_FOR_RETRY and self.next_retry_datetime() < datetime.now()
 
     @provide_session
     def pool_full(self, session):
@@ -1239,7 +1252,7 @@ class TaskInstance(Base):
             # todo: move this to the scheduler
                 self.state == State.UP_FOR_RETRY and
                 not self.ready_for_retry()):
-            next_run = (self.end_date + task.retry_delay).isoformat()
+            next_run = self.next_retry_datetime().isoformat()
             logging.info(
                 "Not ready for retry yet. " +
                 "Next run after {0}".format(next_run)
@@ -1692,6 +1705,12 @@ class BaseOperator(object):
     :type retries: int
     :param retry_delay: delay between retries
     :type retry_delay: timedelta
+    :param retry_exponential_backoff: allow progressive longer waits between
+        retries by using exponential backoff algorithm on retry delay (delay
+        will be converted into seconds)
+    :type retry_exponential_backoff: bool
+    :param max_retry_delay: maximum delay interval between retries
+    :type max_retry_delay: timedelta
     :param start_date: The ``start_date`` for the task, determines
         the ``execution_date`` for the first task instance. The best practice
         is to have the start_date rounded
@@ -1789,6 +1808,8 @@ class BaseOperator(object):
             email_on_failure=True,
             retries=0,
             retry_delay=timedelta(seconds=300),
+            retry_exponential_backoff=False,
+            max_retry_delay=None,
             start_date=None,
             end_date=None,
             schedule_interval=None,  # not hooked as of now
@@ -1864,6 +1885,8 @@ class BaseOperator(object):
         else:
             logging.debug("retry_delay isn't timedelta object, assuming secs")
             self.retry_delay = timedelta(seconds=retry_delay)
+        self.retry_exponential_backoff = retry_exponential_backoff
+        self.max_retry_delay = max_retry_delay
         self.params = params or {}  # Available in templates!
         self.adhoc = adhoc
         self.priority_weight = priority_weight
@@ -1884,6 +1907,8 @@ class BaseOperator(object):
             'email',
             'email_on_retry',
             'retry_delay',
+            'retry_exponential_backoff',
+            'max_retry_delay',
             'start_date',
             'schedule_interval',
             'depends_on_past',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8b86ee6a/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 3549e8f..49b33d1 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -421,6 +421,38 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEqual(ti.state, State.FAILED)
         self.assertEqual(ti.try_number, 4)
 
+    def test_next_retry_datetime(self):
+        delay = datetime.timedelta(seconds=3)
+        delay_squared = datetime.timedelta(seconds=9)
+        max_delay = datetime.timedelta(seconds=10)
+
+        dag = models.DAG(dag_id='fail_dag')
+        task = BashOperator(
+            task_id='task_with_exp_backoff_and_max_delay',
+            bash_command='exit 1',
+            retries=3,
+            retry_delay=delay,
+            retry_exponential_backoff=True,
+            max_retry_delay=max_delay,
+            dag=dag,
+            owner='airflow',
+            start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
+        ti = TI(
+            task=task, execution_date=datetime.datetime.now())
+        ti.end_date = datetime.datetime.now()
+
+        ti.try_number = 1
+        dt = ti.next_retry_datetime()
+        self.assertEqual(dt, ti.end_date+delay)
+
+        ti.try_number = 2
+        dt = ti.next_retry_datetime()
+        self.assertEqual(dt, ti.end_date+delay_squared)
+
+        ti.try_number = 3
+        dt = ti.next_retry_datetime()
+        self.assertEqual(dt, ti.end_date+max_delay)
+
     def test_depends_on_past(self):
         dagbag = models.DagBag()
         dag = dagbag.get_dag('test_depends_on_past')