You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 19:35:31 UTC

[airflow] branch v1-10-test updated: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod (#7523)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
     new 881c271  [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod (#7523)
881c271 is described below

commit 881c271bf6d57467ed78ee30d0b9e191a7971111
Author: Pete DeJoy <32...@users.noreply.github.com>
AuthorDate: Wed Feb 26 13:59:27 2020 -0500

    [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod (#7523)
    
    (cherry picked from commit 676c8515f7bdc8d2767a89484261056dc4bf4bed)
---
 airflow/config_templates/config.yml             | 11 +++++++++++
 airflow/config_templates/default_airflow.cfg    |  8 ++++++++
 airflow/executors/kubernetes_executor.py        |  8 +++++++-
 scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py |  2 ++
 tests/kubernetes/test_worker_configuration.py   | 11 +++++++++++
 5 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 9535d5b..61705e3 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2179,6 +2179,17 @@
       type: string
       example: ~
       default: ""
+    - name: delete_option_kwargs
+      description: |
+        Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client
+        ``core_v1_api`` method when using the Kubernetes Executor.
+        This should be an object and can contain any of the options listed in the ``v1DeleteOptions``
+        class defined here:
+        https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+      version_added: ~
+      type: string
+      example: '{"grace_period_seconds": 10}'
+      default: ""
     - name: run_as_user
       description: |
         Specifies the uid to run the first process of the worker pods containers as
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 9729403..71a5172 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1040,6 +1040,14 @@ tolerations =
 # The timeout is specified as [connect timeout, read timeout]
 kube_client_request_args =
 
+# Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client
+# ``core_v1_api`` method when using the Kubernetes Executor.
+# This should be an object and can contain any of the options listed in the ``v1DeleteOptions``
+# class defined here:
+# https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+# Example: delete_option_kwargs = {{"grace_period_seconds": 10}}
+delete_option_kwargs =
+
 # Specifies the uid to run the first process of the worker pods containers as
 run_as_user = 50000
 
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 7b31b45..b6784a8 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -220,6 +220,12 @@ class KubeConfig:
             self.kube_client_request_args = {}
         self._validate()
 
+        delete_option_kwargs = conf.get(self.kubernetes_section, 'delete_option_kwargs')
+        if delete_option_kwargs:
+            self.delete_option_kwargs = json.loads(delete_option_kwargs)
+        else:
+            self.delete_option_kwargs = {}
+
     # pod security context items should return integers
     # and only return a blank string if contexts are not set.
     def _get_security_context_val(self, scontext):
@@ -458,7 +464,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
         """Deletes POD"""
         try:
             self.kube_client.delete_namespaced_pod(
-                pod_id, namespace, body=client.V1DeleteOptions(),
+                pod_id, namespace, body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
                 **self.kube_config.kube_client_request_args)
         except ApiException as e:
             # If the pod is already deleted
diff --git a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
index a578689..278a223 100755
--- a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
+++ b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
@@ -115,6 +115,8 @@ def write_config(yaml_config_file_path, default_cfg_file_path):
                             configfile.write("# {}\n".format(single_line_desc))
 
                 if option["example"]:
+                    if not str(option["name"]).endswith("_template"):
+                        option["example"] = option["example"].replace("{", "{{").replace("}", "}}")
                     configfile.write("# Example: {} = {}\n".format(option["name"], option["example"]))
 
                 configfile.write("{}{} ={}\n".format(
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 0273ae8..48dc3e3 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -18,6 +18,7 @@
 
 import unittest
 import six
+from parameterized import parameterized
 
 from tests.compat import mock
 from tests.test_utils.config import conf_vars
@@ -116,6 +117,16 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
                                    'but not both$'):
             KubeConfig()
 
+    @parameterized.expand([
+        ('{"grace_period_seconds": 10}', {"grace_period_seconds": 10}),
+        ("", {})
+    ])
+    def test_delete_option_kwargs_config(self, config, expected_value):
+        with conf_vars({
+            ('kubernetes', 'delete_option_kwargs'): config,
+        }):
+            self.assertEqual(KubeConfig().delete_option_kwargs, expected_value)
+
     def test_worker_with_subpaths(self):
         self.kube_config.dags_volume_subpath = 'dags'
         self.kube_config.logs_volume_subpath = 'logs'