You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/10/04 21:33:06 UTC

[airflow] branch main updated: Don't consider airflow core conf for KPO (#26849)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 53d68049d9 Don't consider airflow core conf for KPO (#26849)
53d68049d9 is described below

commit 53d68049d9bf4cec6b7d57545f15409dab0caed1
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Tue Oct 4 14:32:49 2022 -0700

    Don't consider airflow core conf for KPO (#26849)
    
    KPO previously looked at some of the params in the `[kubernetes]` section in airflow config.  Such consideration was deprecated in provider version 4.1.0 but backcompat was retained.  Now, for major release 5.0 of the provider, we remove the backcompat.  So, core airflow conf params will no longer be considered when creating the k8s client object; users will need to use Airflow connections if they want the client configured in a non-default way.  See k8s provider documentation on k8s  [...]
---
 airflow/providers/cncf/kubernetes/CHANGELOG.rst    |  8 ++++
 .../providers/cncf/kubernetes/hooks/kubernetes.py  | 33 -------------
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 34 -------------
 .../cncf/kubernetes/hooks/test_kubernetes.py       | 55 ----------------------
 .../kubernetes/operators/test_kubernetes_pod.py    | 22 ---------
 5 files changed, 8 insertions(+), 144 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index ba77a8722a..5a6de666f8 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -24,6 +24,14 @@
 Changelog
 ---------
 
+5.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+Previously KubernetesPodOperator considered some settings from the Airflow config's ``kubernetes`` section.  Such consideration was deprecated in 4.1.0 and is now removed.  If you previously relied on the Airflow config, and you want client generation to have non-default configuration, you will need to define your configuration in an Airflow connection and set KPO to use the connection.  See kubernetes provider documentation on defining a kubernetes Airflow connection for details.
+
 4.4.0
 .....
 
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index f0860ceddc..a0ace357f9 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -123,17 +123,8 @@ class KubernetesHook(BaseHook):
         self.in_cluster = in_cluster
         self.disable_verify_ssl = disable_verify_ssl
         self.disable_tcp_keepalive = disable_tcp_keepalive
-
         self._is_in_cluster: bool | None = None
 
-        # these params used for transition in KPO to K8s hook
-        # for a deprecation period we will continue to consider k8s settings from airflow.cfg
-        self._deprecated_core_disable_tcp_keepalive: bool | None = None
-        self._deprecated_core_disable_verify_ssl: bool | None = None
-        self._deprecated_core_in_cluster: bool | None = None
-        self._deprecated_core_cluster_context: str | None = None
-        self._deprecated_core_config_file: str | None = None
-
     @staticmethod
     def _coalesce_param(*params):
         for param in params:
@@ -200,30 +191,6 @@ class KubernetesHook(BaseHook):
             self.disable_tcp_keepalive, _get_bool(self._get_field("disable_tcp_keepalive"))
         )
 
-        # BEGIN apply settings from core kubernetes configuration
-        # this section should be removed in next major release
-        deprecation_warnings: list[tuple[str, Any]] = []
-        if disable_verify_ssl is None and self._deprecated_core_disable_verify_ssl is True:
-            deprecation_warnings.append(('verify_ssl', False))
-            disable_verify_ssl = self._deprecated_core_disable_verify_ssl
-        # by default, hook will try in_cluster first. so we only need to
-        # apply core airflow config and alert when False and in_cluster not otherwise set.
-        if in_cluster is None and self._deprecated_core_in_cluster is False:
-            deprecation_warnings.append(('in_cluster', self._deprecated_core_in_cluster))
-            in_cluster = self._deprecated_core_in_cluster
-        if not cluster_context and self._deprecated_core_cluster_context:
-            deprecation_warnings.append(('cluster_context', self._deprecated_core_cluster_context))
-            cluster_context = self._deprecated_core_cluster_context
-        if not kubeconfig_path and self._deprecated_core_config_file:
-            deprecation_warnings.append(('config_file', self._deprecated_core_config_file))
-            kubeconfig_path = self._deprecated_core_config_file
-        if disable_tcp_keepalive is None and self._deprecated_core_disable_tcp_keepalive is True:
-            deprecation_warnings.append(('enable_tcp_keepalive', False))
-            disable_tcp_keepalive = True
-        if deprecation_warnings:
-            self._deprecation_warning_core_param(deprecation_warnings)
-        # END apply settings from core kubernetes configuration
-
         if disable_verify_ssl is True:
             _disable_verify_ssl()
         if disable_tcp_keepalive is not True:
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index e4fc78b179..e0043816a9 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -27,7 +27,6 @@ from typing import TYPE_CHECKING, Any, Sequence
 from kubernetes.client import CoreV1Api, models as k8s
 
 from airflow.compat.functools import cached_property
