You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:22 UTC

[airflow] 20/42: BugFix: Serialize max_retry_delay as a timedelta (#14436)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 041c9d27e373d8e778ce8f2b08f86b76a845d064
Author: Paul Vickers <pa...@outlook.com>
AuthorDate: Fri Feb 26 11:42:00 2021 +0000

    BugFix: Serialize max_retry_delay as a timedelta (#14436)
    
    closes: #13086, #14212
    
    (cherry picked from commit 59c459fa2a6aafc133db4a89980fb3d3d0d25589)
---
 airflow/models/baseoperator.py                | 9 ++++++++-
 airflow/serialization/schema.json             | 1 +
 airflow/serialization/serialized_objects.py   | 2 +-
 tests/serialization/test_dag_serialization.py | 4 ++++
 4 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 64ed4c5..06094a1 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -353,7 +353,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
         retries: Optional[int] = conf.getint('core', 'default_task_retries', fallback=0),
         retry_delay: timedelta = timedelta(seconds=300),
         retry_exponential_backoff: bool = False,
-        max_retry_delay: Optional[datetime] = None,
+        max_retry_delay: Optional[timedelta] = None,
         start_date: Optional[datetime] = None,
         end_date: Optional[datetime] = None,
         depends_on_past: bool = False,
@@ -460,6 +460,13 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
             self.retry_delay = timedelta(seconds=retry_delay)  # noqa
         self.retry_exponential_backoff = retry_exponential_backoff
         self.max_retry_delay = max_retry_delay
+        if max_retry_delay:
+            if isinstance(max_retry_delay, timedelta):
+                self.max_retry_delay = max_retry_delay
+            else:
+                self.log.debug("Max_retry_delay isn't timedelta object, assuming secs")
+                self.max_retry_delay = timedelta(seconds=max_retry_delay)  # noqa
+
         self.params = params or {}  # Available in templates!
         self.priority_weight = priority_weight
         if not WeightRule.is_valid(weight_rule):
diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json
index c831334..0fbe20f 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -145,6 +145,7 @@
         "execution_timeout": { "$ref": "#/definitions/timedelta" },
         "retry_delay": { "$ref": "#/definitions/timedelta" },
         "retry_exponential_backoff": { "type": "boolean" },
+        "max_retry_delay": { "$ref": "#/definitions/timedelta" },
         "params": { "$ref": "#/definitions/dict" },
         "priority_weight": { "type": "number" },
         "weight_rule": { "type": "string" },
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index a5e9f15..d609c09 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -452,7 +452,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
                 v = set(v)
             elif k == "subdag":
                 v = SerializedDAG.deserialize_dag(v)
-            elif k in {"retry_delay", "execution_timeout", "sla"}:
+            elif k in {"retry_delay", "execution_timeout", "sla", "max_retry_delay"}:
                 v = cls._deserialize_timedelta(v)
             elif k in encoded_op["template_fields"]:
                 pass
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 2046e22..a775f5b 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -60,6 +60,7 @@ serialized_simple_dag_ground_truth = {
                 "depends_on_past": False,
                 "retries": 1,
                 "retry_delay": {"__type": "timedelta", "__var": 300.0},
+                "max_retry_delay": {"__type": "timedelta", "__var": 600.0},
                 "sla": {"__type": "timedelta", "__var": 100.0},
             },
         },
@@ -85,6 +86,7 @@ serialized_simple_dag_ground_truth = {
                 "owner": "airflow",
                 "retries": 1,
                 "retry_delay": 300.0,
+                "max_retry_delay": 600.0,
                 "sla": 100.0,
                 "_downstream_task_ids": [],
                 "_inlets": [],
@@ -113,6 +115,7 @@ serialized_simple_dag_ground_truth = {
                 "task_id": "custom_task",
                 "retries": 1,
                 "retry_delay": 300.0,
+                "max_retry_delay": 600.0,
                 "sla": 100.0,
                 "_downstream_task_ids": [],
                 "_inlets": [],
@@ -160,6 +163,7 @@ def make_simple_dag():
         default_args={
             "retries": 1,
             "retry_delay": timedelta(minutes=5),
+            "max_retry_delay": timedelta(minutes=10),
             "depends_on_past": False,
             "sla": timedelta(seconds=100),
         },