You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2022/03/16 19:35:13 UTC
[airflow] branch main updated: Remove RefreshConfiguration workaround for K8s token refreshing (#20759)
This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7bd165f Remove RefreshConfiguration workaround for K8s token refreshing (#20759)
7bd165f is described below
commit 7bd165fbe2cbbfa8208803ec352c5d16ca2bd3ec
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Wed Mar 16 12:33:01 2022 -0700
Remove RefreshConfiguration workaround for K8s token refreshing (#20759)
A workaround was added (https://github.com/apache/airflow/pull/5731) to handle the refreshing of EKS tokens. It was necessary because of an upstream bug. It has since been fixed (https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) and released in v21.7.0 (https://github.com/kubernetes-client/python/blob/master/CHANGELOG.md#v2170).
---
UPDATING.md | 4 +
airflow/kubernetes/kube_client.py | 47 ++------
airflow/kubernetes/refresh_config.py | 124 ---------------------
.../providers/cncf/kubernetes/utils/pod_manager.py | 8 +-
setup.py | 2 +-
tests/kubernetes/test_client.py | 22 ++--
tests/kubernetes/test_refresh_config.py | 106 ------------------
7 files changed, 26 insertions(+), 287 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index cda775f..c929ece 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -81,6 +81,10 @@ https://developers.google.com/style/inclusive-documentation
-->
+### Minimum kubernetes version bumped from 3.0.0 to 21.7.0
+
+No change in behavior is expected. This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759).
+
### Deprecation: `Connection.extra` must be JSON-encoded dict
#### TLDR
diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py
index 97836be..7e6ba05 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -25,39 +25,10 @@ log = logging.getLogger(__name__)
try:
from kubernetes import client, config
from kubernetes.client import Configuration
- from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
- from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config
-
has_kubernetes = True
- def _get_kube_config(
- in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
- ) -> Optional[Configuration]:
- if in_cluster:
- # load_incluster_config set default configuration with config populated by k8s
- config.load_incluster_config()
- return None
- else:
- # this block can be replaced with just config.load_kube_config once
- # refresh_config module is replaced with upstream fix
- cfg = RefreshConfiguration()
- load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
- return cfg
-
- def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
- """
- This is a workaround for supporting api token refresh in k8s client.
-
- The function can be replace with `return client.CoreV1Api()` once the
- upstream client supports token refresh.
- """
- if cfg:
- return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
- else:
- return client.CoreV1Api()
-
def _disable_verify_ssl() -> None:
configuration = Configuration()
configuration.verify_ssl = False
@@ -126,17 +97,19 @@ def get_kube_client(
if not has_kubernetes:
raise _import_err
- if not in_cluster:
- if cluster_context is None:
- cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
- if config_file is None:
- config_file = conf.get('kubernetes', 'config_file', fallback=None)
-
if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
_enable_tcp_keepalive()
if not conf.getboolean('kubernetes', 'verify_ssl'):
_disable_verify_ssl()
- client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
- return _get_client_with_patched_configuration(client_conf)
+ if in_cluster:
+ config.load_incluster_config()
+ else:
+ if cluster_context is None:
+ cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
+ if config_file is None:
+ config_file = conf.get('kubernetes', 'config_file', fallback=None)
+ config.load_kube_config(config_file=config_file, context=cluster_context)
+
+ return client.CoreV1Api()
diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
deleted file mode 100644
index 2564951..0000000
--- a/airflow/kubernetes/refresh_config.py
+++ /dev/null
@@ -1,124 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-NOTE: this module can be removed once upstream client supports token refresh
-see: https://github.com/kubernetes-client/python/issues/741
-"""
-
-import calendar
-import logging
-import os
-import time
-from typing import Optional, cast
-
-import pendulum
-from kubernetes.client import Configuration
-from kubernetes.config.exec_provider import ExecProvider
-from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader
-
-from airflow.utils import yaml
-
-
-def _parse_timestamp(ts_str: str) -> int:
- parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str))
- return calendar.timegm(parsed_dt.timetuple())
-
-
-class RefreshKubeConfigLoader(KubeConfigLoader):
- """
- Patched KubeConfigLoader, this subclass takes expirationTimestamp into
- account and sets api key refresh callback hook in Configuration object
- """
-
- def __init__(self, *args, **kwargs):
- KubeConfigLoader.__init__(self, *args, **kwargs)
- self.api_key_expire_ts = None
-
- def _load_from_exec_plugin(self):
- """
- We override _load_from_exec_plugin method to also read and store
- expiration timestamp for aws-iam-authenticator. It will be later
- used for api token refresh.
- """
- if 'exec' not in self._user:
- return None
- try:
- status = ExecProvider(self._user['exec']).run()
- if 'token' not in status:
- logging.error('exec: missing token field in plugin output')
- return None
- self.token = f"Bearer {status['token']}"
- ts_str = status.get('expirationTimestamp')
- if ts_str:
- self.api_key_expire_ts = _parse_timestamp(ts_str)
- return True
- except Exception as e:
- logging.error(str(e))
- return None
-
- def refresh_api_key(self, client_configuration):
- """Refresh API key if expired"""
- if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts:
- self.load_and_set(client_configuration)
-
- def load_and_set(self, client_configuration):
- KubeConfigLoader.load_and_set(self, client_configuration)
- client_configuration.refresh_api_key = self.refresh_api_key
-
-
-class RefreshConfiguration(Configuration):
- """
- Patched Configuration, this subclass takes api key refresh callback hook
- into account
- """
-
- def __init__(self, *args, **kwargs):
- Configuration.__init__(self, *args, **kwargs)
- self.refresh_api_key = None
-
- def get_api_key_with_prefix(self, identifier):
- if self.refresh_api_key:
- self.refresh_api_key(self)
- return Configuration.get_api_key_with_prefix(self, identifier)
-
-
-def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]:
- """
- Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed
- KubeConfigLoader to RefreshKubeConfigLoader
- """
- with open(filename) as f:
- return RefreshKubeConfigLoader(
- config_dict=yaml.safe_load(f),
- config_base_path=os.path.abspath(os.path.dirname(filename)),
- **kwargs,
- )
-
-
-def load_kube_config(client_configuration, config_file=None, context=None):
- """
- Adapted from the upstream load_kube_config function, changes:
- - removed persist_config argument since it's not being used
- - remove `client_configuration is None` branch since we always pass
- in client configuration
- """
- if config_file is None:
- config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)
-
- loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None)
- loader.load_and_set(client_configuration)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 17c2225..2323ae1 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -38,11 +38,7 @@ from airflow.kubernetes.pod_generator import PodDefaults
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
- try:
- # Kube >= 19
- from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList
- except ImportError:
- from kubernetes.client.models.v1_event_list import V1EventList
+ from kubernetes.client.models.core_v1_event_list import CoreV1EventList
class PodLaunchFailedException(AirflowException):
@@ -298,7 +294,7 @@ class PodManager(LoggingMixin):
raise
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
- def read_pod_events(self, pod: V1Pod) -> "V1EventList":
+ def read_pod_events(self, pod: V1Pod) -> "CoreV1EventList":
"""Reads events from the POD"""
try:
return self._client.list_namespaced_event(
diff --git a/setup.py b/setup.py
index f31f2cd..3118636 100644
--- a/setup.py
+++ b/setup.py
@@ -414,7 +414,7 @@ kerberos = [
]
kubernetes = [
'cryptography>=2.0.0',
- 'kubernetes>=3.0.0',
+ 'kubernetes>=21.7.0',
]
kylin = ['kylinpy>=2.6']
ldap = [
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index 9228e9b..ce040cf 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -22,25 +22,21 @@ from unittest import mock
from kubernetes.client import Configuration
from urllib3.connection import HTTPConnection, HTTPSConnection
-from airflow.kubernetes.kube_client import (
- RefreshConfiguration,
- _disable_verify_ssl,
- _enable_tcp_keepalive,
- get_kube_client,
-)
+from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client
class TestClient(unittest.TestCase):
@mock.patch('airflow.kubernetes.kube_client.config')
- def test_load_cluster_config(self, _):
- client = get_kube_client(in_cluster=True)
- assert not isinstance(client.api_client.configuration, RefreshConfiguration)
+ def test_load_cluster_config(self, config):
+ get_kube_client(in_cluster=True)
+ assert config.load_incluster_config.called
+ assert config.load_kube_config.not_called
@mock.patch('airflow.kubernetes.kube_client.config')
- @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
- def test_load_file_config(self, _, _2):
- client = get_kube_client(in_cluster=False)
- assert isinstance(client.api_client.configuration, RefreshConfiguration)
+ def test_load_file_config(self, config):
+ get_kube_client(in_cluster=False)
+ assert config.load_incluster_config.not_called
+ assert config.load_kube_config.called
def test_enable_tcp_keepalive(self):
socket_options = [
diff --git a/tests/kubernetes/test_refresh_config.py b/tests/kubernetes/test_refresh_config.py
deleted file mode 100644
index 61c1b86..0000000
--- a/tests/kubernetes/test_refresh_config.py
+++ /dev/null
@@ -1,106 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import os
-from unittest import TestCase, mock
-
-import pytest
-from kubernetes.config.kube_config import ConfigNode
-from pendulum.parsing import ParserError
-
-from airflow.kubernetes.refresh_config import (
- RefreshConfiguration,
- RefreshKubeConfigLoader,
- _get_kube_config_loader_for_yaml_file,
- _parse_timestamp,
-)
-
-
-class TestRefreshKubeConfigLoader(TestCase):
- ROOT_PROJECT_DIR = os.path.abspath(
- os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
- )
-
- KUBE_CONFIG_PATH = os.path.join(ROOT_PROJECT_DIR, "tests", "kubernetes", "kube_config")
-
- def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self):
- ts = _parse_timestamp("2020-01-13T13:42:20Z")
- assert 1578922940 == ts
-
- def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self):
- ts = _parse_timestamp("2020-01-13T13:42:20+0600")
- assert 1578922940 == ts
-
- def test_parse_timestamp_should_throw_exception(self):
- with pytest.raises(ParserError):
- _parse_timestamp("foobar")
-
- def test_get_kube_config_loader_for_yaml_file(self):
- refresh_kube_config_loader = _get_kube_config_loader_for_yaml_file(self.KUBE_CONFIG_PATH)
-
- assert refresh_kube_config_loader is not None
-
- assert refresh_kube_config_loader.current_context['name'] == 'federal-context'
-
- context = refresh_kube_config_loader.current_context['context']
- assert context is not None
- assert context['cluster'] == 'horse-cluster'
- assert context['namespace'] == 'chisel-ns'
- assert context['user'] == 'green-user'
-
- def test_get_api_key_with_prefix(self):
-
- refresh_config = RefreshConfiguration()
- refresh_config.api_key['key'] = '1234'
- assert refresh_config is not None
-
- api_key = refresh_config.get_api_key_with_prefix("key")
-
- assert api_key == '1234'
-
- @mock.patch('kubernetes.config.exec_provider.ExecProvider.__init__', return_value=None)
- @mock.patch('kubernetes.config.exec_provider.ExecProvider.run', return_value={'token': '1234'})
- def test_refresh_kube_config_loader(self, exec_provider_run, exec_provider_init):
- current_context = _get_kube_config_loader_for_yaml_file(self.KUBE_CONFIG_PATH).current_context
-
- config_dict = {}
- config_dict['current-context'] = 'federal-context'
- config_dict['contexts'] = []
- config_dict['contexts'].append(current_context)
-
- config_dict['clusters'] = []
-
- cluster_config = {}
- cluster_config['api-version'] = 'v1'
- cluster_config['server'] = 'http://cow.org:8080'
- cluster_config['name'] = 'horse-cluster'
- cluster_root_config = {}
- cluster_root_config['cluster'] = cluster_config
- cluster_root_config['name'] = 'horse-cluster'
- config_dict['clusters'].append(cluster_root_config)
-
- refresh_kube_config_loader = RefreshKubeConfigLoader(config_dict=config_dict)
- refresh_kube_config_loader._user = {}
-
- config_node = ConfigNode('command', 'test')
- config_node.__dict__['apiVersion'] = '2.0'
- config_node.__dict__['command'] = 'test'
-
- refresh_kube_config_loader._user['exec'] = config_node
-
- result = refresh_kube_config_loader._load_from_exec_plugin()
- assert result is not None