-from airflow.configuration import conf
 from airflow.exceptions import AirflowException
 from airflow.kubernetes import pod_generator
 from airflow.kubernetes.pod_generator import PodGenerator
@@ -356,7 +355,6 @@ class KubernetesPodOperator(BaseOperator):
             config_file=self.config_file,
             cluster_context=self.cluster_context,
         )
-        self._patch_deprecated_k8s_settings(hook)
         return hook
 
     @cached_property
@@ -608,38 +606,6 @@ class KubernetesPodOperator(BaseOperator):
         pod = self.build_pod_request_obj()
         print(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))
 
-    def _patch_deprecated_k8s_settings(self, hook: KubernetesHook):
-        """
-        Here we read config from core Airflow config [kubernetes] section.
-        In a future release we will stop looking at this section and require users
-        to use Airflow connections to configure KPO.
-
-        When we find values there that we need to apply on the hook, we patch special
-        hook attributes here.
-        """
-        # default for enable_tcp_keepalive is True; patch if False
-        if conf.getboolean('kubernetes', 'enable_tcp_keepalive') is False:
-            hook._deprecated_core_disable_tcp_keepalive = True
-
-        # default verify_ssl is True; patch if False.
-        if conf.getboolean('kubernetes', 'verify_ssl') is False:
-            hook._deprecated_core_disable_verify_ssl = True
-
-        # default for in_cluster is True; patch if False and no KPO param.
-        conf_in_cluster = conf.getboolean('kubernetes', 'in_cluster')
-        if self.in_cluster is None and conf_in_cluster is False:
-            hook._deprecated_core_in_cluster = conf_in_cluster
-
-        # there's no default for cluster context; if we get something (and no KPO param) patch it.
-        conf_cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
-        if not self.cluster_context and conf_cluster_context:
-            hook._deprecated_core_cluster_context = conf_cluster_context
-
-        # there's no default for config_file; if we get something (and no KPO param) patch it.
-        conf_config_file = conf.get('kubernetes', 'config_file', fallback=None)
-        if not self.config_file and conf_config_file:
-            hook._deprecated_core_config_file = conf_config_file
-
 
 class _suppress(AbstractContextManager):
     """
diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
index ed8dde9c09..633fa82c09 100644
--- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
@@ -313,61 +313,6 @@ class TestKubernetesHook:
         assert isinstance(hook.api_client, kubernetes.client.ApiClient)
         assert isinstance(hook.get_conn(), kubernetes.client.ApiClient)
 
-    @patch(f"{HOOK_MODULE}._disable_verify_ssl")
-    @patch(f"{HOOK_MODULE}.KubernetesHook._get_default_client", new=MagicMock)
-    def test_patch_core_settings_verify_ssl(self, mock_disable_verify_ssl):
-        hook = KubernetesHook()
-        hook.get_conn()
-        mock_disable_verify_ssl.assert_not_called()
-        mock_disable_verify_ssl.reset_mock()
-        hook._deprecated_core_disable_verify_ssl = True
-        hook.get_conn()
-        mock_disable_verify_ssl.assert_called()
-
-    @patch(f"{HOOK_MODULE}._enable_tcp_keepalive")
-    @patch(f"{HOOK_MODULE}.KubernetesHook._get_default_client", new=MagicMock)
-    def test_patch_core_settings_tcp_keepalive(self, mock_enable_tcp_keepalive):
-        hook = KubernetesHook()
-        hook.get_conn()
-        mock_enable_tcp_keepalive.assert_called()
-        mock_enable_tcp_keepalive.reset_mock()
-        hook._deprecated_core_disable_tcp_keepalive = True
-        hook.get_conn()
-        mock_enable_tcp_keepalive.assert_not_called()
-
-    @patch("kubernetes.config.kube_config.KubeConfigLoader", new=MagicMock())
-    @patch("kubernetes.config.kube_config.KubeConfigMerger", new=MagicMock())
-    @patch("kubernetes.config.incluster_config.InClusterConfigLoader")
-    @patch(f"{HOOK_MODULE}.KubernetesHook._get_default_client")
-    def test_patch_core_settings_in_cluster(self, mock_get_default_client, mock_in_cluster_loader):
-        hook = KubernetesHook(conn_id=None)
-        hook.get_conn()
-        mock_in_cluster_loader.assert_not_called()
-        mock_in_cluster_loader.reset_mock()
-        hook._deprecated_core_in_cluster = False
-        hook.get_conn()
-        mock_in_cluster_loader.assert_not_called()
-        mock_get_default_client.assert_called()
-
-    @pytest.mark.parametrize(
-        'key, key_val, attr, attr_val',
-        [
-            ('in_cluster', False, '_deprecated_core_in_cluster', False),
-            ('verify_ssl', False, '_deprecated_core_disable_verify_ssl', True),
-            ('cluster_context', 'hi', '_deprecated_core_cluster_context', 'hi'),
-            ('config_file', '/path/to/file.txt', '_deprecated_core_config_file', '/path/to/file.txt'),
-            ('enable_tcp_keepalive', False, '_deprecated_core_disable_tcp_keepalive', True),
-        ],
-    )
-    @patch("kubernetes.config.incluster_config.InClusterConfigLoader", new=MagicMock())
-    @patch("kubernetes.config.kube_config.KubeConfigLoader", new=MagicMock())
-    @patch("kubernetes.config.kube_config.KubeConfigMerger", new=MagicMock())
-    def test_core_settings_warnings(self, key, key_val, attr, attr_val):
-        hook = KubernetesHook(conn_id=None)
-        setattr(hook, attr, attr_val)
-        with pytest.warns(DeprecationWarning, match=rf'.*Airflow settings.*\n.*{key}={key_val!r}.*'):
-            hook.get_conn()
-
 
 class TestKubernetesHookIncorrectConfiguration:
     @pytest.mark.parametrize(
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index cc0efb3fc4..fbcd027aaf 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -31,7 +31,6 @@ from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.types import DagRunType
 from tests.test_utils import db
-from tests.test_utils.config import conf_vars
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0)
 KPO_MODULE = "airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
@@ -937,27 +936,6 @@ class TestKubernetesPodOperator:
         mock_patch_already_checked.assert_called_once()
         mock_delete_pod.assert_not_called()
 
-    @pytest.mark.parametrize(
-        'key, value, attr, patched_value',
-        [
-            ('verify_ssl', 'False', '_deprecated_core_disable_verify_ssl', True),
-            ('in_cluster', 'False', '_deprecated_core_in_cluster', False),
-            ('cluster_context', 'hi', '_deprecated_core_cluster_context', 'hi'),
-            ('config_file', '/path/to/file.txt', '_deprecated_core_config_file', '/path/to/file.txt'),
-            ('enable_tcp_keepalive', 'False', '_deprecated_core_disable_tcp_keepalive', True),
-        ],
-    )
-    def test_patch_core_settings(self, key, value, attr, patched_value):
-        # first verify the behavior for the default value
-        # the hook attr should be None
-        op = KubernetesPodOperator(task_id='abc', name='hi')
-        self.hook_patch.stop()
-        assert getattr(op.hook, attr) is None
-        # now check behavior with a non-default value
-        with conf_vars({('kubernetes', key): value}):
-            op = KubernetesPodOperator(task_id='abc', name='hi')
-            assert getattr(op.hook, attr) == patched_value
-
 
 def test__suppress():
     with mock.patch('logging.Logger.error') as mock_error: