You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/28 18:26:01 UTC

[airflow] 01/04: Revert "Remove RefreshConfiguration workaround for K8s token refreshing (#20759)"

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 60a2b900ef1e880df970880f865085f023c73701
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Mar 28 17:32:36 2022 +0200

    Revert "Remove RefreshConfiguration workaround for K8s token refreshing (#20759)"
    
    This reverts commit d39197fd13b0d96c2ab84ca3f1f13391dbf59572.
---
 UPDATING.md                                        |   6 +-
 airflow/kubernetes/kube_client.py                  |  47 +-
 airflow/kubernetes/refresh_config.py               | 124 +++++
 airflow/providers/cncf/kubernetes/CHANGELOG.rst    | 231 +--------
 airflow/providers/cncf/kubernetes/__init__.py      |  27 --
 .../backcompat/backwards_compat_converters.py      |  22 +-
 .../providers/cncf/kubernetes/backcompat/pod.py    |  27 +-
 .../kubernetes/backcompat/pod_runtime_info_env.py  |  18 +-
 .../providers/cncf/kubernetes/backcompat/volume.py |   2 +
 .../cncf/kubernetes/backcompat/volume_mount.py     |   4 +
 .../kubernetes/example_dags/example_kubernetes.py  | 163 -------
 .../providers/cncf/kubernetes/hooks/kubernetes.py  |  97 ++--
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 517 ++++++++++-----------
 .../cncf/kubernetes/operators/spark_kubernetes.py  |  16 +-
 airflow/providers/cncf/kubernetes/provider.yaml    |   8 -
 .../cncf/kubernetes/sensors/spark_kubernetes.py    |  15 +-
 setup.py                                           |   2 +-
 tests/kubernetes/test_client.py                    |  22 +-
 tests/kubernetes/test_refresh_config.py            |  37 ++
 19 files changed, 537 insertions(+), 848 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 6532160..a706258 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -84,9 +84,7 @@ https://developers.google.com/style/inclusive-documentation
 
 ## Airflow 2.2.5
 
-### Minimum kubernetes version bumped from 3.0.0 to  21.7.0
-
-No change in behavior is expected.  This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759).
+No breaking changes.
 
 ## Airflow 2.2.4
 
@@ -1381,7 +1379,7 @@ delete this option.
 
 #### `airflow.models.dagbag.DagBag`
 
-Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property
+Passing `store_serialized_dags` argument to `DagBag.__init__` and accessing `DagBag.store_serialized_dags` property
 are deprecated and will be removed in future versions.
 
 
diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py
index aa49715..1c20bd3 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -25,10 +25,39 @@ log = logging.getLogger(__name__)
 try:
     from kubernetes import client, config
     from kubernetes.client import Configuration
+    from kubernetes.client.api_client import ApiClient
     from kubernetes.client.rest import ApiException
 
+    from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config
+
     has_kubernetes = True
 
+    def _get_kube_config(
+        in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
+    ) -> Optional[Configuration]:
+        if in_cluster:
+            # load_incluster_config set default configuration with config populated by k8s
+            config.load_incluster_config()
+            return None
+        else:
+            # this block can be replaced with just config.load_kube_config once
+            # refresh_config module is replaced with upstream fix
+            cfg = RefreshConfiguration()
+            load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
+            return cfg
+
+    def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
+        """
+        This is a workaround for supporting api token refresh in k8s client.
+
+        The function can be replace with `return client.CoreV1Api()` once the
+        upstream client supports token refresh.
+        """
+        if cfg:
+            return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
+        else:
+            return client.CoreV1Api()
+
     def _disable_verify_ssl() -> None:
         configuration = Configuration()
         configuration.verify_ssl = False
