You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/04/09 13:24:36 UTC
[airflow] branch main updated: Cleanup dup code now that k8s provider requires 2.3.0+ (#22845)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 04082ac091 Cleanup dup code now that k8s provider requires 2.3.0+ (#22845)
04082ac091 is described below
commit 04082ac091e92587b22c8323170ebe38bc68a19a
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Sat Apr 9 07:24:31 2022 -0600
Cleanup dup code now that k8s provider requires 2.3.0+ (#22845)
---
.../cncf/kubernetes/operators/kubernetes_pod.py | 54 +---------------------
.../kubernetes/operators/test_kubernetes_pod.py | 35 +-------------
2 files changed, 3 insertions(+), 86 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index dd127fee76..a5eb8baca7 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -46,7 +46,7 @@ from airflow.providers.cncf.kubernetes.utils import xcom_sidecar
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase
from airflow.settings import pod_mutation_hook
from airflow.utils import yaml
-from airflow.utils.helpers import validate_key
+from airflow.utils.helpers import prune_dict, validate_key
from airflow.version import version as airflow_version
if sys.version_info >= (3, 8):
@@ -545,7 +545,7 @@ class KubernetesPodOperator(BaseOperator):
one in a dry_run) and excludes all empty elements.
"""
pod = self.build_pod_request_obj()
- print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict')))
+ print(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))
class _suppress(AbstractContextManager):
@@ -571,53 +571,3 @@ class _suppress(AbstractContextManager):
logger = logging.getLogger()
logger.error(str(excinst), exc_info=True)
return caught_error
-
-
-def _prune_dict(val: Any, mode='strict'):
- """
- Note: this is duplicated from ``airflow.utils.helpers.prune_dict``. That one should
- be the one used if possible, but this one is included to avoid having to
- bump min airflow version. This function will be removed once the min airflow version
- is bumped to 2.3.
-
- Given dict ``val``, returns new dict based on ``val`` with all
- empty elements removed.
-
- What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
- then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
- will be removed if ``bool(x) is False``.
- """
-
- def is_empty(x):
- if mode == 'strict':
- return x is None
- elif mode == 'truthy':
- return bool(x) is False
- raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")
-
- if isinstance(val, dict):
- new_dict = {}
- for k, v in val.items():
- if is_empty(v):
- continue
- elif isinstance(v, (list, dict)):
- new_val = _prune_dict(v, mode=mode)
- if new_val:
- new_dict[k] = new_val
- else:
- new_dict[k] = v
- return new_dict
- elif isinstance(val, list):
- new_list = []
- for v in val:
- if is_empty(v):
- continue
- elif isinstance(v, (list, dict)):
- new_val = _prune_dict(v, mode=mode)
- if new_val:
- new_list.append(new_val)
- else:
- new_list.append(v)
- return new_list
- else:
- return val
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index 936b9d1419..ebe6030f76 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -23,11 +23,7 @@ from kubernetes.client import ApiClient, models as k8s
from airflow.exceptions import AirflowException
from airflow.models import DAG, DagRun, TaskInstance
from airflow.models.xcom import XCom
-from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
- KubernetesPodOperator,
- _prune_dict,
- _suppress,
-)
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress
from airflow.utils import timezone
from airflow.utils.types import DagRunType
@@ -858,32 +854,3 @@ def test__suppress():
raise ValueError("failure")
mock_error.assert_called_once_with("failure", exc_info=True)
-
-
-@pytest.mark.parametrize(
- 'mode, expected',
- [
- (
- 'strict',
- {
- 'b': '',
- 'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
- 'd': ['', 0, '1'],
- 'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
- },
- ),
- (
- 'truthy',
- {
- 'c': {'c': 'hi', 'd': ['1']},
- 'd': ['1'],
- 'e': [{'c': 'hi', 'd': ['1']}, ['1']],
- },
- ),
- ],
-)
-def test__prune_dict(mode, expected):
- l1 = ['', 0, '1', None]
- d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
- d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
- assert _prune_dict(d2, mode=mode) == expected