You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/07/02 15:44:46 UTC

[airflow] branch main updated: Force-remove container after DockerOperator execution (#23160)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 237d2225d6 Force-remove container after DockerOperator execution (#23160)
237d2225d6 is described below

commit 237d2225d6b92a5012a025ece93cd062382470ed
Author: zengbotang <zb...@126.com>
AuthorDate: Sat Jul 2 23:44:33 2022 +0800

    Force-remove container after DockerOperator execution (#23160)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    Co-authored-by: eladkal <45...@users.noreply.github.com>
    Co-authored-by: park.z <pa...@bybit.com>
---
 airflow/providers/docker/operators/docker.py       | 24 ++++++++++++++++++----
 airflow/providers/docker/operators/docker_swarm.py |  5 +++--
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py
index 61f5f9e393..340285dcca 100644
--- a/airflow/providers/docker/operators/docker.py
+++ b/airflow/providers/docker/operators/docker.py
@@ -20,6 +20,7 @@ import ast
 import io
 import pickle
 import tarfile
+import warnings
 from tempfile import TemporaryDirectory
 from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Union
 
@@ -125,7 +126,7 @@ class DockerOperator(BaseOperator):
     :param dns_search: Docker custom DNS search domain
     :param auto_remove: Auto-removal of the container on daemon side when the
         container's process exits.
-        The default is False.
+        The default is never.
     :param shm_size: Size of ``/dev/shm`` in bytes. The size must be
         greater than 0. If omitted uses system default.
     :param tty: Allocate pseudo-TTY to the container
@@ -175,7 +176,7 @@ class DockerOperator(BaseOperator):
         docker_conn_id: Optional[str] = None,
         dns: Optional[List[str]] = None,
         dns_search: Optional[List[str]] = None,
-        auto_remove: bool = False,
+        auto_remove: str = "never",
         shm_size: Optional[int] = None,
         tty: bool = False,
         privileged: bool = False,
@@ -189,7 +190,20 @@ class DockerOperator(BaseOperator):
     ) -> None:
         super().__init__(**kwargs)
         self.api_version = api_version
-        self.auto_remove = auto_remove
+        if type(auto_remove) == bool:
+            warnings.warn(
+                "bool value for auto_remove is deprecated, please use 'never', 'success', or 'force' instead",
+                DeprecationWarning,
+                stacklevel=2,
+            )
+        if str(auto_remove) == "False":
+            self.auto_remove = "never"
+        elif str(auto_remove) == "True":
+            self.auto_remove = "success"
+        elif str(auto_remove) in ("never", "success", "force"):
+            self.auto_remove = auto_remove
+        else:
+            raise ValueError("unsupported auto_remove option, use 'never', 'success', or 'force' instead")
         self.command = command
         self.container_name = container_name
         self.cpus = cpus
@@ -334,8 +348,10 @@ class DockerOperator(BaseOperator):
                     return None
             return None
         finally:
-            if self.auto_remove:
+            if self.auto_remove == "success":
                 self.cli.remove_container(self.container['Id'])
+            elif self.auto_remove == "force":
+                self.cli.remove_container(self.container['Id'], force=True)
 
     def _attempt_to_retrieve_result(self):
         """
diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py
index 1fbb35d8d8..451af6179c 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -160,11 +160,12 @@ class DockerSwarmOperator(DockerOperator):
                 self.log.info('Service status before exiting: %s', self._service_status())
                 break
 
+        self.log.info("auto_removeauto_removeauto_removeauto_removeauto_remove : %s", str(self.auto_remove))
         if self.service and self._service_status() != 'complete':
-            if self.auto_remove:
+            if self.auto_remove == "success":
                 self.cli.remove_service(self.service['ID'])
             raise AirflowException('Service did not complete: ' + repr(self.service))
-        elif self.auto_remove:
+        elif self.auto_remove == "success":
             if not self.service:
                 raise Exception("The 'service' should be initialized before!")
             self.cli.remove_service(self.service['ID'])