You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/11 22:48:14 UTC

[airflow] branch v1-10-test updated (f53767c -> 96a6f57)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    omit f53767c  Add TaskHandlersMovedRule in upgrade check command (#11265)
    omit b9e3683  Create AirflowMacroPluginRemoved rule and tests (#11285)
    omit 8372ed3  Add GCP Service Account rule (#11230)
    omit d355a3c  SkipMixin: Handle empty branches (#11120)
    omit d5637bf  Add rules to ensure users are using pod_template_file for KubernetesE… (#11090)
    omit 6430036  [AIRFLOW-5274] dag loading duration metric name too long (#5890)
     new 86877dd  [AIRFLOW-5274] dag loading duration metric name too long (#5890)
     new 2da215b  Add rules to ensure users are using pod_template_file for KubernetesE… (#11090)
     new 6974e62  SkipMixin: Handle empty branches (#11120)
     new 73925a0  Add GCP Service Account rule (#11230)
     new 550d9dd  Create AirflowMacroPluginRemoved rule and tests (#11285)
     new 96a6f57  Add TaskHandlersMovedRule in upgrade check command (#11265)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f53767c)
            \
             N -- N -- N   refs/heads/v1-10-test (96a6f57)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/models/dagbag.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[airflow] 02/06: Add rules to ensure users are using pod_template_file for KubernetesE… (#11090)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2da215b402594ffc3f87ad290df82207094ca11d
Author: Daniel Imberman <da...@gmail.com>
AuthorDate: Thu Oct 8 09:05:38 2020 -0700

    Add rules to ensure users are using pod_template_file for KubernetesE… (#11090)
    
    * Add rules to ensure users are using pod_template_file for KubernetesExecutor
    
    * add invalid configs check
    
    * fix static checks
    
    * fix syntax
    
    * remove unecessary files
    
    Co-authored-by: Daniel Imberman <da...@astronomer.io>
---
 .../upgrade/rules/invalid_kubernetes_configs.py    | 69 ++++++++++++++++++++++
 airflow/upgrade/rules/pod_template_file_rule.py    | 39 ++++++++++++
 2 files changed, 108 insertions(+)

diff --git a/airflow/upgrade/rules/invalid_kubernetes_configs.py b/airflow/upgrade/rules/invalid_kubernetes_configs.py
new file mode 100644
index 0000000..0850501
--- /dev/null
+++ b/airflow/upgrade/rules/invalid_kubernetes_configs.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.configuration import conf
+
+invalid_keys = {"airflow_configmap",
+                "airflow_local_settings_configmap",
+                "dags_in_image",
+                "dags_volume_subpath",
+                "dags_volume_mount_point", "dags_volume_claim",
+                "logs_volume_subpath", "logs_volume_claim",
+                "dags_volume_host", "logs_volume_host",
+                "env_from_configmap_ref", "env_from_secret_ref", "git_repo",
+                "git_branch", "git_sync_depth", "git_subpath",
+                "git_sync_rev", "git_user", "git_password",
+                "git_sync_root", "git_sync_dest",
+                "git_dags_folder_mount_point", "git_ssh_key_secret_name",
+                "git_ssh_known_hosts_configmap_name", "git_sync_credentials_secret",
+                "git_sync_container_repository",
+                "git_sync_container_tag", "git_sync_init_container_name",
+                "git_sync_run_as_user",
+                "worker_service_account_name", "image_pull_secrets",
+                "gcp_service_account_keys", "affinity",
+                "tolerations", "run_as_user", "fs_group"}
+
+
+class InvalidKubernetesConfigRule(BaseRule):
+
+    title = "Users must delete deprecated configs for KubernetesExecutor"
+
+    description = """\
+In Airflow 2.0, KubernetesExecutor Users need to set a pod_template_file as a base
+value for all pods launched by the KubernetesExecutor. Many Kubernetes configs are no longer
+needed once this pod_template_file has been generated.
+"""
+
+    def check(self):
+        conf_dict = conf.as_dict(display_sensitive=True)
+        kube_conf = conf_dict['kubernetes']
+        keys = kube_conf.keys()
+        resp = [k for k in keys if k in invalid_keys]
+        if conf_dict['kubernetes_labels']:
+            resp.append("kubernetes_labels")
+        if conf_dict['kubernetes_secrets']:
+            resp.append("kubernetes_secrets")
+
+        if resp:
+            resp_string = "\n".join(resp)
+            return "The following invalid keys were found in your airflow.cfg: \
+                   \n\n{resp_string}\n\n \
+                   Please generate a pod_template_file by running `airflow generate_pod_template` \
+                   and delete these keys.".format(resp_string=resp_string)
diff --git a/airflow/upgrade/rules/pod_template_file_rule.py b/airflow/upgrade/rules/pod_template_file_rule.py
new file mode 100644
index 0000000..21363d3
--- /dev/null
+++ b/airflow/upgrade/rules/pod_template_file_rule.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.configuration import conf
+
+
+class PodTemplateFileRule(BaseRule):
+
+    title = "Users must set a kubernetes.pod_template_file value"
+
+    description = """\
+In Airflow 2.0, KubernetesExecutor Users need to set a pod_template_file as a base
+value for all pods launched by the KubernetesExecutor
+"""
+
+    def check(self):
+        pod_template_file = conf.get("kubernetes", "pod_template_file", fallback=None)
+        if not pod_template_file:
+            return (
+                "Please create a pod_template_file by running `airflow generate_pod_template`.\n"
+                "This will generate a pod using your aiflow.cfg settings"
+            )


[airflow] 05/06: Create AirflowMacroPluginRemoved rule and tests (#11285)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 550d9dda97e55de94265d0a1af8bd3e906a8254a
Author: Jacob Hoffman <72...@users.noreply.github.com>
AuthorDate: Sat Oct 10 03:57:02 2020 -0700

    Create AirflowMacroPluginRemoved rule and tests (#11285)
---
 .../upgrade/rules/airflow_macro_plugin_removed.py  | 53 +++++++++++++++++
 .../rules/test_airflow_macro_plugin_removed.py     | 68 ++++++++++++++++++++++
 2 files changed, 121 insertions(+)

diff --git a/airflow/upgrade/rules/airflow_macro_plugin_removed.py b/airflow/upgrade/rules/airflow_macro_plugin_removed.py
new file mode 100644
index 0000000..ca4f546
--- /dev/null
+++ b/airflow/upgrade/rules/airflow_macro_plugin_removed.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+from airflow import conf
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.utils.dag_processing import list_py_file_paths
+
+
+class AirflowMacroPluginRemovedRule(BaseRule):
+
+    title = "Remove airflow.AirflowMacroPlugin class"
+
+    description = "The airflow.AirflowMacroPlugin class has been removed."
+
+    MACRO_PLUGIN_CLASS = "airflow.AirflowMacroPlugin"
+
+    def _change_info(self, file_path, line_number):
+        return "{} will be removed. Affected file: {} (line {})".format(
+            self.MACRO_PLUGIN_CLASS, file_path, line_number
+        )
+
+    def _check_file(self, file_path):
+        problems = []
+        class_name_to_check = self.MACRO_PLUGIN_CLASS.split(".")[-1]
+        with open(file_path, "r") as file_pointer:
+            for line_number, line in enumerate(file_pointer, 1):
+                if class_name_to_check in line:
+                    problems.append(self._change_info(file_path, line_number))
+        return problems
+
+    def check(self):
+        dag_folder = conf.get("core", "dags_folder")
+        file_paths = list_py_file_paths(directory=dag_folder, include_examples=False)
+        problems = []
+        for file_path in file_paths:
+            problems.extend(self._check_file(file_path))
+        return problems
diff --git a/tests/upgrade/rules/test_airflow_macro_plugin_removed.py b/tests/upgrade/rules/test_airflow_macro_plugin_removed.py
new file mode 100644
index 0000000..2d7b7ba
--- /dev/null
+++ b/tests/upgrade/rules/test_airflow_macro_plugin_removed.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from contextlib import contextmanager
+from unittest import TestCase
+
+from tempfile import NamedTemporaryFile
+from tests.compat import mock
+
+from airflow.upgrade.rules.airflow_macro_plugin_removed import AirflowMacroPluginRemovedRule
+
+
+@contextmanager
+def create_temp_file(mock_list_files, lines):
+    with NamedTemporaryFile("w+") as temp_file:
+        mock_list_files.return_value = [temp_file.name]
+        temp_file.writelines("\n".join(lines))
+        temp_file.flush()
+        yield temp_file
+
+
+@mock.patch("airflow.upgrade.rules.airflow_macro_plugin_removed.list_py_file_paths")
+class TestAirflowMacroPluginRemovedRule(TestCase):
+    def test_valid_check(self, mock_list_files):
+        lines = ["import foo.bar.baz"]
+        with create_temp_file(mock_list_files, lines):
+            rule = AirflowMacroPluginRemovedRule()
+            assert isinstance(rule.description, str)
+            assert isinstance(rule.title, str)
+
+            msgs = rule.check()
+            assert 0 == len(msgs)
+
+    def test_invalid_check(self, mock_list_files):
+        lines = [
+            "import airflow.AirflowMacroPlugin",
+            "from airflow import AirflowMacroPlugin",
+        ]
+        with create_temp_file(mock_list_files, lines) as temp_file:
+
+            rule = AirflowMacroPluginRemovedRule()
+
+            assert isinstance(rule.description, str)
+            assert isinstance(rule.title, str)
+
+            msgs = rule.check()
+            assert 2 == len(msgs)
+
+            base_message = "airflow.AirflowMacroPlugin will be removed. Affected file: {}".format(
+                temp_file.name
+            )
+            expected_messages = [
+                "{} (line {})".format(base_message, line_number) for line_number in [1, 2]
+            ]
+            assert expected_messages == msgs


[airflow] 06/06: Add TaskHandlersMovedRule in upgrade check command (#11265)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 96a6f57f3ff07fbc10517625685ad4716541f8b5
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Sun Oct 11 10:45:13 2020 +0100

    Add TaskHandlersMovedRule in upgrade check command (#11265)
---
 airflow/upgrade/rules/task_handlers_moved.py    | 67 ++++++++++++++++++++++++
 tests/upgrade/rules/test_task_handlers_moved.py | 68 +++++++++++++++++++++++++
 2 files changed, 135 insertions(+)

diff --git a/airflow/upgrade/rules/task_handlers_moved.py b/airflow/upgrade/rules/task_handlers_moved.py
new file mode 100644
index 0000000..e813221
--- /dev/null
+++ b/airflow/upgrade/rules/task_handlers_moved.py
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import conf
+from airflow.upgrade.rules.base_rule import BaseRule
+from airflow.utils.module_loading import import_string
+
+LOGS = [
+    (
+        "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
+        "airflow.utils.log.s3_task_handler.S3TaskHandler"
+    ),
+    (
+        'airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler',
+        'airflow.utils.log.cloudwatch_task_handler.CloudwatchTaskHandler'
+    ),
+    (
+        'airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler',
+        'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler'
+    ),
+    (
+        "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
+        "airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler"
+    ),
+    (
+        "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
+        "airflow.utils.log.gcs_task_handler.GCSTaskHandler"
+    ),
+    (
+        "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
+        "airflow.utils.log.wasb_task_handler.WasbTaskHandler"
+    )
+]
+
+
+class TaskHandlersMovedRule(BaseRule):
+    title = "Changes in import path of remote task handlers"
+    description = (
+        "The remote log task handlers have been moved to the providers "
+        "directory and into their respective providers packages."
+    )
+
+    def check(self):
+        logging_class = conf.get("core", "logging_config_class", fallback=None)
+        if logging_class:
+            config = import_string(logging_class)
+            configured_path = config['handlers']['task']['class']
+            for new_path, old_path in LOGS:
+                if configured_path == old_path:
+                    return [
+                        "This path : `{old}` should be updated to this path: `{new}`".format(old=old_path,
+                                                                                             new=new_path)
+                    ]
diff --git a/tests/upgrade/rules/test_task_handlers_moved.py b/tests/upgrade/rules/test_task_handlers_moved.py
new file mode 100644
index 0000000..416f4c0
--- /dev/null
+++ b/tests/upgrade/rules/test_task_handlers_moved.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import TestCase
+from tests.compat import mock
+
+from airflow.upgrade.rules.task_handlers_moved import TaskHandlersMovedRule
+from tests.test_utils.config import conf_vars
+
+OLD_PATH = 'airflow.utils.log.gcs_task_handler.GCSTaskHandler'
+NEW_PATH = "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler"
+
+BAD_LOG_CONFIG_DICT = {
+    'handlers': {
+        'task': {
+            'class': OLD_PATH,
+        },
+    }}
+
+GOOD_LOG_CONFIG_DICT = {
+    'handlers': {
+        'task': {
+            'class': NEW_PATH,
+        },
+    }}
+
+
+class TestTaskHandlersMovedRule(TestCase):
+
+    @conf_vars({("core", "logging_config_class"): "dummy_log.conf"})
+    @mock.patch("airflow.upgrade.rules.task_handlers_moved.import_string")
+    def test_invalid_check(self, mock_import_str):
+        mock_import_str.return_value = BAD_LOG_CONFIG_DICT
+        rule = TaskHandlersMovedRule()
+
+        assert isinstance(rule.description, str)
+        assert isinstance(rule.title, str)
+
+        msg = "This path : `{old}` should be updated to this path: `{new}`".format(old=OLD_PATH,
+                                                                                   new=NEW_PATH)
+        response = rule.check()
+        assert response == [msg]
+
+    @conf_vars({("core", "logging_config_class"): "dummy_log.conf"})
+    @mock.patch("airflow.upgrade.rules.task_handlers_moved.import_string")
+    def test_valid_check(self, mock_import_str):
+        mock_import_str.return_value = GOOD_LOG_CONFIG_DICT
+        rule = TaskHandlersMovedRule()
+
+        assert isinstance(rule.description, str)
+        assert isinstance(rule.title, str)
+
+        msg = None
+        response = rule.check()
+        assert response == msg


[airflow] 03/06: SkipMixin: Handle empty branches (#11120)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6974e62408aaf72c254f30f7cb05489e3fccf575
Author: yuqian90 <yu...@gmail.com>
AuthorDate: Fri Oct 9 05:25:07 2020 +0800

    SkipMixin: Handle empty branches (#11120)
---
 airflow/models/skipmixin.py             | 29 +++++++++++++++---------
 tests/operators/test_python_operator.py | 40 +++++++++++++++++++++++++++++++++
 2 files changed, 59 insertions(+), 10 deletions(-)

diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py
index 3b4531f..f45cac6 100644
--- a/airflow/models/skipmixin.py
+++ b/airflow/models/skipmixin.py
@@ -24,7 +24,6 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 import six
-from typing import Set
 
 # The key used by SkipMixin to store XCom data.
 XCOM_SKIPMIXIN_KEY = "skipmixin_key"
@@ -122,7 +121,8 @@ class SkipMixin(LoggingMixin):
         """
         self.log.info("Following branch %s", branch_task_ids)
         if isinstance(branch_task_ids, six.string_types):
-            branch_task_ids = [branch_task_ids]
+            branch_task_ids = {branch_task_ids}
+        branch_task_ids = set(branch_task_ids)
 
         dag_run = ti.get_dagrun()
         task = ti.task
@@ -131,20 +131,29 @@ class SkipMixin(LoggingMixin):
         downstream_tasks = task.downstream_list
 
         if downstream_tasks:
-            # Also check downstream tasks of the branch task. In case the task to skip
-            # is also a downstream task of the branch task, we exclude it from skipping.
-            branch_downstream_task_ids = set()  # type: Set[str]
-            for b in branch_task_ids:
-                branch_downstream_task_ids.update(
-                    dag.get_task(b).get_flat_relative_ids(upstream=False)
+            # For a branching workflow that looks like this, when "branch" does skip_all_except("task1"),
+            # we intuitively expect both "task1" and "join" to execute even though strictly speaking,
+            # "join" is also immediately downstream of "branch" and should have been skipped. Therefore,
+            # we need a special case here for such empty branches: Check downstream tasks of branch_task_ids.
+            # In case the task to skip is also downstream of branch_task_ids, we add it to branch_task_ids and
+            # exclude it from skipping.
+            #
+            # branch  ----->  join
+            #   \            ^
+            #     v        /
+            #       task1
+            #
+            for branch_task_id in list(branch_task_ids):
+                branch_task_ids.update(
+                    dag.get_task(branch_task_id).get_flat_relative_ids(upstream=False)
                 )
 
             skip_tasks = [
                 t
                 for t in downstream_tasks
                 if t.task_id not in branch_task_ids
-                and t.task_id not in branch_downstream_task_ids
             ]
+            follow_task_ids = [t.task_id for t in downstream_tasks if t.task_id in branch_task_ids]
 
             self.log.info("Skipping tasks %s", [t.task_id for t in skip_tasks])
             with create_session() as session:
@@ -152,5 +161,5 @@ class SkipMixin(LoggingMixin):
                     dag_run, ti.execution_date, skip_tasks, session=session
                 )
                 ti.xcom_push(
-                    key=XCOM_SKIPMIXIN_KEY, value={XCOM_SKIPMIXIN_FOLLOWED: branch_task_ids}
+                    key=XCOM_SKIPMIXIN_KEY, value={XCOM_SKIPMIXIN_FOLLOWED: follow_task_ids}
                 )
diff --git a/tests/operators/test_python_operator.py b/tests/operators/test_python_operator.py
index 13a33b2..81eaa60 100644
--- a/tests/operators/test_python_operator.py
+++ b/tests/operators/test_python_operator.py
@@ -22,6 +22,7 @@ from __future__ import print_function, unicode_literals
 import copy
 import logging
 import os
+import pytest
 
 import unittest
 
@@ -846,3 +847,42 @@ class ShortCircuitOperatorTest(unittest.TestCase):
                 self.assertEqual(ti.state, State.SKIPPED)
             else:
                 raise
+
+
+@pytest.mark.parametrize(
+    "choice,expected_states",
+    [
+        ("task1", [State.SUCCESS, State.SUCCESS, State.SUCCESS]),
+        ("join", [State.SUCCESS, State.SKIPPED, State.SUCCESS]),
+    ]
+)
+def test_empty_branch(choice, expected_states):
+    """
+    Tests that BranchPythonOperator handles empty branches properly.
+    """
+    with DAG(
+        'test_empty_branch',
+        start_date=DEFAULT_DATE,
+    ) as dag:
+        branch = BranchPythonOperator(task_id='branch', python_callable=lambda: choice)
+        task1 = DummyOperator(task_id='task1')
+        join = DummyOperator(task_id='join', trigger_rule="none_failed_or_skipped")
+
+        branch >> [task1, join]
+        task1 >> join
+
+    dag.clear(start_date=DEFAULT_DATE)
+
+    task_ids = ["branch", "task1", "join"]
+
+    tis = {}
+    for task_id in task_ids:
+        task_instance = TI(dag.get_task(task_id), execution_date=DEFAULT_DATE)
+        tis[task_id] = task_instance
+        task_instance.run()
+
+    def get_state(ti):
+        ti.refresh_from_db()
+        return ti.state
+
+    assert [get_state(tis[task_id]) for task_id in task_ids] == expected_states


[airflow] 04/06: Add GCP Service Account rule (#11230)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 73925a0e9d11974c434a7c436302a38984c88ab3
Author: Patrick Cando <32...@users.noreply.github.com>
AuthorDate: Fri Oct 9 23:46:20 2020 +0100

    Add GCP Service Account rule (#11230)
---
 .../upgrade/rules/gcp_service_account_keys_rule.py | 39 ++++++++++++++++++
 .../rules/test_gcp_service_account_key_rule.py     | 48 ++++++++++++++++++++++
 2 files changed, 87 insertions(+)

diff --git a/airflow/upgrade/rules/gcp_service_account_keys_rule.py b/airflow/upgrade/rules/gcp_service_account_keys_rule.py
new file mode 100644
index 0000000..be462f1
--- /dev/null
+++ b/airflow/upgrade/rules/gcp_service_account_keys_rule.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+from airflow.configuration import conf
+from airflow.upgrade.rules.base_rule import BaseRule
+
+
+class GCPServiceAccountKeyRule(BaseRule):
+    title = "GCP service account key deprecation"
+
+    description = """Option has been removed because it is no longer \
+supported by the Google Kubernetes Engine."""
+
+    def check(self):
+        gcp_option = conf.get(section="kubernetes", key="gcp_service_account_keys")
+        if gcp_option:
+            msg = """This option has been removed because it is no longer \
+supported by the Google Kubernetes Engine. The new recommended \
+service account keys for the Google Cloud management method is \
+Workload Identity."""
+            return [msg]
+        else:
+            return None
diff --git a/tests/upgrade/rules/test_gcp_service_account_key_rule.py b/tests/upgrade/rules/test_gcp_service_account_key_rule.py
new file mode 100644
index 0000000..e9d5209
--- /dev/null
+++ b/tests/upgrade/rules/test_gcp_service_account_key_rule.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from unittest import TestCase
+
+from airflow.upgrade.rules.gcp_service_account_keys_rule import GCPServiceAccountKeyRule
+from tests.test_utils.config import conf_vars
+
+
+class TestGCPServiceAccountKeyRule(TestCase):
+
+    @conf_vars({("kubernetes", "gcp_service_account_keys"): "key_name:key_path"})
+    def test_invalid_check(self):
+        rule = GCPServiceAccountKeyRule()
+
+        assert isinstance(rule.description, str)
+        assert isinstance(rule.title, str)
+
+        msg = """This option has been removed because it is no longer \
+supported by the Google Kubernetes Engine. The new recommended \
+service account keys for the Google Cloud management method is \
+Workload Identity."""
+        response = rule.check()
+        assert response == [msg]
+
+    @conf_vars({("kubernetes", "gcp_service_account_keys"): ""})
+    def test_valid_check(self):
+        rule = GCPServiceAccountKeyRule()
+
+        assert isinstance(rule.description, str)
+        assert isinstance(rule.title, str)
+
+        msg = None
+        response = rule.check()
+        assert response == msg


[airflow] 01/06: [AIRFLOW-5274] dag loading duration metric name too long (#5890)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 86877dd93a9412ca37b16549118ee7ccb84c2dba
Author: Tao Feng <fe...@gmail.com>
AuthorDate: Mon Aug 26 13:29:09 2019 -0700

    [AIRFLOW-5274] dag loading duration metric name too long (#5890)
    
    (cherry picked from commit 45176c8d76322bc7f74c94293e5c4b9c205e7a29)
---
 airflow/models/dagbag.py | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 106dff0..88be05d 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -423,8 +423,6 @@ class DagBag(BaseDagBag, LoggingMixin):
 
         dag_folder = correct_maybe_zipped(dag_folder)
 
-        dags_by_name = {}
-
         for filepath in list_py_file_paths(dag_folder, safe_mode=safe_mode,
                                            include_examples=include_examples):
             try:
@@ -438,7 +436,6 @@ class DagBag(BaseDagBag, LoggingMixin):
                 td = timezone.utcnow() - ts
                 td = td.total_seconds() + (
                     float(td.microseconds) / 1000000)
-                dags_by_name[dag_id_names] = dag_ids
                 stats.append(FileLoadStat(
                     filepath.replace(settings.DAGS_FOLDER, ''),
                     td,
@@ -451,13 +448,9 @@ class DagBag(BaseDagBag, LoggingMixin):
         self.dagbag_stats = sorted(
             stats, key=lambda x: x.duration, reverse=True)
         for file_stat in self.dagbag_stats:
-            dag_ids = dags_by_name[file_stat.dags]
-            if file_stat.dag_num >= 1:
-                # if we found multiple dags per file, the stat is 'dag_id1 _ dag_id2'
-                dag_names = '_'.join(dag_ids)
-                Stats.timing('dag.loading-duration.{}'.
-                             format(dag_names),
-                             file_stat.duration)
+            # file_stat.file similar format: /subdir/dag_name.py
+            filename = file_stat.file.split('/')[-1].replace('.py', '')
+            Stats.timing('dag.loading-duration.{}'.format(filename), file_stat.duration)
 
     def collect_dags_from_db(self):
         """Collects DAGs from database."""