You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ds...@apache.org on 2021/12/30 22:21:56 UTC

[airflow] branch main updated: Implement dry_run for KubernetesPodOperator (#20573)

This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d56ff76  Implement dry_run for KubernetesPodOperator (#20573)
d56ff76 is described below

commit d56ff765e15f9fcd582bc6d1ec0e83b0fedf476a
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Thu Dec 30 14:21:25 2021 -0800

    Implement dry_run for KubernetesPodOperator (#20573)
    
    Calling task.dry_run() will print out the kubectl manifest for the pod that would be created (excluding labels that are derived from the task instance context).
---
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 59 ++++++++++++++++++++++
 airflow/utils/helpers.py                           | 45 +++++++++++++++++
 .../operators.rst                                  | 25 +++++++++
 .../kubernetes/operators/test_kubernetes_pod.py    | 35 ++++++++++++-
 tests/utils/test_helpers.py                        | 29 +++++++++++
 5 files changed, 192 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 8cc1bc4..0991e4d 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -578,6 +578,15 @@ class KubernetesPodOperator(BaseOperator):
         pod_mutation_hook(pod)
         return pod
 
+    def dry_run(self) -> None:
+        """
+        Prints out the pod definition that would be created by this operator.
+        Does not include labels specific to the task instance (since there isn't
+        one in a dry_run) and excludes all empty elements.
+        """
+        pod = self.build_pod_request_obj()
+        print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict')))
+
 
 class _suppress(AbstractContextManager):
     """
@@ -602,3 +611,53 @@ class _suppress(AbstractContextManager):
             logger = logging.getLogger()
             logger.error(str(excinst), exc_info=True)
         return caught_error
+
+
+def _prune_dict(val: Any, mode='strict'):
+    """
+    Note: this is duplicated from ``airflow.utils.helpers.prune_dict``.  That one should
+    be the one used if possible, but this one is included to avoid having to
+    bump min airflow version.  This function will be removed once the min airflow version
+    is bumped to 2.3.
+
+    Given dict ``val``, returns new dict based on ``val`` with all
+    empty elements removed.
+
+    What constitutes "empty" is controlled by the ``mode`` parameter.  If mode is 'strict'
+    then only ``None`` elements will be removed.  If mode is ``truthy``, then element ``x``
+    will be removed if ``bool(x) is False``.
+    """
+
+    def is_empty(x):
+        if mode == 'strict':
+            return x is None
+        elif mode == 'truthy':
+            return bool(x) is False
+        raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")
+
+    if isinstance(val, dict):
+        new_dict = {}
+        for k, v in val.items():
+            if is_empty(v):
+                continue
+            elif isinstance(v, (list, dict)):
+                new_val = _prune_dict(v, mode=mode)
+                if new_val:
+                    new_dict[k] = new_val
+            else:
+                new_dict[k] = v
+        return new_dict
+    elif isinstance(val, list):
+        new_list = []
+        for v in val:
+            if is_empty(v):
+                continue
+            elif isinstance(v, (list, dict)):
+                new_val = _prune_dict(v, mode=mode)
+                if new_val:
+                    new_list.append(new_val)
+            else:
+                new_list.append(v)
+        return new_list
+    else:
+        return val
diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py
index 1ff57ef..c75a017 100644
--- a/airflow/utils/helpers.py
+++ b/airflow/utils/helpers.py
@@ -310,3 +310,48 @@ def exactly_one(*args) -> bool:
             "Not supported for iterable args. Use `*` to unpack your iterable in the function call."
         )
     return sum(map(bool, args)) == 1
+
+
+def prune_dict(val: Any, mode='strict'):
+    """
+    Given dict ``val``, returns new dict based on ``val`` with all
+    empty elements removed.
+
+    What constitutes "empty" is controlled by the ``mode`` parameter.  If mode is 'strict'
+    then only ``None`` elements will be removed.  If mode is ``truthy``, then element ``x``
+    will be removed if ``bool(x) is False``.
+    """
+
+    def is_empty(x):
+        if mode == 'strict':
+            return x is None
+        elif mode == 'truthy':
+            return bool(x) is False
+        raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")
+
+    if isinstance(val, dict):
+        new_dict = {}
+        for k, v in val.items():
+            if is_empty(v):
+                continue
+            elif isinstance(v, (list, dict)):
+                new_val = prune_dict(v, mode=mode)
+                if new_val:
+                    new_dict[k] = new_val
+            else:
+                new_dict[k] = v
+        return new_dict
+    elif isinstance(val, list):
+        new_list = []
+        for v in val:
+            if is_empty(v):
+                continue
+            elif isinstance(v, (list, dict)):
+                new_val = prune_dict(v, mode=mode)
+                if new_val:
+                    new_list.append(new_val)
+            else:
+                new_list.append(v)
+        return new_list
+    else:
+        return val
diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
index 354d956..92e067c 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst
@@ -49,6 +49,31 @@ dependencies that are not available through the public PyPI repository. It also
 YAML file using the ``pod_template_file`` parameter.
 Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.
 
+Debugging KubernetesPodOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+You can print out the Kubernetes manifest for the pod that would be created at runtime by calling
+:meth:`~.KubernetesPodOperator.dry_run` on an instance of the operator.
+
+.. code-block:: python
+
+    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
+        KubernetesPodOperator,
+    )
+
+    k = KubernetesPodOperator(
+        name="hello-dry-run",
+        image="debian",
+        cmds=["bash", "-cx"],
+        arguments=["echo", "10"],
+        labels={"foo": "bar"},
+        task_id="dry_run_demo",
+        do_xcom_push=True,
+    )
+
+    k.dry_run()
+
+
 How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index ff39c17..b9502d3 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -26,7 +26,11 @@ from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun, TaskInstance
 from airflow.models.xcom import IN_MEMORY_DAGRUN_ID
 from airflow.operators.dummy import DummyOperator
-from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
+    KubernetesPodOperator,
+    _prune_dict,
+    _suppress,
+)
 from airflow.utils import timezone
 
 DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0)
@@ -834,3 +838,32 @@ def test__suppress():
             raise ValueError("failure")
 
         mock_error.assert_called_once_with("failure", exc_info=True)
+
+
+@pytest.mark.parametrize(
+    'mode, expected',
+    [
+        (
+            'strict',
+            {
+                'b': '',
+                'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
+                'd': ['', 0, '1'],
+                'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
+            },
+        ),
+        (
+            'truthy',
+            {
+                'c': {'c': 'hi', 'd': ['1']},
+                'd': ['1'],
+                'e': [{'c': 'hi', 'd': ['1']}, ['1']],
+            },
+        ),
+    ],
+)
+def test__prune_dict(mode, expected):
+    l1 = ['', 0, '1', None]
+    d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
+    d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
+    assert _prune_dict(d2, mode=mode) == expected
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
index 7b4424d..c36f408 100644
--- a/tests/utils/test_helpers.py
+++ b/tests/utils/test_helpers.py
@@ -26,6 +26,7 @@ from airflow.utils.helpers import (
     build_airflow_url_with_query,
     exactly_one,
     merge_dicts,
+    prune_dict,
     validate_group_key,
     validate_key,
 )
@@ -262,3 +263,31 @@ class TestHelpers:
     def test_exactly_one_should_fail(self):
         with pytest.raises(ValueError):
             exactly_one([True, False])
+
+    @pytest.mark.parametrize(
+        'mode, expected',
+        [
+            (
+                'strict',
+                {
+                    'b': '',
+                    'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
+                    'd': ['', 0, '1'],
+                    'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
+                },
+            ),
+            (
+                'truthy',
+                {
+                    'c': {'c': 'hi', 'd': ['1']},
+                    'd': ['1'],
+                    'e': [{'c': 'hi', 'd': ['1']}, ['1']],
+                },
+            ),
+        ],
+    )
+    def test_prune_dict(self, mode, expected):
+        l1 = ['', 0, '1', None]
+        d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
+        d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
+        assert prune_dict(d2, mode=mode) == expected