You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/12/10 23:43:30 UTC

[airflow] branch v1-10-stable updated: Add upgrade check rule for unrecognized arguments to Operators (#12660)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-stable
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-stable by this push:
     new ad34838  Add upgrade check rule for unrecognized arguments to Operators (#12660)
ad34838 is described below

commit ad34838c227db2b5bcd9e438689bb8a4af8c2fb0
Author: Dr. Dennis Akpenyi <de...@gmail.com>
AuthorDate: Fri Dec 11 00:42:40 2020 +0100

    Add upgrade check rule for unrecognized arguments to Operators (#12660)
    
    The NoAdditionalArgsInOperatorsRule class ensures that passing an
    unrecognized arguments to operators causes an exception.
    
    fixes #11042
---
 .../rules/no_additional_args_in_operators.py       | 74 ++++++++++++++++++++++
 .../rules/test_no_additional_args_operators.py     | 49 ++++++++++++++
 2 files changed, 123 insertions(+)

diff --git a/airflow/upgrade/rules/no_additional_args_in_operators.py b/airflow/upgrade/rules/no_additional_args_in_operators.py
new file mode 100644
index 0000000..0a96f28
--- /dev/null
+++ b/airflow/upgrade/rules/no_additional_args_in_operators.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import os
+import re
+import logging
+import warnings
+from airflow.utils.dag_processing import correct_maybe_zipped, list_py_file_paths
+from airflow import conf
+from airflow.models.dagbag import DagBag
+from airflow.upgrade.rules.base_rule import BaseRule
+
+
+class NoAdditionalArgsInOperatorsRule(BaseRule):
+    title = "No additional argument allowed in BaseOperator."
+
+    description = """\
+Passing unrecognized arguments to operators is not allowed in Airflow 2.0 anymore,
+and will cause an exception.
+                  """
+
+    def check(self, dags_folder=None):
+        if not dags_folder:
+            dags_folder = conf.get("core", "dags_folder")
+
+        logger = logging.root
+        old_level = logger.level
+        try:
+            logger.setLevel(logging.ERROR)
+            dagbag = DagBag(dag_folder=os.devnull, include_examples=False, store_serialized_dags=False)
+            dags_folder = correct_maybe_zipped(dags_folder)
+
+            # Each file in the DAG folder is parsed individually
+            for filepath in list_py_file_paths(dags_folder,
+                                               safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
+                                               include_examples=False):
+                try:
+                    with warnings.catch_warnings(record=True) as captured_warnings:
+                        _ = dagbag.process_file(
+                            filepath,
+                            safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'))
+                except Exception:
+                    pass
+
+                for warning in captured_warnings:
+                    if warning.category in (DeprecationWarning, PendingDeprecationWarning) \
+                            and str(warning.message).startswith("Invalid arguments were passed"):
+                        m = re.match(r'''
+                                    .* \(task_id:\ ([^\)]+)\) .* \n
+                                    \*args:\ (.*) \n
+                                     \*\*kwargs:\ (.*)
+                                    ''', str(warning.message), re.VERBOSE)
+
+                        yield "DAG file `{}` with task_id `{}` has unrecognized positional args `{}`" \
+                              "and keyword args " \
+                              "`{}`".format(filepath, m.group(1), m.group(2), m.group(3))
+        finally:
+            logger.setLevel(old_level)
diff --git a/tests/upgrade/rules/test_no_additional_args_operators.py b/tests/upgrade/rules/test_no_additional_args_operators.py
new file mode 100644
index 0000000..0a51705
--- /dev/null
+++ b/tests/upgrade/rules/test_no_additional_args_operators.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import textwrap
+from tempfile import NamedTemporaryFile
+from unittest import TestCase
+
+import pytest
+
+from airflow.upgrade.rules.no_additional_args_in_operators import NoAdditionalArgsInOperatorsRule
+
+
+class TestNoAdditionalArgsInOperatorsRule(TestCase):
+
+    @pytest.mark.filterwarnings('always')
+    def test_check(self):
+        rule = NoAdditionalArgsInOperatorsRule()
+
+        assert isinstance(rule.title, str)
+        assert isinstance(rule.description, str)
+
+        with NamedTemporaryFile(mode='w', suffix='.py') as dag_file:
+            dag_file.write(
+                textwrap.dedent('''
+            from airflow import DAG
+            from airflow.utils.dates import days_ago
+            from airflow.operators.bash_operator import BashOperator
+
+            with DAG(dag_id="test_no_additional_args_operators", start_date=days_ago(0)):
+                BashOperator(task_id='test', bash_command="true", extra_param=42)
+                '''))
+            dag_file.flush()
+            msgs = list(rule.check(dags_folder=dag_file.name))
+            assert len(msgs) == 1