You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/10 23:43:30 UTC
[airflow] branch v1-10-stable updated: Add upgrade check rule for
unrecognized arguments to Operators (#12660)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-stable by this push:
new ad34838 Add upgrade check rule for unrecognized arguments to Operators (#12660)
ad34838 is described below
commit ad34838c227db2b5bcd9e438689bb8a4af8c2fb0
Author: Dr. Dennis Akpenyi <de...@gmail.com>
AuthorDate: Fri Dec 11 00:42:40 2020 +0100
Add upgrade check rule for unrecognized arguments to Operators (#12660)
The NoAdditionalArgsInOperatorsRule class ensures that passing an
unrecognized arguments to operators causes an exception.
fixes #11042
---
.../rules/no_additional_args_in_operators.py | 74 ++++++++++++++++++++++
.../rules/test_no_additional_args_operators.py | 49 ++++++++++++++
2 files changed, 123 insertions(+)
diff --git a/airflow/upgrade/rules/no_additional_args_in_operators.py b/airflow/upgrade/rules/no_additional_args_in_operators.py
new file mode 100644
index 0000000..0a96f28
--- /dev/null
+++ b/airflow/upgrade/rules/no_additional_args_in_operators.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import os
+import re
+import logging
+import warnings
+from airflow.utils.dag_processing import correct_maybe_zipped, list_py_file_paths
+from airflow import conf
+from airflow.models.dagbag import DagBag
+from airflow.upgrade.rules.base_rule import BaseRule
+
+
+class NoAdditionalArgsInOperatorsRule(BaseRule):
+ title = "No additional argument allowed in BaseOperator."
+
+ description = """\
+Passing unrecognized arguments to operators is not allowed in Airflow 2.0 anymore,
+and will cause an exception.
+ """
+
+ def check(self, dags_folder=None):
+ if not dags_folder:
+ dags_folder = conf.get("core", "dags_folder")
+
+ logger = logging.root
+ old_level = logger.level
+ try:
+ logger.setLevel(logging.ERROR)
+ dagbag = DagBag(dag_folder=os.devnull, include_examples=False, store_serialized_dags=False)
+ dags_folder = correct_maybe_zipped(dags_folder)
+
+ # Each file in the DAG folder is parsed individually
+ for filepath in list_py_file_paths(dags_folder,
+ safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
+ include_examples=False):
+ try:
+ with warnings.catch_warnings(record=True) as captured_warnings:
+ _ = dagbag.process_file(
+ filepath,
+ safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'))
+ except Exception:
+ pass
+
+ for warning in captured_warnings:
+ if warning.category in (DeprecationWarning, PendingDeprecationWarning) \
+ and str(warning.message).startswith("Invalid arguments were passed"):
+ m = re.match(r'''
+ .* \(task_id:\ ([^\)]+)\) .* \n
+ \*args:\ (.*) \n
+ \*\*kwargs:\ (.*)
+ ''', str(warning.message), re.VERBOSE)
+
+ yield "DAG file `{}` with task_id `{}` has unrecognized positional args `{}`" \
+ "and keyword args " \
+ "`{}`".format(filepath, m.group(1), m.group(2), m.group(3))
+ finally:
+ logger.setLevel(old_level)
diff --git a/tests/upgrade/rules/test_no_additional_args_operators.py b/tests/upgrade/rules/test_no_additional_args_operators.py
new file mode 100644
index 0000000..0a51705
--- /dev/null
+++ b/tests/upgrade/rules/test_no_additional_args_operators.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import textwrap
+from tempfile import NamedTemporaryFile
+from unittest import TestCase
+
+import pytest
+
+from airflow.upgrade.rules.no_additional_args_in_operators import NoAdditionalArgsInOperatorsRule
+
+
+class TestNoAdditionalArgsInOperatorsRule(TestCase):
+
+ @pytest.mark.filterwarnings('always')
+ def test_check(self):
+ rule = NoAdditionalArgsInOperatorsRule()
+
+ assert isinstance(rule.title, str)
+ assert isinstance(rule.description, str)
+
+ with NamedTemporaryFile(mode='w', suffix='.py') as dag_file:
+ dag_file.write(
+ textwrap.dedent('''
+ from airflow import DAG
+ from airflow.utils.dates import days_ago
+ from airflow.operators.bash_operator import BashOperator
+
+ with DAG(dag_id="test_no_additional_args_operators", start_date=days_ago(0)):
+ BashOperator(task_id='test', bash_command="true", extra_param=42)
+ '''))
+ dag_file.flush()
+ msgs = list(rule.check(dags_folder=dag_file.name))
+ assert len(msgs) == 1