You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/07/02 15:44:46 UTC
[airflow] branch main updated: Force-remove container after DockerOperator execution (#23160)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 237d2225d6 Force-remove container after DockerOperator execution (#23160)
237d2225d6 is described below
commit 237d2225d6b92a5012a025ece93cd062382470ed
Author: zengbotang <zb...@126.com>
AuthorDate: Sat Jul 2 23:44:33 2022 +0800
Force-remove container after DockerOperator execution (#23160)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
Co-authored-by: eladkal <45...@users.noreply.github.com>
Co-authored-by: park.z <pa...@bybit.com>
---
airflow/providers/docker/operators/docker.py | 24 ++++++++++++++++++----
airflow/providers/docker/operators/docker_swarm.py | 5 +++--
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py
index 61f5f9e393..340285dcca 100644
--- a/airflow/providers/docker/operators/docker.py
+++ b/airflow/providers/docker/operators/docker.py
@@ -20,6 +20,7 @@ import ast
import io
import pickle
import tarfile
+import warnings
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Union
@@ -125,7 +126,7 @@ class DockerOperator(BaseOperator):
:param dns_search: Docker custom DNS search domain
:param auto_remove: Auto-removal of the container on daemon side when the
container's process exits.
- The default is False.
+ The default is never.
:param shm_size: Size of ``/dev/shm`` in bytes. The size must be
greater than 0. If omitted uses system default.
:param tty: Allocate pseudo-TTY to the container
@@ -175,7 +176,7 @@ class DockerOperator(BaseOperator):
docker_conn_id: Optional[str] = None,
dns: Optional[List[str]] = None,
dns_search: Optional[List[str]] = None,
- auto_remove: bool = False,
+ auto_remove: str = "never",
shm_size: Optional[int] = None,
tty: bool = False,
privileged: bool = False,
@@ -189,7 +190,20 @@ class DockerOperator(BaseOperator):
) -> None:
super().__init__(**kwargs)
self.api_version = api_version
- self.auto_remove = auto_remove
+ if type(auto_remove) == bool:
+ warnings.warn(
+ "bool value for auto_remove is deprecated, please use 'never', 'success', or 'force' instead",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ if str(auto_remove) == "False":
+ self.auto_remove = "never"
+ elif str(auto_remove) == "True":
+ self.auto_remove = "success"
+ elif str(auto_remove) in ("never", "success", "force"):
+ self.auto_remove = auto_remove
+ else:
+ raise ValueError("unsupported auto_remove option, use 'never', 'success', or 'force' instead")
self.command = command
self.container_name = container_name
self.cpus = cpus
@@ -334,8 +348,10 @@ class DockerOperator(BaseOperator):
return None
return None
finally:
- if self.auto_remove:
+ if self.auto_remove == "success":
self.cli.remove_container(self.container['Id'])
+ elif self.auto_remove == "force":
+ self.cli.remove_container(self.container['Id'], force=True)
def _attempt_to_retrieve_result(self):
"""
diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py
index 1fbb35d8d8..451af6179c 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -160,11 +160,12 @@ class DockerSwarmOperator(DockerOperator):
self.log.info('Service status before exiting: %s', self._service_status())
break
+ self.log.info("auto_removeauto_removeauto_removeauto_removeauto_remove : %s", str(self.auto_remove))
if self.service and self._service_status() != 'complete':
- if self.auto_remove:
+ if self.auto_remove == "success":
self.cli.remove_service(self.service['ID'])
raise AirflowException('Service did not complete: ' + repr(self.service))
- elif self.auto_remove:
+ elif self.auto_remove == "success":
if not self.service:
raise Exception("The 'service' should be initialized before!")
self.cli.remove_service(self.service['ID'])