You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/03/16 19:35:13 UTC

[airflow] branch main updated: Remove RefreshConfiguration workaround for K8s token refreshing (#20759)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bd165f  Remove RefreshConfiguration workaround for K8s token refreshing (#20759)
7bd165f is described below

commit 7bd165fbe2cbbfa8208803ec352c5d16ca2bd3ec
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Mar 16 12:33:01 2022 -0700

    Remove RefreshConfiguration workaround for K8s token refreshing (#20759)
    
    A workaround was added (https://github.com/apache/airflow/pull/5731) to handle the refreshing of EKS tokens.  It was necessary because of an upstream bug.  It has since been fixed (https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) and released in v21.7.0 (https://github.com/kubernetes-client/python/blob/master/CHANGELOG.md#v2170).
---
 UPDATING.md                                        |   4 +
 airflow/kubernetes/kube_client.py                  |  47 ++------
 airflow/kubernetes/refresh_config.py               | 124 ---------------------
 .../providers/cncf/kubernetes/utils/pod_manager.py |   8 +-
 setup.py                                           |   2 +-
 tests/kubernetes/test_client.py                    |  22 ++--
 tests/kubernetes/test_refresh_config.py            | 106 ------------------
 7 files changed, 26 insertions(+), 287 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index cda775f..c929ece 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -81,6 +81,10 @@ https://developers.google.com/style/inclusive-documentation
 
 -->
 
+### Minimum kubernetes version bumped from 3.0.0 to  21.7.0
+
+No change in behavior is expected.  This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759).
+
 ### Deprecation: `Connection.extra` must be JSON-encoded dict
 
 #### TLDR
diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py
index 97836be..7e6ba05 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -25,39 +25,10 @@ log = logging.getLogger(__name__)
 try:
     from kubernetes import client, config
     from kubernetes.client import Configuration
-    from kubernetes.client.api_client import ApiClient
     from kubernetes.client.rest import ApiException
 
-    from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config
-
     has_kubernetes = True
 
-    def _get_kube_config(
-        in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
-    ) -> Optional[Configuration]:
-        if in_cluster:
-            # load_incluster_config set default configuration with config populated by k8s
-            config.load_incluster_config()
-            return None
-        else:
-            # this block can be replaced with just config.load_kube_config once
-            # refresh_config module is replaced with upstream fix
-            cfg = RefreshConfiguration()
-            load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
-            return cfg
-
-    def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
-        """
-        This is a workaround for supporting api token refresh in k8s client.
-
-        The function can be replace with `return client.CoreV1Api()` once the
-        upstream client supports token refresh.
-        """
-        if cfg:
-            return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
-        else:
-            return client.CoreV1Api()
-
     def _disable_verify_ssl() -> None:
         configuration = Configuration()
         configuration.verify_ssl = False
@@ -126,17 +97,19 @@ def get_kube_client(
     if not has_kubernetes:
         raise _import_err
 
-    if not in_cluster:
-        if cluster_context is None:
-            cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
-        if config_file is None:
-            config_file = conf.get('kubernetes', 'config_file', fallback=None)
-
     if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
         _enable_tcp_keepalive()
 
     if not conf.getboolean('kubernetes', 'verify_ssl'):
         _disable_verify_ssl()
 
-    client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
-    return _get_client_with_patched_configuration(client_conf)
+    if in_cluster:
+        config.load_incluster_config()
+    else:
+        if cluster_context is None:
+            cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
+        if config_file is None:
+            config_file = conf.get('kubernetes', 'config_file', fallback=None)
+        config.load_kube_config(config_file=config_file, context=cluster_context)
+
+    return client.CoreV1Api()
diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
deleted file mode 100644
index 2564951..0000000
--- a/airflow/kubernetes/refresh_config.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-NOTE: this module can be removed once upstream client supports token refresh
-see: https://github.com/kubernetes-client/python/issues/741
-"""
-
-import calendar
-import logging
-import os
-import time
-from typing import Optional, cast
-
-import pendulum
-from kubernetes.client import Configuration
-from kubernetes.config.exec_provider import ExecProvider
-from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader
-
-from airflow.utils import yaml
-
-
-def _parse_timestamp(ts_str: str) -> int:
-    parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str))
-    return calendar.timegm(parsed_dt.timetuple())
-
-
-class RefreshKubeConfigLoader(KubeConfigLoader):
-    """
-    Patched KubeConfigLoader, this subclass takes expirationTimestamp into
-    account and sets api key refresh callback hook in Configuration object
-    """
-
-    def __init__(self, *args, **kwargs):
-        KubeConfigLoader.__init__(self, *args, **kwargs)
-        self.api_key_expire_ts = None
-
-    def _load_from_exec_plugin(self):
-        """
-        We override _load_from_exec_plugin method to also read and store
-        expiration timestamp for aws-iam-authenticator. It will be later
-        used for api token refresh.
-        """
-        if 'exec' not in self._user:
-            return None
-        try:
-            status = ExecProvider(self._user['exec']).run()
-            if 'token' not in status:
-                logging.error('exec: missing token field in plugin output')
-                return None
-            self.token = f"Bearer {status['token']}"
-            ts_str = status.get('expirationTimestamp')
-            if ts_str:
-                self.api_key_expire_ts = _parse_timestamp(ts_str)
-            return True
-        except Exception as e:
-            logging.error(str(e))
-            return None
-
-    def refresh_api_key(self, client_configuration):
-        """Refresh API key if expired"""
-        if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts:
-            self.load_and_set(client_configuration)
-
-    def load_and_set(self, client_configuration):
-        KubeConfigLoader.load_and_set(self, client_configuration)
-        client_configuration.refresh_api_key = self.refresh_api_key
-
-
-class RefreshConfiguration(Configuration):
-    """
-    Patched Configuration, this subclass takes api key refresh callback hook
-    into account
-    """
-
-    def __init__(self, *args, **kwargs):
-        Configuration.__init__(self, *args, **kwargs)
-        self.refresh_api_key = None
-
-    def get_api_key_with_prefix(self, identifier):
-        if self.refresh_api_key:
-            self.refresh_api_key(self)
-        return Configuration.get_api_key_with_prefix(self, identifier)
-
-
-def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]:
-    """
-    Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed
-    KubeConfigLoader to RefreshKubeConfigLoader
-    """
-    with open(filename) as f:
-        return RefreshKubeConfigLoader(
-            config_dict=yaml.safe_load(f),
-            config_base_path=os.path.abspath(os.path.dirname(filename)),
-            **kwargs,
-        )
-
-
-def load_kube_config(client_configuration, config_file=None, context=None):
-    """
-    Adapted from the upstream load_kube_config function, changes:
-        - removed persist_config argument since it's not being used
-        - remove `client_configuration is None` branch since we always pass
-        in client configuration
-    """
-    if config_file is None:
-        config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)
-
-    loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None)
-    loader.load_and_set(client_configuration)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 17c2225..2323ae1 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -38,11 +38,7 @@ from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
-    try:
-        # Kube >= 19
-        from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList
-    except ImportError:
-        from kubernetes.client.models.v1_event_list import V1EventList
+    from kubernetes.client.models.core_v1_event_list import CoreV1EventList
 
 
 class PodLaunchFailedException(AirflowException):
@@ -298,7 +294,7 @@ class PodManager(LoggingMixin):
             raise
 
     @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
-    def read_pod_events(self, pod: V1Pod) -> "V1EventList":
+    def read_pod_events(self, pod: V1Pod) -> "CoreV1EventList":
         """Reads events from the POD"""
         try:
             return self._client.list_namespaced_event(
diff --git a/setup.py b/setup.py
index f31f2cd..3118636 100644
--- a/setup.py
+++ b/setup.py
@@ -414,7 +414,7 @@ kerberos = [
 ]
 kubernetes = [
     'cryptography>=2.0.0',
-    'kubernetes>=3.0.0',
+    'kubernetes>=21.7.0',
 ]
 kylin = ['kylinpy>=2.6']
 ldap = [
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index 9228e9b..ce040cf 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -22,25 +22,21 @@ from unittest import mock
 from kubernetes.client import Configuration
 from urllib3.connection import HTTPConnection, HTTPSConnection
 
-from airflow.kubernetes.kube_client import (
-    RefreshConfiguration,
-    _disable_verify_ssl,
-    _enable_tcp_keepalive,
-    get_kube_client,
-)
+from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client
 
 
 class TestClient(unittest.TestCase):
     @mock.patch('airflow.kubernetes.kube_client.config')
-    def test_load_cluster_config(self, _):
-        client = get_kube_client(in_cluster=True)
-        assert not isinstance(client.api_client.configuration, RefreshConfiguration)
+    def test_load_cluster_config(self, config):
+        get_kube_client(in_cluster=True)
+        assert config.load_incluster_config.called
+        assert config.load_kube_config.not_called
 
     @mock.patch('airflow.kubernetes.kube_client.config')
-    @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
-    def test_load_file_config(self, _, _2):
-        client = get_kube_client(in_cluster=False)
-        assert isinstance(client.api_client.configuration, RefreshConfiguration)
+    def test_load_file_config(self, config):
+        get_kube_client(in_cluster=False)
+        assert config.load_incluster_config.not_called
+        assert config.load_kube_config.called
 
     def test_enable_tcp_keepalive(self):
         socket_options = [
diff --git a/tests/kubernetes/test_refresh_config.py b/tests/kubernetes/test_refresh_config.py
deleted file mode 100644
index 61c1b86..0000000
--- a/tests/kubernetes/test_refresh_config.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import os
-from unittest import TestCase, mock
-
-import pytest
-from kubernetes.config.kube_config import ConfigNode
-from pendulum.parsing import ParserError
-
-from airflow.kubernetes.refresh_config import (
-    RefreshConfiguration,
-    RefreshKubeConfigLoader,
-    _get_kube_config_loader_for_yaml_file,
-    _parse_timestamp,
-)
-
-
-class TestRefreshKubeConfigLoader(TestCase):
-    ROOT_PROJECT_DIR = os.path.abspath(
-        os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
-    )
-
-    KUBE_CONFIG_PATH = os.path.join(ROOT_PROJECT_DIR, "tests", "kubernetes", "kube_config")
-
-    def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self):
-        ts = _parse_timestamp("2020-01-13T13:42:20Z")
-        assert 1578922940 == ts
-
-    def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self):
-        ts = _parse_timestamp("2020-01-13T13:42:20+0600")
-        assert 1578922940 == ts
-
-    def test_parse_timestamp_should_throw_exception(self):
-        with pytest.raises(ParserError):
-            _parse_timestamp("foobar")
-
-    def test_get_kube_config_loader_for_yaml_file(self):
-        refresh_kube_config_loader = _get_kube_config_loader_for_yaml_file(self.KUBE_CONFIG_PATH)
-
-        assert refresh_kube_config_loader is not None
-
-        assert refresh_kube_config_loader.current_context['name'] == 'federal-context'
-
-        context = refresh_kube_config_loader.current_context['context']
-        assert context is not None
-        assert context['cluster'] == 'horse-cluster'
-        assert context['namespace'] == 'chisel-ns'
-        assert context['user'] == 'green-user'
-
-    def test_get_api_key_with_prefix(self):
-
-        refresh_config = RefreshConfiguration()
-        refresh_config.api_key['key'] = '1234'
-        assert refresh_config is not None
-
-        api_key = refresh_config.get_api_key_with_prefix("key")
-
-        assert api_key == '1234'
-
-    @mock.patch('kubernetes.config.exec_provider.ExecProvider.__init__', return_value=None)
-    @mock.patch('kubernetes.config.exec_provider.ExecProvider.run', return_value={'token': '1234'})
-    def test_refresh_kube_config_loader(self, exec_provider_run, exec_provider_init):
-        current_context = _get_kube_config_loader_for_yaml_file(self.KUBE_CONFIG_PATH).current_context
-
-        config_dict = {}
-        config_dict['current-context'] = 'federal-context'
-        config_dict['contexts'] = []
-        config_dict['contexts'].append(current_context)
-
-        config_dict['clusters'] = []
-
-        cluster_config = {}
-        cluster_config['api-version'] = 'v1'
-        cluster_config['server'] = 'http://cow.org:8080'
-        cluster_config['name'] = 'horse-cluster'
-        cluster_root_config = {}
-        cluster_root_config['cluster'] = cluster_config
-        cluster_root_config['name'] = 'horse-cluster'
-        config_dict['clusters'].append(cluster_root_config)
-
-        refresh_kube_config_loader = RefreshKubeConfigLoader(config_dict=config_dict)
-        refresh_kube_config_loader._user = {}
-
-        config_node = ConfigNode('command', 'test')
-        config_node.__dict__['apiVersion'] = '2.0'
-        config_node.__dict__['command'] = 'test'
-
-        refresh_kube_config_loader._user['exec'] = config_node
-
-        result = refresh_kube_config_loader._load_from_exec_plugin()
-        assert result is not None