@@ -101,19 +130,17 @@ def get_kube_client(
     if not has_kubernetes:
         raise _import_err
 
+    if not in_cluster:
+        if cluster_context is None:
+            cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
+        if config_file is None:
+            config_file = conf.get('kubernetes', 'config_file', fallback=None)
+
     if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
         _enable_tcp_keepalive()
 
     if not conf.getboolean('kubernetes', 'verify_ssl'):
         _disable_verify_ssl()
 
-    if in_cluster:
-        config.load_incluster_config()
-    else:
-        if cluster_context is None:
-            cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
-        if config_file is None:
-            config_file = conf.get('kubernetes', 'config_file', fallback=None)
-        config.load_kube_config(config_file=config_file, context=cluster_context)
-
-    return client.CoreV1Api()
+    client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
+    return _get_client_with_patched_configuration(client_conf)
diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
new file mode 100644
index 0000000..2564951
--- /dev/null
+++ b/airflow/kubernetes/refresh_config.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+NOTE: this module can be removed once upstream client supports token refresh
+see: https://github.com/kubernetes-client/python/issues/741
+"""
+
+import calendar
+import logging
+import os
+import time
+from typing import Optional, cast
+
+import pendulum
+from kubernetes.client import Configuration
+from kubernetes.config.exec_provider import ExecProvider
+from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader
+
+from airflow.utils import yaml
+
+
+def _parse_timestamp(ts_str: str) -> int:
+    parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str))
+    return calendar.timegm(parsed_dt.timetuple())
+
+
+class RefreshKubeConfigLoader(KubeConfigLoader):
+    """
+    Patched KubeConfigLoader, this subclass takes expirationTimestamp into
+    account and sets api key refresh callback hook in Configuration object
+    """
+
+    def __init__(self, *args, **kwargs):
+        KubeConfigLoader.__init__(self, *args, **kwargs)
+        self.api_key_expire_ts = None
+
+    def _load_from_exec_plugin(self):
+        """
+        We override _load_from_exec_plugin method to also read and store
+        expiration timestamp for aws-iam-authenticator. It will be later
+        used for api token refresh.
+        """
+        if 'exec' not in self._user:
+            return None
+        try:
+            status = ExecProvider(self._user['exec']).run()
+            if 'token' not in status:
+                logging.error('exec: missing token field in plugin output')
+                return None
+            self.token = f"Bearer {status['token']}"
+            ts_str = status.get('expirationTimestamp')
+            if ts_str:
+                self.api_key_expire_ts = _parse_timestamp(ts_str)
+            return True
+        except Exception as e:
+            logging.error(str(e))
+            return None
+
+    def refresh_api_key(self, client_configuration):
+        """Refresh API key if expired"""
+        if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts:
+            self.load_and_set(client_configuration)
+
+    def load_and_set(self, client_configuration):
+        KubeConfigLoader.load_and_set(self, client_configuration)
+        client_configuration.refresh_api_key = self.refresh_api_key
+
+
+class RefreshConfiguration(Configuration):
+    """
+    Patched Configuration, this subclass takes api key refresh callback hook
+    into account
+    """
+
+    def __init__(self, *args, **kwargs):
+        Configuration.__init__(self, *args, **kwargs)
+        self.refresh_api_key = None
+
+    def get_api_key_with_prefix(self, identifier):
+        if self.refresh_api_key:
+            self.refresh_api_key(self)
+        return Configuration.get_api_key_with_prefix(self, identifier)
+
+
+def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]:
+    """
+    Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed
+    KubeConfigLoader to RefreshKubeConfigLoader
+    """
+    with open(filename) as f:
+        return RefreshKubeConfigLoader(
+            config_dict=yaml.safe_load(f),
+            config_base_path=os.path.abspath(os.path.dirname(filename)),
+            **kwargs,
+        )
+
+
+def load_kube_config(client_configuration, config_file=None, context=None):
+    """
+    Adapted from the upstream load_kube_config function, changes:
+        - removed persist_config argument since it's not being used
+        - remove `client_configuration is None` branch since we always pass
+        in client configuration
+    """
+    if config_file is None:
+        config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)
+
+    loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None)
+    loader.load_and_set(client_configuration)
diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 5bcf569..7f686b2 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -19,231 +19,6 @@
 Changelog
 ---------
 
-3.1.2
-.....
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Fix mistakenly added install_requires for all providers (#22382)``
-* ``Fix "run_id" k8s and elasticsearch compatibility with Airflow 2.1 (#22385)``
-
-Misc
-~~~~
-
-* ``Remove RefreshConfiguration workaround for K8s token refreshing (#20759)``
-
-3.1.1
-.....
-
-Misc
-~~~~~
-
-* ``Add Trove classifiers in PyPI (Framework :: Apache Airflow :: Provider)``
-
-3.1.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Add map_index label to mapped KubernetesPodOperator (#21916)``
-* ``Change KubePodOperator labels from exeuction_date to run_id (#21960)``
-
-Misc
-~~~~
-
-* ``Support for Python 3.10``
-* ``Fix Kubernetes example with wrong operator casing (#21898)``
-* ``Remove types from KPO docstring (#21826)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-   * ``Add pre-commit check for docstring param types (#21398)``
-
-3.0.2
-.....
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Add missed deprecations for cncf (#20031)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-   * ``Remove ':type' lines now sphinx-autoapi supports typehints (#20951)``
-   * ``Make ''delete_pod'' change more prominent in K8s changelog (#20753)``
-   * ``Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)``
-   * ``Add optional features in providers. (#21074)``
-   * ``Add documentation for January 2021 providers release (#21257)``
-
-3.0.1
-.....
-
-
-Misc
-~~~~
-
-* ``Update Kubernetes library version (#18797)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-
-3.0.0
-.....
-
-Breaking changes
-~~~~~~~~~~~~~~~~
-
-* ``Parameter is_delete_operator_pod default is changed to True (#20575)``
-* ``Simplify KubernetesPodOperator (#19572)``
-* ``Move pod_mutation_hook call from PodManager to KubernetesPodOperator (#20596)``
-* ``Rename ''PodLauncher'' to ''PodManager'' (#20576)``
-
-Parameter is_delete_operator_pod has new default
-````````````````````````````````````````````````
-
-Previously, the default for param ``is_delete_operator_pod`` was ``False``, which means that
-after a task runs, its pod is not deleted by the operator and remains on the
-cluster indefinitely.  With this release, we change the default to ``True``.
-
-Notes on changes KubernetesPodOperator and PodLauncher
-``````````````````````````````````````````````````````
-
-.. warning:: Many methods in ``KubernetesPodOperator`` and ``PodLauncher`` have been renamed.
-    If you have subclassed ``KubernetesPodOperator`` you will need to update your subclass to reflect
-    the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``.
-
-Overview
-''''''''
-
-Generally speaking if you did not subclass ``KubernetesPodOperator`` and you didn't use the ``PodLauncher`` class directly,
-then you don't need to worry about this change.  If however you have subclassed ``KubernetesPodOperator``, what
-follows are some notes on the changes in this release.
-
-One of the principal goals of the refactor is to clearly separate the "get or create pod" and
-"wait for pod completion" phases.  Previously the "wait for pod completion" logic would be invoked
-differently depending on whether the operator were to  "attach to an existing pod" (e.g. after a
-worker failure) or "create a new pod" and this resulted in some code duplication and a bit more
-nesting of logic.  With this refactor we encapsulate  the "get or create" step
-into method ``KubernetesPodOperator.get_or_create_pod``, and pull the monitoring and XCom logic up
-into the top level of ``execute`` because it can be the same for "attached" pods and "new" pods.
-
-The ``KubernetesPodOperator.get_or_create_pod`` tries first to find an existing pod using labels
-specific to the task instance (see ``KubernetesPodOperator.find_pod``).
-If one does not exist it ``creates a pod <~.PodManager.create_pod>``.
-
-The "waiting" part of execution has three components.  The first step is to wait for the pod to leave the
-``Pending`` phase (``~.KubernetesPodOperator.await_pod_start``). Next, if configured to do so,
-the operator will follow the base container logs and forward these logs to the task logger until
-the ``base`` container is done. If not configured to harvest the
-logs, the operator will instead ``KubernetesPodOperator.await_container_completion``
-either way, we must await container completion before harvesting xcom. After (optionally) extracting the xcom
-value from the base container, we ``await pod completion <~.PodManager.await_pod_completion>``.
-
-Previously, depending on whether the pod was "reattached to" (e.g. after a worker failure) or
-created anew, the waiting logic may have occurred in either ``handle_pod_overlap`` or ``create_new_pod_for_operator``.
-
-After the pod terminates, we execute different cleanup tasks depending on whether the pod terminated successfully.
-
-If the pod terminates *unsuccessfully*, we attempt to log the pod events ``PodLauncher.read_pod_events>``. If
-additionally the task is configured *not* to delete the pod after termination, we apply a label ``KubernetesPodOperator.patch_already_checked>``
-indicating that the pod failed and should not be "reattached to" in a retry.  If the task is configured
-to delete its pod, we delete it ``KubernetesPodOperator.process_pod_deletion>``.  Finally,
-we raise an AirflowException to fail the task instance.
-
-If the pod terminates successfully, we delete the pod ``KubernetesPodOperator.process_pod_deletion>``
-(if configured to delete the pod) and push XCom (if configured to push XCom).
-
-Details on method renames, refactors, and deletions
-'''''''''''''''''''''''''''''''''''''''''''''''''''
-
-In ``KubernetesPodOperator``:
-
-* Method ``create_pod_launcher`` is converted to cached property ``pod_manager``
-* Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client``
-* Logic to search for an existing pod (e.g. after an airflow worker failure) is moved out of ``execute`` and into method ``find_pod``.
-* Method ``handle_pod_overlap`` is removed. Previously it monitored a "found" pod until completion.  With this change the pod monitoring (and log following) is orchestrated directly from ``execute`` and it is the same  whether it's a "found" pod or a "new" pod. See methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``.
-* Method ``create_pod_request_obj`` is renamed ``build_pod_request_obj``.  It now takes argument ``context`` in order to add TI-specific pod labels; previously they were added after return.
-* Method ``create_labels_for_pod`` is renamed ``_get_ti_pod_labels``.  This method doesn't return *all* labels, but only those specific to the TI. We also add parameter ``include_try_number`` to control the inclusion of this label instead of possibly filtering it out later.
-* Method ``_get_pod_identifying_label_string`` is renamed ``_build_find_pod_label_selector``
-* Method ``_try_numbers_match`` is removed.
-* Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc.  Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``.  Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``.
-
-In class ``PodManager`` (formerly ``PodLauncher``):
-
-* Method ``start_pod`` is removed and split into two methods: ``create_pod`` and ``await_pod_start``.
-* Method ``monitor_pod`` is removed and split into methods ``follow_container_logs``, ``await_container_completion``, ``await_pod_completion``
-* Methods ``pod_not_started``, ``pod_is_running``, ``process_status``, and ``_task_status`` are removed.  These were needed due to the way in which pod ``phase`` was mapped to task instance states; but we no longer do such a mapping and instead deal with pod phases directly and untransformed.
-* Method ``_extract_xcom`` is renamed  ``extract_xcom``.
-* Method ``read_pod_logs`` now takes kwarg ``container_name``
-
-
-Other changes in ``pod_manager.py`` (formerly ``pod_launcher.py``):
-
-* Class ``pod_launcher.PodLauncher`` renamed to ``pod_manager.PodManager``
-* Enum-like class ``PodStatus`` is renamed ``PodPhase``, and the values are no longer lower-cased.
-* The ``airflow.settings.pod_mutation_hook`` is no longer called in
-  ``cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async``. For ``KubernetesPodOperator``,
-  mutation now occurs in ``build_pod_request_obj``.
-* Parameter ``is_delete_operator_pod`` default is changed to ``True`` so that pods are deleted after task
-  completion and not left to accumulate. In practice it seems more common to disable pod deletion only on a
-  temporary basis for debugging purposes and therefore pod deletion is the more sensible default.
-
-Features
-~~~~~~~~
-
-* ``Add params config, in_cluster, and cluster_context to KubernetesHook (#19695)``
-* ``Implement dry_run for KubernetesPodOperator (#20573)``
-* ``Clarify docstring for ''build_pod_request_obj'' in K8s providers (#20574)``
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Fix Volume/VolumeMount KPO DeprecationWarning (#19726)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-     * ``Fix cached_property MyPy declaration and related MyPy errors (#20226)``
-     * ``Use typed Context EVERYWHERE (#20565)``
-     * ``Fix template_fields type to have MyPy friendly Sequence type (#20571)``
-     * ``Even more typing in operators (template_fields/ext) (#20608)``
-     * ``Update documentation for provider December 2021 release (#20523)``
-
-2.2.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Added namespace as a template field in the KPO. (#19718)``
-* ``Decouple name randomization from name kwarg (#19398)``
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Checking event.status.container_statuses before filtering (#19713)``
-* ``Coalesce 'extra' params to None in KubernetesHook (#19694)``
-* ``Change to correct type in KubernetesPodOperator (#19459)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-   * ``Fix duplicate changelog entries (#19759)``
-
-2.1.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Add more type hints to PodLauncher (#18928)``
-* ``Add more information to PodLauncher timeout error (#17953)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-   * ``Update docstring to let users use 'node_selector' (#19057)``
-   * ``Add pre-commit hook for common misspelling check in files (#18964)``
-
 2.0.3
 .....
 
@@ -269,8 +44,7 @@ Bug Fixes
 * ``Fix using XCom with ''KubernetesPodOperator'' (#17760)``
 * ``Import Hooks lazily individually in providers manager (#17682)``
 
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
+.. Review and move the new changes to one of the sections above:
    * ``Fix messed-up changelog in 3 providers (#17380)``
    * ``Fix static checks (#17256)``
    * ``Update spark_kubernetes.py (#17237)``
@@ -291,7 +65,10 @@ Bug Fixes
 
 .. Below changes are excluded from the changelog. Move them to
    appropriate section above if needed. Do not delete the lines(!):
+   * ``Fixed wrongly escaped characters in amazon's changelog (#17020)``
    * ``Simplify 'default_args' in Kubernetes example DAGs (#16870)``
+   * ``Enable using custom pod launcher in Kubernetes Pod Operator (#16945)``
+   * ``Prepare documentation for July release of providers. (#17015)``
    * ``Updating task dependencies (#16624)``
    * ``Removes pylint from our toolchain (#16682)``
    * ``Prepare documentation for July release of providers. (#17015)``
diff --git a/airflow/providers/cncf/kubernetes/__init__.py b/airflow/providers/cncf/kubernetes/__init__.py
index 0998e31..217e5db 100644
--- a/airflow/providers/cncf/kubernetes/__init__.py
+++ b/airflow/providers/cncf/kubernetes/__init__.py
@@ -15,30 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import sys
-
-if sys.version_info < (3, 7):
-    # This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that
-    # v1.Pod et al. are not pickleable on Python 3.6.
-
-    # Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this
-    # method.
-
-    # This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means
-    # that we can update the version used in this provider and have it work for older versions
-    import copyreg
-    import logging
-
-    def _reduce_Logger(logger):
-        if logging.getLogger(logger.name) is not logger:
-            import pickle
-
-            raise pickle.PicklingError('logger cannot be pickled')
-        return logging.getLogger, (logger.name,)
-
-    def _reduce_RootLogger(logger):
-        return logging.getLogger, ()
-
-    if logging.Logger not in copyreg.dispatch_table:
-        copyreg.pickle(logging.Logger, _reduce_Logger)
-        copyreg.pickle(logging.RootLogger, _reduce_RootLogger)
diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index bf2b832..4c6404f 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -21,16 +21,18 @@ from typing import List
 from kubernetes.client import ApiClient, models as k8s
 
 from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources
+from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
 
 
-def _convert_kube_model_object(obj, new_class):
+def _convert_kube_model_object(obj, old_class, new_class):
     convert_op = getattr(obj, "to_k8s_client_obj", None)
     if callable(convert_op):
         return obj.to_k8s_client_obj()
     elif isinstance(obj, new_class):
         return obj
     else:
-        raise AirflowException(f"Expected {new_class}, got {type(obj)}")
+        raise AirflowException(f"Expected {old_class} or {new_class}, got {type(obj)}")
 
 
 def _convert_from_dict(obj, new_class):
@@ -50,7 +52,9 @@ def convert_volume(volume) -> k8s.V1Volume:
     :param volume:
     :return: k8s.V1Volume
     """
-    return _convert_kube_model_object(volume, k8s.V1Volume)
+    from airflow.providers.cncf.kubernetes.backcompat.volume import Volume
+
+    return _convert_kube_model_object(volume, Volume, k8s.V1Volume)
 
 
 def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
@@ -60,7 +64,9 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
     :param volume_mount:
     :return: k8s.V1VolumeMount
     """
-    return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount)
+    from airflow.providers.cncf.kubernetes.backcompat.volume_mount import VolumeMount
+
+    return _convert_kube_model_object(volume_mount, VolumeMount, k8s.V1VolumeMount)
 
 
 def convert_resources(resources) -> k8s.V1ResourceRequirements:
@@ -71,10 +77,8 @@ def convert_resources(resources) -> k8s.V1ResourceRequirements:
     :return: k8s.V1ResourceRequirements
     """
     if isinstance(resources, dict):
-        from airflow.providers.cncf.kubernetes.backcompat.pod import Resources
-
         resources = Resources(**resources)
-    return _convert_kube_model_object(resources, k8s.V1ResourceRequirements)
+    return _convert_kube_model_object(resources, Resources, k8s.V1ResourceRequirements)
 
 
 def convert_port(port) -> k8s.V1ContainerPort:
@@ -84,7 +88,7 @@ def convert_port(port) -> k8s.V1ContainerPort:
     :param port:
     :return: k8s.V1ContainerPort
     """
-    return _convert_kube_model_object(port, k8s.V1ContainerPort)
+    return _convert_kube_model_object(port, Port, k8s.V1ContainerPort)
 
 
 def convert_env_vars(env_vars) -> List[k8s.V1EnvVar]:
@@ -112,7 +116,7 @@ def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar:
     :param pod_runtime_info_envs:
     :return:
     """
-    return _convert_kube_model_object(pod_runtime_info_envs, k8s.V1EnvVar)
+    return _convert_kube_model_object(pod_runtime_info_envs, PodRuntimeInfoEnv, k8s.V1EnvVar)
 
 
 def convert_image_pull_secrets(image_pull_secrets) -> List[k8s.V1LocalObjectReference]:
diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod.py b/airflow/providers/cncf/kubernetes/backcompat/pod.py
index 7f18117..30a7128 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/pod.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/pod.py
@@ -14,29 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Classes for interacting with Kubernetes API.
-
-This module is deprecated. Please use :mod:`kubernetes.client.models.V1ResourceRequirements`
-and :mod:`kubernetes.client.models.V1ContainerPort`.
-"""
-
-import warnings
+"""Classes for interacting with Kubernetes API"""
 
 from kubernetes.client import models as k8s
 
-warnings.warn(
-    (
-        "This module is deprecated. Please use `kubernetes.client.models.V1ResourceRequirements`"
-        " and `kubernetes.client.models.V1ContainerPort`."
-    ),
-    DeprecationWarning,
-    stacklevel=2,
-)
-
 
 class Resources:
-    """backwards compat for Resources."""
+    """backwards compat for Resources"""
 
     __slots__ = (
         'request_memory',
@@ -50,12 +34,19 @@ class Resources:
 
     """
     :param request_memory: requested memory
+    :type request_memory: str
     :param request_cpu: requested CPU number
+    :type request_cpu: float | str
     :param request_ephemeral_storage: requested ephemeral storage
+    :type request_ephemeral_storage: str
     :param limit_memory: limit for memory usage
+    :type limit_memory: str
     :param limit_cpu: Limit for CPU used
+    :type limit_cpu: float | str
     :param limit_gpu: Limits for GPU used
+    :type limit_gpu: int
     :param limit_ephemeral_storage: Limit for ephemeral storage
+    :type limit_ephemeral_storage: float | str
     """
 
     def __init__(
diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
index f08aecf..f76e0d7 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
@@ -14,25 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Classes for interacting with Kubernetes API.
-
-This module is deprecated. Please use :mod:`kubernetes.client.models.V1EnvVar`.
-"""
-
-import warnings
+"""Classes for interacting with Kubernetes API"""
 
 import kubernetes.client.models as k8s
 
-warnings.warn(
-    "This module is deprecated. Please use `kubernetes.client.models.V1EnvVar`.",
-    DeprecationWarning,
-    stacklevel=2,
-)
-
 
 class PodRuntimeInfoEnv:
-    """Defines Pod runtime information as environment variable."""
+    """Defines Pod runtime information as environment variable"""
 
     def __init__(self, name, field_path):
         """
@@ -40,7 +28,9 @@ class PodRuntimeInfoEnv:
         Full list of options can be found in kubernetes documentation.
 
         :param name: the name of the environment variable
+        :type: name: str
         :param field_path: path to pod runtime info. Ex: metadata.namespace | status.podIP
+        :type: field_path: str
         """
         self.name = name
         self.field_path = field_path
diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume.py b/airflow/providers/cncf/kubernetes/backcompat/volume.py
index c51ce8a..e5b4d00 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/volume.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/volume.py
@@ -35,8 +35,10 @@ class Volume:
         and Persistent Volumes
 
         :param name: the name of the volume mount
+        :type name: str
         :param configs: dictionary of any features needed for volume. We purposely keep this
             vague since there are multiple volume types with changing configs.
+        :type configs: dict
         """
         self.name = name
         self.configs = configs
diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
index f9faed9..b77ab47 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
@@ -38,9 +38,13 @@ class VolumeMount:
         running container.
 
         :param name: the name of the volume mount
+        :type name: str
         :param mount_path:
+        :type mount_path: str
         :param sub_path: subpath within the volume mount
+        :type sub_path: Optional[str]
         :param read_only: whether to access pod with read-only mode
+        :type read_only: bool
         """
         self.name = name
         self.mount_path = mount_path
diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py b/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
deleted file mode 100644
index b65dae9..0000000
--- a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-This is an example dag for using the KubernetesPodOperator.
-"""
-
-from datetime import datetime
-
-from kubernetes.client import models as k8s
-
-from airflow import DAG
-from airflow.kubernetes.secret import Secret
-from airflow.operators.bash import BashOperator
-from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
-
-# [START howto_operator_k8s_cluster_resources]
-secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
-secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
-secret_all_keys = Secret('env', None, 'airflow-secrets-2')
-volume_mount = k8s.V1VolumeMount(
-    name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
-)
-
-configmaps = [
-    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')),
-    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')),
-]
-
-volume = k8s.V1Volume(
-    name='test-volume',
-    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
-)
-
-port = k8s.V1ContainerPort(name='http', container_port=80)
-
-init_container_volume_mounts = [
-    k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
-]
-
-init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
-
-init_container = k8s.V1Container(
-    name="init-container",
-    image="ubuntu:16.04",
-    env=init_environments,
-    volume_mounts=init_container_volume_mounts,
-    command=["bash", "-cx"],
-    args=["echo 10"],
-)
-
-affinity = k8s.V1Affinity(
-    node_affinity=k8s.V1NodeAffinity(
-        preferred_during_scheduling_ignored_during_execution=[
-            k8s.V1PreferredSchedulingTerm(
-                weight=1,
-                preference=k8s.V1NodeSelectorTerm(
-                    match_expressions=[
-                        k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
-                    ]
-                ),
-            )
-        ]
-    ),
-    pod_affinity=k8s.V1PodAffinity(
-        required_during_scheduling_ignored_during_execution=[
-            k8s.V1WeightedPodAffinityTerm(
-                weight=1,
-                pod_affinity_term=k8s.V1PodAffinityTerm(
-                    label_selector=k8s.V1LabelSelector(
-                        match_expressions=[
-                            k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
-                        ]
-                    ),
-                    topology_key="failure-domain.beta.kubernetes.io/zone",
-                ),
-            )
-        ]
-    ),
-)
-
-tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
-
-# [END howto_operator_k8s_cluster_resources]
-
-
-with DAG(
-    dag_id='example_kubernetes_operator',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-) as dag:
-    k = KubernetesPodOperator(
-        namespace='default',
-        image="ubuntu:16.04",
-        cmds=["bash", "-cx"],
-        arguments=["echo", "10"],
-        labels={"foo": "bar"},
-        secrets=[secret_file, secret_env, secret_all_keys],
-        ports=[port],
-        volumes=[volume],
-        volume_mounts=[volume_mount],
-        env_from=configmaps,
-        name="airflow-test-pod",
-        task_id="task",
-        affinity=affinity,
-        is_delete_operator_pod=True,
-        hostnetwork=False,
-        tolerations=tolerations,
-        init_containers=[init_container],
-        priority_class_name="medium",
-    )
-
-    # [START howto_operator_k8s_private_image]
-    quay_k8s = KubernetesPodOperator(
-        namespace='default',
-        image='quay.io/apache/bash',
-        image_pull_secrets=[k8s.V1LocalObjectReference('testquay')],
-        cmds=["bash", "-cx"],
-        arguments=["echo", "10", "echo pwd"],
-        labels={"foo": "bar"},
-        name="airflow-private-image-pod",
-        is_delete_operator_pod=True,
-        in_cluster=True,
-        task_id="task-two",
-        get_logs=True,
-    )
-    # [END howto_operator_k8s_private_image]
-
-    # [START howto_operator_k8s_write_xcom]
-    write_xcom = KubernetesPodOperator(
-        namespace='default',
-        image='alpine',
-        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
-        name="write-xcom",
-        do_xcom_push=True,
-        is_delete_operator_pod=True,
-        in_cluster=True,
-        task_id="write-xcom",
-        get_logs=True,
-    )
-
-    pod_task_xcom_result = BashOperator(
-        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
-        task_id="pod_task_xcom_result",
-    )
-    # [END howto_operator_k8s_write_xcom]
-
-    write_xcom >> pod_task_xcom_result
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 3830503..e230dba 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -14,21 +14,19 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import sys
 import tempfile
 from typing import Any, Dict, Generator, Optional, Tuple, Union
 
-if sys.version_info >= (3, 8):
+try:
     from functools import cached_property
-else:
+except ImportError:
     from cached_property import cached_property
-
 from kubernetes import client, config, watch
 
 try:
     import airflow.utils.yaml as yaml
 except ImportError:
-    import yaml  # type: ignore[no-redef]
+    import yaml
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
@@ -61,6 +59,7 @@ class KubernetesHook(BaseHook):
 
     :param conn_id: The :ref:`kubernetes connection <howto/connection:kubernetes>`
         to Kubernetes cluster.
+    :type conn_id: str
     """
 
     conn_name_attr = 'kubernetes_conn_id'
@@ -86,13 +85,10 @@ class KubernetesHook(BaseHook):
             "extra__kubernetes__namespace": StringField(
                 lazy_gettext('Namespace'), widget=BS3TextFieldWidget()
             ),
-            "extra__kubernetes__cluster_context": StringField(
-                lazy_gettext('Cluster context'), widget=BS3TextFieldWidget()
-            ),
         }
 
     @staticmethod
