You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/28 18:26:01 UTC
[airflow] 01/04: Revert "Remove RefreshConfiguration workaround for K8s token refreshing (#20759)"
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 60a2b900ef1e880df970880f865085f023c73701
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Mar 28 17:32:36 2022 +0200
Revert "Remove RefreshConfiguration workaround for K8s token refreshing (#20759)"
This reverts commit d39197fd13b0d96c2ab84ca3f1f13391dbf59572.
---
UPDATING.md | 6 +-
airflow/kubernetes/kube_client.py | 47 +-
airflow/kubernetes/refresh_config.py | 124 +++++
airflow/providers/cncf/kubernetes/CHANGELOG.rst | 231 +--------
airflow/providers/cncf/kubernetes/__init__.py | 27 --
.../backcompat/backwards_compat_converters.py | 22 +-
.../providers/cncf/kubernetes/backcompat/pod.py | 27 +-
.../kubernetes/backcompat/pod_runtime_info_env.py | 18 +-
.../providers/cncf/kubernetes/backcompat/volume.py | 2 +
.../cncf/kubernetes/backcompat/volume_mount.py | 4 +
.../kubernetes/example_dags/example_kubernetes.py | 163 -------
.../providers/cncf/kubernetes/hooks/kubernetes.py | 97 ++--
.../cncf/kubernetes/operators/kubernetes_pod.py | 517 ++++++++++-----------
.../cncf/kubernetes/operators/spark_kubernetes.py | 16 +-
airflow/providers/cncf/kubernetes/provider.yaml | 8 -
.../cncf/kubernetes/sensors/spark_kubernetes.py | 15 +-
setup.py | 2 +-
tests/kubernetes/test_client.py | 22 +-
tests/kubernetes/test_refresh_config.py | 37 ++
19 files changed, 537 insertions(+), 848 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index 6532160..a706258 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -84,9 +84,7 @@ https://developers.google.com/style/inclusive-documentation
## Airflow 2.2.5
-### Minimum kubernetes version bumped from 3.0.0 to 21.7.0
-
-No change in behavior is expected. This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759).
+No breaking changes.
## Airflow 2.2.4
@@ -1381,7 +1379,7 @@ delete this option.
#### `airflow.models.dagbag.DagBag`
-Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property
+Passing `store_serialized_dags` argument to `DagBag.__init__` and accessing `DagBag.store_serialized_dags` property
are deprecated and will be removed in future versions.
diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py
index aa49715..1c20bd3 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -25,10 +25,39 @@ log = logging.getLogger(__name__)
try:
from kubernetes import client, config
from kubernetes.client import Configuration
+ from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
+ from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config
+
has_kubernetes = True
+ def _get_kube_config(
+ in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
+ ) -> Optional[Configuration]:
+ if in_cluster:
+ # load_incluster_config set default configuration with config populated by k8s
+ config.load_incluster_config()
+ return None
+ else:
+ # this block can be replaced with just config.load_kube_config once
+ # refresh_config module is replaced with upstream fix
+ cfg = RefreshConfiguration()
+ load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
+ return cfg
+
+ def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
+ """
+ This is a workaround for supporting api token refresh in k8s client.
+
+ The function can be replace with `return client.CoreV1Api()` once the
+ upstream client supports token refresh.
+ """
+ if cfg:
+ return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
+ else:
+ return client.CoreV1Api()
+
def _disable_verify_ssl() -> None:
configuration = Configuration()
configuration.verify_ssl = False
@@ -101,19 +130,17 @@ def get_kube_client(
if not has_kubernetes:
raise _import_err
+ if not in_cluster:
+ if cluster_context is None:
+ cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
+ if config_file is None:
+ config_file = conf.get('kubernetes', 'config_file', fallback=None)
+
if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
_enable_tcp_keepalive()
if not conf.getboolean('kubernetes', 'verify_ssl'):
_disable_verify_ssl()
- if in_cluster:
- config.load_incluster_config()
- else:
- if cluster_context is None:
- cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
- if config_file is None:
- config_file = conf.get('kubernetes', 'config_file', fallback=None)
- config.load_kube_config(config_file=config_file, context=cluster_context)
-
- return client.CoreV1Api()
+ client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
+ return _get_client_with_patched_configuration(client_conf)
diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
new file mode 100644
index 0000000..2564951
--- /dev/null
+++ b/airflow/kubernetes/refresh_config.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+NOTE: this module can be removed once upstream client supports token refresh
+see: https://github.com/kubernetes-client/python/issues/741
+"""
+
+import calendar
+import logging
+import os
+import time
+from typing import Optional, cast
+
+import pendulum
+from kubernetes.client import Configuration
+from kubernetes.config.exec_provider import ExecProvider
+from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader
+
+from airflow.utils import yaml
+
+
+def _parse_timestamp(ts_str: str) -> int:
+ parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str))
+ return calendar.timegm(parsed_dt.timetuple())
+
+
+class RefreshKubeConfigLoader(KubeConfigLoader):
+ """
+ Patched KubeConfigLoader, this subclass takes expirationTimestamp into
+ account and sets api key refresh callback hook in Configuration object
+ """
+
+ def __init__(self, *args, **kwargs):
+ KubeConfigLoader.__init__(self, *args, **kwargs)
+ self.api_key_expire_ts = None
+
+ def _load_from_exec_plugin(self):
+ """
+ We override _load_from_exec_plugin method to also read and store
+ expiration timestamp for aws-iam-authenticator. It will be later
+ used for api token refresh.
+ """
+ if 'exec' not in self._user:
+ return None
+ try:
+ status = ExecProvider(self._user['exec']).run()
+ if 'token' not in status:
+ logging.error('exec: missing token field in plugin output')
+ return None
+ self.token = f"Bearer {status['token']}"
+ ts_str = status.get('expirationTimestamp')
+ if ts_str:
+ self.api_key_expire_ts = _parse_timestamp(ts_str)
+ return True
+ except Exception as e:
+ logging.error(str(e))
+ return None
+
+ def refresh_api_key(self, client_configuration):
+ """Refresh API key if expired"""
+ if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts:
+ self.load_and_set(client_configuration)
+
+ def load_and_set(self, client_configuration):
+ KubeConfigLoader.load_and_set(self, client_configuration)
+ client_configuration.refresh_api_key = self.refresh_api_key
+
+
+class RefreshConfiguration(Configuration):
+ """
+ Patched Configuration, this subclass takes api key refresh callback hook
+ into account
+ """
+
+ def __init__(self, *args, **kwargs):
+ Configuration.__init__(self, *args, **kwargs)
+ self.refresh_api_key = None
+
+ def get_api_key_with_prefix(self, identifier):
+ if self.refresh_api_key:
+ self.refresh_api_key(self)
+ return Configuration.get_api_key_with_prefix(self, identifier)
+
+
+def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]:
+ """
+ Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed
+ KubeConfigLoader to RefreshKubeConfigLoader
+ """
+ with open(filename) as f:
+ return RefreshKubeConfigLoader(
+ config_dict=yaml.safe_load(f),
+ config_base_path=os.path.abspath(os.path.dirname(filename)),
+ **kwargs,
+ )
+
+
+def load_kube_config(client_configuration, config_file=None, context=None):
+ """
+ Adapted from the upstream load_kube_config function, changes:
+ - removed persist_config argument since it's not being used
+ - remove `client_configuration is None` branch since we always pass
+ in client configuration
+ """
+ if config_file is None:
+ config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)
+
+ loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None)
+ loader.load_and_set(client_configuration)
diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 5bcf569..7f686b2 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -19,231 +19,6 @@
Changelog
---------
-3.1.2
-.....
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Fix mistakenly added install_requires for all providers (#22382)``
-* ``Fix "run_id" k8s and elasticsearch compatibility with Airflow 2.1 (#22385)``
-
-Misc
-~~~~
-
-* ``Remove RefreshConfiguration workaround for K8s token refreshing (#20759)``
-
-3.1.1
-.....
-
-Misc
-~~~~~
-
-* ``Add Trove classifiers in PyPI (Framework :: Apache Airflow :: Provider)``
-
-3.1.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Add map_index label to mapped KubernetesPodOperator (#21916)``
-* ``Change KubePodOperator labels from exeuction_date to run_id (#21960)``
-
-Misc
-~~~~
-
-* ``Support for Python 3.10``
-* ``Fix Kubernetes example with wrong operator casing (#21898)``
-* ``Remove types from KPO docstring (#21826)``
-
-.. Below changes are excluded from the changelog. Move them to
- appropriate section above if needed. Do not delete the lines(!):
- * ``Add pre-commit check for docstring param types (#21398)``
-
-3.0.2
-.....
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Add missed deprecations for cncf (#20031)``
-
-.. Below changes are excluded from the changelog. Move them to
- appropriate section above if needed. Do not delete the lines(!):
- * ``Remove ':type' lines now sphinx-autoapi supports typehints (#20951)``
- * ``Make ''delete_pod'' change more prominent in K8s changelog (#20753)``
- * ``Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)``
- * ``Add optional features in providers. (#21074)``
- * ``Add documentation for January 2021 providers release (#21257)``
-
-3.0.1
-.....
-
-
-Misc
-~~~~
-
-* ``Update Kubernetes library version (#18797)``
-
-.. Below changes are excluded from the changelog. Move them to
- appropriate section above if needed. Do not delete the lines(!):
-
-3.0.0
-.....
-
-Breaking changes
-~~~~~~~~~~~~~~~~
-
-* ``Parameter is_delete_operator_pod default is changed to True (#20575)``
-* ``Simplify KubernetesPodOperator (#19572)``
-* ``Move pod_mutation_hook call from PodManager to KubernetesPodOperator (#20596)``
-* ``Rename ''PodLauncher'' to ''PodManager'' (#20576)``
-
-Parameter is_delete_operator_pod has new default
-````````````````````````````````````````````````
-
-Previously, the default for param ``is_delete_operator_pod`` was ``False``, which means that
-after a task runs, its pod is not deleted by the operator and remains on the
-cluster indefinitely. With this release, we change the default to ``True``.
-
-Notes on changes KubernetesPodOperator and PodLauncher
-``````````````````````````````````````````````````````
-
-.. warning:: Many methods in ``KubernetesPodOperator`` and ``PodLauncher`` have been renamed.
- If you have subclassed ``KubernetesPodOperator`` you will need to update your subclass to reflect
- the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``.
-
-Overview
-''''''''
-
-Generally speaking if you did not subclass ``KubernetesPodOperator`` and you didn't use the ``PodLauncher`` class directly,
-then you don't need to worry about this change. If however you have subclassed ``KubernetesPodOperator``, what
-follows are some notes on the changes in this release.
-
-One of the principal goals of the refactor is to clearly separate the "get or create pod" and
-"wait for pod completion" phases. Previously the "wait for pod completion" logic would be invoked
-differently depending on whether the operator were to "attach to an existing pod" (e.g. after a
-worker failure) or "create a new pod" and this resulted in some code duplication and a bit more
-nesting of logic. With this refactor we encapsulate the "get or create" step
-into method ``KubernetesPodOperator.get_or_create_pod``, and pull the monitoring and XCom logic up
-into the top level of ``execute`` because it can be the same for "attached" pods and "new" pods.
-
-The ``KubernetesPodOperator.get_or_create_pod`` tries first to find an existing pod using labels
-specific to the task instance (see ``KubernetesPodOperator.find_pod``).
-If one does not exist it ``creates a pod <~.PodManager.create_pod>``.
-
-The "waiting" part of execution has three components. The first step is to wait for the pod to leave the
-``Pending`` phase (``~.KubernetesPodOperator.await_pod_start``). Next, if configured to do so,
-the operator will follow the base container logs and forward these logs to the task logger until
-the ``base`` container is done. If not configured to harvest the
-logs, the operator will instead ``KubernetesPodOperator.await_container_completion``
-either way, we must await container completion before harvesting xcom. After (optionally) extracting the xcom
-value from the base container, we ``await pod completion <~.PodManager.await_pod_completion>``.
-
-Previously, depending on whether the pod was "reattached to" (e.g. after a worker failure) or
-created anew, the waiting logic may have occurred in either ``handle_pod_overlap`` or ``create_new_pod_for_operator``.
-
-After the pod terminates, we execute different cleanup tasks depending on whether the pod terminated successfully.
-
-If the pod terminates *unsuccessfully*, we attempt to log the pod events ``PodLauncher.read_pod_events>``. If
-additionally the task is configured *not* to delete the pod after termination, we apply a label ``KubernetesPodOperator.patch_already_checked>``
-indicating that the pod failed and should not be "reattached to" in a retry. If the task is configured
-to delete its pod, we delete it ``KubernetesPodOperator.process_pod_deletion>``. Finally,
-we raise an AirflowException to fail the task instance.
-
-If the pod terminates successfully, we delete the pod ``KubernetesPodOperator.process_pod_deletion>``
-(if configured to delete the pod) and push XCom (if configured to push XCom).
-
-Details on method renames, refactors, and deletions
-'''''''''''''''''''''''''''''''''''''''''''''''''''
-
-In ``KubernetesPodOperator``:
-
-* Method ``create_pod_launcher`` is converted to cached property ``pod_manager``
-* Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client``
-* Logic to search for an existing pod (e.g. after an airflow worker failure) is moved out of ``execute`` and into method ``find_pod``.
-* Method ``handle_pod_overlap`` is removed. Previously it monitored a "found" pod until completion. With this change the pod monitoring (and log following) is orchestrated directly from ``execute`` and it is the same whether it's a "found" pod or a "new" pod. See methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``.
-* Method ``create_pod_request_obj`` is renamed ``build_pod_request_obj``. It now takes argument ``context`` in order to add TI-specific pod labels; previously they were added after return.
-* Method ``create_labels_for_pod`` is renamed ``_get_ti_pod_labels``. This method doesn't return *all* labels, but only those specific to the TI. We also add parameter ``include_try_number`` to control the inclusion of this label instead of possibly filtering it out later.
-* Method ``_get_pod_identifying_label_string`` is renamed ``_build_find_pod_label_selector``
-* Method ``_try_numbers_match`` is removed.
-* Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc. Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``. Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``.
-
-In class ``PodManager`` (formerly ``PodLauncher``):
-
-* Method ``start_pod`` is removed and split into two methods: ``create_pod`` and ``await_pod_start``.
-* Method ``monitor_pod`` is removed and split into methods ``follow_container_logs``, ``await_container_completion``, ``await_pod_completion``
-* Methods ``pod_not_started``, ``pod_is_running``, ``process_status``, and ``_task_status`` are removed. These were needed due to the way in which pod ``phase`` was mapped to task instance states; but we no longer do such a mapping and instead deal with pod phases directly and untransformed.
-* Method ``_extract_xcom`` is renamed ``extract_xcom``.
-* Method ``read_pod_logs`` now takes kwarg ``container_name``
-
-
-Other changes in ``pod_manager.py`` (formerly ``pod_launcher.py``):
-
-* Class ``pod_launcher.PodLauncher`` renamed to ``pod_manager.PodManager``
-* Enum-like class ``PodStatus`` is renamed ``PodPhase``, and the values are no longer lower-cased.
-* The ``airflow.settings.pod_mutation_hook`` is no longer called in
- ``cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async``. For ``KubernetesPodOperator``,
- mutation now occurs in ``build_pod_request_obj``.
-* Parameter ``is_delete_operator_pod`` default is changed to ``True`` so that pods are deleted after task
- completion and not left to accumulate. In practice it seems more common to disable pod deletion only on a
- temporary basis for debugging purposes and therefore pod deletion is the more sensible default.
-
-Features
-~~~~~~~~
-
-* ``Add params config, in_cluster, and cluster_context to KubernetesHook (#19695)``
-* ``Implement dry_run for KubernetesPodOperator (#20573)``
-* ``Clarify docstring for ''build_pod_request_obj'' in K8s providers (#20574)``
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Fix Volume/VolumeMount KPO DeprecationWarning (#19726)``
-
-.. Below changes are excluded from the changelog. Move them to
- appropriate section above if needed. Do not delete the lines(!):
- * ``Fix cached_property MyPy declaration and related MyPy errors (#20226)``
- * ``Use typed Context EVERYWHERE (#20565)``
- * ``Fix template_fields type to have MyPy friendly Sequence type (#20571)``
- * ``Even more typing in operators (template_fields/ext) (#20608)``
- * ``Update documentation for provider December 2021 release (#20523)``
-
-2.2.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Added namespace as a template field in the KPO. (#19718)``
-* ``Decouple name randomization from name kwarg (#19398)``
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Checking event.status.container_statuses before filtering (#19713)``
-* ``Coalesce 'extra' params to None in KubernetesHook (#19694)``
-* ``Change to correct type in KubernetesPodOperator (#19459)``
-
-.. Below changes are excluded from the changelog. Move them to
- appropriate section above if needed. Do not delete the lines(!):
- * ``Fix duplicate changelog entries (#19759)``
-
-2.1.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Add more type hints to PodLauncher (#18928)``
-* ``Add more information to PodLauncher timeout error (#17953)``
-
-.. Below changes are excluded from the changelog. Move them to
- appropriate section above if needed. Do not delete the lines(!):
- * ``Update docstring to let users use 'node_selector' (#19057)``
- * ``Add pre-commit hook for common misspelling check in files (#18964)``
-
2.0.3
.....
@@ -269,8 +44,7 @@ Bug Fixes
* ``Fix using XCom with ''KubernetesPodOperator'' (#17760)``
* ``Import Hooks lazily individually in providers manager (#17682)``
-.. Below changes are excluded from the changelog. Move them to
- appropriate section above if needed. Do not delete the lines(!):
+.. Review and move the new changes to one of the sections above:
* ``Fix messed-up changelog in 3 providers (#17380)``
* ``Fix static checks (#17256)``
* ``Update spark_kubernetes.py (#17237)``
@@ -291,7 +65,10 @@ Bug Fixes
.. Below changes are excluded from the changelog. Move them to
appropriate section above if needed. Do not delete the lines(!):
+ * ``Fixed wrongly escaped characters in amazon's changelog (#17020)``
* ``Simplify 'default_args' in Kubernetes example DAGs (#16870)``
+ * ``Enable using custom pod launcher in Kubernetes Pod Operator (#16945)``
+ * ``Prepare documentation for July release of providers. (#17015)``
* ``Updating task dependencies (#16624)``
* ``Removes pylint from our toolchain (#16682)``
* ``Prepare documentation for July release of providers. (#17015)``
diff --git a/airflow/providers/cncf/kubernetes/__init__.py b/airflow/providers/cncf/kubernetes/__init__.py
index 0998e31..217e5db 100644
--- a/airflow/providers/cncf/kubernetes/__init__.py
+++ b/airflow/providers/cncf/kubernetes/__init__.py
@@ -15,30 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import sys
-
-if sys.version_info < (3, 7):
- # This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that
- # v1.Pod et al. are not pickleable on Python 3.6.
-
- # Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this
- # method.
-
- # This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means
- # that we can update the version used in this provider and have it work for older versions
- import copyreg
- import logging
-
- def _reduce_Logger(logger):
- if logging.getLogger(logger.name) is not logger:
- import pickle
-
- raise pickle.PicklingError('logger cannot be pickled')
- return logging.getLogger, (logger.name,)
-
- def _reduce_RootLogger(logger):
- return logging.getLogger, ()
-
- if logging.Logger not in copyreg.dispatch_table:
- copyreg.pickle(logging.Logger, _reduce_Logger)
- copyreg.pickle(logging.RootLogger, _reduce_RootLogger)
diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index bf2b832..4c6404f 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -21,16 +21,18 @@ from typing import List
from kubernetes.client import ApiClient, models as k8s
from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources
+from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
-def _convert_kube_model_object(obj, new_class):
+def _convert_kube_model_object(obj, old_class, new_class):
convert_op = getattr(obj, "to_k8s_client_obj", None)
if callable(convert_op):
return obj.to_k8s_client_obj()
elif isinstance(obj, new_class):
return obj
else:
- raise AirflowException(f"Expected {new_class}, got {type(obj)}")
+ raise AirflowException(f"Expected {old_class} or {new_class}, got {type(obj)}")
def _convert_from_dict(obj, new_class):
@@ -50,7 +52,9 @@ def convert_volume(volume) -> k8s.V1Volume:
:param volume:
:return: k8s.V1Volume
"""
- return _convert_kube_model_object(volume, k8s.V1Volume)
+ from airflow.providers.cncf.kubernetes.backcompat.volume import Volume
+
+ return _convert_kube_model_object(volume, Volume, k8s.V1Volume)
def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
@@ -60,7 +64,9 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
:param volume_mount:
:return: k8s.V1VolumeMount
"""
- return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount)
+ from airflow.providers.cncf.kubernetes.backcompat.volume_mount import VolumeMount
+
+ return _convert_kube_model_object(volume_mount, VolumeMount, k8s.V1VolumeMount)
def convert_resources(resources) -> k8s.V1ResourceRequirements:
@@ -71,10 +77,8 @@ def convert_resources(resources) -> k8s.V1ResourceRequirements:
:return: k8s.V1ResourceRequirements
"""
if isinstance(resources, dict):
- from airflow.providers.cncf.kubernetes.backcompat.pod import Resources
-
resources = Resources(**resources)
- return _convert_kube_model_object(resources, k8s.V1ResourceRequirements)
+ return _convert_kube_model_object(resources, Resources, k8s.V1ResourceRequirements)
def convert_port(port) -> k8s.V1ContainerPort:
@@ -84,7 +88,7 @@ def convert_port(port) -> k8s.V1ContainerPort:
:param port:
:return: k8s.V1ContainerPort
"""
- return _convert_kube_model_object(port, k8s.V1ContainerPort)
+ return _convert_kube_model_object(port, Port, k8s.V1ContainerPort)
def convert_env_vars(env_vars) -> List[k8s.V1EnvVar]:
@@ -112,7 +116,7 @@ def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar:
:param pod_runtime_info_envs:
:return:
"""
- return _convert_kube_model_object(pod_runtime_info_envs, k8s.V1EnvVar)
+ return _convert_kube_model_object(pod_runtime_info_envs, PodRuntimeInfoEnv, k8s.V1EnvVar)
def convert_image_pull_secrets(image_pull_secrets) -> List[k8s.V1LocalObjectReference]:
diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod.py b/airflow/providers/cncf/kubernetes/backcompat/pod.py
index 7f18117..30a7128 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/pod.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/pod.py
@@ -14,29 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-Classes for interacting with Kubernetes API.
-
-This module is deprecated. Please use :mod:`kubernetes.client.models.V1ResourceRequirements`
-and :mod:`kubernetes.client.models.V1ContainerPort`.
-"""
-
-import warnings
+"""Classes for interacting with Kubernetes API"""
from kubernetes.client import models as k8s
-warnings.warn(
- (
- "This module is deprecated. Please use `kubernetes.client.models.V1ResourceRequirements`"
- " and `kubernetes.client.models.V1ContainerPort`."
- ),
- DeprecationWarning,
- stacklevel=2,
-)
-
class Resources:
- """backwards compat for Resources."""
+ """backwards compat for Resources"""
__slots__ = (
'request_memory',
@@ -50,12 +34,19 @@ class Resources:
"""
:param request_memory: requested memory
+ :type request_memory: str
:param request_cpu: requested CPU number
+ :type request_cpu: float | str
:param request_ephemeral_storage: requested ephemeral storage
+ :type request_ephemeral_storage: str
:param limit_memory: limit for memory usage
+ :type limit_memory: str
:param limit_cpu: Limit for CPU used
+ :type limit_cpu: float | str
:param limit_gpu: Limits for GPU used
+ :type limit_gpu: int
:param limit_ephemeral_storage: Limit for ephemeral storage
+ :type limit_ephemeral_storage: float | str
"""
def __init__(
diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
index f08aecf..f76e0d7 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
@@ -14,25 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""
-Classes for interacting with Kubernetes API.
-
-This module is deprecated. Please use :mod:`kubernetes.client.models.V1EnvVar`.
-"""
-
-import warnings
+"""Classes for interacting with Kubernetes API"""
import kubernetes.client.models as k8s
-warnings.warn(
- "This module is deprecated. Please use `kubernetes.client.models.V1EnvVar`.",
- DeprecationWarning,
- stacklevel=2,
-)
-
class PodRuntimeInfoEnv:
- """Defines Pod runtime information as environment variable."""
+ """Defines Pod runtime information as environment variable"""
def __init__(self, name, field_path):
"""
@@ -40,7 +28,9 @@ class PodRuntimeInfoEnv:
Full list of options can be found in kubernetes documentation.
:param name: the name of the environment variable
+ :type: name: str
:param field_path: path to pod runtime info. Ex: metadata.namespace | status.podIP
+ :type: field_path: str
"""
self.name = name
self.field_path = field_path
diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume.py b/airflow/providers/cncf/kubernetes/backcompat/volume.py
index c51ce8a..e5b4d00 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/volume.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/volume.py
@@ -35,8 +35,10 @@ class Volume:
and Persistent Volumes
:param name: the name of the volume mount
+ :type name: str
:param configs: dictionary of any features needed for volume. We purposely keep this
vague since there are multiple volume types with changing configs.
+ :type configs: dict
"""
self.name = name
self.configs = configs
diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
index f9faed9..b77ab47 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
@@ -38,9 +38,13 @@ class VolumeMount:
running container.
:param name: the name of the volume mount
+ :type name: str
:param mount_path:
+ :type mount_path: str
:param sub_path: subpath within the volume mount
+ :type sub_path: Optional[str]
:param read_only: whether to access pod with read-only mode
+ :type read_only: bool
"""
self.name = name
self.mount_path = mount_path
diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py b/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
deleted file mode 100644
index b65dae9..0000000
--- a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-This is an example dag for using the KubernetesPodOperator.
-"""
-
-from datetime import datetime
-
-from kubernetes.client import models as k8s
-
-from airflow import DAG
-from airflow.kubernetes.secret import Secret
-from airflow.operators.bash import BashOperator
-from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
-
-# [START howto_operator_k8s_cluster_resources]
-secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
-secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
-secret_all_keys = Secret('env', None, 'airflow-secrets-2')
-volume_mount = k8s.V1VolumeMount(
- name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
-)
-
-configmaps = [
- k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')),
- k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')),
-]
-
-volume = k8s.V1Volume(
- name='test-volume',
- persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
-)
-
-port = k8s.V1ContainerPort(name='http', container_port=80)
-
-init_container_volume_mounts = [
- k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
-]
-
-init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
-
-init_container = k8s.V1Container(
- name="init-container",
- image="ubuntu:16.04",
- env=init_environments,
- volume_mounts=init_container_volume_mounts,
- command=["bash", "-cx"],
- args=["echo 10"],
-)
-
-affinity = k8s.V1Affinity(
- node_affinity=k8s.V1NodeAffinity(
- preferred_during_scheduling_ignored_during_execution=[
- k8s.V1PreferredSchedulingTerm(
- weight=1,
- preference=k8s.V1NodeSelectorTerm(
- match_expressions=[
- k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
- ]
- ),
- )
- ]
- ),
- pod_affinity=k8s.V1PodAffinity(
- required_during_scheduling_ignored_during_execution=[
- k8s.V1WeightedPodAffinityTerm(
- weight=1,
- pod_affinity_term=k8s.V1PodAffinityTerm(
- label_selector=k8s.V1LabelSelector(
- match_expressions=[
- k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
- ]
- ),
- topology_key="failure-domain.beta.kubernetes.io/zone",
- ),
- )
- ]
- ),
-)
-
-tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
-
-# [END howto_operator_k8s_cluster_resources]
-
-
-with DAG(
- dag_id='example_kubernetes_operator',
- schedule_interval=None,
- start_date=datetime(2021, 1, 1),
- tags=['example'],
-) as dag:
- k = KubernetesPodOperator(
- namespace='default',
- image="ubuntu:16.04",
- cmds=["bash", "-cx"],
- arguments=["echo", "10"],
- labels={"foo": "bar"},
- secrets=[secret_file, secret_env, secret_all_keys],
- ports=[port],
- volumes=[volume],
- volume_mounts=[volume_mount],
- env_from=configmaps,
- name="airflow-test-pod",
- task_id="task",
- affinity=affinity,
- is_delete_operator_pod=True,
- hostnetwork=False,
- tolerations=tolerations,
- init_containers=[init_container],
- priority_class_name="medium",
- )
-
- # [START howto_operator_k8s_private_image]
- quay_k8s = KubernetesPodOperator(
- namespace='default',
- image='quay.io/apache/bash',
- image_pull_secrets=[k8s.V1LocalObjectReference('testquay')],
- cmds=["bash", "-cx"],
- arguments=["echo", "10", "echo pwd"],
- labels={"foo": "bar"},
- name="airflow-private-image-pod",
- is_delete_operator_pod=True,
- in_cluster=True,
- task_id="task-two",
- get_logs=True,
- )
- # [END howto_operator_k8s_private_image]
-
- # [START howto_operator_k8s_write_xcom]
- write_xcom = KubernetesPodOperator(
- namespace='default',
- image='alpine',
- cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
- name="write-xcom",
- do_xcom_push=True,
- is_delete_operator_pod=True,
- in_cluster=True,
- task_id="write-xcom",
- get_logs=True,
- )
-
- pod_task_xcom_result = BashOperator(
- bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
- task_id="pod_task_xcom_result",
- )
- # [END howto_operator_k8s_write_xcom]
-
- write_xcom >> pod_task_xcom_result
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 3830503..e230dba 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -14,21 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-import sys
import tempfile
from typing import Any, Dict, Generator, Optional, Tuple, Union
-if sys.version_info >= (3, 8):
+try:
from functools import cached_property
-else:
+except ImportError:
from cached_property import cached_property
-
from kubernetes import client, config, watch
try:
import airflow.utils.yaml as yaml
except ImportError:
- import yaml # type: ignore[no-redef]
+ import yaml
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
@@ -61,6 +59,7 @@ class KubernetesHook(BaseHook):
:param conn_id: The :ref:`kubernetes connection <howto/connection:kubernetes>`
to Kubernetes cluster.
+ :type conn_id: str
"""
conn_name_attr = 'kubernetes_conn_id'
@@ -86,13 +85,10 @@ class KubernetesHook(BaseHook):
"extra__kubernetes__namespace": StringField(
lazy_gettext('Namespace'), widget=BS3TextFieldWidget()
),
- "extra__kubernetes__cluster_context": StringField(
- lazy_gettext('Cluster context'), widget=BS3TextFieldWidget()
- ),
}
@staticmethod
- def get_ui_field_behaviour() -> Dict[str, Any]:
+ def get_ui_field_behaviour() -> Dict:
"""Returns custom field behaviour"""
return {
"hidden_fields": ['host', 'schema', 'login', 'password', 'port', 'extra'],
@@ -100,49 +96,25 @@ class KubernetesHook(BaseHook):
}
def __init__(
- self,
- conn_id: Optional[str] = default_conn_name,
- client_configuration: Optional[client.Configuration] = None,
- cluster_context: Optional[str] = None,
- config_file: Optional[str] = None,
- in_cluster: Optional[bool] = None,
+ self, conn_id: str = default_conn_name, client_configuration: Optional[client.Configuration] = None
) -> None:
super().__init__()
self.conn_id = conn_id
self.client_configuration = client_configuration
- self.cluster_context = cluster_context
- self.config_file = config_file
- self.in_cluster = in_cluster
-
- @staticmethod
- def _coalesce_param(*params):
- for param in params:
- if param is not None:
- return param
def get_conn(self) -> Any:
"""Returns kubernetes api session for use with requests"""
- if self.conn_id:
- connection = self.get_connection(self.conn_id)
- extras = connection.extra_dejson
- else:
- extras = {}
- in_cluster = self._coalesce_param(
- self.in_cluster, extras.get("extra__kubernetes__in_cluster") or None
- )
- cluster_context = self._coalesce_param(
- self.cluster_context, extras.get("extra__kubernetes__cluster_context") or None
- )
- kubeconfig_path = self._coalesce_param(
- self.config_file, extras.get("extra__kubernetes__kube_config_path") or None
- )
- kubeconfig = extras.get("extra__kubernetes__kube_config") or None
+ connection = self.get_connection(self.conn_id)
+ extras = connection.extra_dejson
+ in_cluster = extras.get("extra__kubernetes__in_cluster")
+ kubeconfig_path = extras.get("extra__kubernetes__kube_config_path")
+ kubeconfig = extras.get("extra__kubernetes__kube_config")
num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o])
if num_selected_configuration > 1:
raise AirflowException(
- "Invalid connection configuration. Options kube_config_path, "
- "kube_config, in_cluster are mutually exclusive. "
+ "Invalid connection configuration. Options extra__kubernetes__kube_config_path, "
+ "extra__kubernetes__kube_config, extra__kubernetes__in_cluster are mutually exclusive. "
"You can only use one option at a time."
)
if in_cluster:
@@ -153,9 +125,7 @@ class KubernetesHook(BaseHook):
if kubeconfig_path is not None:
self.log.debug("loading kube_config from: %s", kubeconfig_path)
config.load_kube_config(
- config_file=kubeconfig_path,
- client_configuration=self.client_configuration,
- context=cluster_context,
+ config_file=kubeconfig_path, client_configuration=self.client_configuration
)
return client.ApiClient()
@@ -165,17 +135,12 @@ class KubernetesHook(BaseHook):
temp_config.write(kubeconfig.encode())
temp_config.flush()
config.load_kube_config(
- config_file=temp_config.name,
- client_configuration=self.client_configuration,
- context=cluster_context,
+ config_file=temp_config.name, client_configuration=self.client_configuration
)
return client.ApiClient()
self.log.debug("loading kube_config from: default file")
- config.load_kube_config(
- client_configuration=self.client_configuration,
- context=cluster_context,
- )
+ config.load_kube_config(client_configuration=self.client_configuration)
return client.ApiClient()
@cached_property
@@ -183,10 +148,6 @@ class KubernetesHook(BaseHook):
"""Cached Kubernetes API client"""
return self.get_conn()
- @cached_property
- def core_v1_client(self):
- return client.CoreV1Api(api_client=self.api_client)
-
def create_custom_object(
self, group: str, version: str, plural: str, body: Union[str, dict], namespace: Optional[str] = None
):
@@ -194,10 +155,15 @@ class KubernetesHook(BaseHook):
Creates custom resource definition object in Kubernetes
:param group: api group
+ :type group: str
:param version: api version
+ :type version: str
:param plural: api plural
+ :type plural: str
:param body: crd object definition
+ :type body: Union[str, dict]
:param namespace: kubernetes namespace
+ :type namespace: str
"""
api = client.CustomObjectsApi(self.api_client)
if namespace is None:
@@ -220,10 +186,15 @@ class KubernetesHook(BaseHook):
Get custom resource definition object from Kubernetes
:param group: api group
+ :type group: str
:param version: api version
+ :type version: str
:param plural: api plural
+ :type plural: str
:param name: crd object name
+ :type name: str
:param namespace: kubernetes namespace
+ :type namespace: str
"""
api = client.CustomObjectsApi(self.api_client)
if namespace is None:
@@ -236,14 +207,12 @@ class KubernetesHook(BaseHook):
except client.rest.ApiException as e:
raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n")
- def get_namespace(self) -> Optional[str]:
+ def get_namespace(self) -> str:
"""Returns the namespace that defined in the connection"""
- if self.conn_id:
- connection = self.get_connection(self.conn_id)
- extras = connection.extra_dejson
- namespace = extras.get("extra__kubernetes__namespace", "default")
- return namespace
- return None
+ connection = self.get_connection(self.conn_id)
+ extras = connection.extra_dejson
+ namespace = extras.get("extra__kubernetes__namespace", "default")
+ return namespace
def get_pod_log_stream(
self,
@@ -255,8 +224,10 @@ class KubernetesHook(BaseHook):
Retrieves a log stream for a container in a kubernetes pod.
:param pod_name: pod name
+ :type pod_name: str
:param container: container name
:param namespace: kubernetes namespace
+ :type namespace: str
"""
api = client.CoreV1Api(self.api_client)
watcher = watch.Watch()
@@ -280,8 +251,10 @@ class KubernetesHook(BaseHook):
Retrieves a container's log from the specified pod.
:param pod_name: pod name
+ :type pod_name: str
:param container: container name
:param namespace: kubernetes namespace
+ :type namespace: str
"""
api = client.CoreV1Api(self.api_client)
return api.read_namespaced_pod_log(
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index dd127fe..747f8b0 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -15,16 +15,17 @@
# specific language governing permissions and limitations
# under the License.
"""Executes task in a Kubernetes POD"""
-import json
-import logging
import re
-import sys
import warnings
-from contextlib import AbstractContextManager
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
from kubernetes.client import CoreV1Api, models as k8s
+try:
+ import airflow.utils.yaml as yaml
+except ImportError:
+ import yaml
+
from airflow.exceptions import AirflowException
from airflow.kubernetes import kube_client, pod_generator
from airflow.kubernetes.pod_generator import PodGenerator
@@ -42,27 +43,15 @@ from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters im
convert_volume,
convert_volume_mount,
)
-from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
-from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase
-from airflow.settings import pod_mutation_hook
-from airflow.utils import yaml
+from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
+from airflow.providers.cncf.kubernetes.utils import pod_launcher, xcom_sidecar
from airflow.utils.helpers import validate_key
+from airflow.utils.state import State
from airflow.version import version as airflow_version
-if sys.version_info >= (3, 8):
- from functools import cached_property
-else:
- from cached_property import cached_property
-
if TYPE_CHECKING:
import jinja2
- from airflow.utils.context import Context
-
-
-class PodReattachFailure(AirflowException):
- """When we expect to be able to find a pod but cannot."""
-
class KubernetesPodOperator(BaseOperator):
"""
@@ -79,66 +68,101 @@ class KubernetesPodOperator(BaseOperator):
simplifies the authorization process.
:param namespace: the namespace to run within kubernetes.
+ :type namespace: str
:param image: Docker image you wish to launch. Defaults to hub.docker.com,
but fully qualified URLS will point to custom repositories. (templated)
+ :type image: str
:param name: name of the pod in which the task will run, will be used (plus a random
- suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain,
- containing only [a-z0-9.-]).
- :param random_name_suffix: if True, will generate a random suffix.
+ suffix) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
+ :type name: str
:param cmds: entrypoint of the container. (templated)
The docker images's entrypoint is used if this is not provided.
+ :type cmds: list[str]
:param arguments: arguments of the entrypoint. (templated)
The docker image's CMD is used if this is not provided.
- :param ports: ports for the launched pod.
- :param volume_mounts: volumeMounts for the launched pod.
- :param volumes: volumes for the launched pod. Includes ConfigMaps and PersistentVolumes.
+ :type arguments: list[str]
+ :param ports: ports for launched pod.
+ :type ports: list[k8s.V1ContainerPort]
+ :param volume_mounts: volumeMounts for launched pod.
+ :type volume_mounts: list[k8s.V1VolumeMount]
+ :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
+ :type volumes: list[k8s.V1Volume]
:param env_vars: Environment variables initialized in the container. (templated)
+ :type env_vars: list[k8s.V1EnvVar]
:param secrets: Kubernetes secrets to inject in the container.
They can be exposed as environment vars or files in a volume.
+ :type secrets: list[airflow.kubernetes.secret.Secret]
:param in_cluster: run kubernetes client with in_cluster configuration.
+ :type in_cluster: bool
:param cluster_context: context that points to kubernetes cluster.
Ignored when in_cluster is True. If None, current-context is used.
+ :type cluster_context: str
:param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor
+ :type reattach_on_restart: bool
:param labels: labels to apply to the Pod. (templated)
+ :type labels: dict
:param startup_timeout_seconds: timeout in seconds to startup the pod.
+ :type startup_timeout_seconds: int
:param get_logs: get the stdout of the container as logs of the tasks.
+ :type get_logs: bool
:param image_pull_policy: Specify a policy to cache or always pull an image.
+ :type image_pull_policy: str
:param annotations: non-identifying metadata you can attach to the Pod.
Can be a large range of data, and can include characters
that are not permitted by labels.
- :param resources: resources for the launched pod.
- :param affinity: affinity scheduling rules for the launched pod.
+ :type annotations: dict
+ :param resources: A dict containing resources requests and limits.
+ Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
+ and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources.
+ See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
+ :type resources: k8s.V1ResourceRequirements
+ :param affinity: A dict containing a group of affinity scheduling rules.
+ :type affinity: k8s.V1Affinity
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
- :param node_selector: A dict containing a group of scheduling rules.
+ :type config_file: str
+ :param node_selectors: A dict containing a group of scheduling rules.
+ :type node_selectors: dict
:param image_pull_secrets: Any image pull secrets to be given to the pod.
If more than one secret is required, provide a
comma separated list: secret_a,secret_b
+ :type image_pull_secrets: List[k8s.V1LocalObjectReference]
:param service_account_name: Name of the service account
+ :type service_account_name: str
:param is_delete_operator_pod: What to do when the pod reaches its final
- state, or the execution is interrupted. If True (default), delete the
- pod; if False, leave the pod.
+ state, or the execution is interrupted.
+ If False (default): do nothing, If True: delete the pod
+ :type is_delete_operator_pod: bool
:param hostnetwork: If True enable host networking on the pod.
+ :type hostnetwork: bool
:param tolerations: A list of kubernetes tolerations.
+ :type tolerations: List[k8s.V1Toleration]
:param security_context: security options the pod should run with (PodSecurityContext).
+ :type security_context: dict
:param dnspolicy: dnspolicy for the pod.
+ :type dnspolicy: str
:param schedulername: Specify a schedulername for the pod
+ :type schedulername: str
:param full_pod_spec: The complete podSpec
+ :type full_pod_spec: kubernetes.client.models.V1Pod
:param init_containers: init container for the launched Pod
+ :type init_containers: list[kubernetes.client.models.V1Container]
:param log_events_on_failure: Log the pod's events if a failure occurs
+ :type log_events_on_failure: bool
:param do_xcom_push: If True, the content of the file
/airflow/xcom/return.json in the container will also be pushed to an
XCom when the container completes.
+ :type do_xcom_push: bool
:param pod_template_file: path to pod template file (templated)
+ :type pod_template_file: str
:param priority_class_name: priority class name for the launched Pod
+ :type priority_class_name: str
:param termination_grace_period: Termination grace period if task killed in UI,
defaults to kubernetes default
+ :type termination_grace_period: int
"""
- BASE_CONTAINER_NAME = 'base'
- POD_CHECKED_KEY = 'already_checked'
-
- template_fields: Sequence[str] = (
+ template_fields: Iterable[str] = (
'image',
'cmds',
'arguments',
@@ -146,16 +170,16 @@ class KubernetesPodOperator(BaseOperator):
'labels',
'config_file',
'pod_template_file',
- 'namespace',
)
+ # fmt: off
def __init__(
+ # fmt: on
self,
*,
namespace: Optional[str] = None,
image: Optional[str] = None,
name: Optional[str] = None,
- random_name_suffix: Optional[bool] = True,
cmds: Optional[List[str]] = None,
arguments: Optional[List[str]] = None,
ports: Optional[List[k8s.V1ContainerPort]] = None,
@@ -179,7 +203,7 @@ class KubernetesPodOperator(BaseOperator):
node_selector: Optional[dict] = None,
image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None,
service_account_name: Optional[str] = None,
- is_delete_operator_pod: bool = True,
+ is_delete_operator_pod: bool = False,
hostnetwork: bool = False,
tolerations: Optional[List[k8s.V1Toleration]] = None,
security_context: Optional[Dict] = None,
@@ -191,9 +215,9 @@ class KubernetesPodOperator(BaseOperator):
do_xcom_push: bool = False,
pod_template_file: Optional[str] = None,
priority_class_name: Optional[str] = None,
- pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None,
+ pod_runtime_info_envs: List[PodRuntimeInfoEnv] = None,
termination_grace_period: Optional[int] = None,
- configmaps: Optional[List[str]] = None,
+ configmaps: Optional[str] = None,
**kwargs,
) -> None:
if kwargs.get('xcom_push') is not None:
@@ -240,9 +264,8 @@ class KubernetesPodOperator(BaseOperator):
self.service_account_name = service_account_name
self.is_delete_operator_pod = is_delete_operator_pod
self.hostnetwork = hostnetwork
- self.tolerations = (
- [convert_toleration(toleration) for toleration in tolerations] if tolerations else []
- )
+ self.tolerations = [convert_toleration(toleration) for toleration in tolerations] \
+ if tolerations else []
self.security_context = security_context or {}
self.dnspolicy = dnspolicy
self.schedulername = schedulername
@@ -252,15 +275,14 @@ class KubernetesPodOperator(BaseOperator):
self.priority_class_name = priority_class_name
self.pod_template_file = pod_template_file
self.name = self._set_name(name)
- self.random_name_suffix = random_name_suffix
self.termination_grace_period = termination_grace_period
- self.pod_request_obj: Optional[k8s.V1Pod] = None
- self.pod: Optional[k8s.V1Pod] = None
+ self.client: CoreV1Api = None
+ self.pod: k8s.V1Pod = None
def _render_nested_template_fields(
self,
content: Any,
- context: 'Context',
+ context: Dict,
jinja_env: "jinja2.Environment",
seen_oids: set,
) -> None:
@@ -269,31 +291,27 @@ class KubernetesPodOperator(BaseOperator):
self._do_render_template_fields(content, ('value', 'name'), context, jinja_env, seen_oids)
return
- super()._render_nested_template_fields(content, context, jinja_env, seen_oids)
+ super()._render_nested_template_fields(
+ content,
+ context,
+ jinja_env,
+ seen_oids
+ )
@staticmethod
- def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool = True) -> dict:
+ def create_labels_for_pod(context) -> dict:
"""
Generate labels for the pod to track the pod in case of Operator crash
:param context: task context provided by airflow DAG
:return: dict
"""
- if not context:
- return {}
-
- ti = context['ti']
- run_id = context['run_id']
-
- labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}
-
- # If running on Airflow 2.3+:
- map_index = getattr(ti, 'map_index', -1)
- if map_index >= 0:
- labels['map_index'] = map_index
-
- if include_try_number:
- labels.update(try_number=ti.try_number)
+ labels = {
+ 'dag_id': context['dag'].dag_id,
+ 'task_id': context['task'].task_id,
+ 'execution_date': context['ts'],
+ 'try_number': context['ti'].try_number,
+ }
# In the case of sub dags this is just useful
if context['dag'].is_subdag:
labels['parent_dag_id'] = context['dag'].parent_dag.dag_id
@@ -304,127 +322,101 @@ class KubernetesPodOperator(BaseOperator):
labels[label_id] = safe_label
return labels
- @cached_property
- def pod_manager(self) -> PodManager:
- return PodManager(kube_client=self.client)
+ def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
+ return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
- @cached_property
- def client(self) -> CoreV1Api:
- # todo: use airflow Connection / hook to authenticate to the cluster
- kwargs: Dict[str, Any] = dict(
- cluster_context=self.cluster_context,
- config_file=self.config_file,
- )
- if self.in_cluster is not None:
- kwargs.update(in_cluster=self.in_cluster)
- return kube_client.get_kube_client(**kwargs)
-
- def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
- """Returns an already-running pod for this task instance if one exists."""
- label_selector = self._build_find_pod_label_selector(context)
- pod_list = self.client.list_namespaced_pod(
- namespace=namespace,
- label_selector=label_selector,
- ).items
-
- pod = None
- num_pods = len(pod_list)
- if num_pods > 1:
- raise AirflowException(f'More than one pod running with labels {label_selector}')
- elif num_pods == 1:
- pod = pod_list[0]
- self.log.info("Found matching pod %s with labels %s", pod.metadata.name, pod.metadata.labels)
- self.log.info("`try_number` of task_instance: %s", context['ti'].try_number)
- self.log.info("`try_number` of pod: %s", pod.metadata.labels['try_number'])
- return pod
+ def execute(self, context) -> Optional[str]:
+ try:
+ if self.in_cluster is not None:
+ client = kube_client.get_kube_client(
+ in_cluster=self.in_cluster,
+ cluster_context=self.cluster_context,
+ config_file=self.config_file,
+ )
+ else:
+ client = kube_client.get_kube_client(
+ cluster_context=self.cluster_context, config_file=self.config_file
+ )
- def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
- if self.reattach_on_restart:
- pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
- if pod:
- return pod
- self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
- self.pod_manager.create_pod(pod=pod_request_obj)
- return pod_request_obj
+ self.client = client
- def await_pod_start(self, pod):
- try:
- self.pod_manager.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds)
- except PodLaunchFailedException:
- if self.log_events_on_failure:
- for event in self.pod_manager.read_pod_events(pod).items:
- self.log.error("Pod Event: %s - %s", event.reason, event.message)
- raise
+ self.pod = self.create_pod_request_obj()
+ self.namespace = self.pod.metadata.namespace
- def extract_xcom(self, pod):
- """Retrieves xcom value and kills xcom sidecar container"""
- result = self.pod_manager.extract_xcom(pod)
- self.log.info("xcom result: \n%s", result)
- return json.loads(result)
+ # Add combination of labels to uniquely identify a running pod
+ labels = self.create_labels_for_pod(context)
- def execute(self, context: 'Context'):
- remote_pod = None
- try:
- self.pod_request_obj = self.build_pod_request_obj(context)
- self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill`
- pod_request_obj=self.pod_request_obj,
- context=context,
- )
- self.await_pod_start(pod=self.pod)
+ label_selector = self._get_pod_identifying_label_string(labels)
- if self.get_logs:
- self.pod_manager.fetch_container_logs(
- pod=self.pod,
- container_name=self.BASE_CONTAINER_NAME,
- follow=True,
- )
- else:
- self.pod_manager.await_container_completion(
- pod=self.pod, container_name=self.BASE_CONTAINER_NAME
+ pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector)
+
+ if len(pod_list.items) > 1 and self.reattach_on_restart:
+ raise AirflowException(
+ f'More than one pod running with labels: {label_selector}'
)
- if self.do_xcom_push:
- result = self.extract_xcom(pod=self.pod)
- remote_pod = self.pod_manager.await_pod_completion(self.pod)
- finally:
- self.cleanup(
- pod=self.pod or self.pod_request_obj,
- remote_pod=remote_pod,
- )
- ti = context['ti']
- ti.xcom_push(key='pod_name', value=self.pod.metadata.name)
- ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace)
- if self.do_xcom_push:
+ launcher = self.create_pod_launcher()
+
+ if len(pod_list.items) == 1:
+ try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
+ final_state, remote_pod, result = self.handle_pod_overlap(
+ labels, try_numbers_match, launcher, pod_list.items[0]
+ )
+ else:
+ self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
+ final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
+ if final_state != State.SUCCESS:
+ raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
+ context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name)
+ context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace)
return result
+ except AirflowException as ex:
+ raise AirflowException(f'Pod Launching failed: {ex}')
- def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
- pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
- if pod_phase != PodPhase.SUCCEEDED:
- if self.log_events_on_failure:
- with _suppress(Exception):
- for event in self.pod_manager.read_pod_events(pod).items:
- self.log.error("Pod Event: %s - %s", event.reason, event.message)
- if not self.is_delete_operator_pod:
- with _suppress(Exception):
- self.patch_already_checked(pod)
- with _suppress(Exception):
- self.process_pod_deletion(pod)
- raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}')
- else:
- with _suppress(Exception):
- self.process_pod_deletion(pod)
+ def handle_pod_overlap(
+ self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
+ ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
+ """
+
+ In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
+ this function will either continue to monitor the existing pod or launch a new pod
+ based on the `reattach_on_restart` parameter.
- def process_pod_deletion(self, pod):
- if self.is_delete_operator_pod:
- self.log.info("Deleting pod: %s", pod.metadata.name)
- self.pod_manager.delete_pod(pod)
+ :param labels: labels used to determine if a pod is repeated
+ :type labels: dict
+ :param try_numbers_match: do the try numbers match? Only needed for logging purposes
+ :type try_numbers_match: bool
+ :param launcher: PodLauncher
+ :param pod: Pod found with matching labels
+ """
+ if try_numbers_match:
+ log_line = f"found a running pod with labels {labels} and the same try_number."
+ else:
+ log_line = f"found a running pod with labels {labels} but a different try_number."
+
+ # In case of failed pods, should reattach the first time, but only once
+ # as the task will have already failed.
+ if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
+ log_line += " Will attach to this pod and monitor instead of starting new one"
+ self.log.info(log_line)
+ self.pod = pod
+ final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod)
else:
- self.log.info("skipping deleting pod: %s", pod.metadata.name)
+ log_line += f"creating pod with labels {labels} and launcher {launcher}"
+ self.log.info(log_line)
+ final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
+ return final_state, remote_pod, result
- def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str:
- labels = self._get_ti_pod_labels(context, include_try_number=False)
- label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())]
- return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+ @staticmethod
+ def _get_pod_identifying_label_string(labels) -> str:
+ label_strings = [
+ f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number'
+ ]
+ return ','.join(label_strings) + ',already_checked!=True'
+
+ @staticmethod
+ def _try_numbers_match(context, pod) -> bool:
+ return pod.metadata.labels['try_number'] == context['ti'].try_number
def _set_name(self, name):
if name is None:
@@ -435,29 +427,11 @@ class KubernetesPodOperator(BaseOperator):
validate_key(name, max_length=220)
return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
- def patch_already_checked(self, pod: k8s.V1Pod):
- """Add an "already checked" annotation to ensure we don't reattach on retries"""
- pod.metadata.labels[self.POD_CHECKED_KEY] = "True"
- body = PodGenerator.serialize_pod(pod)
- self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
-
- def on_kill(self) -> None:
- if self.pod:
- pod = self.pod
- kwargs = dict(
- name=pod.metadata.name,
- namespace=pod.metadata.namespace,
- )
- if self.termination_grace_period is not None:
- kwargs.update(grace_period_seconds=self.termination_grace_period)
- self.client.delete_namespaced_pod(**kwargs)
-
- def build_pod_request_obj(self, context=None):
+ def create_pod_request_obj(self) -> k8s.V1Pod:
"""
- Returns V1Pod object based on pod template file, full pod spec, and other operator parameters.
+ Creates a V1Pod based on user parameters. Note that a `pod` or `pod_template_file`
+ will supersede all other values.
- The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod
- template file.
"""
self.log.debug("Creating pod for KubernetesPodOperator task %s", self.task_id)
if self.pod_template_file:
@@ -476,7 +450,7 @@ class KubernetesPodOperator(BaseOperator):
metadata=k8s.V1ObjectMeta(
namespace=self.namespace,
labels=self.labels,
- name=self.name,
+ name=PodGenerator.make_unique_pod_id(self.name),
annotations=self.annotations,
),
spec=k8s.V1PodSpec(
@@ -487,7 +461,7 @@ class KubernetesPodOperator(BaseOperator):
containers=[
k8s.V1Container(
image=self.image,
- name=self.BASE_CONTAINER_NAME,
+ name="base",
command=self.cmds,
ports=self.ports,
image_pull_policy=self.image_pull_policy,
@@ -512,112 +486,89 @@ class KubernetesPodOperator(BaseOperator):
pod = PodGenerator.reconcile_pods(pod_template, pod)
- if self.random_name_suffix:
- pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name)
-
for secret in self.secrets:
self.log.debug("Adding secret to task %s", self.task_id)
pod = secret.attach_to_pod(pod)
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
pod = xcom_sidecar.add_xcom_sidecar(pod)
+ return pod
+
+ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]:
+ """
+ Creates a new pod and monitors for duration of task
- labels = self._get_ti_pod_labels(context)
- self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels)
+ :param labels: labels used to track pod
+ :param launcher: pod launcher that will manage launching and monitoring pods
+ :return:
+ """
+ self.log.debug(
+ "Adding KubernetesPodOperator labels to pod before launch for task %s", self.task_id
+ )
# Merge Pod Identifying labels with labels passed to operator
- pod.metadata.labels.update(labels)
+ self.pod.metadata.labels.update(labels)
# Add Airflow Version to the label
# And a label to identify that pod is launched by KubernetesPodOperator
- pod.metadata.labels.update(
+ self.pod.metadata.labels.update(
{
'airflow_version': airflow_version.replace('+', '-'),
'kubernetes_pod_operator': 'True',
}
)
- pod_mutation_hook(pod)
- return pod
-
- def dry_run(self) -> None:
- """
- Prints out the pod definition that would be created by this operator.
- Does not include labels specific to the task instance (since there isn't
- one in a dry_run) and excludes all empty elements.
- """
- pod = self.build_pod_request_obj()
- print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict')))
-
-
-class _suppress(AbstractContextManager):
- """
- This behaves the same as ``contextlib.suppress`` but logs the suppressed
- exceptions as errors with traceback.
-
- The caught exception is also stored on the context manager instance under
- attribute ``exception``.
- """
-
- def __init__(self, *exceptions):
- self._exceptions = exceptions
- self.exception = None
-
- def __enter__(self):
- return self
-
- def __exit__(self, exctype, excinst, exctb):
- caught_error = exctype is not None and issubclass(exctype, self._exceptions)
- if caught_error:
- self.exception = excinst
- logger = logging.getLogger()
- logger.error(str(excinst), exc_info=True)
- return caught_error
+ self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
+ final_state = None
+ try:
+ launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds)
+ final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
+ except AirflowException:
+ if self.log_events_on_failure:
+ for event in launcher.read_pod_events(self.pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason, event.message)
+ raise
+ finally:
+ if self.is_delete_operator_pod:
+ self.log.debug("Deleting pod for task %s", self.task_id)
+ launcher.delete_pod(self.pod)
+ elif final_state != State.SUCCESS:
+ self.patch_already_checked(self.pod)
+ return final_state, remote_pod, result
-def _prune_dict(val: Any, mode='strict'):
- """
- Note: this is duplicated from ``airflow.utils.helpers.prune_dict``. That one should
- be the one used if possible, but this one is included to avoid having to
- bump min airflow version. This function will be removed once the min airflow version
- is bumped to 2.3.
+ def patch_already_checked(self, pod: k8s.V1Pod):
+ """Add an "already tried annotation to ensure we only retry once"""
+ pod.metadata.labels["already_checked"] = "True"
+ body = PodGenerator.serialize_pod(pod)
+ self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
- Given dict ``val``, returns new dict based on ``val`` with all
- empty elements removed.
+ def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]:
+ """
+ Monitors a pod to completion that was created by a previous KubernetesPodOperator
- What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
- then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
- will be removed if ``bool(x) is False``.
- """
+ :param launcher: pod launcher that will manage launching and monitoring pods
+ :param pod: podspec used to find pod using k8s API
+ :return:
+ """
+ try:
+ (final_state, remote_pod, result) = launcher.monitor_pod(pod, get_logs=self.get_logs)
+ finally:
+ if self.is_delete_operator_pod:
+ launcher.delete_pod(pod)
+ if final_state != State.SUCCESS:
+ if self.log_events_on_failure:
+ for event in launcher.read_pod_events(pod).items:
+ self.log.error("Pod Event: %s - %s", event.reason, event.message)
+ if not self.is_delete_operator_pod:
+ self.patch_already_checked(pod)
+ raise AirflowException(f'Pod returned a failure: {final_state}')
+ return final_state, remote_pod, result
- def is_empty(x):
- if mode == 'strict':
- return x is None
- elif mode == 'truthy':
- return bool(x) is False
- raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")
-
- if isinstance(val, dict):
- new_dict = {}
- for k, v in val.items():
- if is_empty(v):
- continue
- elif isinstance(v, (list, dict)):
- new_val = _prune_dict(v, mode=mode)
- if new_val:
- new_dict[k] = new_val
- else:
- new_dict[k] = v
- return new_dict
- elif isinstance(val, list):
- new_list = []
- for v in val:
- if is_empty(v):
- continue
- elif isinstance(v, (list, dict)):
- new_val = _prune_dict(v, mode=mode)
- if new_val:
- new_list.append(new_val)
- else:
- new_list.append(v)
- return new_list
- else:
- return val
+ def on_kill(self) -> None:
+ if self.pod:
+ pod: k8s.V1Pod = self.pod
+ namespace = pod.metadata.namespace
+ name = pod.metadata.name
+ kwargs = {}
+ if self.termination_grace_period is not None:
+ kwargs = {"grace_period_seconds": self.termination_grace_period}
+ self.client.delete_namespaced_pod(name=name, namespace=namespace, **kwargs)
diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index 1029687..9779292 100644
--- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -15,14 +15,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import TYPE_CHECKING, Optional, Sequence
+from typing import Optional
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
-if TYPE_CHECKING:
- from airflow.utils.context import Context
-
class SparkKubernetesOperator(BaseOperator):
"""
@@ -34,15 +31,20 @@ class SparkKubernetesOperator(BaseOperator):
:param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a
path to a '.json' file or a JSON string.
+ :type application_file: str
:param namespace: kubernetes namespace to put sparkApplication
+ :type namespace: str
:param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
for the to Kubernetes cluster.
+ :type kubernetes_conn_id: str
:param api_group: kubernetes api group of sparkApplication
+ :type api_group: str
:param api_version: kubernetes api version of sparkApplication
+ :type api_version: str
"""
- template_fields: Sequence[str] = ('application_file', 'namespace')
- template_ext: Sequence[str] = ('.yaml', '.yml', '.json')
+ template_fields = ['application_file', 'namespace']
+ template_ext = ('.yaml', '.yml', '.json')
ui_color = '#f4a460'
def __init__(
@@ -62,7 +64,7 @@ class SparkKubernetesOperator(BaseOperator):
self.api_group = api_group
self.api_version = api_version
- def execute(self, context: 'Context'):
+ def execute(self, context):
self.log.info("Creating sparkApplication")
hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
response = hook.create_custom_object(
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml
index b5b5054..c7878ba 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -22,14 +22,6 @@ description: |
`Kubernetes <https://kubernetes.io/>`__
versions:
- - 3.1.2
- - 3.1.1
- - 3.1.0
- - 3.0.2
- - 3.0.1
- - 3.0.0
- - 2.2.0
- - 2.1.0
- 2.0.3
- 2.0.2
- 2.0.1
diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 15ac40b..da29e79 100644
--- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import TYPE_CHECKING, Optional, Sequence
+from typing import Dict, Optional
from kubernetes import client
@@ -23,9 +23,6 @@ from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.sensors.base import BaseSensorOperator
-if TYPE_CHECKING:
- from airflow.utils.context import Context
-
class SparkKubernetesSensor(BaseSensorOperator):
"""
@@ -36,15 +33,21 @@ class SparkKubernetesSensor(BaseSensorOperator):
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
:param application_name: spark Application resource name
+ :type application_name: str
:param namespace: the kubernetes namespace where the sparkApplication reside in
+ :type namespace: str
:param kubernetes_conn_id: The :ref:`kubernetes connection<howto/connection:kubernetes>`
to Kubernetes cluster.
+ :type kubernetes_conn_id: str
:param attach_log: determines whether logs for driver pod should be appended to the sensor log
+ :type attach_log: bool
:param api_group: kubernetes api group of sparkApplication
+ :type api_group: str
:param api_version: kubernetes api version of sparkApplication
+ :type api_version: str
"""
- template_fields: Sequence[str] = ("application_name", "namespace")
+ template_fields = ("application_name", "namespace")
FAILURE_STATES = ("FAILED", "UNKNOWN")
SUCCESS_STATES = ("COMPLETED",)
@@ -94,7 +97,7 @@ class SparkKubernetesSensor(BaseSensorOperator):
e,
)
- def poke(self, context: 'Context') -> bool:
+ def poke(self, context: Dict) -> bool:
self.log.info("Poking: %s", self.application_name)
response = self.hook.get_custom_object(
group=self.api_group,
diff --git a/setup.py b/setup.py
index 0e6ae83..794c33b 100644
--- a/setup.py
+++ b/setup.py
@@ -406,7 +406,7 @@ kerberos = [
]
kubernetes = [
'cryptography>=2.0.0',
- 'kubernetes>=21.7.0',
+ 'kubernetes>=3.0.0, <12.0.0',
]
kylin = ['kylinpy>=2.6']
ldap = [
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index ce040cf..9228e9b 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -22,21 +22,25 @@ from unittest import mock
from kubernetes.client import Configuration
from urllib3.connection import HTTPConnection, HTTPSConnection
-from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client
+from airflow.kubernetes.kube_client import (
+ RefreshConfiguration,
+ _disable_verify_ssl,
+ _enable_tcp_keepalive,
+ get_kube_client,
+)
class TestClient(unittest.TestCase):
@mock.patch('airflow.kubernetes.kube_client.config')
- def test_load_cluster_config(self, config):
- get_kube_client(in_cluster=True)
- assert config.load_incluster_config.called
- assert config.load_kube_config.not_called
+ def test_load_cluster_config(self, _):
+ client = get_kube_client(in_cluster=True)
+ assert not isinstance(client.api_client.configuration, RefreshConfiguration)
@mock.patch('airflow.kubernetes.kube_client.config')
- def test_load_file_config(self, config):
- get_kube_client(in_cluster=False)
- assert config.load_incluster_config.not_called
- assert config.load_kube_config.called
+ @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
+ def test_load_file_config(self, _, _2):
+ client = get_kube_client(in_cluster=False)
+ assert isinstance(client.api_client.configuration, RefreshConfiguration)
def test_enable_tcp_keepalive(self):
socket_options = [
diff --git a/tests/kubernetes/test_refresh_config.py b/tests/kubernetes/test_refresh_config.py
new file mode 100644
index 0000000..a0753e2
--- /dev/null
+++ b/tests/kubernetes/test_refresh_config.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+import pytest
+from pendulum.parsing import ParserError
+
+from airflow.kubernetes.refresh_config import _parse_timestamp
+
+
+class TestRefreshKubeConfigLoader(TestCase):
+ def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self):
+ ts = _parse_timestamp("2020-01-13T13:42:20Z")
+ assert 1578922940 == ts
+
+ def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self):
+ ts = _parse_timestamp("2020-01-13T13:42:20+0600")
+ assert 1578922940 == ts
+
+ def test_parse_timestamp_should_throw_exception(self):
+ with pytest.raises(ParserError):
+ _parse_timestamp("foobar")