You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/29 15:12:05 UTC

incubator-airflow git commit: [AIRFLOW-1036] Randomize exponential backoff

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 2fa6905f4 -> 66168efa1


[AIRFLOW-1036] Randomize exponential backoff

This prevents the thundering herd problem. Using a
combination of
dag_run, task_id, and execution_date makes this
random with respect to
task instances, while still being deterministic
across machines. The
retry delay is within a range that doubles in
size.

Closes #2262 from saguziel/aguziel-random-
exponential-backoff


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/66168efa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/66168efa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/66168efa

Branch: refs/heads/master
Commit: 66168efa12de98a9c29b20e5cea28c7e34a2d90a
Parents: 2fa6905
Author: Alex Guziel <al...@airbnb.com>
Authored: Sat Apr 29 17:11:58 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Apr 29 17:11:58 2017 +0200

----------------------------------------------------------------------
 airflow/models.py |  8 +++++++-
 tests/models.py   | 15 +++++++++++----
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66168efa/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 51beab8..d2f7894 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1183,13 +1183,19 @@ class TaskInstance(Base):
         """
         delay = self.task.retry_delay
         if self.task.retry_exponential_backoff:
+            min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2)))
+            # deterministic per task instance
+            hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id,
+                self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16)
+            # between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number)
+            modded_hash = min_backoff + hash % min_backoff
             # timedelta has a maximum representable value. The exponentiation
             # here means this value can be exceeded after a certain number
             # of tries (around 50 if the initial delay is 1s, even fewer if
             # the delay is larger). Cap the value here before creating a
             # timedelta object so the operation doesn't fail.
             delay_backoff_in_seconds = min(
-                delay.total_seconds() * (2 ** (self.try_number - 1)),
+                modded_hash,
                 timedelta.max.total_seconds() - 1
             )
             delay = timedelta(seconds=delay_backoff_in_seconds)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66168efa/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 2180896..49e5c75 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -838,18 +838,25 @@ class TaskInstanceTest(unittest.TestCase):
             owner='airflow',
             start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
         ti = TI(
-            task=task, execution_date=datetime.datetime.now())
+            task=task, execution_date=DEFAULT_DATE)
         ti.end_date = datetime.datetime.now()
 
         ti.try_number = 1
         dt = ti.next_retry_datetime()
-        self.assertEqual(dt, ti.end_date + delay)
+        # between 30 * 2^0.5 and 30 * 2^1 (15 and 30)
+        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0))
+
+        ti.try_number = 4
+        dt = ti.next_retry_datetime()
+        # between 30 * 2^2 and 30 * 2^3 (120 and 240)
+        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0))
 
         ti.try_number = 6
         dt = ti.next_retry_datetime()
-        self.assertEqual(dt, ti.end_date + (2 ** 5) * delay)
+        # between 30 * 2^4 and 30 * 2^5 (480 and 960)
+        self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0))
 
-        ti.try_number = 8
+        ti.try_number = 9
         dt = ti.next_retry_datetime()
         self.assertEqual(dt, ti.end_date+max_delay)