-    def get_ui_field_behaviour() -> Dict[str, Any]:
+    def get_ui_field_behaviour() -> Dict:
         """Returns custom field behaviour"""
         return {
             "hidden_fields": ['host', 'schema', 'login', 'password', 'port', 'extra'],
@@ -100,49 +96,25 @@ class KubernetesHook(BaseHook):
         }
 
     def __init__(
-        self,
-        conn_id: Optional[str] = default_conn_name,
-        client_configuration: Optional[client.Configuration] = None,
-        cluster_context: Optional[str] = None,
-        config_file: Optional[str] = None,
-        in_cluster: Optional[bool] = None,
+        self, conn_id: str = default_conn_name, client_configuration: Optional[client.Configuration] = None
     ) -> None:
         super().__init__()
         self.conn_id = conn_id
         self.client_configuration = client_configuration
-        self.cluster_context = cluster_context
-        self.config_file = config_file
-        self.in_cluster = in_cluster
-
-    @staticmethod
-    def _coalesce_param(*params):
-        for param in params:
-            if param is not None:
-                return param
 
     def get_conn(self) -> Any:
         """Returns kubernetes api session for use with requests"""
-        if self.conn_id:
-            connection = self.get_connection(self.conn_id)
-            extras = connection.extra_dejson
-        else:
-            extras = {}
-        in_cluster = self._coalesce_param(
-            self.in_cluster, extras.get("extra__kubernetes__in_cluster") or None
-        )
-        cluster_context = self._coalesce_param(
-            self.cluster_context, extras.get("extra__kubernetes__cluster_context") or None
-        )
-        kubeconfig_path = self._coalesce_param(
-            self.config_file, extras.get("extra__kubernetes__kube_config_path") or None
-        )
-        kubeconfig = extras.get("extra__kubernetes__kube_config") or None
+        connection = self.get_connection(self.conn_id)
+        extras = connection.extra_dejson
+        in_cluster = extras.get("extra__kubernetes__in_cluster")
+        kubeconfig_path = extras.get("extra__kubernetes__kube_config_path")
+        kubeconfig = extras.get("extra__kubernetes__kube_config")
         num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o])
 
         if num_selected_configuration > 1:
             raise AirflowException(
-                "Invalid connection configuration. Options kube_config_path, "
-                "kube_config, in_cluster are mutually exclusive. "
+                "Invalid connection configuration. Options extra__kubernetes__kube_config_path, "
+                "extra__kubernetes__kube_config, extra__kubernetes__in_cluster are mutually exclusive. "
                 "You can only use one option at a time."
             )
         if in_cluster:
