You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/11 22:48:20 UTC

[airflow] 06/06: Add TaskHandlersMovedRule in upgrade check command (#11265)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 96a6f57f3ff07fbc10517625685ad4716541f8b5
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Sun Oct 11 10:45:13 2020 +0100

    Add TaskHandlersMovedRule in upgrade check command (#11265)
---
 airflow/upgrade/rules/task_handlers_moved.py    | 67 ++++++++++++++++++++++++
 tests/upgrade/rules/test_task_handlers_moved.py | 68 +++++++++++++++++++++++++
 2 files changed, 135 insertions(+)

diff --git a/airflow/upgrade/rules/task_handlers_moved.py b/airflow/upgrade/rules/task_handlers_moved.py
new file mode 100644
index 0000000..e813221
--- /dev/null
+++ b/airflow/upgrade/rules/task_handlers_moved.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import conf
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.utils.module_loading import import_string
+
+LOGS = [
+    (
+        "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
+        "airflow.utils.log.s3_task_handler.S3TaskHandler"
+    ),
+    (
+        'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler',
+        'airflow.utils.log.cloudwatch_task_handler.CloudwatchTaskHandler'
+    ),
+    (
+        'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
+        'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler'
+    ),
+    (
+        "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
+        "airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler"
+    ),
+    (
+        "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
+        "airflow.utils.log.gcs_task_handler.GCSTaskHandler"
+    ),
+    (
+        "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
+        "airflow.utils.log.wasb_task_handler.WasbTaskHandler"
+    )
+]
+
+
+class TaskHandlersMovedRule(BaseRule):
+    title = "Changes in import path of remote task handlers"
+    description = (
+        "The remote log task handlers have been moved to the providers "
+        "directory and into their respective providers packages."
+    )
+
+    def check(self):
+        logging_class = conf.get("core", "logging_config_class", fallback=None)
+        if logging_class:
+            config = import_string(logging_class)
+            configured_path = config['handlers']['task']['class']
+            for new_path, old_path in LOGS:
+                if configured_path == old_path:
+                    return [
+                        "This path : `{old}` should be updated to this path: `{new}`".format(old=old_path,
+                                                                                             new=new_path)
+                    ]
diff --git a/tests/upgrade/rules/test_task_handlers_moved.py b/tests/upgrade/rules/test_task_handlers_moved.py
new file mode 100644
index 0000000..416f4c0
--- /dev/null
+++ b/tests/upgrade/rules/test_task_handlers_moved.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import TestCase
+from tests.compat import mock
+
+from airflow.upgrade.rules.task_handlers_moved import TaskHandlersMovedRule
+from tests.test_utils.config import conf_vars
+
+OLD_PATH = 'airflow.utils.log.gcs_task_handler.GCSTaskHandler'
+NEW_PATH = "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler"
+
+BAD_LOG_CONFIG_DICT = {
+    'handlers': {
+        'task': {
+            'class': OLD_PATH,
+        },
+    }}
+
+GOOD_LOG_CONFIG_DICT = {
+    'handlers': {
+        'task': {
+            'class': NEW_PATH,
+        },
+    }}
+
+
+class TestTaskHandlersMovedRule(TestCase):
+
+    @conf_vars({("core", "logging_config_class"): "dummy_log.conf"})
+    @mock.patch("airflow.upgrade.rules.task_handlers_moved.import_string")
+    def test_invalid_check(self, mock_import_str):
+        mock_import_str.return_value = BAD_LOG_CONFIG_DICT
+        rule = TaskHandlersMovedRule()
+
+        assert isinstance(rule.description, str)
+        assert isinstance(rule.title, str)
+
+        msg = "This path : `{old}` should be updated to this path: `{new}`".format(old=OLD_PATH,
+                                                                                   new=NEW_PATH)
+        response = rule.check()
+        assert response == [msg]
+
+    @conf_vars({("core", "logging_config_class"): "dummy_log.conf"})
+    @mock.patch("airflow.upgrade.rules.task_handlers_moved.import_string")
+    def test_valid_check(self, mock_import_str):
+        mock_import_str.return_value = GOOD_LOG_CONFIG_DICT
+        rule = TaskHandlersMovedRule()
+
+        assert isinstance(rule.description, str)
+        assert isinstance(rule.title, str)
+
+        msg = None
+        response = rule.check()
+        assert response == msg