You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/04/21 15:36:07 UTC

[airflow] branch main updated: KubernetesHook should try incluster first when not otherwise configured (#23126)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c7399c7190 KubernetesHook should try incluster first when not otherwise configured (#23126)
c7399c7190 is described below

commit c7399c7190750ba705b8255b7a92de2554e6eef3
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Thu Apr 21 08:35:57 2022 -0700

    KubernetesHook should try incluster first when not otherwise configured (#23126)
    
    Currently when K8s hook receives no configuration (e.g. incluster vs config file content vs config file path) the default client generation process will try to load the kube config in the default location.  This is inconsistent with airflow core's behavior in kubernetes executor and kubernetes pod operator (in_cluster=True is the default with those).
    
    To make k8s hook's behavior consistent, we can first try incluster, then if that fails, try default kubeconfig.  This should be safe to do.  The kubernetes client will check for 2 environment variables that an in-cluster environment should have and if it doesn't find them, it will raise ConfigException (see here: https://github.com/kubernetes-client/python/blob/1271465acdb80bf174c50564a384fd6898635ea6/kubernetes/base/config/incluster_config.py#L60-L62).  If ConfigException is raised,  [...]
---
 .../providers/cncf/kubernetes/hooks/kubernetes.py  | 22 +++++++++++----
 .../cncf/kubernetes/hooks/test_kubernetes.py       | 33 +++++++++++++++++++++-
 2 files changed, 49 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 1107655585..b6115b87f8 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -18,6 +18,8 @@ import sys
 import tempfile
 from typing import Any, Dict, Generator, Optional, Tuple, Union
 
+from kubernetes.config import ConfigException
+
 if sys.version_info >= (3, 8):
     from functools import cached_property
 else:
@@ -171,11 +173,21 @@ class KubernetesHook(BaseHook):
                 )
             return client.ApiClient()
 
-        self.log.debug("loading kube_config from: default file")
-        config.load_kube_config(
-            client_configuration=self.client_configuration,
-            context=cluster_context,
-        )
+        return self._get_default_client(cluster_context=cluster_context)
+
+    def _get_default_client(self, *, cluster_context=None):
+        # if we get here, then no configuration has been supplied
+        # we should try in_cluster since that's most likely
+        # but failing that just load assuming a kubeconfig file
+        # in the default location
+        try:
+            config.load_incluster_config(client_configuration=self.client_configuration)
+        except ConfigException:
+            self.log.debug("loading kube_config from: default file")
+            config.load_kube_config(
+                client_configuration=self.client_configuration,
+                context=cluster_context,
+            )
         return client.ApiClient()
 
     @cached_property
diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
index 256974ebd4..5194061e7a 100644
--- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
@@ -25,6 +25,7 @@ from unittest.mock import patch
 
 import kubernetes
 import pytest
+from kubernetes.config import ConfigException
 
 from airflow import AirflowException
 from airflow.models import Connection
@@ -75,8 +76,10 @@ class TestKubernetesHook:
     @patch("kubernetes.config.kube_config.KubeConfigLoader")
     @patch("kubernetes.config.kube_config.KubeConfigMerger")
     @patch("kubernetes.config.incluster_config.InClusterConfigLoader")
+    @patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook._get_default_client")
     def test_in_cluster_connection(
         self,
+        mock_get_default_client,
         mock_in_cluster_loader,
         mock_merger,
         mock_loader,
@@ -89,15 +92,43 @@ class TestKubernetesHook:
         Hook param should beat extra.
         """
         kubernetes_hook = KubernetesHook(conn_id=conn_id, in_cluster=in_cluster_param)
+        mock_get_default_client.return_value = kubernetes.client.api_client.ApiClient()
         api_conn = kubernetes_hook.get_conn()
         if in_cluster_called:
             mock_in_cluster_loader.assert_called_once()
             mock_merger.assert_not_called()
             mock_loader.assert_not_called()
         else:
-            mock_in_cluster_loader.assert_not_called()
+            mock_get_default_client.assert_called()
+        assert isinstance(api_conn, kubernetes.client.api_client.ApiClient)
+
+    @pytest.mark.parametrize('in_cluster_fails', [True, False])
+    @patch("kubernetes.config.kube_config.KubeConfigLoader")
+    @patch("kubernetes.config.kube_config.KubeConfigMerger")
+    @patch("kubernetes.config.incluster_config.InClusterConfigLoader")
+    def test_get_default_client(
+        self,
+        mock_incluster,
+        mock_merger,
+        mock_loader,
+        in_cluster_fails,
+    ):
+        """
+        Verifies the behavior of the ``_get_default_client`` function.  It should try the "in cluster"
+        loader first but if that fails, try to use the default kubeconfig file.
+        """
+        if in_cluster_fails:
+            mock_incluster.side_effect = ConfigException('any')
+        kubernetes_hook = KubernetesHook()
+        api_conn = kubernetes_hook._get_default_client()
+        if in_cluster_fails:
+            mock_incluster.assert_called_once()
             mock_merger.assert_called_once_with(KUBE_CONFIG_PATH)
             mock_loader.assert_called_once()
+        else:
+            mock_incluster.assert_called_once()
+            mock_merger.assert_not_called()
+            mock_loader.assert_not_called()
         assert isinstance(api_conn, kubernetes.client.api_client.ApiClient)
 
     @pytest.mark.parametrize(