@@ -153,9 +125,7 @@ class KubernetesHook(BaseHook):
         if kubeconfig_path is not None:
             self.log.debug("loading kube_config from: %s", kubeconfig_path)
             config.load_kube_config(
-                config_file=kubeconfig_path,
-                client_configuration=self.client_configuration,
-                context=cluster_context,
+                config_file=kubeconfig_path, client_configuration=self.client_configuration
             )
             return client.ApiClient()
 
@@ -165,17 +135,12 @@ class KubernetesHook(BaseHook):
                 temp_config.write(kubeconfig.encode())
                 temp_config.flush()
                 config.load_kube_config(
-                    config_file=temp_config.name,
-                    client_configuration=self.client_configuration,
-                    context=cluster_context,
+                    config_file=temp_config.name, client_configuration=self.client_configuration
                 )
             return client.ApiClient()
 
         self.log.debug("loading kube_config from: default file")
-        config.load_kube_config(
-            client_configuration=self.client_configuration,
-            context=cluster_context,
-        )
+        config.load_kube_config(client_configuration=self.client_configuration)
         return client.ApiClient()
 
     @cached_property
@@ -183,10 +148,6 @@ class KubernetesHook(BaseHook):
         """Cached Kubernetes API client"""
         return self.get_conn()
 
-    @cached_property
-    def core_v1_client(self):
-        return client.CoreV1Api(api_client=self.api_client)
-
     def create_custom_object(
         self, group: str, version: str, plural: str, body: Union[str, dict], namespace: Optional[str] = None
     ):
