You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:22 UTC
[airflow] 20/42: BugFix: Serialize max_retry_delay as a timedelta
(#14436)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 041c9d27e373d8e778ce8f2b08f86b76a845d064
Author: Paul Vickers <pa...@outlook.com>
AuthorDate: Fri Feb 26 11:42:00 2021 +0000
BugFix: Serialize max_retry_delay as a timedelta (#14436)
closes: #13086, #14212
(cherry picked from commit 59c459fa2a6aafc133db4a89980fb3d3d0d25589)
---
airflow/models/baseoperator.py | 9 ++++++++-
airflow/serialization/schema.json | 1 +
airflow/serialization/serialized_objects.py | 2 +-
tests/serialization/test_dag_serialization.py | 4 ++++
4 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 64ed4c5..06094a1 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -353,7 +353,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
retries: Optional[int] = conf.getint('core', 'default_task_retries', fallback=0),
retry_delay: timedelta = timedelta(seconds=300),
retry_exponential_backoff: bool = False,
- max_retry_delay: Optional[datetime] = None,
+ max_retry_delay: Optional[timedelta] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
depends_on_past: bool = False,
@@ -460,6 +460,13 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
self.retry_delay = timedelta(seconds=retry_delay) # noqa
self.retry_exponential_backoff = retry_exponential_backoff
self.max_retry_delay = max_retry_delay
+ if max_retry_delay:
+ if isinstance(max_retry_delay, timedelta):
+ self.max_retry_delay = max_retry_delay
+ else:
+ self.log.debug("Max_retry_delay isn't timedelta object, assuming secs")
+ self.max_retry_delay = timedelta(seconds=max_retry_delay) # noqa
+
self.params = params or {} # Available in templates!
self.priority_weight = priority_weight
if not WeightRule.is_valid(weight_rule):
diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json
index c831334..0fbe20f 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -145,6 +145,7 @@
"execution_timeout": { "$ref": "#/definitions/timedelta" },
"retry_delay": { "$ref": "#/definitions/timedelta" },
"retry_exponential_backoff": { "type": "boolean" },
+ "max_retry_delay": { "$ref": "#/definitions/timedelta" },
"params": { "$ref": "#/definitions/dict" },
"priority_weight": { "type": "number" },
"weight_rule": { "type": "string" },
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index a5e9f15..d609c09 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -452,7 +452,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
v = set(v)
elif k == "subdag":
v = SerializedDAG.deserialize_dag(v)
- elif k in {"retry_delay", "execution_timeout", "sla"}:
+ elif k in {"retry_delay", "execution_timeout", "sla", "max_retry_delay"}:
v = cls._deserialize_timedelta(v)
elif k in encoded_op["template_fields"]:
pass
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 2046e22..a775f5b 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -60,6 +60,7 @@ serialized_simple_dag_ground_truth = {
"depends_on_past": False,
"retries": 1,
"retry_delay": {"__type": "timedelta", "__var": 300.0},
+ "max_retry_delay": {"__type": "timedelta", "__var": 600.0},
"sla": {"__type": "timedelta", "__var": 100.0},
},
},
@@ -85,6 +86,7 @@ serialized_simple_dag_ground_truth = {
"owner": "airflow",
"retries": 1,
"retry_delay": 300.0,
+ "max_retry_delay": 600.0,
"sla": 100.0,
"_downstream_task_ids": [],
"_inlets": [],
@@ -113,6 +115,7 @@ serialized_simple_dag_ground_truth = {
"task_id": "custom_task",
"retries": 1,
"retry_delay": 300.0,
+ "max_retry_delay": 600.0,
"sla": 100.0,
"_downstream_task_ids": [],
"_inlets": [],
@@ -160,6 +163,7 @@ def make_simple_dag():
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=5),
+ "max_retry_delay": timedelta(minutes=10),
"depends_on_past": False,
"sla": timedelta(seconds=100),
},