You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/28 18:15:57 UTC

[airflow] branch v2-2-test updated (825a1f2 -> 3da4cb7)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 825a1f2  Revert "Fix handling some None parameters in kubernetes 23 libs. (#21905)"
 discard ea476f9  Revert "Update Kubernetes library version (#18797)"
 discard e0a6b67  Revert "Remove RefreshConfiguration workaround for K8s token refreshing (#20759)"
    omit e0754a4  Fix rat-exclides and issue template licence (#22550)
    omit 52499a7  Add 2.2.5 to CHANGELOG.txt and UPDATING.md
     new 6bbcb33  Fix rat-exclides and issue template licence (#22550)
     new 9597880  Revert "Remove RefreshConfiguration workaround for K8s token refreshing (#20759)"
     new cbfae04  Revert "Update Kubernetes library version (#18797)"
     new d11c2e9  Revert "Fix handling some None parameters in kubernetes 23 libs. (#21905)"
     new 3da4cb7  Add 2.2.5 to CHANGELOG.txt and UPDATING.md

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (825a1f2)
            \
             N -- N -- N   refs/heads/v2-2-test (3da4cb7)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG.txt | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

[airflow] 04/05: Revert "Fix handling some None parameters in kubernetes 23 libs. (#21905)"

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d11c2e9fab7773dfbab25bda8b20a07e8639ecb2
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Mar 28 17:38:20 2022 +0200

    Revert "Fix handling some None parameters in kubernetes 23 libs. (#21905)"
    
    This reverts commit 24c84f0acee622f6bba4b8d73f2e9a6831cd364a.
---
 airflow/kubernetes/pod_generator_deprecated.py | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/airflow/kubernetes/pod_generator_deprecated.py b/airflow/kubernetes/pod_generator_deprecated.py
index bb3c980..c4d83d6 100644
--- a/airflow/kubernetes/pod_generator_deprecated.py
+++ b/airflow/kubernetes/pod_generator_deprecated.py
@@ -206,8 +206,7 @@ class PodGenerator:
 
         self.container.command = cmds or []
         self.container.args = args or []
-        if image_pull_policy:
-            self.container.image_pull_policy = image_pull_policy
+        self.container.image_pull_policy = image_pull_policy
         self.container.ports = ports or []
         self.container.resources = resources
         self.container.volume_mounts = volume_mounts or []
@@ -216,8 +215,7 @@ class PodGenerator:
         self.spec = k8s.V1PodSpec(containers=[])
         self.spec.security_context = security_context
         self.spec.tolerations = tolerations
-        if dnspolicy:
-            self.spec.dns_policy = dnspolicy
+        self.spec.dns_policy = dnspolicy
         self.spec.scheduler_name = schedulername
         self.spec.host_network = hostnetwork
         self.spec.affinity = affinity
@@ -225,8 +223,7 @@ class PodGenerator:
         self.spec.init_containers = init_containers
         self.spec.volumes = volumes or []
         self.spec.node_selector = node_selectors
-        if restart_policy:
-            self.spec.restart_policy = restart_policy
+        self.spec.restart_policy = restart_policy
         self.spec.priority_class_name = priority_class_name
 
         self.spec.image_pull_secrets = []

[airflow] 02/05: Revert "Remove RefreshConfiguration workaround for K8s token refreshing (#20759)"

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9597880fc8f2334a76339feb0ad4c97efd111560
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Mar 28 17:32:36 2022 +0200

    Revert "Remove RefreshConfiguration workaround for K8s token refreshing (#20759)"
    
    This reverts commit d39197fd13b0d96c2ab84ca3f1f13391dbf59572.
---
 UPDATING.md                                        |   6 +-
 airflow/kubernetes/kube_client.py                  |  47 +-
 airflow/kubernetes/refresh_config.py               | 124 +++++
 airflow/providers/cncf/kubernetes/CHANGELOG.rst    | 231 +--------
 airflow/providers/cncf/kubernetes/__init__.py      |  27 --
 .../backcompat/backwards_compat_converters.py      |  22 +-
 .../providers/cncf/kubernetes/backcompat/pod.py    |  27 +-
 .../kubernetes/backcompat/pod_runtime_info_env.py  |  18 +-
 .../providers/cncf/kubernetes/backcompat/volume.py |   2 +
 .../cncf/kubernetes/backcompat/volume_mount.py     |   4 +
 .../kubernetes/example_dags/example_kubernetes.py  | 163 -------
 .../providers/cncf/kubernetes/hooks/kubernetes.py  |  97 ++--
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 517 ++++++++++-----------
 .../cncf/kubernetes/operators/spark_kubernetes.py  |  16 +-
 airflow/providers/cncf/kubernetes/provider.yaml    |   8 -
 .../cncf/kubernetes/sensors/spark_kubernetes.py    |  15 +-
 setup.py                                           |   2 +-
 tests/kubernetes/test_client.py                    |  22 +-
 tests/kubernetes/test_refresh_config.py            |  37 ++
 19 files changed, 537 insertions(+), 848 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index b096881..c942199 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -83,9 +83,7 @@ https://developers.google.com/style/inclusive-documentation
 
 ## Airflow 2.2.5
 
-### Minimum kubernetes version bumped from 3.0.0 to  21.7.0
-
-No change in behavior is expected.  This was necessary in order to take advantage of a [bugfix](https://github.com/kubernetes-client/python-base/commit/70b78cd8488068c014b6d762a0c8d358273865b4) concerning refreshing of Kubernetes API tokens with EKS, which enabled the removal of some [workaround code](https://github.com/apache/airflow/pull/20759).
+No breaking changes.
 
 ### Deprecation: `Connection.extra` must be JSON-encoded dict
 
@@ -1382,7 +1380,7 @@ delete this option.
 
 #### `airflow.models.dagbag.DagBag`
 
-Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property
+Passing `store_serialized_dags` argument to `DagBag.__init__` and accessing `DagBag.store_serialized_dags` property
 are deprecated and will be removed in future versions.
 
 
diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py
index aa49715..1c20bd3 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -25,10 +25,39 @@ log = logging.getLogger(__name__)
 try:
     from kubernetes import client, config
     from kubernetes.client import Configuration
+    from kubernetes.client.api_client import ApiClient
     from kubernetes.client.rest import ApiException
 
+    from airflow.kubernetes.refresh_config import RefreshConfiguration, load_kube_config
+
     has_kubernetes = True
 
+    def _get_kube_config(
+        in_cluster: bool, cluster_context: Optional[str], config_file: Optional[str]
+    ) -> Optional[Configuration]:
+        if in_cluster:
+            # load_incluster_config set default configuration with config populated by k8s
+            config.load_incluster_config()
+            return None
+        else:
+            # this block can be replaced with just config.load_kube_config once
+            # refresh_config module is replaced with upstream fix
+            cfg = RefreshConfiguration()
+            load_kube_config(client_configuration=cfg, config_file=config_file, context=cluster_context)
+            return cfg
+
+    def _get_client_with_patched_configuration(cfg: Optional[Configuration]) -> client.CoreV1Api:
+        """
+        This is a workaround for supporting api token refresh in k8s client.
+
+        The function can be replace with `return client.CoreV1Api()` once the
+        upstream client supports token refresh.
+        """
+        if cfg:
+            return client.CoreV1Api(api_client=ApiClient(configuration=cfg))
+        else:
+            return client.CoreV1Api()
+
     def _disable_verify_ssl() -> None:
         configuration = Configuration()
         configuration.verify_ssl = False
@@ -101,19 +130,17 @@ def get_kube_client(
     if not has_kubernetes:
         raise _import_err
 
+    if not in_cluster:
+        if cluster_context is None:
+            cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
+        if config_file is None:
+            config_file = conf.get('kubernetes', 'config_file', fallback=None)
+
     if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
         _enable_tcp_keepalive()
 
     if not conf.getboolean('kubernetes', 'verify_ssl'):
         _disable_verify_ssl()
 
-    if in_cluster:
-        config.load_incluster_config()
-    else:
-        if cluster_context is None:
-            cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
-        if config_file is None:
-            config_file = conf.get('kubernetes', 'config_file', fallback=None)
-        config.load_kube_config(config_file=config_file, context=cluster_context)
-
-    return client.CoreV1Api()
+    client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
+    return _get_client_with_patched_configuration(client_conf)
diff --git a/airflow/kubernetes/refresh_config.py b/airflow/kubernetes/refresh_config.py
new file mode 100644
index 0000000..2564951
--- /dev/null
+++ b/airflow/kubernetes/refresh_config.py
@@ -0,0 +1,124 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+NOTE: this module can be removed once upstream client supports token refresh
+see: https://github.com/kubernetes-client/python/issues/741
+"""
+
+import calendar
+import logging
+import os
+import time
+from typing import Optional, cast
+
+import pendulum
+from kubernetes.client import Configuration
+from kubernetes.config.exec_provider import ExecProvider
+from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION, KubeConfigLoader
+
+from airflow.utils import yaml
+
+
+def _parse_timestamp(ts_str: str) -> int:
+    parsed_dt = cast(pendulum.DateTime, pendulum.parse(ts_str))
+    return calendar.timegm(parsed_dt.timetuple())
+
+
+class RefreshKubeConfigLoader(KubeConfigLoader):
+    """
+    Patched KubeConfigLoader, this subclass takes expirationTimestamp into
+    account and sets api key refresh callback hook in Configuration object
+    """
+
+    def __init__(self, *args, **kwargs):
+        KubeConfigLoader.__init__(self, *args, **kwargs)
+        self.api_key_expire_ts = None
+
+    def _load_from_exec_plugin(self):
+        """
+        We override _load_from_exec_plugin method to also read and store
+        expiration timestamp for aws-iam-authenticator. It will be later
+        used for api token refresh.
+        """
+        if 'exec' not in self._user:
+            return None
+        try:
+            status = ExecProvider(self._user['exec']).run()
+            if 'token' not in status:
+                logging.error('exec: missing token field in plugin output')
+                return None
+            self.token = f"Bearer {status['token']}"
+            ts_str = status.get('expirationTimestamp')
+            if ts_str:
+                self.api_key_expire_ts = _parse_timestamp(ts_str)
+            return True
+        except Exception as e:
+            logging.error(str(e))
+            return None
+
+    def refresh_api_key(self, client_configuration):
+        """Refresh API key if expired"""
+        if self.api_key_expire_ts and time.time() >= self.api_key_expire_ts:
+            self.load_and_set(client_configuration)
+
+    def load_and_set(self, client_configuration):
+        KubeConfigLoader.load_and_set(self, client_configuration)
+        client_configuration.refresh_api_key = self.refresh_api_key
+
+
+class RefreshConfiguration(Configuration):
+    """
+    Patched Configuration, this subclass takes api key refresh callback hook
+    into account
+    """
+
+    def __init__(self, *args, **kwargs):
+        Configuration.__init__(self, *args, **kwargs)
+        self.refresh_api_key = None
+
+    def get_api_key_with_prefix(self, identifier):
+        if self.refresh_api_key:
+            self.refresh_api_key(self)
+        return Configuration.get_api_key_with_prefix(self, identifier)
+
+
+def _get_kube_config_loader_for_yaml_file(filename, **kwargs) -> Optional[RefreshKubeConfigLoader]:
+    """
+    Adapted from the upstream _get_kube_config_loader_for_yaml_file function, changed
+    KubeConfigLoader to RefreshKubeConfigLoader
+    """
+    with open(filename) as f:
+        return RefreshKubeConfigLoader(
+            config_dict=yaml.safe_load(f),
+            config_base_path=os.path.abspath(os.path.dirname(filename)),
+            **kwargs,
+        )
+
+
+def load_kube_config(client_configuration, config_file=None, context=None):
+    """
+    Adapted from the upstream load_kube_config function, changes:
+        - removed persist_config argument since it's not being used
+        - remove `client_configuration is None` branch since we always pass
+        in client configuration
+    """
+    if config_file is None:
+        config_file = os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)
+
+    loader = _get_kube_config_loader_for_yaml_file(config_file, active_context=context, config_persister=None)
+    loader.load_and_set(client_configuration)
diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
index 5bcf569..7f686b2 100644
--- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst
+++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst
@@ -19,231 +19,6 @@
 Changelog
 ---------
 
-3.1.2
-.....
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Fix mistakenly added install_requires for all providers (#22382)``
-* ``Fix "run_id" k8s and elasticsearch compatibility with Airflow 2.1 (#22385)``
-
-Misc
-~~~~
-
-* ``Remove RefreshConfiguration workaround for K8s token refreshing (#20759)``
-
-3.1.1
-.....
-
-Misc
-~~~~~
-
-* ``Add Trove classifiers in PyPI (Framework :: Apache Airflow :: Provider)``
-
-3.1.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Add map_index label to mapped KubernetesPodOperator (#21916)``
-* ``Change KubePodOperator labels from exeuction_date to run_id (#21960)``
-
-Misc
-~~~~
-
-* ``Support for Python 3.10``
-* ``Fix Kubernetes example with wrong operator casing (#21898)``
-* ``Remove types from KPO docstring (#21826)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-   * ``Add pre-commit check for docstring param types (#21398)``
-
-3.0.2
-.....
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Add missed deprecations for cncf (#20031)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-   * ``Remove ':type' lines now sphinx-autoapi supports typehints (#20951)``
-   * ``Make ''delete_pod'' change more prominent in K8s changelog (#20753)``
-   * ``Fix MyPy Errors for providers: Tableau, CNCF, Apache (#20654)``
-   * ``Add optional features in providers. (#21074)``
-   * ``Add documentation for January 2021 providers release (#21257)``
-
-3.0.1
-.....
-
-
-Misc
-~~~~
-
-* ``Update Kubernetes library version (#18797)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-
-3.0.0
-.....
-
-Breaking changes
-~~~~~~~~~~~~~~~~
-
-* ``Parameter is_delete_operator_pod default is changed to True (#20575)``
-* ``Simplify KubernetesPodOperator (#19572)``
-* ``Move pod_mutation_hook call from PodManager to KubernetesPodOperator (#20596)``
-* ``Rename ''PodLauncher'' to ''PodManager'' (#20576)``
-
-Parameter is_delete_operator_pod has new default
-````````````````````````````````````````````````
-
-Previously, the default for param ``is_delete_operator_pod`` was ``False``, which means that
-after a task runs, its pod is not deleted by the operator and remains on the
-cluster indefinitely.  With this release, we change the default to ``True``.
-
-Notes on changes KubernetesPodOperator and PodLauncher
-``````````````````````````````````````````````````````
-
-.. warning:: Many methods in ``KubernetesPodOperator`` and ``PodLauncher`` have been renamed.
-    If you have subclassed ``KubernetesPodOperator`` you will need to update your subclass to reflect
-    the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``.
-
-Overview
-''''''''
-
-Generally speaking if you did not subclass ``KubernetesPodOperator`` and you didn't use the ``PodLauncher`` class directly,
-then you don't need to worry about this change.  If however you have subclassed ``KubernetesPodOperator``, what
-follows are some notes on the changes in this release.
-
-One of the principal goals of the refactor is to clearly separate the "get or create pod" and
-"wait for pod completion" phases.  Previously the "wait for pod completion" logic would be invoked
-differently depending on whether the operator were to  "attach to an existing pod" (e.g. after a
-worker failure) or "create a new pod" and this resulted in some code duplication and a bit more
-nesting of logic.  With this refactor we encapsulate  the "get or create" step
-into method ``KubernetesPodOperator.get_or_create_pod``, and pull the monitoring and XCom logic up
-into the top level of ``execute`` because it can be the same for "attached" pods and "new" pods.
-
-The ``KubernetesPodOperator.get_or_create_pod`` tries first to find an existing pod using labels
-specific to the task instance (see ``KubernetesPodOperator.find_pod``).
-If one does not exist it ``creates a pod <~.PodManager.create_pod>``.
-
-The "waiting" part of execution has three components.  The first step is to wait for the pod to leave the
-``Pending`` phase (``~.KubernetesPodOperator.await_pod_start``). Next, if configured to do so,
-the operator will follow the base container logs and forward these logs to the task logger until
-the ``base`` container is done. If not configured to harvest the
-logs, the operator will instead ``KubernetesPodOperator.await_container_completion``
-either way, we must await container completion before harvesting xcom. After (optionally) extracting the xcom
-value from the base container, we ``await pod completion <~.PodManager.await_pod_completion>``.
-
-Previously, depending on whether the pod was "reattached to" (e.g. after a worker failure) or
-created anew, the waiting logic may have occurred in either ``handle_pod_overlap`` or ``create_new_pod_for_operator``.
-
-After the pod terminates, we execute different cleanup tasks depending on whether the pod terminated successfully.
-
-If the pod terminates *unsuccessfully*, we attempt to log the pod events ``PodLauncher.read_pod_events>``. If
-additionally the task is configured *not* to delete the pod after termination, we apply a label ``KubernetesPodOperator.patch_already_checked>``
-indicating that the pod failed and should not be "reattached to" in a retry.  If the task is configured
-to delete its pod, we delete it ``KubernetesPodOperator.process_pod_deletion>``.  Finally,
-we raise an AirflowException to fail the task instance.
-
-If the pod terminates successfully, we delete the pod ``KubernetesPodOperator.process_pod_deletion>``
-(if configured to delete the pod) and push XCom (if configured to push XCom).
-
-Details on method renames, refactors, and deletions
-'''''''''''''''''''''''''''''''''''''''''''''''''''
-
-In ``KubernetesPodOperator``:
-
-* Method ``create_pod_launcher`` is converted to cached property ``pod_manager``
-* Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client``
-* Logic to search for an existing pod (e.g. after an airflow worker failure) is moved out of ``execute`` and into method ``find_pod``.
-* Method ``handle_pod_overlap`` is removed. Previously it monitored a "found" pod until completion.  With this change the pod monitoring (and log following) is orchestrated directly from ``execute`` and it is the same  whether it's a "found" pod or a "new" pod. See methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``.
-* Method ``create_pod_request_obj`` is renamed ``build_pod_request_obj``.  It now takes argument ``context`` in order to add TI-specific pod labels; previously they were added after return.
-* Method ``create_labels_for_pod`` is renamed ``_get_ti_pod_labels``.  This method doesn't return *all* labels, but only those specific to the TI. We also add parameter ``include_try_number`` to control the inclusion of this label instead of possibly filtering it out later.
-* Method ``_get_pod_identifying_label_string`` is renamed ``_build_find_pod_label_selector``
-* Method ``_try_numbers_match`` is removed.
-* Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc.  Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``.  Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``.
-
-In class ``PodManager`` (formerly ``PodLauncher``):
-
-* Method ``start_pod`` is removed and split into two methods: ``create_pod`` and ``await_pod_start``.
-* Method ``monitor_pod`` is removed and split into methods ``follow_container_logs``, ``await_container_completion``, ``await_pod_completion``
-* Methods ``pod_not_started``, ``pod_is_running``, ``process_status``, and ``_task_status`` are removed.  These were needed due to the way in which pod ``phase`` was mapped to task instance states; but we no longer do such a mapping and instead deal with pod phases directly and untransformed.
-* Method ``_extract_xcom`` is renamed  ``extract_xcom``.
-* Method ``read_pod_logs`` now takes kwarg ``container_name``
-
-
-Other changes in ``pod_manager.py`` (formerly ``pod_launcher.py``):
-
-* Class ``pod_launcher.PodLauncher`` renamed to ``pod_manager.PodManager``
-* Enum-like class ``PodStatus`` is renamed ``PodPhase``, and the values are no longer lower-cased.
-* The ``airflow.settings.pod_mutation_hook`` is no longer called in
-  ``cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async``. For ``KubernetesPodOperator``,
-  mutation now occurs in ``build_pod_request_obj``.
-* Parameter ``is_delete_operator_pod`` default is changed to ``True`` so that pods are deleted after task
-  completion and not left to accumulate. In practice it seems more common to disable pod deletion only on a
-  temporary basis for debugging purposes and therefore pod deletion is the more sensible default.
-
-Features
-~~~~~~~~
-
-* ``Add params config, in_cluster, and cluster_context to KubernetesHook (#19695)``
-* ``Implement dry_run for KubernetesPodOperator (#20573)``
-* ``Clarify docstring for ''build_pod_request_obj'' in K8s providers (#20574)``
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Fix Volume/VolumeMount KPO DeprecationWarning (#19726)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-     * ``Fix cached_property MyPy declaration and related MyPy errors (#20226)``
-     * ``Use typed Context EVERYWHERE (#20565)``
-     * ``Fix template_fields type to have MyPy friendly Sequence type (#20571)``
-     * ``Even more typing in operators (template_fields/ext) (#20608)``
-     * ``Update documentation for provider December 2021 release (#20523)``
-
-2.2.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Added namespace as a template field in the KPO. (#19718)``
-* ``Decouple name randomization from name kwarg (#19398)``
-
-Bug Fixes
-~~~~~~~~~
-
-* ``Checking event.status.container_statuses before filtering (#19713)``
-* ``Coalesce 'extra' params to None in KubernetesHook (#19694)``
-* ``Change to correct type in KubernetesPodOperator (#19459)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-   * ``Fix duplicate changelog entries (#19759)``
-
-2.1.0
-.....
-
-Features
-~~~~~~~~
-
-* ``Add more type hints to PodLauncher (#18928)``
-* ``Add more information to PodLauncher timeout error (#17953)``
-
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
-   * ``Update docstring to let users use 'node_selector' (#19057)``
-   * ``Add pre-commit hook for common misspelling check in files (#18964)``
-
 2.0.3
 .....
 
@@ -269,8 +44,7 @@ Bug Fixes
 * ``Fix using XCom with ''KubernetesPodOperator'' (#17760)``
 * ``Import Hooks lazily individually in providers manager (#17682)``
 
-.. Below changes are excluded from the changelog. Move them to
-   appropriate section above if needed. Do not delete the lines(!):
+.. Review and move the new changes to one of the sections above:
    * ``Fix messed-up changelog in 3 providers (#17380)``
    * ``Fix static checks (#17256)``
    * ``Update spark_kubernetes.py (#17237)``
@@ -291,7 +65,10 @@ Bug Fixes
 
 .. Below changes are excluded from the changelog. Move them to
    appropriate section above if needed. Do not delete the lines(!):
+   * ``Fixed wrongly escaped characters in amazon's changelog (#17020)``
    * ``Simplify 'default_args' in Kubernetes example DAGs (#16870)``
+   * ``Enable using custom pod launcher in Kubernetes Pod Operator (#16945)``
+   * ``Prepare documentation for July release of providers. (#17015)``
    * ``Updating task dependencies (#16624)``
    * ``Removes pylint from our toolchain (#16682)``
    * ``Prepare documentation for July release of providers. (#17015)``
diff --git a/airflow/providers/cncf/kubernetes/__init__.py b/airflow/providers/cncf/kubernetes/__init__.py
index 0998e31..217e5db 100644
--- a/airflow/providers/cncf/kubernetes/__init__.py
+++ b/airflow/providers/cncf/kubernetes/__init__.py
@@ -15,30 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import sys
-
-if sys.version_info < (3, 7):
-    # This is needed because the Python Kubernetes client >= 12.0 contains a logging object, meaning that
-    # v1.Pod et al. are not pickleable on Python 3.6.
-
-    # Python 3.7 added this via https://bugs.python.org/issue30520 in 2017 -- but Python 3.6 doesn't have this
-    # method.
-
-    # This is duplicated/backported from airflow.logging_config in 2.2, but by having it here as well it means
-    # that we can update the version used in this provider and have it work for older versions
-    import copyreg
-    import logging
-
-    def _reduce_Logger(logger):
-        if logging.getLogger(logger.name) is not logger:
-            import pickle
-
-            raise pickle.PicklingError('logger cannot be pickled')
-        return logging.getLogger, (logger.name,)
-
-    def _reduce_RootLogger(logger):
-        return logging.getLogger, ()
-
-    if logging.Logger not in copyreg.dispatch_table:
-        copyreg.pickle(logging.Logger, _reduce_Logger)
-        copyreg.pickle(logging.RootLogger, _reduce_RootLogger)
diff --git a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
index bf2b832..4c6404f 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py
@@ -21,16 +21,18 @@ from typing import List
 from kubernetes.client import ApiClient, models as k8s
 
 from airflow.exceptions import AirflowException
+from airflow.providers.cncf.kubernetes.backcompat.pod import Port, Resources
+from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
 
 
-def _convert_kube_model_object(obj, new_class):
+def _convert_kube_model_object(obj, old_class, new_class):
     convert_op = getattr(obj, "to_k8s_client_obj", None)
     if callable(convert_op):
         return obj.to_k8s_client_obj()
     elif isinstance(obj, new_class):
         return obj
     else:
-        raise AirflowException(f"Expected {new_class}, got {type(obj)}")
+        raise AirflowException(f"Expected {old_class} or {new_class}, got {type(obj)}")
 
 
 def _convert_from_dict(obj, new_class):
@@ -50,7 +52,9 @@ def convert_volume(volume) -> k8s.V1Volume:
     :param volume:
     :return: k8s.V1Volume
     """
-    return _convert_kube_model_object(volume, k8s.V1Volume)
+    from airflow.providers.cncf.kubernetes.backcompat.volume import Volume
+
+    return _convert_kube_model_object(volume, Volume, k8s.V1Volume)
 
 
 def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
@@ -60,7 +64,9 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
     :param volume_mount:
     :return: k8s.V1VolumeMount
     """
-    return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount)
+    from airflow.providers.cncf.kubernetes.backcompat.volume_mount import VolumeMount
+
+    return _convert_kube_model_object(volume_mount, VolumeMount, k8s.V1VolumeMount)
 
 
 def convert_resources(resources) -> k8s.V1ResourceRequirements:
@@ -71,10 +77,8 @@ def convert_resources(resources) -> k8s.V1ResourceRequirements:
     :return: k8s.V1ResourceRequirements
     """
     if isinstance(resources, dict):
-        from airflow.providers.cncf.kubernetes.backcompat.pod import Resources
-
         resources = Resources(**resources)
-    return _convert_kube_model_object(resources, k8s.V1ResourceRequirements)
+    return _convert_kube_model_object(resources, Resources, k8s.V1ResourceRequirements)
 
 
 def convert_port(port) -> k8s.V1ContainerPort:
@@ -84,7 +88,7 @@ def convert_port(port) -> k8s.V1ContainerPort:
     :param port:
     :return: k8s.V1ContainerPort
     """
-    return _convert_kube_model_object(port, k8s.V1ContainerPort)
+    return _convert_kube_model_object(port, Port, k8s.V1ContainerPort)
 
 
 def convert_env_vars(env_vars) -> List[k8s.V1EnvVar]:
@@ -112,7 +116,7 @@ def convert_pod_runtime_info_env(pod_runtime_info_envs) -> k8s.V1EnvVar:
     :param pod_runtime_info_envs:
     :return:
     """
-    return _convert_kube_model_object(pod_runtime_info_envs, k8s.V1EnvVar)
+    return _convert_kube_model_object(pod_runtime_info_envs, PodRuntimeInfoEnv, k8s.V1EnvVar)
 
 
 def convert_image_pull_secrets(image_pull_secrets) -> List[k8s.V1LocalObjectReference]:
diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod.py b/airflow/providers/cncf/kubernetes/backcompat/pod.py
index 7f18117..30a7128 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/pod.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/pod.py
@@ -14,29 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Classes for interacting with Kubernetes API.
-
-This module is deprecated. Please use :mod:`kubernetes.client.models.V1ResourceRequirements`
-and :mod:`kubernetes.client.models.V1ContainerPort`.
-"""
-
-import warnings
+"""Classes for interacting with Kubernetes API"""
 
 from kubernetes.client import models as k8s
 
-warnings.warn(
-    (
-        "This module is deprecated. Please use `kubernetes.client.models.V1ResourceRequirements`"
-        " and `kubernetes.client.models.V1ContainerPort`."
-    ),
-    DeprecationWarning,
-    stacklevel=2,
-)
-
 
 class Resources:
-    """backwards compat for Resources."""
+    """backwards compat for Resources"""
 
     __slots__ = (
         'request_memory',
@@ -50,12 +34,19 @@ class Resources:
 
     """
     :param request_memory: requested memory
+    :type request_memory: str
     :param request_cpu: requested CPU number
+    :type request_cpu: float | str
     :param request_ephemeral_storage: requested ephemeral storage
+    :type request_ephemeral_storage: str
     :param limit_memory: limit for memory usage
+    :type limit_memory: str
     :param limit_cpu: Limit for CPU used
+    :type limit_cpu: float | str
     :param limit_gpu: Limits for GPU used
+    :type limit_gpu: int
     :param limit_ephemeral_storage: Limit for ephemeral storage
+    :type limit_ephemeral_storage: float | str
     """
 
     def __init__(
diff --git a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
index f08aecf..f76e0d7 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/pod_runtime_info_env.py
@@ -14,25 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-"""
-Classes for interacting with Kubernetes API.
-
-This module is deprecated. Please use :mod:`kubernetes.client.models.V1EnvVar`.
-"""
-
-import warnings
+"""Classes for interacting with Kubernetes API"""
 
 import kubernetes.client.models as k8s
 
-warnings.warn(
-    "This module is deprecated. Please use `kubernetes.client.models.V1EnvVar`.",
-    DeprecationWarning,
-    stacklevel=2,
-)
-
 
 class PodRuntimeInfoEnv:
-    """Defines Pod runtime information as environment variable."""
+    """Defines Pod runtime information as environment variable"""
 
     def __init__(self, name, field_path):
         """
@@ -40,7 +28,9 @@ class PodRuntimeInfoEnv:
         Full list of options can be found in kubernetes documentation.
 
         :param name: the name of the environment variable
+        :type: name: str
         :param field_path: path to pod runtime info. Ex: metadata.namespace | status.podIP
+        :type: field_path: str
         """
         self.name = name
         self.field_path = field_path
diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume.py b/airflow/providers/cncf/kubernetes/backcompat/volume.py
index c51ce8a..e5b4d00 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/volume.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/volume.py
@@ -35,8 +35,10 @@ class Volume:
         and Persistent Volumes
 
         :param name: the name of the volume mount
+        :type name: str
         :param configs: dictionary of any features needed for volume. We purposely keep this
             vague since there are multiple volume types with changing configs.
+        :type configs: dict
         """
         self.name = name
         self.configs = configs
diff --git a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
index f9faed9..b77ab47 100644
--- a/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
+++ b/airflow/providers/cncf/kubernetes/backcompat/volume_mount.py
@@ -38,9 +38,13 @@ class VolumeMount:
         running container.
 
         :param name: the name of the volume mount
+        :type name: str
         :param mount_path:
+        :type mount_path: str
         :param sub_path: subpath within the volume mount
+        :type sub_path: Optional[str]
         :param read_only: whether to access pod with read-only mode
+        :type read_only: bool
         """
         self.name = name
         self.mount_path = mount_path
diff --git a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py b/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
deleted file mode 100644
index b65dae9..0000000
--- a/airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""
-This is an example dag for using the KubernetesPodOperator.
-"""
-
-from datetime import datetime
-
-from kubernetes.client import models as k8s
-
-from airflow import DAG
-from airflow.kubernetes.secret import Secret
-from airflow.operators.bash import BashOperator
-from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
-
-# [START howto_operator_k8s_cluster_resources]
-secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
-secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
-secret_all_keys = Secret('env', None, 'airflow-secrets-2')
-volume_mount = k8s.V1VolumeMount(
-    name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True
-)
-
-configmaps = [
-    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')),
-    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')),
-]
-
-volume = k8s.V1Volume(
-    name='test-volume',
-    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
-)
-
-port = k8s.V1ContainerPort(name='http', container_port=80)
-
-init_container_volume_mounts = [
-    k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True)
-]
-
-init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')]
-
-init_container = k8s.V1Container(
-    name="init-container",
-    image="ubuntu:16.04",
-    env=init_environments,
-    volume_mounts=init_container_volume_mounts,
-    command=["bash", "-cx"],
-    args=["echo 10"],
-)
-
-affinity = k8s.V1Affinity(
-    node_affinity=k8s.V1NodeAffinity(
-        preferred_during_scheduling_ignored_during_execution=[
-            k8s.V1PreferredSchedulingTerm(
-                weight=1,
-                preference=k8s.V1NodeSelectorTerm(
-                    match_expressions=[
-                        k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
-                    ]
-                ),
-            )
-        ]
-    ),
-    pod_affinity=k8s.V1PodAffinity(
-        required_during_scheduling_ignored_during_execution=[
-            k8s.V1WeightedPodAffinityTerm(
-                weight=1,
-                pod_affinity_term=k8s.V1PodAffinityTerm(
-                    label_selector=k8s.V1LabelSelector(
-                        match_expressions=[
-                            k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
-                        ]
-                    ),
-                    topology_key="failure-domain.beta.kubernetes.io/zone",
-                ),
-            )
-        ]
-    ),
-)
-
-tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]
-
-# [END howto_operator_k8s_cluster_resources]
-
-
-with DAG(
-    dag_id='example_kubernetes_operator',
-    schedule_interval=None,
-    start_date=datetime(2021, 1, 1),
-    tags=['example'],
-) as dag:
-    k = KubernetesPodOperator(
-        namespace='default',
-        image="ubuntu:16.04",
-        cmds=["bash", "-cx"],
-        arguments=["echo", "10"],
-        labels={"foo": "bar"},
-        secrets=[secret_file, secret_env, secret_all_keys],
-        ports=[port],
-        volumes=[volume],
-        volume_mounts=[volume_mount],
-        env_from=configmaps,
-        name="airflow-test-pod",
-        task_id="task",
-        affinity=affinity,
-        is_delete_operator_pod=True,
-        hostnetwork=False,
-        tolerations=tolerations,
-        init_containers=[init_container],
-        priority_class_name="medium",
-    )
-
-    # [START howto_operator_k8s_private_image]
-    quay_k8s = KubernetesPodOperator(
-        namespace='default',
-        image='quay.io/apache/bash',
-        image_pull_secrets=[k8s.V1LocalObjectReference('testquay')],
-        cmds=["bash", "-cx"],
-        arguments=["echo", "10", "echo pwd"],
-        labels={"foo": "bar"},
-        name="airflow-private-image-pod",
-        is_delete_operator_pod=True,
-        in_cluster=True,
-        task_id="task-two",
-        get_logs=True,
-    )
-    # [END howto_operator_k8s_private_image]
-
-    # [START howto_operator_k8s_write_xcom]
-    write_xcom = KubernetesPodOperator(
-        namespace='default',
-        image='alpine',
-        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
-        name="write-xcom",
-        do_xcom_push=True,
-        is_delete_operator_pod=True,
-        in_cluster=True,
-        task_id="write-xcom",
-        get_logs=True,
-    )
-
-    pod_task_xcom_result = BashOperator(
-        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
-        task_id="pod_task_xcom_result",
-    )
-    # [END howto_operator_k8s_write_xcom]
-
-    write_xcom >> pod_task_xcom_result
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 3830503..e230dba 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -14,21 +14,19 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import sys
 import tempfile
 from typing import Any, Dict, Generator, Optional, Tuple, Union
 
-if sys.version_info >= (3, 8):
+try:
     from functools import cached_property
-else:
+except ImportError:
     from cached_property import cached_property
-
 from kubernetes import client, config, watch
 
 try:
     import airflow.utils.yaml as yaml
 except ImportError:
-    import yaml  # type: ignore[no-redef]
+    import yaml
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base import BaseHook
@@ -61,6 +59,7 @@ class KubernetesHook(BaseHook):
 
     :param conn_id: The :ref:`kubernetes connection <howto/connection:kubernetes>`
         to Kubernetes cluster.
+    :type conn_id: str
     """
 
     conn_name_attr = 'kubernetes_conn_id'
@@ -86,13 +85,10 @@ class KubernetesHook(BaseHook):
             "extra__kubernetes__namespace": StringField(
                 lazy_gettext('Namespace'), widget=BS3TextFieldWidget()
             ),
-            "extra__kubernetes__cluster_context": StringField(
-                lazy_gettext('Cluster context'), widget=BS3TextFieldWidget()
-            ),
         }
 
     @staticmethod
-    def get_ui_field_behaviour() -> Dict[str, Any]:
+    def get_ui_field_behaviour() -> Dict:
         """Returns custom field behaviour"""
         return {
             "hidden_fields": ['host', 'schema', 'login', 'password', 'port', 'extra'],
@@ -100,49 +96,25 @@ class KubernetesHook(BaseHook):
         }
 
     def __init__(
-        self,
-        conn_id: Optional[str] = default_conn_name,
-        client_configuration: Optional[client.Configuration] = None,
-        cluster_context: Optional[str] = None,
-        config_file: Optional[str] = None,
-        in_cluster: Optional[bool] = None,
+        self, conn_id: str = default_conn_name, client_configuration: Optional[client.Configuration] = None
     ) -> None:
         super().__init__()
         self.conn_id = conn_id
         self.client_configuration = client_configuration
-        self.cluster_context = cluster_context
-        self.config_file = config_file
-        self.in_cluster = in_cluster
-
-    @staticmethod
-    def _coalesce_param(*params):
-        for param in params:
-            if param is not None:
-                return param
 
     def get_conn(self) -> Any:
         """Returns kubernetes api session for use with requests"""
-        if self.conn_id:
-            connection = self.get_connection(self.conn_id)
-            extras = connection.extra_dejson
-        else:
-            extras = {}
-        in_cluster = self._coalesce_param(
-            self.in_cluster, extras.get("extra__kubernetes__in_cluster") or None
-        )
-        cluster_context = self._coalesce_param(
-            self.cluster_context, extras.get("extra__kubernetes__cluster_context") or None
-        )
-        kubeconfig_path = self._coalesce_param(
-            self.config_file, extras.get("extra__kubernetes__kube_config_path") or None
-        )
-        kubeconfig = extras.get("extra__kubernetes__kube_config") or None
+        connection = self.get_connection(self.conn_id)
+        extras = connection.extra_dejson
+        in_cluster = extras.get("extra__kubernetes__in_cluster")
+        kubeconfig_path = extras.get("extra__kubernetes__kube_config_path")
+        kubeconfig = extras.get("extra__kubernetes__kube_config")
         num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o])
 
         if num_selected_configuration > 1:
             raise AirflowException(
-                "Invalid connection configuration. Options kube_config_path, "
-                "kube_config, in_cluster are mutually exclusive. "
+                "Invalid connection configuration. Options extra__kubernetes__kube_config_path, "
+                "extra__kubernetes__kube_config, extra__kubernetes__in_cluster are mutually exclusive. "
                 "You can only use one option at a time."
             )
         if in_cluster:
@@ -153,9 +125,7 @@ class KubernetesHook(BaseHook):
         if kubeconfig_path is not None:
             self.log.debug("loading kube_config from: %s", kubeconfig_path)
             config.load_kube_config(
-                config_file=kubeconfig_path,
-                client_configuration=self.client_configuration,
-                context=cluster_context,
+                config_file=kubeconfig_path, client_configuration=self.client_configuration
             )
             return client.ApiClient()
 
@@ -165,17 +135,12 @@ class KubernetesHook(BaseHook):
                 temp_config.write(kubeconfig.encode())
                 temp_config.flush()
                 config.load_kube_config(
-                    config_file=temp_config.name,
-                    client_configuration=self.client_configuration,
-                    context=cluster_context,
+                    config_file=temp_config.name, client_configuration=self.client_configuration
                 )
             return client.ApiClient()
 
         self.log.debug("loading kube_config from: default file")
-        config.load_kube_config(
-            client_configuration=self.client_configuration,
-            context=cluster_context,
-        )
+        config.load_kube_config(client_configuration=self.client_configuration)
         return client.ApiClient()
 
     @cached_property
@@ -183,10 +148,6 @@ class KubernetesHook(BaseHook):
         """Cached Kubernetes API client"""
         return self.get_conn()
 
-    @cached_property
-    def core_v1_client(self):
-        return client.CoreV1Api(api_client=self.api_client)
-
     def create_custom_object(
         self, group: str, version: str, plural: str, body: Union[str, dict], namespace: Optional[str] = None
     ):
@@ -194,10 +155,15 @@ class KubernetesHook(BaseHook):
         Creates custom resource definition object in Kubernetes
 
         :param group: api group
+        :type group: str
         :param version: api version
+        :type version: str
         :param plural: api plural
+        :type plural: str
         :param body: crd object definition
+        :type body: Union[str, dict]
         :param namespace: kubernetes namespace
+        :type namespace: str
         """
         api = client.CustomObjectsApi(self.api_client)
         if namespace is None:
@@ -220,10 +186,15 @@ class KubernetesHook(BaseHook):
         Get custom resource definition object from Kubernetes
 
         :param group: api group
+        :type group: str
         :param version: api version
+        :type version: str
         :param plural: api plural
+        :type plural: str
         :param name: crd object name
+        :type name: str
         :param namespace: kubernetes namespace
+        :type namespace: str
         """
         api = client.CustomObjectsApi(self.api_client)
         if namespace is None:
@@ -236,14 +207,12 @@ class KubernetesHook(BaseHook):
         except client.rest.ApiException as e:
             raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n")
 
-    def get_namespace(self) -> Optional[str]:
+    def get_namespace(self) -> str:
         """Returns the namespace that defined in the connection"""
-        if self.conn_id:
-            connection = self.get_connection(self.conn_id)
-            extras = connection.extra_dejson
-            namespace = extras.get("extra__kubernetes__namespace", "default")
-            return namespace
-        return None
+        connection = self.get_connection(self.conn_id)
+        extras = connection.extra_dejson
+        namespace = extras.get("extra__kubernetes__namespace", "default")
+        return namespace
 
     def get_pod_log_stream(
         self,
@@ -255,8 +224,10 @@ class KubernetesHook(BaseHook):
         Retrieves a log stream for a container in a kubernetes pod.
 
         :param pod_name: pod name
+        :type pod_name: str
         :param container: container name
         :param namespace: kubernetes namespace
+        :type namespace: str
         """
         api = client.CoreV1Api(self.api_client)
         watcher = watch.Watch()
@@ -280,8 +251,10 @@ class KubernetesHook(BaseHook):
         Retrieves a container's log from the specified pod.
 
         :param pod_name: pod name
+        :type pod_name: str
         :param container: container name
         :param namespace: kubernetes namespace
+        :type namespace: str
         """
         api = client.CoreV1Api(self.api_client)
         return api.read_namespaced_pod_log(
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index dd127fe..747f8b0 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -15,16 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 """Executes task in a Kubernetes POD"""
-import json
-import logging
 import re
-import sys
 import warnings
-from contextlib import AbstractContextManager
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
 
 from kubernetes.client import CoreV1Api, models as k8s
 
+try:
+    import airflow.utils.yaml as yaml
+except ImportError:
+    import yaml
+
 from airflow.exceptions import AirflowException
 from airflow.kubernetes import kube_client, pod_generator
 from airflow.kubernetes.pod_generator import PodGenerator
@@ -42,27 +43,15 @@ from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters im
     convert_volume,
     convert_volume_mount,
 )
-from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
-from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase
-from airflow.settings import pod_mutation_hook
-from airflow.utils import yaml
+from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv
+from airflow.providers.cncf.kubernetes.utils import pod_launcher, xcom_sidecar
 from airflow.utils.helpers import validate_key
+from airflow.utils.state import State
 from airflow.version import version as airflow_version
 
-if sys.version_info >= (3, 8):
-    from functools import cached_property
-else:
-    from cached_property import cached_property
-
 if TYPE_CHECKING:
     import jinja2
 
-    from airflow.utils.context import Context
-
-
-class PodReattachFailure(AirflowException):
-    """When we expect to be able to find a pod but cannot."""
-
 
 class KubernetesPodOperator(BaseOperator):
     """
@@ -79,66 +68,101 @@ class KubernetesPodOperator(BaseOperator):
         simplifies the authorization process.
 
     :param namespace: the namespace to run within kubernetes.
+    :type namespace: str
     :param image: Docker image you wish to launch. Defaults to hub.docker.com,
         but fully qualified URLS will point to custom repositories. (templated)
+    :type image: str
     :param name: name of the pod in which the task will run, will be used (plus a random
-        suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain,
-        containing only [a-z0-9.-]).
-    :param random_name_suffix: if True, will generate a random suffix.
+        suffix) to generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
+    :type name: str
     :param cmds: entrypoint of the container. (templated)
         The docker images's entrypoint is used if this is not provided.
+    :type cmds: list[str]
     :param arguments: arguments of the entrypoint. (templated)
         The docker image's CMD is used if this is not provided.
-    :param ports: ports for the launched pod.
-    :param volume_mounts: volumeMounts for the launched pod.
-    :param volumes: volumes for the launched pod. Includes ConfigMaps and PersistentVolumes.
+    :type arguments: list[str]
+    :param ports: ports for launched pod.
+    :type ports: list[k8s.V1ContainerPort]
+    :param volume_mounts: volumeMounts for launched pod.
+    :type volume_mounts: list[k8s.V1VolumeMount]
+    :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes.
+    :type volumes: list[k8s.V1Volume]
     :param env_vars: Environment variables initialized in the container. (templated)
+    :type env_vars: list[k8s.V1EnvVar]
     :param secrets: Kubernetes secrets to inject in the container.
         They can be exposed as environment vars or files in a volume.
+    :type secrets: list[airflow.kubernetes.secret.Secret]
     :param in_cluster: run kubernetes client with in_cluster configuration.
+    :type in_cluster: bool
     :param cluster_context: context that points to kubernetes cluster.
         Ignored when in_cluster is True. If None, current-context is used.
+    :type cluster_context: str
     :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor
+    :type reattach_on_restart: bool
     :param labels: labels to apply to the Pod. (templated)
+    :type labels: dict
     :param startup_timeout_seconds: timeout in seconds to startup the pod.
+    :type startup_timeout_seconds: int
     :param get_logs: get the stdout of the container as logs of the tasks.
+    :type get_logs: bool
     :param image_pull_policy: Specify a policy to cache or always pull an image.
+    :type image_pull_policy: str
     :param annotations: non-identifying metadata you can attach to the Pod.
         Can be a large range of data, and can include characters
         that are not permitted by labels.
-    :param resources: resources for the launched pod.
-    :param affinity: affinity scheduling rules for the launched pod.
+    :type annotations: dict
+    :param resources: A dict containing resources requests and limits.
+        Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
+        and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources.
+        See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container
+    :type resources: k8s.V1ResourceRequirements
+    :param affinity: A dict containing a group of affinity scheduling rules.
+    :type affinity: k8s.V1Affinity
     :param config_file: The path to the Kubernetes config file. (templated)
         If not specified, default value is ``~/.kube/config``
-    :param node_selector: A dict containing a group of scheduling rules.
+    :type config_file: str
+    :param node_selectors: A dict containing a group of scheduling rules.
+    :type node_selectors: dict
     :param image_pull_secrets: Any image pull secrets to be given to the pod.
         If more than one secret is required, provide a
         comma separated list: secret_a,secret_b
+    :type image_pull_secrets: List[k8s.V1LocalObjectReference]
     :param service_account_name: Name of the service account
+    :type service_account_name: str
     :param is_delete_operator_pod: What to do when the pod reaches its final
-        state, or the execution is interrupted. If True (default), delete the
-        pod; if False, leave the pod.
+        state, or the execution is interrupted.
+        If False (default): do nothing, If True: delete the pod
+    :type is_delete_operator_pod: bool
     :param hostnetwork: If True enable host networking on the pod.
+    :type hostnetwork: bool
     :param tolerations: A list of kubernetes tolerations.
+    :type tolerations: List[k8s.V1Toleration]
     :param security_context: security options the pod should run with (PodSecurityContext).
+    :type security_context: dict
     :param dnspolicy: dnspolicy for the pod.
+    :type dnspolicy: str
     :param schedulername: Specify a schedulername for the pod
+    :type schedulername: str
     :param full_pod_spec: The complete podSpec
+    :type full_pod_spec: kubernetes.client.models.V1Pod
     :param init_containers: init container for the launched Pod
+    :type init_containers: list[kubernetes.client.models.V1Container]
     :param log_events_on_failure: Log the pod's events if a failure occurs
+    :type log_events_on_failure: bool
     :param do_xcom_push: If True, the content of the file
         /airflow/xcom/return.json in the container will also be pushed to an
         XCom when the container completes.
+    :type do_xcom_push: bool
     :param pod_template_file: path to pod template file (templated)
+    :type pod_template_file: str
     :param priority_class_name: priority class name for the launched Pod
+    :type priority_class_name: str
     :param termination_grace_period: Termination grace period if task killed in UI,
         defaults to kubernetes default
+    :type termination_grace_period: int
     """
 
-    BASE_CONTAINER_NAME = 'base'
-    POD_CHECKED_KEY = 'already_checked'
-
-    template_fields: Sequence[str] = (
+    template_fields: Iterable[str] = (
         'image',
         'cmds',
         'arguments',
@@ -146,16 +170,16 @@ class KubernetesPodOperator(BaseOperator):
         'labels',
         'config_file',
         'pod_template_file',
-        'namespace',
     )
 
+    # fmt: off
     def __init__(
+        # fmt: on
         self,
         *,
         namespace: Optional[str] = None,
         image: Optional[str] = None,
         name: Optional[str] = None,
-        random_name_suffix: Optional[bool] = True,
         cmds: Optional[List[str]] = None,
         arguments: Optional[List[str]] = None,
         ports: Optional[List[k8s.V1ContainerPort]] = None,
@@ -179,7 +203,7 @@ class KubernetesPodOperator(BaseOperator):
         node_selector: Optional[dict] = None,
         image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None,
         service_account_name: Optional[str] = None,
-        is_delete_operator_pod: bool = True,
+        is_delete_operator_pod: bool = False,
         hostnetwork: bool = False,
         tolerations: Optional[List[k8s.V1Toleration]] = None,
         security_context: Optional[Dict] = None,
@@ -191,9 +215,9 @@ class KubernetesPodOperator(BaseOperator):
         do_xcom_push: bool = False,
         pod_template_file: Optional[str] = None,
         priority_class_name: Optional[str] = None,
-        pod_runtime_info_envs: Optional[List[k8s.V1EnvVar]] = None,
+        pod_runtime_info_envs: List[PodRuntimeInfoEnv] = None,
         termination_grace_period: Optional[int] = None,
-        configmaps: Optional[List[str]] = None,
+        configmaps: Optional[str] = None,
         **kwargs,
     ) -> None:
         if kwargs.get('xcom_push') is not None:
@@ -240,9 +264,8 @@ class KubernetesPodOperator(BaseOperator):
         self.service_account_name = service_account_name
         self.is_delete_operator_pod = is_delete_operator_pod
         self.hostnetwork = hostnetwork
-        self.tolerations = (
-            [convert_toleration(toleration) for toleration in tolerations] if tolerations else []
-        )
+        self.tolerations = [convert_toleration(toleration) for toleration in tolerations] \
+            if tolerations else []
         self.security_context = security_context or {}
         self.dnspolicy = dnspolicy
         self.schedulername = schedulername
@@ -252,15 +275,14 @@ class KubernetesPodOperator(BaseOperator):
         self.priority_class_name = priority_class_name
         self.pod_template_file = pod_template_file
         self.name = self._set_name(name)
-        self.random_name_suffix = random_name_suffix
         self.termination_grace_period = termination_grace_period
-        self.pod_request_obj: Optional[k8s.V1Pod] = None
-        self.pod: Optional[k8s.V1Pod] = None
+        self.client: CoreV1Api = None
+        self.pod: k8s.V1Pod = None
 
     def _render_nested_template_fields(
         self,
         content: Any,
-        context: 'Context',
+        context: Dict,
         jinja_env: "jinja2.Environment",
         seen_oids: set,
     ) -> None:
@@ -269,31 +291,27 @@ class KubernetesPodOperator(BaseOperator):
             self._do_render_template_fields(content, ('value', 'name'), context, jinja_env, seen_oids)
             return
 
-        super()._render_nested_template_fields(content, context, jinja_env, seen_oids)
+        super()._render_nested_template_fields(
+            content,
+            context,
+            jinja_env,
+            seen_oids
+        )
 
     @staticmethod
-    def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool = True) -> dict:
+    def create_labels_for_pod(context) -> dict:
         """
         Generate labels for the pod to track the pod in case of Operator crash
 
         :param context: task context provided by airflow DAG
         :return: dict
         """
-        if not context:
-            return {}
-
-        ti = context['ti']
-        run_id = context['run_id']
-
-        labels = {'dag_id': ti.dag_id, 'task_id': ti.task_id, 'run_id': run_id}
-
-        # If running on Airflow 2.3+:
-        map_index = getattr(ti, 'map_index', -1)
-        if map_index >= 0:
-            labels['map_index'] = map_index
-
-        if include_try_number:
-            labels.update(try_number=ti.try_number)
+        labels = {
+            'dag_id': context['dag'].dag_id,
+            'task_id': context['task'].task_id,
+            'execution_date': context['ts'],
+            'try_number': context['ti'].try_number,
+        }
         # In the case of sub dags this is just useful
         if context['dag'].is_subdag:
             labels['parent_dag_id'] = context['dag'].parent_dag.dag_id
@@ -304,127 +322,101 @@ class KubernetesPodOperator(BaseOperator):
             labels[label_id] = safe_label
         return labels
 
-    @cached_property
-    def pod_manager(self) -> PodManager:
-        return PodManager(kube_client=self.client)
+    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
+        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
 
-    @cached_property
-    def client(self) -> CoreV1Api:
-        # todo: use airflow Connection / hook to authenticate to the cluster
-        kwargs: Dict[str, Any] = dict(
-            cluster_context=self.cluster_context,
-            config_file=self.config_file,
-        )
-        if self.in_cluster is not None:
-            kwargs.update(in_cluster=self.in_cluster)
-        return kube_client.get_kube_client(**kwargs)
-
-    def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
-        """Returns an already-running pod for this task instance if one exists."""
-        label_selector = self._build_find_pod_label_selector(context)
-        pod_list = self.client.list_namespaced_pod(
-            namespace=namespace,
-            label_selector=label_selector,
-        ).items
-
-        pod = None
-        num_pods = len(pod_list)
-        if num_pods > 1:
-            raise AirflowException(f'More than one pod running with labels {label_selector}')
-        elif num_pods == 1:
-            pod = pod_list[0]
-            self.log.info("Found matching pod %s with labels %s", pod.metadata.name, pod.metadata.labels)
-            self.log.info("`try_number` of task_instance: %s", context['ti'].try_number)
-            self.log.info("`try_number` of pod: %s", pod.metadata.labels['try_number'])
-        return pod
+    def execute(self, context) -> Optional[str]:
+        try:
+            if self.in_cluster is not None:
+                client = kube_client.get_kube_client(
+                    in_cluster=self.in_cluster,
+                    cluster_context=self.cluster_context,
+                    config_file=self.config_file,
+                )
+            else:
+                client = kube_client.get_kube_client(
+                    cluster_context=self.cluster_context, config_file=self.config_file
+                )
 
-    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
-        if self.reattach_on_restart:
-            pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
-            if pod:
-                return pod
-        self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
-        self.pod_manager.create_pod(pod=pod_request_obj)
-        return pod_request_obj
+            self.client = client
 
-    def await_pod_start(self, pod):
-        try:
-            self.pod_manager.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds)
-        except PodLaunchFailedException:
-            if self.log_events_on_failure:
-                for event in self.pod_manager.read_pod_events(pod).items:
-                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
-            raise
+            self.pod = self.create_pod_request_obj()
+            self.namespace = self.pod.metadata.namespace
 
-    def extract_xcom(self, pod):
-        """Retrieves xcom value and kills xcom sidecar container"""
-        result = self.pod_manager.extract_xcom(pod)
-        self.log.info("xcom result: \n%s", result)
-        return json.loads(result)
+            # Add combination of labels to uniquely identify a running pod
+            labels = self.create_labels_for_pod(context)
 
-    def execute(self, context: 'Context'):
-        remote_pod = None
-        try:
-            self.pod_request_obj = self.build_pod_request_obj(context)
-            self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
-                pod_request_obj=self.pod_request_obj,
-                context=context,
-            )
-            self.await_pod_start(pod=self.pod)
+            label_selector = self._get_pod_identifying_label_string(labels)
 
-            if self.get_logs:
-                self.pod_manager.fetch_container_logs(
-                    pod=self.pod,
-                    container_name=self.BASE_CONTAINER_NAME,
-                    follow=True,
-                )
-            else:
-                self.pod_manager.await_container_completion(
-                    pod=self.pod, container_name=self.BASE_CONTAINER_NAME
+            pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector)
+
+            if len(pod_list.items) > 1 and self.reattach_on_restart:
+                raise AirflowException(
+                    f'More than one pod running with labels: {label_selector}'
                 )
 
-            if self.do_xcom_push:
-                result = self.extract_xcom(pod=self.pod)
-            remote_pod = self.pod_manager.await_pod_completion(self.pod)
-        finally:
-            self.cleanup(
-                pod=self.pod or self.pod_request_obj,
-                remote_pod=remote_pod,
-            )
-        ti = context['ti']
-        ti.xcom_push(key='pod_name', value=self.pod.metadata.name)
-        ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace)
-        if self.do_xcom_push:
+            launcher = self.create_pod_launcher()
+
+            if len(pod_list.items) == 1:
+                try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
+                final_state, remote_pod, result = self.handle_pod_overlap(
+                    labels, try_numbers_match, launcher, pod_list.items[0]
+                )
+            else:
+                self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
+                final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
+            if final_state != State.SUCCESS:
+                raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
+            context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name)
+            context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace)
             return result
+        except AirflowException as ex:
+            raise AirflowException(f'Pod Launching failed: {ex}')
 
-    def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
-        pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
-        if pod_phase != PodPhase.SUCCEEDED:
-            if self.log_events_on_failure:
-                with _suppress(Exception):
-                    for event in self.pod_manager.read_pod_events(pod).items:
-                        self.log.error("Pod Event: %s - %s", event.reason, event.message)
-            if not self.is_delete_operator_pod:
-                with _suppress(Exception):
-                    self.patch_already_checked(pod)
-            with _suppress(Exception):
-                self.process_pod_deletion(pod)
-            raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}')
-        else:
-            with _suppress(Exception):
-                self.process_pod_deletion(pod)
+    def handle_pod_overlap(
+        self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
+    ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
+        """
+
+        In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
+        this function will either continue to monitor the existing pod or launch a new pod
+        based on the `reattach_on_restart` parameter.
 
-    def process_pod_deletion(self, pod):
-        if self.is_delete_operator_pod:
-            self.log.info("Deleting pod: %s", pod.metadata.name)
-            self.pod_manager.delete_pod(pod)
+        :param labels: labels used to determine if a pod is repeated
+        :type labels: dict
+        :param try_numbers_match: do the try numbers match? Only needed for logging purposes
+        :type try_numbers_match: bool
+        :param launcher: PodLauncher
+        :param pod: Pod found with matching labels
+        """
+        if try_numbers_match:
+            log_line = f"found a running pod with labels {labels} and the same try_number."
+        else:
+            log_line = f"found a running pod with labels {labels} but a different try_number."
+
+        # In case of failed pods, should reattach the first time, but only once
+        # as the task will have already failed.
+        if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
+            log_line += " Will attach to this pod and monitor instead of starting new one"
+            self.log.info(log_line)
+            self.pod = pod
+            final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod)
         else:
-            self.log.info("skipping deleting pod: %s", pod.metadata.name)
+            log_line += f"creating pod with labels {labels} and launcher {launcher}"
+            self.log.info(log_line)
+            final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
+        return final_state, remote_pod, result
 
-    def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str:
-        labels = self._get_ti_pod_labels(context, include_try_number=False)
-        label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())]
-        return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+    @staticmethod
+    def _get_pod_identifying_label_string(labels) -> str:
+        label_strings = [
+            f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number'
+        ]
+        return ','.join(label_strings) + ',already_checked!=True'
+
+    @staticmethod
+    def _try_numbers_match(context, pod) -> bool:
+        return pod.metadata.labels['try_number'] == context['ti'].try_number
 
     def _set_name(self, name):
         if name is None:
@@ -435,29 +427,11 @@ class KubernetesPodOperator(BaseOperator):
         validate_key(name, max_length=220)
         return re.sub(r'[^a-z0-9.-]+', '-', name.lower())
 
-    def patch_already_checked(self, pod: k8s.V1Pod):
-        """Add an "already checked" annotation to ensure we don't reattach on retries"""
-        pod.metadata.labels[self.POD_CHECKED_KEY] = "True"
-        body = PodGenerator.serialize_pod(pod)
-        self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
-
-    def on_kill(self) -> None:
-        if self.pod:
-            pod = self.pod
-            kwargs = dict(
-                name=pod.metadata.name,
-                namespace=pod.metadata.namespace,
-            )
-            if self.termination_grace_period is not None:
-                kwargs.update(grace_period_seconds=self.termination_grace_period)
-            self.client.delete_namespaced_pod(**kwargs)
-
-    def build_pod_request_obj(self, context=None):
+    def create_pod_request_obj(self) -> k8s.V1Pod:
         """
-        Returns V1Pod object based on pod template file, full pod spec, and other operator parameters.
+        Creates a V1Pod based on user parameters. Note that a `pod` or `pod_template_file`
+        will supersede all other values.
 
-        The V1Pod attributes are derived (in order of precedence) from operator params, full pod spec, pod
-        template file.
         """
         self.log.debug("Creating pod for KubernetesPodOperator task %s", self.task_id)
         if self.pod_template_file:
@@ -476,7 +450,7 @@ class KubernetesPodOperator(BaseOperator):
             metadata=k8s.V1ObjectMeta(
                 namespace=self.namespace,
                 labels=self.labels,
-                name=self.name,
+                name=PodGenerator.make_unique_pod_id(self.name),
                 annotations=self.annotations,
             ),
             spec=k8s.V1PodSpec(
@@ -487,7 +461,7 @@ class KubernetesPodOperator(BaseOperator):
                 containers=[
                     k8s.V1Container(
                         image=self.image,
-                        name=self.BASE_CONTAINER_NAME,
+                        name="base",
                         command=self.cmds,
                         ports=self.ports,
                         image_pull_policy=self.image_pull_policy,
@@ -512,112 +486,89 @@ class KubernetesPodOperator(BaseOperator):
 
         pod = PodGenerator.reconcile_pods(pod_template, pod)
 
-        if self.random_name_suffix:
-            pod.metadata.name = PodGenerator.make_unique_pod_id(pod.metadata.name)
-
         for secret in self.secrets:
             self.log.debug("Adding secret to task %s", self.task_id)
             pod = secret.attach_to_pod(pod)
         if self.do_xcom_push:
             self.log.debug("Adding xcom sidecar to task %s", self.task_id)
             pod = xcom_sidecar.add_xcom_sidecar(pod)
+        return pod
+
+    def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]:
+        """
+        Creates a new pod and monitors for duration of task
 
-        labels = self._get_ti_pod_labels(context)
-        self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels)
+        :param labels: labels used to track pod
+        :param launcher: pod launcher that will manage launching and monitoring pods
+        :return:
+        """
+        self.log.debug(
+            "Adding KubernetesPodOperator labels to pod before launch for task %s", self.task_id
+        )
 
         # Merge Pod Identifying labels with labels passed to operator
-        pod.metadata.labels.update(labels)
+        self.pod.metadata.labels.update(labels)
         # Add Airflow Version to the label
         # And a label to identify that pod is launched by KubernetesPodOperator
-        pod.metadata.labels.update(
+        self.pod.metadata.labels.update(
             {
                 'airflow_version': airflow_version.replace('+', '-'),
                 'kubernetes_pod_operator': 'True',
             }
         )
-        pod_mutation_hook(pod)
-        return pod
-
-    def dry_run(self) -> None:
-        """
-        Prints out the pod definition that would be created by this operator.
-        Does not include labels specific to the task instance (since there isn't
-        one in a dry_run) and excludes all empty elements.
-        """
-        pod = self.build_pod_request_obj()
-        print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict')))
-
-
-class _suppress(AbstractContextManager):
-    """
-    This behaves the same as ``contextlib.suppress`` but logs the suppressed
-    exceptions as errors with traceback.
-
-    The caught exception is also stored on the context manager instance under
-    attribute ``exception``.
-    """
-
-    def __init__(self, *exceptions):
-        self._exceptions = exceptions
-        self.exception = None
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exctype, excinst, exctb):
-        caught_error = exctype is not None and issubclass(exctype, self._exceptions)
-        if caught_error:
-            self.exception = excinst
-            logger = logging.getLogger()
-            logger.error(str(excinst), exc_info=True)
-        return caught_error
 
+        self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
+        final_state = None
+        try:
+            launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds)
+            final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
+        except AirflowException:
+            if self.log_events_on_failure:
+                for event in launcher.read_pod_events(self.pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            raise
+        finally:
+            if self.is_delete_operator_pod:
+                self.log.debug("Deleting pod for task %s", self.task_id)
+                launcher.delete_pod(self.pod)
+            elif final_state != State.SUCCESS:
+                self.patch_already_checked(self.pod)
+        return final_state, remote_pod, result
 
-def _prune_dict(val: Any, mode='strict'):
-    """
-    Note: this is duplicated from ``airflow.utils.helpers.prune_dict``.  That one should
-    be the one used if possible, but this one is included to avoid having to
-    bump min airflow version.  This function will be removed once the min airflow version
-    is bumped to 2.3.
+    def patch_already_checked(self, pod: k8s.V1Pod):
+        """Add an "already tried annotation to ensure we only retry once"""
+        pod.metadata.labels["already_checked"] = "True"
+        body = PodGenerator.serialize_pod(pod)
+        self.client.patch_namespaced_pod(pod.metadata.name, pod.metadata.namespace, body)
 
-    Given dict ``val``, returns new dict based on ``val`` with all
-    empty elements removed.
+    def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]:
+        """
+        Monitors a pod to completion that was created by a previous KubernetesPodOperator
 
-    What constitutes "empty" is controlled by the ``mode`` parameter.  If mode is 'strict'
-    then only ``None`` elements will be removed.  If mode is ``truthy``, then element ``x``
-    will be removed if ``bool(x) is False``.
-    """
+        :param launcher: pod launcher that will manage launching and monitoring pods
+        :param pod: podspec used to find pod using k8s API
+        :return:
+        """
+        try:
+            (final_state, remote_pod, result) = launcher.monitor_pod(pod, get_logs=self.get_logs)
+        finally:
+            if self.is_delete_operator_pod:
+                launcher.delete_pod(pod)
+        if final_state != State.SUCCESS:
+            if self.log_events_on_failure:
+                for event in launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            if not self.is_delete_operator_pod:
+                self.patch_already_checked(pod)
+            raise AirflowException(f'Pod returned a failure: {final_state}')
+        return final_state, remote_pod, result
 
-    def is_empty(x):
-        if mode == 'strict':
-            return x is None
-        elif mode == 'truthy':
-            return bool(x) is False
-        raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")
-
-    if isinstance(val, dict):
-        new_dict = {}
-        for k, v in val.items():
-            if is_empty(v):
-                continue
-            elif isinstance(v, (list, dict)):
-                new_val = _prune_dict(v, mode=mode)
-                if new_val:
-                    new_dict[k] = new_val
-            else:
-                new_dict[k] = v
-        return new_dict
-    elif isinstance(val, list):
-        new_list = []
-        for v in val:
-            if is_empty(v):
-                continue
-            elif isinstance(v, (list, dict)):
-                new_val = _prune_dict(v, mode=mode)
-                if new_val:
-                    new_list.append(new_val)
-            else:
-                new_list.append(v)
-        return new_list
-    else:
-        return val
+    def on_kill(self) -> None:
+        if self.pod:
+            pod: k8s.V1Pod = self.pod
+            namespace = pod.metadata.namespace
+            name = pod.metadata.name
+            kwargs = {}
+            if self.termination_grace_period is not None:
+                kwargs = {"grace_period_seconds": self.termination_grace_period}
+            self.client.delete_namespaced_pod(name=name, namespace=namespace, **kwargs)
diff --git a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
index 1029687..9779292 100644
--- a/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py
@@ -15,14 +15,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import TYPE_CHECKING, Optional, Sequence
+from typing import Optional
 
 from airflow.models import BaseOperator
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
 
 class SparkKubernetesOperator(BaseOperator):
     """
@@ -34,15 +31,20 @@ class SparkKubernetesOperator(BaseOperator):
 
     :param application_file: Defines Kubernetes 'custom_resource_definition' of 'sparkApplication' as either a
         path to a '.json' file or a JSON string.
+    :type application_file:  str
     :param namespace: kubernetes namespace to put sparkApplication
+    :type namespace: str
     :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
         for the to Kubernetes cluster.
+    :type kubernetes_conn_id: str
     :param api_group: kubernetes api group of sparkApplication
+    :type api_group: str
     :param api_version: kubernetes api version of sparkApplication
+    :type api_version: str
     """
 
-    template_fields: Sequence[str] = ('application_file', 'namespace')
-    template_ext: Sequence[str] = ('.yaml', '.yml', '.json')
+    template_fields = ['application_file', 'namespace']
+    template_ext = ('.yaml', '.yml', '.json')
     ui_color = '#f4a460'
 
     def __init__(
@@ -62,7 +64,7 @@ class SparkKubernetesOperator(BaseOperator):
         self.api_group = api_group
         self.api_version = api_version
 
-    def execute(self, context: 'Context'):
+    def execute(self, context):
         self.log.info("Creating sparkApplication")
         hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
         response = hook.create_custom_object(
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml
index b5b5054..c7878ba 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -22,14 +22,6 @@ description: |
     `Kubernetes <https://kubernetes.io/>`__
 
 versions:
-  - 3.1.2
-  - 3.1.1
-  - 3.1.0
-  - 3.0.2
-  - 3.0.1
-  - 3.0.0
-  - 2.2.0
-  - 2.1.0
   - 2.0.3
   - 2.0.2
   - 2.0.1
diff --git a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
index 15ac40b..da29e79 100644
--- a/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import TYPE_CHECKING, Optional, Sequence
+from typing import Dict, Optional
 
 from kubernetes import client
 
@@ -23,9 +23,6 @@ from airflow.exceptions import AirflowException
 from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
 from airflow.sensors.base import BaseSensorOperator
 
-if TYPE_CHECKING:
-    from airflow.utils.context import Context
-
 
 class SparkKubernetesSensor(BaseSensorOperator):
     """
@@ -36,15 +33,21 @@ class SparkKubernetesSensor(BaseSensorOperator):
         https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.1.0-2.4.5/docs/api-docs.md#sparkapplication
 
     :param application_name: spark Application resource name
+    :type application_name:  str
     :param namespace: the kubernetes namespace where the sparkApplication reside in
+    :type namespace: str
     :param kubernetes_conn_id: The :ref:`kubernetes connection<howto/connection:kubernetes>`
         to Kubernetes cluster.
+    :type kubernetes_conn_id: str
     :param attach_log: determines whether logs for driver pod should be appended to the sensor log
+    :type attach_log: bool
     :param api_group: kubernetes api group of sparkApplication
+    :type api_group: str
     :param api_version: kubernetes api version of sparkApplication
+    :type api_version: str
     """
 
-    template_fields: Sequence[str] = ("application_name", "namespace")
+    template_fields = ("application_name", "namespace")
     FAILURE_STATES = ("FAILED", "UNKNOWN")
     SUCCESS_STATES = ("COMPLETED",)
 
@@ -94,7 +97,7 @@ class SparkKubernetesSensor(BaseSensorOperator):
                 e,
             )
 
-    def poke(self, context: 'Context') -> bool:
+    def poke(self, context: Dict) -> bool:
         self.log.info("Poking: %s", self.application_name)
         response = self.hook.get_custom_object(
             group=self.api_group,
diff --git a/setup.py b/setup.py
index 0e6ae83..794c33b 100644
--- a/setup.py
+++ b/setup.py
@@ -406,7 +406,7 @@ kerberos = [
 ]
 kubernetes = [
     'cryptography>=2.0.0',
-    'kubernetes>=21.7.0',
+    'kubernetes>=3.0.0, <12.0.0',
 ]
 kylin = ['kylinpy>=2.6']
 ldap = [
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index ce040cf..9228e9b 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -22,21 +22,25 @@ from unittest import mock
 from kubernetes.client import Configuration
 from urllib3.connection import HTTPConnection, HTTPSConnection
 
-from airflow.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive, get_kube_client
+from airflow.kubernetes.kube_client import (
+    RefreshConfiguration,
+    _disable_verify_ssl,
+    _enable_tcp_keepalive,
+    get_kube_client,
+)
 
 
 class TestClient(unittest.TestCase):
     @mock.patch('airflow.kubernetes.kube_client.config')
-    def test_load_cluster_config(self, config):
-        get_kube_client(in_cluster=True)
-        assert config.load_incluster_config.called
-        assert config.load_kube_config.not_called
+    def test_load_cluster_config(self, _):
+        client = get_kube_client(in_cluster=True)
+        assert not isinstance(client.api_client.configuration, RefreshConfiguration)
 
     @mock.patch('airflow.kubernetes.kube_client.config')
-    def test_load_file_config(self, config):
-        get_kube_client(in_cluster=False)
-        assert config.load_incluster_config.not_called
-        assert config.load_kube_config.called
+    @mock.patch('airflow.kubernetes.refresh_config._get_kube_config_loader_for_yaml_file')
+    def test_load_file_config(self, _, _2):
+        client = get_kube_client(in_cluster=False)
+        assert isinstance(client.api_client.configuration, RefreshConfiguration)
 
     def test_enable_tcp_keepalive(self):
         socket_options = [
diff --git a/tests/kubernetes/test_refresh_config.py b/tests/kubernetes/test_refresh_config.py
new file mode 100644
index 0000000..a0753e2
--- /dev/null
+++ b/tests/kubernetes/test_refresh_config.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+import pytest
+from pendulum.parsing import ParserError
+
+from airflow.kubernetes.refresh_config import _parse_timestamp
+
+
+class TestRefreshKubeConfigLoader(TestCase):
+    def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self):
+        ts = _parse_timestamp("2020-01-13T13:42:20Z")
+        assert 1578922940 == ts
+
+    def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self):
+        ts = _parse_timestamp("2020-01-13T13:42:20+0600")
+        assert 1578922940 == ts
+
+    def test_parse_timestamp_should_throw_exception(self):
+        with pytest.raises(ParserError):
+            _parse_timestamp("foobar")

[airflow] 05/05: Add 2.2.5 to CHANGELOG.txt and UPDATING.md

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3da4cb712c63035ddb6637a934cd52dd1aab85e3
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Mar 24 02:39:28 2022 +0100

    Add 2.2.5 to CHANGELOG.txt and UPDATING.md
---
 CHANGELOG.txt | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++
 UPDATING.md   |  3 +--
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 062b1e8..97dbf48 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -1,3 +1,54 @@
+Airflow 2.2.5, 2022-03-31
+-------------------------
+
+Bug Fixes
+"""""""""
+- Check and disallow a relative path for sqlite (#22530)
+- Fixed dask executor and tests (#22027)
+- Fix broken links to celery documentation (#22364)
+- Fix incorrect data provided to tries & landing times charts (#21928)
+- Fix assignment of unassigned triggers (#21770)
+- Fix triggerer ``--capacity`` parameter (#21753)
+- Fix graph autorefresh on page load (#21736)
+- Fix filesystem sensor for directories (#21729)
+- Fix stray ``order_by(TaskInstance.execution_date)`` (#21705)
+- Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)
+- Log exception in local executor (#21667)
+- Disable ``default_pool`` delete on web ui (#21658)
+- Extends ``typing-extensions`` to be installed with python 3.8+ #21566 (#21567)
+- Dispose unused connection pool (#21565)
+- Fix logging JDBC SQL error when task fails (#21540)
+- Filter out default configs when overrides exist. (#21539)
+- Fix Resources ``__eq__`` check (#21442)
+- Fix ``max_active_runs=1`` not scheduling runs when ``min_file_process_interval`` is high (#21413)
+- Reduce DB load incurred by Stale DAG deactivation (#21399)
+- Fix race condition between triggerer and scheduler (#21316)
+- Fix trigger dag redirect from task instance log view (#21239)
+- Log traceback in trigger excs (#21213)
+- A trigger might use a connection; make sure we mask passwords (#21207)
+- Update ``ExternalTaskSensorLink`` to handle templated ``external_dag_id`` (#21192)
+- Ensure ``clear_task_instances`` sets valid run state (#21116)
+- Fix: Update custom connection field processing (#20883)
+- Truncate stack trace to DAG user code for exceptions raised during execution (#20731)
+- Fix duplicate trigger creation race condition (#20699)
+- Fix Tasks getting stuck in scheduled state (#19747)
+- Fix: Do not render undefined graph edges (#19684)
+- Set ``X-Frame-Options`` header to DENY only if ``X_FRAME_ENABLED`` is set to true. (#19491)
+
+Doc only changes
+""""""""""""""""
+- adding ``on_execute_callback`` to callbacks docs (#22362)
+- Add documentation on specifying a DB schema. (#22347)
+- Fix postgres part of pipeline example of tutorial (#21586)
+- Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)
+- DB upgrade is required when updating Airflow (#22061)
+- Remove misleading MSSQL information from the docs (#21998)
+
+Misc
+""""
+- Add the new Airflow Trove Classifier to setup.cfg (#22241)
+- Rename ``to_delete`` to ``to_cancel`` in TriggerRunner (#20658)
+
 Airflow 2.2.4, 2022-02-22
 -------------------------
 
diff --git a/UPDATING.md b/UPDATING.md
index c942199..a706258 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -27,6 +27,7 @@ assists users migrating to a new version.
 **Table of contents**
 
 - [Main](#main)
+- [Airflow 2.2.5](#airflow-225)
 - [Airflow 2.2.4](#airflow-224)
 - [Airflow 2.2.3](#airflow-223)
 - [Airflow 2.2.2](#airflow-222)
@@ -85,8 +86,6 @@ https://developers.google.com/style/inclusive-documentation
 
 No breaking changes.
 
-### Deprecation: `Connection.extra` must be JSON-encoded dict
-
 ## Airflow 2.2.4
 
 ### Smart sensors deprecated

[airflow] 03/05: Revert "Update Kubernetes library version (#18797)"

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cbfae04a1e1fc26c08d9f710334b33a268b37b93
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Tue Jan 4 12:09:35 2022 +0000

    Revert "Update Kubernetes library version (#18797)"
    
    This reverts commit cb9cdf5285502381298bf459d000dc689c6aab2a.
---
 airflow/providers/cncf/kubernetes/utils/pod_manager.py | 8 ++------
 tests/kubernetes/test_client.py                        | 6 +-----
 2 files changed, 3 insertions(+), 11 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index eab45fa..4221ac2 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -40,11 +40,7 @@ from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 if TYPE_CHECKING:
-    try:
-        # Kube >= 19
-        from kubernetes.client.models.core_v1_event_list import CoreV1EventList as V1EventList
-    except ImportError:
-        from kubernetes.client.models.v1_event_list import V1EventList
+    from kubernetes.client.models.core_v1_event_list import CoreV1EventList
 
 
 class PodLaunchFailedException(AirflowException):
@@ -321,7 +317,7 @@ class PodManager(LoggingMixin):
             raise
 
     @tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
-    def read_pod_events(self, pod: V1Pod) -> "V1EventList":
+    def read_pod_events(self, pod: V1Pod) -> "CoreV1EventList":
         """Reads events from the POD"""
         try:
             return self._client.list_namespaced_event(
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index 9228e9b..bf5dcfc 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -63,9 +63,5 @@ class TestClient(unittest.TestCase):
 
         _disable_verify_ssl()
 
-        # Support wide range of kube client libraries
-        if hasattr(Configuration, 'get_default_copy'):
-            configuration = Configuration.get_default_copy()
-        else:
-            configuration = Configuration()
+        configuration = Configuration()
         self.assertFalse(configuration.verify_ssl)

[airflow] 01/05: Fix rat-exclides and issue template licence (#22550)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6bbcb332a9573eff7d0683f47f9a5fef65129b46
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sun Mar 27 14:03:56 2022 +0200

    Fix rat-exclides and issue template licence (#22550)
---
 .rat-excludes                |  5 +++++
 dev/ISSUE_TEMPLATE.md.jinja2 | 17 +++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/.rat-excludes b/.rat-excludes
index e611436..d1bb9e3 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -113,3 +113,8 @@ chart/values_schema.schema.json
 
 # autogenerated docker-compose.env
 scripts/ci/docker-compose/_docker_compose.env
+
+# Exclude robots from rat check
+airflow/www/static/robots.txt
+# Auto-generated files
+scripts/ci/installed_providers.txt
diff --git a/dev/ISSUE_TEMPLATE.md.jinja2 b/dev/ISSUE_TEMPLATE.md.jinja2
index 35be892..0e2c55c 100644
--- a/dev/ISSUE_TEMPLATE.md.jinja2
+++ b/dev/ISSUE_TEMPLATE.md.jinja2
@@ -1,4 +1,21 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
 
 We have a kind request for all the contributors to the latest [Apache Airflow RC {{version}}](https://pypi.org/project/apache-airflow/{{version}}/).