@@ -194,10 +155,15 @@ class KubernetesHook(BaseHook):
         Creates custom resource definition object in Kubernetes
 
         :param group: api group
+        :type group: str
         :param version: api version
+        :type version: str
         :param plural: api plural
+        :type plural: str
         :param body: crd object definition
+        :type body: Union[str, dict]
         :param namespace: kubernetes namespace
+        :type namespace: str
         """
         api = client.CustomObjectsApi(self.api_client)
         if namespace is None:
@@ -220,10 +186,15 @@ class KubernetesHook(BaseHook):
         Get custom resource definition object from Kubernetes
 
         :param group: api group
+        :type group: str
         :param version: api version
+        :type version: str
         :param plural: api plural
+        :type plural: str
         :param name: crd object name
+        :type name: str
         :param namespace: kubernetes namespace
+        :type namespace: str
         """
         api = client.CustomObjectsApi(self.api_client)
         if namespace is None:
@@ -236,14 +207,12 @@ class KubernetesHook(BaseHook):
         except client.rest.ApiException as e:
             raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n")
 
-    def get_namespace(self) -> Optional[str]:
+    def get_namespace(self) -> str:
         """Returns the namespace that defined in the connection"""
-        if self.conn_id:
-            connection = self.get_connection(self.conn_id)
-            extras = connection.extra_dejson
-            namespace = extras.get("extra__kubernetes__namespace", "default")
-            return namespace
-        return None
+        connection = self.get_connection(self.conn_id)
+        extras = connection.extra_dejson
+        namespace = extras.get("extra__kubernetes__namespace", "default")
+        return namespace
 
     def get_pod_log_stream(
         self,
@@ -255,8 +224,10 @@ class KubernetesHook(BaseHook):
         Retrieves a log stream for a container in a kubernetes pod.
 
         :param pod_name: pod name
+        :type pod_name: str
         :param container: container name
         :param namespace: kubernetes namespace
+        :type namespace: str
         """
         api = client.CoreV1Api(self.api_client)
         watcher = watch.Watch()
@@ -280,8 +251,10 @@ class KubernetesHook(BaseHook):
         Retrieves a container's log from the specified pod.
 
         :param pod_name: pod name
+        :type pod_name: str
         :param container: container name
         :param namespace: kubernetes namespace
+        :type namespace: str
         """
         api = client.CoreV1Api(self.api_client)
         return api.read_namespaced_pod_log(
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index dd127fe..747f8b0 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -15,16 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 """Executes task in a Kubernetes POD"""
-import json
-import logging
 import re
-import sys
 import warnings
-from contextlib import AbstractContextManager
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
 
 from kubernetes.client import CoreV1Api, models as k8s
 
+try:
+    import airflow.utils.yaml as yaml
+except ImportError:
+    import yaml
+
 from airflow.exceptions import AirflowException
 from airflow.kubernetes import kube_client, pod_generator
 from airflow.kubernetes.pod_generator import PodGenerator
@@ -42,27 +43,15 @@ from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters im
     convert_volume,
     convert_volume_mount,
 )
-from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
-from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase
-from airflow.settings import pod_mutation_hook
-from airflow.utils import yaml
+from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
+from airflow.providers.cncf.kubernetes.utils import pod_launcher, xcom_sidecar
 from airflow.utils.helpers import validate_key
+from airflow.utils.state import State
 from airflow.version import version as airflow_version
 
-if sys.version_info >= (3, 8):
-    from functools import cached_property
-else:
-    from cached_property import cached_property
-
 if TYPE_CHECKING:
     import jinja2
 
-    from airflow.utils.context import Context
-
-
-class PodReattachFailure(AirflowException):
-    """When we expect to be able to find a pod but cannot."""
-
 
 class KubernetesPodOperator(BaseOperator):
     """
@@ -79,66 +68,101 @@ class KubernetesPodOperator(BaseOperator):
         simplifies the authorization process.
 
     :param namespace: the namespace to run within kubernetes.
+    :type namespace: str
     :param image: Docker image you wish to launch. Defaults to hub.docker.com,
         but fully qualified URLS will point to custom repositories. (templated)
+    :type image: str
     :param name: name of the pod in which the task will run, will be used (plus a random
-        suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain,
-        containing only [a-z0-9.-]).
-    :param random_name_suffix: if True, will generate a random suffix.
+        suffix) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
+    :type name: str
     :param cmds: entrypoint of the container. (templated)
         The docker images's entrypoint is used if this is not provided.
+    :type cmds: list[str]
     :param arguments: arguments of the entrypoint. (templated)
         The docker image's CMD is used if this is not provided.
-    :param ports: ports for the launched pod.
-    :param volume_mounts: volumeMounts for the launched pod.
-    :param volumes: volumes for the launched pod. Includes ConfigMaps and PersistentVolumes.
+    :type arguments: list[str]
+    :param ports: ports for launched pod.
+    :type ports: list[k8s.V1ContainerPort]
+    :param volume_mounts: volumeMounts for launched pod.
+    :type volume_mounts: list[k8s.V1VolumeMount]
+    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
+    :type volumes: list[k8s.V1Volume]
     :param env_vars: Environment variables initialized in the container. (templated)
+    :type env_vars: list[k8s.V1EnvVar]
     :param secrets: Kubernetes secrets to inject in the container.
         They can be exposed as environment vars or files in a volume.
+    :type secrets: list[airflow.kubernetes.secret.Secret]
     :param in_cluster: run kubernetes client with in_cluster configuration.
+    :type in_cluster: bool
     :param cluster_context: context that points to kubernetes cluster.
         Ignored when in_cluster is True. If None, current-context is used.
+    :type cluster_context: str
     :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor
+    :type reattach_on_restart: bool
     :param labels: labels to apply to the Pod. (templated)
+    :type labels: dict
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
+    :type startup_timeout_seconds: int
     :param get_logs: get the stdout of the container as logs of the tasks.
+    :type get_logs: bool
     :param image_pull_policy: Specify a policy to cache or always pull an image.
+    :type image_pull_policy: str
     :param annotations: non-identifying metadata you can attach to the Pod.
         Can be a large range of data, and can include characters
         that are not permitted by labels.
-    :param resources: resources for the launched pod.
-    :param affinity: affinity scheduling rules for the launched pod.
+    :type annotations: dict
+    :param resources: A dict containing resources requests and limits.
+        Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
+        and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources.
+        See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
+    :type resources: k8s.V1ResourceRequirements
+    :param affinity: A dict containing a group of affinity scheduling rules.
+    :type affinity: k8s.V1Affinity
     :param config_file: The path to the Kubernetes config file. (templated)
         If not specified, default value is ``~/.kube/config``
-    :param node_selector: A dict containing a group of scheduling rules.
+    :type config_file: str
+    :param node_selectors: A dict containing a group of scheduling rules.
+    :type node_selectors: dict
     :param image_pull_secrets: Any image pull secrets to be given to the pod.
         If more than one secret is required, provide a
         comma separated list: secret_a,secret_b
+    :type image_pull_secrets: List[k8s.V1LocalObjectReference]
     :param service_account_name: Name of the service account
+    :type service_account_name: str
     :param is_delete_operator_pod: What to do when the pod reaches its final
-        state, or the execution is interrupted. If True (default), delete the
-        pod; if False, leave the pod.
+        state, or the execution is interrupted.
+        If False (default): do nothing, If True: delete the pod
+    :type is_delete_operator_pod: bool
     :param hostnetwork: If True enable host networking on the pod.
+    :type hostnetwork: bool
     :param tolerations: A list of kubernetes tolerations.
+    :type tolerations: List[k8s.V1Toleration]
     :param security_context: security options the pod should run with (PodSecurityContext).
+    :type security_context: dict
     :param dnspolicy: dnspolicy for the pod.
+    :type dnspolicy: str
     :param schedulername: Specify a schedulername for the pod
+    :type schedulername: str
     :param full_pod_spec: The complete podSpec
+    :type full_pod_spec: kubernetes.client.models.V1Pod
     :param init_containers: init container for the launched Pod
+    :type init_containers: list[kubernetes.client.models.V1Container]
     :param log_events_on_failure: Log the pod's events if a failure occurs
+    :type log_events_on_failure: bool
     :param do_xcom_push: If True, the content of the file
         /airflow/xcom/return.json in the container will also be pushed to an
         XCom when the container completes.
+    :type do_xcom_push: bool
     :param pod_template_file: path to pod template file (templated)
+    :type pod_template_file: str
     :param priority_class_name: priority class name for the launched Pod
+    :type priority_class_name: str
     :param termination_grace_period: Termination grace period if task killed in UI,
         defaults to kubernetes default
+    :type termination_grace_period: int
     """
 
-    BASE_CONTAINER_NAME = 'base'
-    POD_CHECKED_KEY = 'already_checked'
-
-    template_fields: Sequence[str] = (
+    template_fields: Iterable[str] = (
         'image',
         'cmds',
         'arguments',
@@ -146,16 +170,16 @@ class KubernetesPodOperator(BaseOperator):
         'labels',
         'config_file',
         'pod_template_file',
-        'namespace',
     )
 
+    # fmt: off
     def __init__(
+        # fmt: on
         self,
         *,
         namespace: Optional[str] = None,
         image: Optional[str] = None,
         name: Optional[str] = None,
-        random_name_suffix: Optional[bool] = True,
         cmds: Optional[List[str]] = None,
         arguments: Optional[List[str]] = None,
         ports: Optional[List[k8s.V1ContainerPort]] = None,
@@ -179,7 +203,7 @@ class KubernetesPodOperator(BaseOperator):
         node_selector: Optional[dict] = None,
         image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None,
         service_account_name: Optional[str] = None,
-        is_delete_operator_pod: bool = True,
+        is_delete_operator_pod: bool = False,
         hostnetwork: bool = False,
         tolerations: Optional[List[k8s.V1Toleration]] = None,
         security_context: Optional[Dict] = None,
@@ -191,9 +215,9 @@ class KubernetesPodOperator(BaseOperator):
         do_xcom_push: bool = False,
         pod_template_file: Optional[str] = None,
         priority_class_name: Optional[str] = None,
-        pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None,
+        pod_runtime_info_envs: List[PodRuntimeInfoEnv] = None,
         termination_grace_period: Optional[int] = None,
-        configmaps: Optional[List[str]] = None,
+        configmaps: Optional[str] = None,
         **kwargs,
     ) -> None:
         if kwargs.get('xcom_push') is not None:
@@ -240,9 +264,8 @@ class KubernetesPodOperator(BaseOperator):
         self.service_account_name = service_account_name
         self.is_delete_operator_pod = is_delete_operator_pod
         self.hostnetwork = hostnetwork
-        self.tolerations = (
-            [convert_toleration(toleration) for toleration in tolerations] if tolerations else []
-        )
+        self.tolerations = [convert_toleration(toleration) for toleration in tolerations] \
+            if tolerations else []
         self.security_context = security_context or {}
         self.dnspolicy = dnspolicy
         self.schedulername = schedulername
@@ -252,15 +275,14 @@ class KubernetesPodOperator(BaseOperator):
         self.priority_class_name = priority_class_name
         self.pod_template_file = pod_template_file
         self.name = self._set_name(name)
-        self.random_name_suffix = random_name_suffix
         self.termination_grace_period = termination_grace_period
-        self.pod_request_obj: Optional[k8s.V1Pod] = None
-        self.pod: Optional[k8s.V1Pod] = None
+        self.client: CoreV1Api = None
+        self.pod: k8s.V1Pod = None
 
     def _render_nested_template_fields(
         self,
         content: Any,
-        context: 'Context',
+        context: Dict,
         jinja_env: "jinja2.Environment",
         seen_oids: set,
     ) -> None:
@@ -269,31 +291,27 @@ class KubernetesPodOperator(BaseOperator):
             self._do_render_template_fields(content, ('value', 'name'), context, jinja_env, seen_oids)
             return
 
-        super()._render_nested_template_fields(content, context, jinja_env, seen_oids)
+        super()._render_nested_template_fields(
+            content,
+            context,
+            jinja_env,
+            seen_oids
+        )
 
     @staticmethod
-    def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool = True) -> dict:
+    def create_labels_for_pod(context) -> dict:
         """
         Generate labels for the pod to track the pod in case of Operator crash
 
         :param context: task context provided by airflow DAG
         :return: dict
         """
-        if not context:
-            return {}
-
-        ti = context['ti']
-        run_id = context['run_id']
-
-        labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}
-
-        # If running on Airflow 2.3+:
-        map_index = getattr(ti, 'map_index', -1)
-        if map_index >= 0:
-            labels['map_index'] = map_index
-
-        if include_try_number:
-            labels.update(try_number=ti.try_number)
+        labels = {
+            'dag_id': context['dag'].dag_id,
+            'task_id': context['task'].task_id,
+            'execution_date': context['ts'],
+            'try_number': context['ti'].try_number,
+        }
         # In the case of sub dags this is just useful
         if context['dag'].is_subdag:
             labels['parent_dag_id'] = context['dag'].parent_dag.dag_id
