You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/29 15:12:05 UTC
incubator-airflow git commit: [AIRFLOW-1036] Randomize exponential
backoff
Repository: incubator-airflow
Updated Branches:
refs/heads/master 2fa6905f4 -> 66168efa1
[AIRFLOW-1036] Randomize exponential backoff
This prevents the thundering herd problem. Using a
combination of
dag_run, task_id, and execution_date makes this
random with respect to
task instances, while still being deterministic
across machines. The
retry delay is within a range that doubles in
size.
Closes #2262 from saguziel/aguziel-random-
exponential-backoff
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/66168efa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/66168efa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/66168efa
Branch: refs/heads/master
Commit: 66168efa12de98a9c29b20e5cea28c7e34a2d90a
Parents: 2fa6905
Author: Alex Guziel <al...@airbnb.com>
Authored: Sat Apr 29 17:11:58 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat Apr 29 17:11:58 2017 +0200
----------------------------------------------------------------------
airflow/models.py | 8 +++++++-
tests/models.py | 15 +++++++++++----
2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66168efa/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 51beab8..d2f7894 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1183,13 +1183,19 @@ class TaskInstance(Base):
"""
delay = self.task.retry_delay
if self.task.retry_exponential_backoff:
+ min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2)))
+ # deterministic per task instance
+ hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id,
+ self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16)
+ # between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number)
+ modded_hash = min_backoff + hash % min_backoff
# timedelta has a maximum representable value. The exponentiation
# here means this value can be exceeded after a certain number
# of tries (around 50 if the initial delay is 1s, even fewer if
# the delay is larger). Cap the value here before creating a
# timedelta object so the operation doesn't fail.
delay_backoff_in_seconds = min(
- delay.total_seconds() * (2 ** (self.try_number - 1)),
+ modded_hash,
timedelta.max.total_seconds() - 1
)
delay = timedelta(seconds=delay_backoff_in_seconds)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/66168efa/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 2180896..49e5c75 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -838,18 +838,25 @@ class TaskInstanceTest(unittest.TestCase):
owner='airflow',
start_date=datetime.datetime(2016, 2, 1, 0, 0, 0))
ti = TI(
- task=task, execution_date=datetime.datetime.now())
+ task=task, execution_date=DEFAULT_DATE)
ti.end_date = datetime.datetime.now()
ti.try_number = 1
dt = ti.next_retry_datetime()
- self.assertEqual(dt, ti.end_date + delay)
+ # between 30 * 2^0.5 and 30 * 2^1 (15 and 30)
+ self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=20.0))
+
+ ti.try_number = 4
+ dt = ti.next_retry_datetime()
+ # between 30 * 2^2 and 30 * 2^3 (120 and 240)
+ self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=181.0))
ti.try_number = 6
dt = ti.next_retry_datetime()
- self.assertEqual(dt, ti.end_date + (2 ** 5) * delay)
+ # between 30 * 2^4 and 30 * 2^5 (480 and 960)
+ self.assertEqual(dt, ti.end_date + datetime.timedelta(seconds=825.0))
- ti.try_number = 8
+ ti.try_number = 9
dt = ti.next_retry_datetime()
self.assertEqual(dt, ti.end_date+max_delay)