You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by ma...@apache.org on 2018/08/28 19:16:43 UTC

[incubator-mxnet] branch master updated: A solution to prevent zombie containers locally and in CI (#12381)

This is an automated email from the ASF dual-hosted git repository.

marcoabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/master by this push:
     new e2a3eef  A solution to prevent zombie containers locally and in CI (#12381)
e2a3eef is described below

commit e2a3eef349cb6643c08a7840d8cbd43b38fedfd5
Author: Pedro Larroy <92...@users.noreply.github.com>
AuthorDate: Tue Aug 28 21:16:31 2018 +0200

    A solution to prevent zombie containers locally and in CI (#12381)
    
    Fix pylint, mypy, and pycharm code inspection warnings
---
 ci/README.md |  14 +++
 ci/build.py  | 304 ++++++++++++++++++++++++++++++++++++++++++++---------------
 2 files changed, 243 insertions(+), 75 deletions(-)

diff --git a/ci/README.md b/ci/README.md
index 548e9cb..6930875 100644
--- a/ci/README.md
+++ b/ci/README.md
@@ -59,6 +59,20 @@ To work inside a container with a shell you can do:
 When building, the artifacts are located in the build/ directory in the project root. In case
 `build.py -a` is invoked, the artifacts are located in build.<platform>/
 
+# Docker container cleanup (Zombie containers)
+Docker has a client-server architecture, so when the program that is executing the docker client
+dies or receieves a signal, the container keeps running as it's started by the docker daemon.
+We implement signal handlers that catch sigterm and sigint and cleanup containers before exit. In
+Jenkins there's not enough time between sigterm and sigkill so we guarantee that containers are not
+left running by propagating environment variables used by the Jenkins process tree killer to
+identify which process to kill when the job is stopped. This has the effect of stopping the
+container given that the process inside the container is terminated.
+
+How to test this is working propperly: On the console you can hit ^C while a container is running
+(not just building) and see that the container is stopped by running `docker ps` on another
+terminal. In Jenkins this has been tested by stopping the job which has containers running and
+verifying that the container stops shortly afterwards by running docker ps.
+
 ## Add a platform
 
 To add a platform, you should add the appropriate dockerfile in
diff --git a/ci/build.py b/ci/build.py
index f1a5e99..df9e97b 100755
--- a/ci/build.py
+++ b/ci/build.py
@@ -23,26 +23,67 @@
 """
 
 __author__ = 'Marco de Abreu, Kellen Sunderland, Anton Chernov, Pedro Larroy'
-__version__ = '0.2'
+__version__ = '0.3'
 
 import argparse
 import glob
 import logging
+import os
 import re
 import shutil
 import subprocess
 import sys
 import tempfile
-from copy import deepcopy
 from itertools import chain
-from subprocess import call, check_call, check_output
+from subprocess import check_call, check_output
 from typing import *
 from util import *
+import docker
+import docker.models
+import docker.errors
+import signal
+import atexit
 import pprint
-import requests
 
 
-CCACHE_MAXSIZE = '500G'
+class Cleanup:
+    """A class to cleanup containers"""
+    def __init__(self):
+        self.containers = set()
+        self.docker_stop_timeout = 3
+
+    def add_container(self, container: docker.models.containers.Container):
+        assert isinstance(container, docker.models.containers.Container)
+        self.containers.add(container)
+
+    def remove_container(self, container: docker.models.containers.Container):
+        assert isinstance(container, docker.models.containers.Container)
+        self.containers.remove(container)
+
+    def _cleanup_containers(self):
+        if self.containers:
+            logging.warning("Cleaning up containers")
+        else:
+            return
+        # noinspection PyBroadException
+        try:
+            stop_timeout = int(os.environ.get("DOCKER_STOP_TIMEOUT", self.docker_stop_timeout))
+        except Exception:
+            stop_timeout = 3
+        for container in self.containers:
+            try:
+                container.stop(timeout=stop_timeout)
+                logging.info("☠: stopped container %s", trim_container_id(container.id))
+                container.remove()
+                logging.info("🚽: removed container %s", trim_container_id(container.id))
+            except Exception as e:
+                logging.exception(e)
+        self.containers.clear()
+        logging.info("Cleaning up containers finished.")
+
+    def __call__(self):
+        """Perform cleanup"""
+        self._cleanup_containers()
 
 
 def get_dockerfiles_path():
@@ -115,7 +156,10 @@ def build_docker(platform: str, docker_binary: str, registry: str, num_retries:
     run_cmd()
     # Get image id by reading the tag. It's guaranteed (except race condition) that the tag exists. Otherwise, the
     # check_call would have failed
-    return _get_local_image_id(docker_binary=docker_binary, docker_tag=tag)
+    image_id = _get_local_image_id(docker_binary=docker_binary, docker_tag=tag)
+    if not image_id:
+        raise FileNotFoundError('Unable to find docker image id matching with {}'.format(tag))
+    return image_id
 
 
 def _get_local_image_id(docker_binary, docker_tag):
@@ -137,10 +181,11 @@ def buildir() -> str:
 
 
 def default_ccache_dir() -> str:
+    """:return: ccache directory for the current platform"""
     # Share ccache across containers
     if 'CCACHE_DIR' in os.environ:
+        ccache_dir = os.path.realpath(os.environ['CCACHE_DIR'])
         try:
-            ccache_dir = os.path.realpath(os.environ['CCACHE_DIR'])
             os.makedirs(ccache_dir, exist_ok=True)
             return ccache_dir
         except PermissionError:
@@ -154,14 +199,41 @@ def default_ccache_dir() -> str:
     return os.path.join(tempfile.gettempdir(), "ci_ccache")
 
 
+def trim_container_id(cid):
+    """:return: trimmed container id"""
+    return cid[:12]
+
+
 def container_run(platform: str,
-                  docker_binary: str,
+                  nvidia_runtime: bool,
                   docker_registry: str,
                   shared_memory_size: str,
                   local_ccache_dir: str,
                   command: List[str],
-                  dry_run: bool = False,
-                  interactive: bool = False) -> int:
+                  cleanup: Cleanup,
+                  dry_run: bool = False) -> int:
+    """Run command in a container"""
+    container_wait_s = 600
+    #
+    # Environment setup
+    #
+    environment = {
+        'CCACHE_MAXSIZE': '500G',
+        'CCACHE_TEMPDIR': '/tmp/ccache',  # temp dir should be local and not shared
+        'CCACHE_DIR': '/work/ccache',  # this path is inside the container as /work/ccache is
+                                       # mounted
+        'CCACHE_LOGFILE': '/tmp/ccache.log',  # a container-scoped log, useful for ccache
+                                              # verification.
+    }
+    # These variables are passed to the container to the process tree killer can find runaway
+    # process inside the container
+    # https://wiki.jenkins.io/display/JENKINS/ProcessTreeKiller
+    # https://github.com/jenkinsci/jenkins/blob/578d6bacb33a5e99f149de504c80275796f0b231/core/src/main/java/hudson/model/Run.java#L2393
+    #
+    jenkins_env_vars = ['BUILD_NUMBER', 'BUILD_ID', 'BUILD_TAG']
+    environment.update({k: os.environ[k] for k in jenkins_env_vars if k in os.environ})
+    environment.update({k: os.environ[k] for k in ['CCACHE_MAXSIZE'] if k in os.environ})
+
     tag = get_docker_tag(platform=platform, registry=docker_registry)
     mx_root = get_mxnet_root()
     local_build_folder = buildir()
@@ -169,39 +241,107 @@ def container_run(platform: str,
     os.makedirs(local_build_folder, exist_ok=True)
     os.makedirs(local_ccache_dir, exist_ok=True)
     logging.info("Using ccache directory: %s", local_ccache_dir)
-    runlist = [docker_binary, 'run', '--rm', '-t',
-               '--shm-size={}'.format(shared_memory_size),
-               '-v', "{}:/work/mxnet".format(mx_root),  # mount mxnet root
-               '-v', "{}:/work/build".format(local_build_folder),  # mount mxnet/build for storing build artifacts
-               '-v', "{}:/work/ccache".format(local_ccache_dir),
-               '-u', '{}:{}'.format(os.getuid(), os.getgid()),
-               '-e', 'CCACHE_MAXSIZE={}'.format(CCACHE_MAXSIZE),
-               '-e', 'CCACHE_TEMPDIR=/tmp/ccache',  # temp dir should be local and not shared
-               '-e', "CCACHE_DIR=/work/ccache",  # this path is inside the container as /work/ccache is mounted
-               '-e', "CCACHE_LOGFILE=/tmp/ccache.log",  # a container-scoped log, useful for ccache verification.
-               tag]
-    runlist.extend(command)
-    cmd = '\\\n\t'.join(runlist)
+    docker_client = docker.from_env()
+    # Equivalent command
+    docker_cmd_list = [
+        get_docker_binary(nvidia_runtime),
+        'run',
+        '--rm',
+        '--shm-size={}'.format(shared_memory_size),
+        # mount mxnet root
+        '-v', "{}:/work/mxnet".format(mx_root),
+        # mount mxnet/build for storing build
+        '-v', "{}:/work/build".format(local_build_folder),
+        '-v', "{}:/work/ccache".format(local_ccache_dir),
+        '-u', '{}:{}'.format(os.getuid(), os.getgid()),
+        '-e', 'CCACHE_MAXSIZE={}'.format(environment['CCACHE_MAXSIZE']),
+        # temp dir should be local and not shared
+        '-e', 'CCACHE_TEMPDIR={}'.format(environment['CCACHE_TEMPDIR']),
+        # this path is inside the container as /work/ccache is mounted
+        '-e', "CCACHE_DIR={}".format(environment['CCACHE_DIR']),
+        # a container-scoped log, useful for ccache verification.
+        '-e', "CCACHE_LOGFILE={}".format(environment['CCACHE_LOGFILE']),
+        '-ti',
+        tag]
+    docker_cmd_list.extend(command)
+    docker_cmd = ' \\\n\t'.join(docker_cmd_list)
+    logging.info("Running %s in container %s", command, tag)
+    logging.info("Executing the equivalent of:\n%s\n", docker_cmd)
+    # return code of the command inside docker
     ret = 0
-    if not dry_run and not interactive:
-        logging.info("Running %s in container %s", command, tag)
-        logging.info("Executing:\n%s\n", cmd)
-        ret = call(runlist)
-
-    if not dry_run and interactive:
-        into_cmd = deepcopy(runlist)
-        # -ti can't be after the tag, as is interpreted as a command so hook it up after the -u argument
-        idx = into_cmd.index('-u') + 2
-        into_cmd[idx:idx] = ['-ti']
-        cmd = ' \\\n\t'.join(into_cmd)
-        logging.info("Executing:\n%s\n", cmd)
-        ret = call(into_cmd)
-
-    if not dry_run and not interactive and ret != 0:
-        logging.error("Running of command in container failed (%s):\n%s\n", ret, cmd)
-        logging.error("You can get into the container by adding the -i option")
-        raise subprocess.CalledProcessError(ret, cmd)
+    if not dry_run:
+        #############################
+        #
+        signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT, signal.SIGTERM})
+        # noinspection PyShadowingNames
+        runtime = None
+        if nvidia_runtime:
+            # noinspection PyShadowingNames
+            # runc is default (docker info | grep -i runtime)
+            runtime = 'nvidia'
+
+        container = docker_client.containers.run(
+            tag,
+            runtime=runtime,
+            detach=True,
+            command=command,
+            shm_size=shared_memory_size,
+            user='{}:{}'.format(os.getuid(), os.getgid()),
+            volumes={
+                mx_root:
+                    {'bind': '/work/mxnet', 'mode': 'rw'},
+                local_build_folder:
+                    {'bind': '/work/build', 'mode': 'rw'},
+                local_ccache_dir:
+                    {'bind': '/work/ccache', 'mode': 'rw'},
+            },
+            environment=environment)
+        logging.info("Started container: %s", trim_container_id(container.id))
+        # Race condition:
+        # If the previous call is interrupted then it's possible that the container is not cleaned up
+        # We avoid by masking the signals temporarily
+        cleanup.add_container(container)
+        signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT, signal.SIGTERM})
+        #
+        #############################
+
+        stream = container.logs(stream=True, stdout=True, stderr=True)
+        sys.stdout.flush()
+        for chunk in stream:
+            sys.stdout.buffer.write(chunk)
+            sys.stdout.buffer.flush()
+        sys.stdout.flush()
+        stream.close()
+        try:
+            logging.info("Waiting for status of container %s for %d s.",
+                         trim_container_id(container.id),
+                         container_wait_s)
+            wait_result = container.wait(timeout=container_wait_s)
+            logging.info("Container exit status: %s", wait_result)
+            ret = wait_result.get('StatusCode', 200)
+        except Exception as e:
+            logging.exception(e)
+            ret = 150
+
+        # Stop
+        try:
+            logging.info("Stopping container: %s", trim_container_id(container.id))
+            container.stop()
+        except Exception as e:
+            logging.exception(e)
+            ret = 151
 
+        # Remove
+        try:
+            logging.info("Removing container: %s", trim_container_id(container.id))
+            container.remove()
+        except Exception as e:
+            logging.exception(e)
+            ret = 152
+        cleanup.remove_container(container)
+        containers = docker_client.containers.list()
+        if containers:
+            logging.info("Other running containers: %s", [trim_container_id(x.id) for x in containers])
     return ret
 
 
@@ -210,12 +350,13 @@ def list_platforms() -> str:
 
 
 def load_docker_cache(tag, docker_registry) -> None:
+    """Imports tagged container from the given docker registry"""
     if docker_registry:
+        # noinspection PyBroadException
         try:
             import docker_cache
             logging.info('Docker cache download is enabled from registry %s', docker_registry)
             docker_cache.load_docker_cache(registry=docker_registry, docker_tag=tag)
-        # noinspection PyBroadException
         except Exception:
             logging.exception('Unable to retrieve Docker cache. Continue without...')
     else:
@@ -231,6 +372,7 @@ def log_environment():
 
 
 def script_name() -> str:
+    """:returns: script name with leading paths removed"""
     return os.path.split(sys.argv[0])[1]
 
 
@@ -274,10 +416,6 @@ def main() -> int:
                         help="print docker run command for manual inspection",
                         action='store_true')
 
-    parser.add_argument("-i", "--interactive",
-                        help="go in a shell inside the container",
-                        action='store_true')
-
     parser.add_argument("-d", "--docker-registry",
                         help="Dockerhub registry name to retrieve cache from. Default is 'mxnetci'",
                         default='mxnetci',
@@ -299,7 +437,7 @@ def main() -> int:
 
     parser.add_argument("--ccache-dir",
                         default=default_ccache_dir(),
-                        help="Ccache directory",
+                        help="ccache directory",
                         type=str)
 
     args = parser.parse_args()
@@ -310,6 +448,20 @@ def main() -> int:
     command = list(chain(*args.command))
     docker_binary = get_docker_binary(args.nvidiadocker)
 
+    # Cleanup on signals and exit
+    cleanup = Cleanup()
+
+    def signal_handler(signum, _):
+        signal.pthread_sigmask(signal.SIG_BLOCK, {signum})
+        logging.warning("Signal %d received, cleaning up...", signum)
+        cleanup()
+        logging.warning("done. Exiting with error.")
+        sys.exit(1)
+
+    atexit.register(cleanup)
+    signal.signal(signal.SIGTERM, signal_handler)
+    signal.signal(signal.SIGINT, signal_handler)
+
     if args.list:
         print(list_platforms())
     elif args.platform:
@@ -323,38 +475,42 @@ def main() -> int:
             logging.warning("Container was just built. Exiting due to build-only.")
             return 0
 
+        # noinspection PyUnusedLocal
+        ret = 0
         if command:
-            container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size,
-                          command=command, docker_registry=args.docker_registry,
-                          local_ccache_dir=args.ccache_dir, interactive=args.interactive)
+            ret = container_run(
+                platform=platform, nvidia_runtime=args.nvidiadocker,
+                shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry,
+                local_ccache_dir=args.ccache_dir, cleanup=cleanup)
         elif args.print_docker_run:
-            print(container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size,
-                                command=[], dry_run=True, docker_registry=args.docker_registry,
-                                local_ccache_dir=args.ccache_dir))
-        elif args.interactive:
-            container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size,
-                          command=command, docker_registry=args.docker_registry,
-                          local_ccache_dir=args.ccache_dir, interactive=args.interactive)
-
+            command = []
+            ret = container_run(
+                platform=platform, nvidia_runtime=args.nvidiadocker,
+                shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry,
+                local_ccache_dir=args.ccache_dir, dry_run=True, cleanup=cleanup)
         else:
             # With no commands, execute a build function for the target platform
-            assert not args.interactive, "when running with -i must provide a command"
-            cmd = ["/work/mxnet/ci/docker/runtime_functions.sh", "build_{}".format(platform)]
-            logging.info("No command specified, trying default build: %s", ' '.join(cmd))
-            container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size,
-                          command=cmd, docker_registry=args.docker_registry,
-                          local_ccache_dir=args.ccache_dir)
+            command = ["/work/mxnet/ci/docker/runtime_functions.sh", "build_{}".format(platform)]
+            logging.info("No command specified, trying default build: %s", ' '.join(command))
+            ret = container_run(
+                platform=platform, nvidia_runtime=args.nvidiadocker,
+                shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry,
+                local_ccache_dir=args.ccache_dir, cleanup=cleanup)
+
+        if ret != 0:
+            logging.critical("Execution of %s failed with status: %d", command, ret)
+            return ret
 
     elif args.all:
         platforms = get_platforms()
-        logging.info("Building for all architectures: {}".format(platforms))
+        logging.info("Building for all architectures: %s", platforms)
         logging.info("Artifacts will be produced in the build/ directory.")
         for platform in platforms:
             tag = get_docker_tag(platform=platform, registry=args.docker_registry)
             if use_cache():
                 load_docker_cache(tag=tag, docker_registry=args.docker_registry)
-            build_docker(platform, docker_binary, args.docker_registry, num_retries=args.docker_build_retries,
-                         use_cache=use_cache())
+            build_docker(platform, docker_binary=docker_binary, registry=args.docker_registry,
+                         num_retries=args.docker_build_retries, use_cache=use_cache())
             if args.build_only:
                 continue
             shutil.rmtree(buildir(), ignore_errors=True)
@@ -362,11 +518,13 @@ def main() -> int:
             plat_buildir = os.path.abspath(os.path.join(get_mxnet_root(), '..',
                                                         "mxnet_{}".format(build_platform)))
             if os.path.exists(plat_buildir):
-                logging.warning("{} already exists, skipping".format(plat_buildir))
+                logging.warning("%s already exists, skipping", plat_buildir)
                 continue
             command = ["/work/mxnet/ci/docker/runtime_functions.sh", build_platform]
-            container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size,
-                          command=command, docker_registry=args.docker_registry, local_ccache_dir=args.ccache_dir)
+            container_run(
+                platform=platform, nvidia_runtime=args.nvidiadocker,
+                shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry,
+                local_ccache_dir=args.ccache_dir, cleanup=cleanup)
             shutil.move(buildir(), plat_buildir)
             logging.info("Built files left in: %s", plat_buildir)
 
@@ -389,13 +547,9 @@ Examples:
 
     Will print a docker run command to get inside the container in a shell
 
-./build.py -p armv7 --interactive
-
-    Will execute a shell into the container
-
 ./build.py -a
 
-    Builds for all platforms and leaves artifacts in build_<platform>.
+    Builds for all platforms and leaves artifacts in build_<platform>
 
     """)