@@ -304,127 +322,101 @@ class KubernetesPodOperator(BaseOperator):
             labels[label_id] = safe_label
         return labels
 
-    @cached_property
-    def pod_manager(self) -> PodManager:
-        return PodManager(kube_client=self.client)
+    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
+        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
 
-    @cached_property
-    def client(self) -> CoreV1Api:
-        # todo: use airflow Connection / hook to authenticate to the cluster
-        kwargs: Dict[str, Any] = dict(
-            cluster_context=self.cluster_context,
-            config_file=self.config_file,
-        )
-        if self.in_cluster is not None:
-            kwargs.update(in_cluster=self.in_cluster)
-        return kube_client.get_kube_client(**kwargs)
-
-    def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
-        """Returns an already-running pod for this task instance if one exists."""
-        label_selector = self._build_find_pod_label_selector(context)
-        pod_list = self.client.list_namespaced_pod(
-            namespace=namespace,
-            label_selector=label_selector,
-        ).items
-
-        pod = None
-        num_pods = len(pod_list)
-        if num_pods > 1:
-            raise AirflowException(f'More than one pod running with labels {label_selector}')
-        elif num_pods == 1:
-            pod = pod_list[0]
-            self.log.info("Found matching pod %s with labels %s", pod.metadata.name, pod.metadata.labels)
-            self.log.info("`try_number` of task_instance: %s", context['ti'].try_number)
-            self.log.info("`try_number` of pod: %s", pod.metadata.labels['try_number'])
-        return pod
+    def execute(self, context) -> Optional[str]:
+        try:
+            if self.in_cluster is not None:
+                client = kube_client.get_kube_client(
+                    in_cluster=self.in_cluster,
+                    cluster_context=self.cluster_context,
+                    config_file=self.config_file,
+                )
+            else:
+                client = kube_client.get_kube_client(
+                    cluster_context=self.cluster_context, config_file=self.config_file
+                )
 
-    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
-        if self.reattach_on_restart:
-            pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
-            if pod:
-                return pod
-        self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
-        self.pod_manager.create_pod(pod=pod_request_obj)
-        return pod_request_obj
+            self.client = client
 
-    def await_pod_start(self, pod):
-        try:
-            self.pod_manager.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds)
-        except PodLaunchFailedException:
-            if self.log_events_on_failure:
-                for event in self.pod_manager.read_pod_events(pod).items:
-                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
-            raise
+            self.pod = self.create_pod_request_obj()
+            self.namespace = self.pod.metadata.namespace
 
-    def extract_xcom(self, pod):
-        """Retrieves xcom value and kills xcom sidecar container"""
-        result = self.pod_manager.extract_xcom(pod)
-        self.log.info("xcom result: \n%s", result)
-        return json.loads(result)
+            # Add combination of labels to uniquely identify a running pod
+            labels = self.create_labels_for_pod(context)
 
-    def execute(self, context: 'Context'):
-        remote_pod = None
-        try:
-            self.pod_request_obj = self.build_pod_request_obj(context)
-            self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
-                pod_request_obj=self.pod_request_obj,
-                context=context,
-            )
-            self.await_pod_start(pod=self.pod)
+            label_selector = self._get_pod_identifying_label_string(labels)
 
-            if self.get_logs:
-                self.pod_manager.fetch_container_logs(
-                    pod=self.pod,
-                    container_name=self.BASE_CONTAINER_NAME,
-                    follow=True,
-                )
-            else:
-                self.pod_manager.await_container_completion(
-                    pod=self.pod, container_name=self.BASE_CONTAINER_NAME
+            pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector)
+
+            if len(pod_list.items) > 1 and self.reattach_on_restart:
+                raise AirflowException(
+                    f'More than one pod running with labels: {label_selector}'
                 )
 
