You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2021/12/30 22:21:56 UTC
[airflow] branch main updated: Implement dry_run for KubernetesPodOperator (#20573)
This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d56ff76 Implement dry_run for KubernetesPodOperator (#20573)
d56ff76 is described below
commit d56ff765e15f9fcd582bc6d1ec0e83b0fedf476a
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Thu Dec 30 14:21:25 2021 -0800
Implement dry_run for KubernetesPodOperator (#20573)
Calling task.dry_run() will print out the kubectl manifest for the pod that would be created (excluding labels that are derived from the task instance context).
---
.../cncf/kubernetes/operators/kubernetes_pod.py | 59 ++++++++++++++++++++++
airflow/utils/helpers.py | 45 +++++++++++++++++
.../operators.rst | 25 +++++++++
.../kubernetes/operators/test_kubernetes_pod.py | 35 ++++++++++++-
tests/utils/test_helpers.py | 29 +++++++++++
5 files changed, 192 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 8cc1bc4..0991e4d 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -578,6 +578,15 @@ class KubernetesPodOperator(BaseOperator):
pod_mutation_hook(pod)
return pod
+ def dry_run(self) -> None:
+ """
+ Prints out the pod definition that would be created by this operator.
+ Does not include labels specific to the task instance (since there isn't
+ one in a dry_run) and excludes all empty elements.
+ """
+ pod = self.build_pod_request_obj()
+ print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict')))
+
class _suppress(AbstractContextManager):
"""
@@ -602,3 +611,53 @@ class _suppress(AbstractContextManager):
logger = logging.getLogger()
logger.error(str(excinst), exc_info=True)
return caught_error
+
+
+def _prune_dict(val: Any, mode='strict'):
+ """
+ Note: this is duplicated from ``airflow.utils.helpers.prune_dict``. That one should
+ be the one used if possible, but this one is included to avoid having to
+ bump min airflow version. This function will be removed once the min airflow version
+ is bumped to 2.3.
+
+ Given dict ``val``, returns new dict based on ``val`` with all
+ empty elements removed.
+
+ What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
+ then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
+ will be removed if ``bool(x) is False``.
+ """
+
+ def is_empty(x):
+ if mode == 'strict':
+ return x is None
+ elif mode == 'truthy':
+ return bool(x) is False
+ raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")
+
+ if isinstance(val, dict):
+ new_dict = {}
+ for k, v in val.items():
+ if is_empty(v):
+ continue
+ elif isinstance(v, (list, dict)):
+ new_val = _prune_dict(v, mode=mode)
+ if new_val:
+ new_dict[k] = new_val
+ else:
+ new_dict[k] = v
+ return new_dict
+ elif isinstance(val, list):
+ new_list = []
+ for v in val:
+ if is_empty(v):
+ continue
+ elif isinstance(v, (list, dict)):
+ new_val = _prune_dict(v, mode=mode)
+ if new_val:
+ new_list.append(new_val)
+ else:
+ new_list.append(v)
+ return new_list
+ else:
+ return val
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 1ff57ef..c75a017 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -310,3 +310,48 @@ def exactly_one(*args) -> bool:
"Not supported for iterable args. Use `*` to unpack your iterable in the function call."
)
return sum(map(bool, args)) == 1
+
+
+def prune_dict(val: Any, mode='strict'):
+ """
+ Given dict ``val``, returns new dict based on ``val`` with all
+ empty elements removed.
+
+ What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
+ then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
+ will be removed if ``bool(x) is False``.
+ """
+
+ def is_empty(x):
+ if mode == 'strict':
+ return x is None
+ elif mode == 'truthy':
+ return bool(x) is False
+ raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")
+
+ if isinstance(val, dict):
+ new_dict = {}
+ for k, v in val.items():
+ if is_empty(v):
+ continue
+ elif isinstance(v, (list, dict)):
+ new_val = prune_dict(v, mode=mode)
+ if new_val:
+ new_dict[k] = new_val
+ else:
+ new_dict[k] = v
+ return new_dict
+ elif isinstance(val, list):
+ new_list = []
+ for v in val:
+ if is_empty(v):
+ continue
+ elif isinstance(v, (list, dict)):
+ new_val = prune_dict(v, mode=mode)
+ if new_val:
+ new_list.append(new_val)
+ else:
+ new_list.append(v)
+ return new_list
+ else:
+ return val
diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
index 354d956..92e067c 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
@@ -49,6 +49,31 @@ dependencies that are not available through the public PyPI repository. It also
YAML file using the ``pod_template_file`` parameter.
Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
+Debugging KubernetesPodOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can print out the Kubernetes manifest for the pod that would be created at runtime by calling
+:meth:`~.KubernetesPodOperator.dry_run` on an instance of the operator.
+
+.. code-block:: python
+
+ from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
+ KubernetesPodOperator,
+ )
+
+ k = KubernetesPodOperator(
+ name="hello-dry-run",
+ image="debian",
+ cmds=["bash", "-cx"],
+ arguments=["echo", "10"],
+ labels={"foo": "bar"},
+ task_id="dry_run_demo",
+ do_xcom_push=True,
+ )
+
+ k.dry_run()
+
+
How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index ff39c17..b9502d3 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -26,7 +26,11 @@ from airflow.exceptions import AirflowException
from airflow.models import DAG, DagRun, TaskInstance
from airflow.models.xcom import IN_MEMORY_DAGRUN_ID
from airflow.operators.dummy import DummyOperator
-from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
+ KubernetesPodOperator,
+ _prune_dict,
+ _suppress,
+)
from airflow.utils import timezone
DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0)
@@ -834,3 +838,32 @@ def test__suppress():
raise ValueError("failure")
mock_error.assert_called_once_with("failure", exc_info=True)
+
+
+@pytest.mark.parametrize(
+ 'mode, expected',
+ [
+ (
+ 'strict',
+ {
+ 'b': '',
+ 'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
+ 'd': ['', 0, '1'],
+ 'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
+ },
+ ),
+ (
+ 'truthy',
+ {
+ 'c': {'c': 'hi', 'd': ['1']},
+ 'd': ['1'],
+ 'e': [{'c': 'hi', 'd': ['1']}, ['1']],
+ },
+ ),
+ ],
+)
+def test__prune_dict(mode, expected):
+ l1 = ['', 0, '1', None]
+ d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
+ d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
+ assert _prune_dict(d2, mode=mode) == expected
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 7b4424d..c36f408 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -26,6 +26,7 @@ from airflow.utils.helpers import (
build_airflow_url_with_query,
exactly_one,
merge_dicts,
+ prune_dict,
validate_group_key,
validate_key,
)
@@ -262,3 +263,31 @@ class TestHelpers:
def test_exactly_one_should_fail(self):
with pytest.raises(ValueError):
exactly_one([True, False])
+
+ @pytest.mark.parametrize(
+ 'mode, expected',
+ [
+ (
+ 'strict',
+ {
+ 'b': '',
+ 'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
+ 'd': ['', 0, '1'],
+ 'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
+ },
+ ),
+ (
+ 'truthy',
+ {
+ 'c': {'c': 'hi', 'd': ['1']},
+ 'd': ['1'],
+ 'e': [{'c': 'hi', 'd': ['1']}, ['1']],
+ },
+ ),
+ ],
+ )
+ def test_prune_dict(self, mode, expected):
+ l1 = ['', 0, '1', None]
+ d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
+ d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
+ assert prune_dict(d2, mode=mode) == expected