You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:08 UTC
[airflow] 18/38: Validate retries value on init for better errors
(#16415)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c3bc645c1322fff970766633ed906891c213bfcc
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Sun Jun 13 08:29:14 2021 +0800
Validate retries value on init for better errors (#16415)
(cherry picked from commit 15ff2388e8a52348afcc923653f85ce15a3c5f71)
---
airflow/models/baseoperator.py | 8 ++++++++
tests/core/test_core.py | 46 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index e243b5e..f74c5f9 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -563,6 +563,14 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
if wait_for_downstream:
self.depends_on_past = True
+ if retries is not None and not isinstance(retries, int):
+ try:
+ parsed_retries = int(retries)
+ except (TypeError, ValueError):
+ raise AirflowException(f"'retries' type must be int, not {type(retries).__name__}")
+ self.log.warning("Implicitly converting 'retries' for %s from %r to int", self, retries)
+ retries = parsed_retries
+
self.retries = retries
self.queue = queue
self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool
diff --git a/tests/core/test_core.py b/tests/core/test_core.py
index 7149112..78f2676 100644
--- a/tests/core/test_core.py
+++ b/tests/core/test_core.py
@@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
+import logging
import multiprocessing
import os
import signal
@@ -453,3 +454,48 @@ class TestCore(unittest.TestCase):
assert context1['params'] == {'key_1': 'value_1', 'key_2': 'value_2_new', 'key_3': 'value_3'}
assert context2['params'] == {'key_1': 'value_1', 'key_2': 'value_2_old'}
+
+
+@pytest.fixture()
+def dag():
+ return DAG(TEST_DAG_ID, default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE})
+
+
+def test_operator_retries_invalid(dag):
+ with pytest.raises(AirflowException) as ctx:
+ BashOperator(
+ task_id='test_illegal_args',
+ bash_command='echo success',
+ dag=dag,
+ retries='foo',
+ )
+ assert str(ctx.value) == "'retries' type must be int, not str"
+
+
+def test_operator_retries_coerce(caplog, dag):
+ with caplog.at_level(logging.WARNING):
+ BashOperator(
+ task_id='test_illegal_args',
+ bash_command='echo success',
+ dag=dag,
+ retries='1',
+ )
+ assert caplog.record_tuples == [
+ (
+ "airflow.operators.bash.BashOperator",
+ logging.WARNING,
+ "Implicitly converting 'retries' for <Task(BashOperator): test_illegal_args> from '1' to int",
+ ),
+ ]
+
+
+@pytest.mark.parametrize("retries", [None, 5])
+def test_operator_retries(caplog, dag, retries):
+ with caplog.at_level(logging.WARNING):
+ BashOperator(
+ task_id='test_illegal_args',
+ bash_command='echo success',
+ dag=dag,
+ retries=retries,
+ )
+ assert caplog.records == []