-            if self.do_xcom_push:
-                result = self.extract_xcom(pod=self.pod)
-            remote_pod = self.pod_manager.await_pod_completion(self.pod)
-        finally:
-            self.cleanup(
-                pod=self.pod or self.pod_request_obj,
-                remote_pod=remote_pod,
-            )
-        ti = context['ti']
-        ti.xcom_push(key='pod_name', value=self.pod.metadata.name)
-        ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace)
-        if self.do_xcom_push:
+            launcher = self.create_pod_launcher()
+
+            if len(pod_list.items) == 1:
+                try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
+                final_state, remote_pod, result = self.handle_pod_overlap(
+                    labels, try_numbers_match, launcher, pod_list.items[0]
+                )
+            else:
+                self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
+                final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
+            if final_state != State.SUCCESS:
+                raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
+            context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name)
+            context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace)
             return result
+        except AirflowException as ex:
+            raise AirflowException(f'Pod Launching failed: {ex}')
 
-    def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
-        pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
-        if pod_phase != PodPhase.SUCCEEDED:
-            if self.log_events_on_failure:
-                with _suppress(Exception):
-                    for event in self.pod_manager.read_pod_events(pod).items:
-                        self.log.error("Pod Event: %s - %s", event.reason, event.message)
-            if not self.is_delete_operator_pod:
-                with _suppress(Exception):
-                    self.patch_already_checked(pod)
-            with _suppress(Exception):
-                self.process_pod_deletion(pod)
-            raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}')
-        else:
-            with _suppress(Exception):
-                self.process_pod_deletion(pod)
+    def handle_pod_overlap(
+        self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
+    ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
+        """
+
+        In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
+        this function will either continue to monitor the existing pod or launch a new pod
+        based on the `reattach_on_restart` parameter.
 
-    def process_pod_deletion(self, pod):
-        if self.is_delete_operator_pod:
-            self.log.info("Deleting pod: %s", pod.metadata.name)
-            self.pod_manager.delete_pod(pod)
+        :param labels: labels used to determine if a pod is repeated
+        :type labels: dict
+        :param try_numbers_match: do the try numbers match? Only needed for logging purposes
+        :type try_numbers_match: bool
+        :param launcher: PodLauncher
+        :param pod: Pod found with matching labels
+        """
+        if try_numbers_match:
+            log_line = f"found a running pod with labels {labels} and the same try_number."
+        else:
+            log_line = f"found a running pod with labels {labels} but a different try_number."
+
+        # In case of failed pods, should reattach the first time, but only once
+        # as the task will have already failed.
+        if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
+            log_line += " Will attach to this pod and monitor instead of starting new one"
+            self.log.info(log_line)
+            self.pod = pod
+            final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod)
         else:
-            self.log.info("skipping deleting pod: %s", pod.metadata.name)
+            log_line += f"creating pod with labels {labels} and launcher {launcher}"
+            self.log.info(log_line)
+            final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
+        return final_state, remote_pod, result
 
-    def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str:
-        labels = self._get_ti_pod_labels(context, include_try_number=False)
-        label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())]
-        return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+    @staticmethod
+    def _get_pod_identifying_label_string(labels) -> str:
+        label_strings = [
+            f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number'
+        ]
+        return ','.join(label_strings) + ',already_checked!=True'
+
+    @staticmethod
+    def _try_numbers_match(context, pod) -> bool:
+        return pod.metadata.labels['try_number'] == context['ti'].try_number
 
     def _set_name(self, name):
         if name is None:
@@ -435,29 +427,11 @@ class KubernetesPodOperator(BaseOperator):
         validate_key(name, max_length=220)
         return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
 
-    def patch_already_checked(self, pod: k8s.V1Pod):
-        """Add an "already checked" annotation to ensure we don't reattach on retries"""
-        pod.metadata.labels[self.POD_CHECKED_KEY] = "True"
-        body = PodGenerator.serialize_pod(pod)
-        self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
-
-    def on_kill(self) -> None:
-        if self.pod:
-            pod = self.pod
-            kwargs = dict(
-                name=pod.metadata.name,
-                namespace=pod.metadata.namespace,
-            )
-            if self.termination_grace_period is not None:
-                kwargs.update(grace_period_seconds=self.termination_grace_period)
-            self.client.delete_namespaced_pod(**kwargs)
-
-    def build_pod_request_obj(self, context=None):
+    def create_pod_request_obj(self) -> k8s.V1Pod:
         """
-        Returns V1Pod object based on pod template file, full pod spec, and other operator parameters.
+        Creates a V1Pod based on user parameters. Note that a `pod` or `pod_template_file`
+        will supersede all other values.
 
-        The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod
-        template file.
         """
         self.log.debug("Creating pod for KubernetesPodOperator task %s", self.task_id)
         if self.pod_template_file:
@@ -476,7 +450,7 @@ class KubernetesPodOperator(BaseOperator):
             metadata=k8s.V1ObjectMeta(
                 namespace=self.namespace,
                 labels=self.labels,
-                name=self.name,
+                name=PodGenerator.make_unique_pod_id(self.name),
                 annotations=self.annotations,
             ),
             spec=k8s.V1PodSpec(
@@ -487,7 +461,7 @@ class KubernetesPodOperator(BaseOperator):
                 containers=[
                     k8s.V1Container(
                         image=self.image,
-                        name=self.BASE_CONTAINER_NAME,
+                        name="base",
                         command=self.cmds,
                         ports=self.ports,
                         image_pull_policy=self.image_pull_policy,
@@ -512,112 +486,89 @@ class KubernetesPodOperator(BaseOperator):
 
         pod = PodGenerator.reconcile_pods(pod_template, pod)
 
-        if self.random_name_suffix:
-            pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name)
-
         for secret in self.secrets:
             self.log.debug("Adding secret to task %s", self.task_id)
             pod = secret.attach_to_pod(pod)
         if self.do_xcom_push:
             self.log.debug("Adding xcom sidecar to task %s", self.task_id)
             pod = xcom_sidecar.add_xcom_sidecar(pod)
+        return pod
+
+    def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]:
+        """
+        Creates a new pod and monitors for duration of task
 
-        labels = self._get_ti_pod_labels(context)
-        self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels)
+        :param labels: labels used to track pod
+        :param launcher: pod launcher that will manage launching and monitoring pods
+        :return:
+        """
+        self.log.debug(
+            "Adding KubernetesPodOperator labels to pod before launch for task %s", self.task_id
+        )
 
         # Merge Pod Identifying labels with labels passed to operator
-        pod.metadata.labels.update(labels)
+        self.pod.metadata.labels.update(labels)
         # Add Airflow Version to the label
         # And a label to identify that pod is launched by KubernetesPodOperator
-        pod.metadata.labels.update(
+        self.pod.metadata.labels.update(
             {
                 'airflow_version': airflow_version.replace('+', '-'),
                 'kubernetes_pod_operator': 'True',
             }
         )
-        pod_mutation_hook(pod)
-        return pod
-
-    def dry_run(self) -> None:
-        """
-        Prints out the pod definition that would be created by this operator.
-        Does not include labels specific to the task instance (since there isn't
-        one in a dry_run) and excludes all empty elements.
-        """
-        pod = self.build_pod_request_obj()
-        print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict')))
-
-
-class _suppress(AbstractContextManager):
-    """
-    This behaves the same as ``contextlib.suppress`` but logs the suppressed
-    exceptions as errors with traceback.
-
-    The caught exception is also stored on the context manager instance under
-    attribute ``exception``.
-    """
-
-    def __init__(self, *exceptions):
-        self._exceptions = exceptions
-        self.exception = None
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exctype, excinst, exctb):
-        caught_error = exctype is not None and issubclass(exctype, self._exceptions)
-        if caught_error:
-            self.exception = excinst
-            logger = logging.getLogger()
-            logger.error(str(excinst), exc_info=True)
-        return caught_error
 
+        self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
+        final_state = None
+        try:
+            launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds)
+            final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
+        except AirflowException:
+            if self.log_events_on_failure:
+                for event in launcher.read_pod_events(self.pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            raise
+        finally:
+            if self.is_delete_operator_pod:
+                self.log.debug("Deleting pod for task %s", self.task_id)
+                launcher.delete_pod(self.pod)
+            elif final_state != State.SUCCESS:
+                self.patch_already_checked(self.pod)
+        return final_state, remote_pod, result
 
-def _prune_dict(val: Any, mode='strict'):
-    """
-    Note: this is duplicated from ``airflow.utils.helpers.prune_dict``.  That one should
-    be the one used if possible, but this one is included to avoid having to
-    bump min airflow version.  This function will be removed once the min airflow version
-    is bumped to 2.3.
+    def patch_already_checked(self, pod: k8s.V1Pod):
+        """Add an "already tried annotation to ensure we only retry once"""
+        pod.metadata.labels["already_checked"] = "True"
+        body = PodGenerator.serialize_pod(pod)
+        self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
 
-    Given dict ``val``, returns new dict based on ``val`` with all
-    empty elements removed.
+    def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]:
+        """
+        Monitors a pod to completion that was created by a previous KubernetesPodOperator
 
-    What constitutes "empty" is controlled by the ``mode`` parameter.  If mode is 'strict'
-    then only ``None`` elements will be removed.  If mode is ``truthy``, then element ``x``
-    will be removed if ``bool(x) is False``.
-    """
+        :param launcher: pod launcher that will manage launching and monitoring pods
+        :param pod: podspec used to find pod using k8s API
+        :return:
+        """
+        try:
+            (final_state, remote_pod, result) = launcher.monitor_pod(pod, get_logs=self.get_logs)
+        finally:
+            if self.is_delete_operator_pod:
+                launcher.delete_pod(pod)
+        if final_state != State.SUCCESS:
+            if self.log_events_on_failure:
+                for event in launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            if not self.is_delete_operator_pod:
+                self.patch_already_checked(pod)
+            raise AirflowException(f'Pod returned a failure: {final_state}')
+        return final_state, remote_pod, result
 
-    def is_empty(x):
-        if mode == 'strict':
-            return x is None
-        elif mode == 'truthy':
-            return bool(x) is False
-        raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")
-
-    if isinstance(val, dict):
-        new_dict = {}
-        for k, v in val.items():
-            if is_empty(v):
-                continue
-            elif isinstance(v, (list, dict)):
-                new_val = _prune_dict(v, mode=mode)
-                if new_val:
-                    new_dict[k] = new_val
-            else:
-                new_dict[k] = v
-        return new_dict
-    elif isinstance(val, list):
-        new_list = []
-        for v in val:
-            if is_empty(v):
-                continue
-            elif isinstance(v, (list, dict)):
-                new_val = _prune_dict(v, mode=mode)
-                if new_val:
-                    new_list.append(new_val)
-            else:
-                new_list.append(v)
-        return new_list
-    else:
-        return val
+    def on_kill(self) -> None:
+        if self.pod:
+            pod: k8s.V1Pod = self.pod
+            namespace = pod.metadata.namespace
+            name = pod.metadata.name
+            kwargs = {}
+            if self.termination_grace_period is not None:
+                kwargs = {"grace_period_seconds": self.termination_grace_period}
+            self.client.delete_namespaced_pod(name=name, namespace=namespace, **kwargs)
diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index 1029687..9779292 100644
--- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -15,14 +15,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import TYPE_CHECKING, Optional, Sequence
+from typing import Optional
 
 from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
 
 class SparkKubernetesOperator(BaseOperator):
     """
