You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 19:35:31 UTC
[airflow] branch v1-10-test updated: [AIRFLOW-6843] Add
delete_option_kwargs to delete_namespaced_pod (#7523)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push:
new 881c271 [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod (#7523)
881c271 is described below
commit 881c271bf6d57467ed78ee30d0b9e191a7971111
Author: Pete DeJoy <32...@users.noreply.github.com>
AuthorDate: Wed Feb 26 13:59:27 2020 -0500
[AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod (#7523)
(cherry picked from commit 676c8515f7bdc8d2767a89484261056dc4bf4bed)
---
airflow/config_templates/config.yml | 11 +++++++++++
airflow/config_templates/default_airflow.cfg | 8 ++++++++
airflow/executors/kubernetes_executor.py | 8 +++++++-
scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py | 2 ++
tests/kubernetes/test_worker_configuration.py | 11 +++++++++++
5 files changed, 39 insertions(+), 1 deletion(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 9535d5b..61705e3 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2179,6 +2179,17 @@
type: string
example: ~
default: ""
+ - name: delete_option_kwargs
+ description: |
+ Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client
+ ``core_v1_api`` method when using the Kubernetes Executor.
+ This should be an object and can contain any of the options listed in the ``v1DeleteOptions``
+ class defined here:
+ https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+ version_added: ~
+ type: string
+ example: '{"grace_period_seconds": 10}'
+ default: ""
- name: run_as_user
description: |
Specifies the uid to run the first process of the worker pods containers as
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 9729403..71a5172 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1040,6 +1040,14 @@ tolerations =
# The timeout is specified as [connect timeout, read timeout]
kube_client_request_args =
+# Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client
+# ``core_v1_api`` method when using the Kubernetes Executor.
+# This should be an object and can contain any of the options listed in the ``v1DeleteOptions``
+# class defined here:
+# https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+# Example: delete_option_kwargs = {{"grace_period_seconds": 10}}
+delete_option_kwargs =
+
# Specifies the uid to run the first process of the worker pods containers as
run_as_user = 50000
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 7b31b45..b6784a8 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -220,6 +220,12 @@ class KubeConfig:
self.kube_client_request_args = {}
self._validate()
+ delete_option_kwargs = conf.get(self.kubernetes_section, 'delete_option_kwargs')
+ if delete_option_kwargs:
+ self.delete_option_kwargs = json.loads(delete_option_kwargs)
+ else:
+ self.delete_option_kwargs = {}
+
# pod security context items should return integers
# and only return a blank string if contexts are not set.
def _get_security_context_val(self, scontext):
@@ -458,7 +464,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
"""Deletes POD"""
try:
self.kube_client.delete_namespaced_pod(
- pod_id, namespace, body=client.V1DeleteOptions(),
+ pod_id, namespace, body=client.V1DeleteOptions(**self.kube_config.delete_option_kwargs),
**self.kube_config.kube_client_request_args)
except ApiException as e:
# If the pod is already deleted
diff --git a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
index a578689..278a223 100755
--- a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
+++ b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
@@ -115,6 +115,8 @@ def write_config(yaml_config_file_path, default_cfg_file_path):
configfile.write("# {}\n".format(single_line_desc))
if option["example"]:
+ if not str(option["name"]).endswith("_template"):
+ option["example"] = option["example"].replace("{", "{{").replace("}", "}}")
configfile.write("# Example: {} = {}\n".format(option["name"], option["example"]))
configfile.write("{}{} ={}\n".format(
diff --git a/tests/kubernetes/test_worker_configuration.py b/tests/kubernetes/test_worker_configuration.py
index 0273ae8..48dc3e3 100644
--- a/tests/kubernetes/test_worker_configuration.py
+++ b/tests/kubernetes/test_worker_configuration.py
@@ -18,6 +18,7 @@
import unittest
import six
+from parameterized import parameterized
from tests.compat import mock
from tests.test_utils.config import conf_vars
@@ -116,6 +117,16 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase):
'but not both$'):
KubeConfig()
+ @parameterized.expand([
+ ('{"grace_period_seconds": 10}', {"grace_period_seconds": 10}),
+ ("", {})
+ ])
+ def test_delete_option_kwargs_config(self, config, expected_value):
+ with conf_vars({
+ ('kubernetes', 'delete_option_kwargs'): config,
+ }):
+ self.assertEqual(KubeConfig().delete_option_kwargs, expected_value)
+
def test_worker_with_subpaths(self):
self.kube_config.dags_volume_subpath = 'dags'
self.kube_config.logs_volume_subpath = 'logs'