You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/11 22:48:20 UTC
[airflow] 06/06: Add TaskHandlersMovedRule in upgrade check command
(#11265)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 96a6f57f3ff07fbc10517625685ad4716541f8b5
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Sun Oct 11 10:45:13 2020 +0100
Add TaskHandlersMovedRule in upgrade check command (#11265)
---
airflow/upgrade/rules/task_handlers_moved.py | 67 ++++++++++++++++++++++++
tests/upgrade/rules/test_task_handlers_moved.py | 68 +++++++++++++++++++++++++
2 files changed, 135 insertions(+)
diff --git a/airflow/upgrade/rules/task_handlers_moved.py b/airflow/upgrade/rules/task_handlers_moved.py
new file mode 100644
index 0000000..e813221
--- /dev/null
+++ b/airflow/upgrade/rules/task_handlers_moved.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import conf
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.utils.module_loading import import_string
+
+LOGS = [
+ (
+ "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
+ "airflow.utils.log.s3_task_handler.S3TaskHandler"
+ ),
+ (
+ 'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler',
+ 'airflow.utils.log.cloudwatch_task_handler.CloudwatchTaskHandler'
+ ),
+ (
+ 'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
+ 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler'
+ ),
+ (
+ "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
+ "airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler"
+ ),
+ (
+ "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
+ "airflow.utils.log.gcs_task_handler.GCSTaskHandler"
+ ),
+ (
+ "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
+ "airflow.utils.log.wasb_task_handler.WasbTaskHandler"
+ )
+]
+
+
+class TaskHandlersMovedRule(BaseRule):
+ title = "Changes in import path of remote task handlers"
+ description = (
+ "The remote log task handlers have been moved to the providers "
+ "directory and into their respective providers packages."
+ )
+
+ def check(self):
+ logging_class = conf.get("core", "logging_config_class", fallback=None)
+ if logging_class:
+ config = import_string(logging_class)
+ configured_path = config['handlers']['task']['class']
+ for new_path, old_path in LOGS:
+ if configured_path == old_path:
+ return [
+ "This path : `{old}` should be updated to this path: `{new}`".format(old=old_path,
+ new=new_path)
+ ]
diff --git a/tests/upgrade/rules/test_task_handlers_moved.py b/tests/upgrade/rules/test_task_handlers_moved.py
new file mode 100644
index 0000000..416f4c0
--- /dev/null
+++ b/tests/upgrade/rules/test_task_handlers_moved.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import TestCase
+from tests.compat import mock
+
+from airflow.upgrade.rules.task_handlers_moved import TaskHandlersMovedRule
+from tests.test_utils.config import conf_vars
+
+OLD_PATH = 'airflow.utils.log.gcs_task_handler.GCSTaskHandler'
+NEW_PATH = "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler"
+
+BAD_LOG_CONFIG_DICT = {
+ 'handlers': {
+ 'task': {
+ 'class': OLD_PATH,
+ },
+ }}
+
+GOOD_LOG_CONFIG_DICT = {
+ 'handlers': {
+ 'task': {
+ 'class': NEW_PATH,
+ },
+ }}
+
+
+class TestTaskHandlersMovedRule(TestCase):
+
+ @conf_vars({("core", "logging_config_class"): "dummy_log.conf"})
+ @mock.patch("airflow.upgrade.rules.task_handlers_moved.import_string")
+ def test_invalid_check(self, mock_import_str):
+ mock_import_str.return_value = BAD_LOG_CONFIG_DICT
+ rule = TaskHandlersMovedRule()
+
+ assert isinstance(rule.description, str)
+ assert isinstance(rule.title, str)
+
+ msg = "This path : `{old}` should be updated to this path: `{new}`".format(old=OLD_PATH,
+ new=NEW_PATH)
+ response = rule.check()
+ assert response == [msg]
+
+ @conf_vars({("core", "logging_config_class"): "dummy_log.conf"})
+ @mock.patch("airflow.upgrade.rules.task_handlers_moved.import_string")
+ def test_valid_check(self, mock_import_str):
+ mock_import_str.return_value = GOOD_LOG_CONFIG_DICT
+ rule = TaskHandlersMovedRule()
+
+ assert isinstance(rule.description, str)
+ assert isinstance(rule.title, str)
+
+ msg = None
+ response = rule.check()
+ assert response == msg