@@ -34,15 +31,20 @@ class SparkKubernetesOperator(BaseOperator):
 
     :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a
         path to a '.json' file or a JSON string.
+    :type application_file:  str
     :param namespace: kubernetes namespace to put sparkApplication
+    :type namespace: str
     :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
         for the to Kubernetes cluster.
+    :type kubernetes_conn_id: str
     :param api_group: kubernetes api group of sparkApplication
+    :type api_group: str
     :param api_version: kubernetes api version of sparkApplication
+    :type api_version: str
     """
 
-    template_fields: Sequence[str] = ('application_file', 'namespace')
-    template_ext: Sequence[str] = ('.yaml', '.yml', '.json')
+    template_fields = ['application_file', 'namespace']
+    template_ext = ('.yaml', '.yml', '.json')
     ui_color = '#f4a460'
 
     def __init__(
@@ -62,7 +64,7 @@ class SparkKubernetesOperator(BaseOperator):
         self.api_group = api_group
         self.api_version = api_version
 
-    def execute(self, context: 'Context'):
+    def execute(self, context):
         self.log.info("Creating sparkApplication")
         hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
         response = hook.create_custom_object(
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml
index b5b5054..c7878ba 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -22,14 +22,6 @@ description: |
     `Kubernetes <https://kubernetes.io/>`__
 
 versions:
-  - 3.1.2
-  - 3.1.1
-  - 3.1.0
-  - 3.0.2
-  - 3.0.1
-  - 3.0.0
-  - 2.2.0
-  - 2.1.0
   - 2.0.3
   - 2.0.2
   - 2.0.1
diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 15ac40b..da29e79 100644
--- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import TYPE_CHECKING, Optional, Sequence
+from typing import Dict, Optional
 
 from kubernetes import client
 
@@ -23,9 +23,6 @@ from airflow.exceptions import AirflowException
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
 from airflow.sensors.base import BaseSensorOperator
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
 
 class SparkKubernetesSensor(BaseSensorOperator):
     """
@@ -36,15 +33,21 @@ class SparkKubernetesSensor(BaseSensorOperator):
         https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
 
     :param application_name: spark Application resource name
+    :type application_name:  str
     :param namespace: the kubernetes namespace where the sparkApplication reside in
+    :type namespace: str
     :param kubernetes_conn_id: The :ref:`kubernetes connection<howto/connection:kubernetes>`
         to Kubernetes cluster.
+    :type kubernetes_conn_id: str
     :param attach_log: determines whether logs for driver pod should be appended to the sensor log
+    :type attach_log: bool
     :param api_group: kubernetes api group of sparkApplication
+    :type api_group: str
     :param api_version: kubernetes api version of sparkApplication
+    :type api_version: str
     """
 
-    template_fields: Sequence[str] = ("application_name", "namespace")
+    template_fields = ("application_name", "namespace")
     FAILURE_STATES = ("FAILED", "UNKNOWN")
     SUCCESS_STATES = ("COMPLETED",)
 
@@ -94,7 +97,7 @@ class SparkKubernetesSensor(BaseSensorOperator):
                 e,
             )
 
-    def poke(self, context: 'Context') -> bool:
+    def poke(self, context: Dict) -> bool:
         self.log.info("Poking: %s", self.application_name)
         response = self.hook.get_custom_object(
             group=self.api_group,
diff --git a/setup.py b/setup.py
index 0e6ae83..794c33b 100644
--- a/setup.py
+++ b/setup.py
@@ -406,7 +406,7 @@ kerberos = [
 ]
 kubernetes = [
     'cryptography>=2.0.0',
-    'kubernetes>=21.7.0',
+    'kubernetes>=3.0.0, <12.0.0',
 ]
 kylin = ['kylinpy>=2.6']
 ldap = [
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index ce040cf..9228e9b 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -22,21 +22,25 @@ from unittest import mock
 from kubernetes.client import Configuration
 from urllib3.connection import HTTPConnection, HTTPSConnection
 
-from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client
+from airflow.kubernetes.kube_client import (
+    RefreshConfiguration,
+    _disable_verify_ssl,
+    _enable_tcp_keepalive,
+    get_kube_client,
+)
 
 
 class TestClient(unittest.TestCase):
     @mock.patch('airflow.kubernetes.kube_client.config')
-    def test_load_cluster_config(self, config):
-        get_kube_client(in_cluster=True)
-        assert config.load_incluster_config.called
-        assert config.load_kube_config.not_called
+    def test_load_cluster_config(self, _):
+        client = get_kube_client(in_cluster=True)
+        assert not isinstance(client.api_client.configuration, RefreshConfiguration)
 
     @mock.patch('airflow.kubernetes.kube_client.config')
-    def test_load_file_config(self, config):
-        get_kube_client(in_cluster=False)
-        assert config.load_incluster_config.not_called
-        assert config.load_kube_config.called
+    @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
+    def test_load_file_config(self, _, _2):
+        client = get_kube_client(in_cluster=False)
+        assert isinstance(client.api_client.configuration, RefreshConfiguration)
 
     def test_enable_tcp_keepalive(self):
         socket_options = [
diff --git a/tests/kubernetes/test_refresh_config.py b/tests/kubernetes/test_refresh_config.py
new file mode 100644
index 0000000..a0753e2
--- /dev/null
+++ b/tests/kubernetes/test_refresh_config.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+import pytest
+from pendulum.parsing import ParserError
+
+from airflow.kubernetes.refresh_config import _parse_timestamp
+
+
+class TestRefreshKubeConfigLoader(TestCase):
+    def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self):
+        ts = _parse_timestamp("2020-01-13T13:42:20Z")
+        assert 1578922940 == ts
+
+    def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self):
+        ts = _parse_timestamp("2020-01-13T13:42:20+0600")
+        assert 1578922940 == ts
+
+    def test_parse_timestamp_should_throw_exception(self):
+        with pytest.raises(ParserError):
+            _parse_timestamp("foobar")