You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by ma...@apache.org on 2018/06/03 13:45:57 UTC

[incubator-mxnet] branch master updated: [MXNET-454] Move distributed Docker cache from S3 to Docker Hub (#11018)

This is an automated email from the ASF dual-hosted git repository.

marcoabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/master by this push:
     new f107397  [MXNET-454] Move distributed Docker cache from S3 to Docker Hub (#11018)
f107397 is described below

commit f107397b753ff08bb19e7572bc1a9ebedd832f88
Author: Marco de Abreu <ma...@users.noreply.github.com>
AuthorDate: Sun Jun 3 15:45:30 2018 +0200

    [MXNET-454] Move distributed Docker cache from S3 to Docker Hub (#11018)
    
    * Add test cases for Docker cache and move from S3 to Docker registry
    
    * Remove unnecessary args
    
    * Fix lint
    
    * Address review
---
 Jenkinsfile                 |   2 +-
 ci/Jenkinsfile_docker_cache |   8 +-
 ci/build.py                 |  67 ++++++-----
 ci/docker_cache.py          | 264 ++++++++++++++++----------------------------
 ci/test_docker_cache.py     | 252 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 391 insertions(+), 202 deletions(-)

diff --git a/Jenkinsfile b/Jenkinsfile
index c3fe535..288f9a4 100644
--- a/Jenkinsfile
+++ b/Jenkinsfile
@@ -93,7 +93,7 @@ echo ${libs} | sed -e 's/,/ /g' | xargs md5sum
 }
 
 def docker_run(platform, function_name, use_nvidia, shared_mem = '500m') {
-  def command = "ci/build.py --download-docker-cache --docker-cache-bucket ${env.DOCKER_CACHE_BUCKET} %USE_NVIDIA% --platform %PLATFORM% --shm-size %SHARED_MEM% /work/runtime_functions.sh %FUNCTION_NAME%"
+  def command = "ci/build.py --docker-registry ${env.DOCKER_CACHE_REGISTRY} %USE_NVIDIA% --platform %PLATFORM% --shm-size %SHARED_MEM% /work/runtime_functions.sh %FUNCTION_NAME%"
   command = command.replaceAll('%USE_NVIDIA%', use_nvidia ? '--nvidiadocker' : '')
   command = command.replaceAll('%PLATFORM%', platform)
   command = command.replaceAll('%FUNCTION_NAME%', function_name)
diff --git a/ci/Jenkinsfile_docker_cache b/ci/Jenkinsfile_docker_cache
index 8a0428b..eba3a49 100644
--- a/ci/Jenkinsfile_docker_cache
+++ b/ci/Jenkinsfile_docker_cache
@@ -49,11 +49,11 @@ def init_git() {
 
 try {
   stage("Docker cache build & publish") {
-    node('mxnetlinux-cpu') {
+    node('restricted-mxnetlinux-cpu') {
       ws('workspace/docker_cache') {
         timeout(time: total_timeout, unit: 'MINUTES') {
           init_git()
-          sh "ci/docker_cache.py --docker-cache-bucket ${env.DOCKER_CACHE_BUCKET}"
+          sh "ci/docker_cache.py --docker-registry ${env.DOCKER_CACHE_REGISTRY}"
         }
       }
     }
@@ -62,13 +62,13 @@ try {
   // set build status to success at the end
   currentBuild.result = "SUCCESS"
 } catch (caughtError) {
-  node("mxnetlinux-cpu") {
+  node("restricted-mxnetlinux-cpu") {
     sh "echo caught ${caughtError}"
     err = caughtError
     currentBuild.result = "FAILURE"
   }
 } finally {
-  node("mxnetlinux-cpu") {
+  node("restricted-mxnetlinux-cpu") {
     // Only send email if master failed
     if (currentBuild.result == "FAILURE" && env.BRANCH_NAME == "master") {
       emailext body: 'Build for MXNet branch ${BRANCH_NAME} has broken. Please view the build at ${BUILD_URL}', replyTo: '${EMAIL}', subject: '[BUILD FAILED] Branch ${BRANCH_NAME} build ${BUILD_NUMBER}', to: '${EMAIL}'
diff --git a/ci/build.py b/ci/build.py
index deae1d7..e52fa79 100755
--- a/ci/build.py
+++ b/ci/build.py
@@ -48,8 +48,8 @@ def get_platforms(path: Optional[str]="docker"):
     return platforms
 
 
-def get_docker_tag(platform: str) -> str:
-    return "mxnet/build.{0}".format(platform)
+def get_docker_tag(platform: str, registry: str) -> str:
+    return "{0}/build.{1}".format(registry, platform)
 
 
 def get_dockerfile(platform: str, path="docker") -> str:
@@ -60,19 +60,19 @@ def get_docker_binary(use_nvidia_docker: bool) -> str:
     return "nvidia-docker" if use_nvidia_docker else "docker"
 
 
-def build_docker(platform: str, docker_binary: str) -> None:
+def build_docker(platform: str, docker_binary: str, registry: str) -> None:
     """
     Build a container for the given platform
     :param platform: Platform
     :param docker_binary: docker binary to use (docker/nvidia-docker)
+    :param registry: Dockerhub registry name
     :return: Id of the top level image
     """
 
-    tag = get_docker_tag(platform)
+    tag = get_docker_tag(platform=platform, registry=registry)
     logging.info("Building container tagged '%s' with %s", tag, docker_binary)
     cmd = [docker_binary, "build",
         "-f", get_dockerfile(platform),
-        "--rm=false",  # Keep intermediary layers to prime the build cache
         "--build-arg", "USER_ID={}".format(os.getuid()),
         "--cache-from", tag,
         "-t", tag,
@@ -118,11 +118,12 @@ def buildir() -> str:
 
 def container_run(platform: str,
                   docker_binary: str,
+                  docker_registry: str,
                   shared_memory_size: str,
                   command: List[str],
                   dry_run: bool = False,
                   into_container: bool = False) -> str:
-    tag = get_docker_tag(platform)
+    tag = get_docker_tag(platform=platform, registry=docker_registry)
     mx_root = get_mxnet_root()
     local_build_folder = buildir()
     # We need to create it first, otherwise it will be created by the docker daemon with root only permissions
@@ -159,6 +160,19 @@ def container_run(platform: str,
 def list_platforms() -> str:
     print("\nSupported platforms:\n{}".format('\n'.join(get_platforms())))
 
+
+def load_docker_cache(tag, docker_registry) -> None:
+    if docker_registry:
+        try:
+            import docker_cache
+            logging.info('Docker cache download is enabled')
+            docker_cache.load_docker_cache(registry=docker_registry, docker_tag=tag)
+        except Exception:
+            logging.exception('Unable to retrieve Docker cache. Continue without...')
+    else:
+        logging.info('Distributed docker cache disabled')
+
+
 def main() -> int:
     # We need to be in the same directory than the script so the commands in the dockerfiles work as
     # expected. But the script can be invoked from a different path
@@ -207,12 +221,9 @@ def main() -> int:
                         help="go in a shell inside the container",
                         action='store_true')
 
-    parser.add_argument("--download-docker-cache",
-                        help="Download the docker cache from our central repository instead of rebuilding locally",
-                        action='store_true')
-
-    parser.add_argument("--docker-cache-bucket",
-                        help="S3 docker cache bucket, e.g. mxnet-ci-docker-cache",
+    parser.add_argument("--docker-registry",
+                        help="Dockerhub registry name to retrieve cache from",
+                        default='mxnetci',
                         type=str)
 
     parser.add_argument("command",
@@ -220,6 +231,7 @@ def main() -> int:
                         nargs='*', action='append', type=str)
 
     args = parser.parse_args()
+    docker_registry = args.docker_registry
     command = list(chain(*args.command))
     docker_binary = get_docker_binary(args.nvidiadocker)
     shared_memory_size = args.shared_memory_size
@@ -229,44 +241,43 @@ def main() -> int:
         list_platforms()
     elif args.platform:
         platform = args.platform
-        tag = get_docker_tag(platform)
-        if args.download_docker_cache:
-            import docker_cache
-            logging.info('Docker cache download is enabled')
-            docker_cache.load_docker_cache(bucket_name=args.docker_cache_bucket, docker_tag=tag)
-        build_docker(platform, docker_binary)
+        tag = get_docker_tag(platform=platform, registry=docker_registry)
+        load_docker_cache(tag=tag, docker_registry=args.docker_registry)
+        build_docker(platform, docker_binary, registry=docker_registry)
         if args.build_only:
             logging.warning("Container was just built. Exiting due to build-only.")
             return 0
 
         if command:
-            container_run(platform, docker_binary, shared_memory_size, command)
+            container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=shared_memory_size,
+                          command=command, docker_registry=docker_registry)
         elif args.print_docker_run:
-            print(container_run(platform, docker_binary, shared_memory_size, [], True))
+            print(container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=shared_memory_size,
+                                command=[], dry_run=True, docker_registry=docker_registry))
         elif args.into_container:
-            container_run(platform, docker_binary, shared_memory_size, [], False, True)
+            container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=shared_memory_size,
+                          command=[], dry_run=False, into_container=True, docker_registry=docker_registry)
         else:
             cmd = ["/work/mxnet/ci/docker/runtime_functions.sh", "build_{}".format(platform)]
             logging.info("No command specified, trying default build: %s", ' '.join(cmd))
-            container_run(platform, docker_binary, shared_memory_size, cmd)
+            container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=shared_memory_size,
+                          command=cmd, docker_registry=docker_registry)
 
     elif args.all:
         platforms = get_platforms()
         logging.info("Building for all architectures: {}".format(platforms))
         logging.info("Artifacts will be produced in the build/ directory.")
         for platform in platforms:
-            if args.download_docker_cache:
-                import docker_cache
-                tag = get_docker_tag(platform)
-                logging.info('Docker cache download is enabled')
-                docker_cache.load_docker_cache(bucket_name=args.docker_cache_bucket, docker_tag=tag)
+            tag = get_docker_tag(platform=platform, registry=docker_registry)
+            load_docker_cache(tag=tag, docker_registry=args.docker_registry)
             build_docker(platform, docker_binary)
             if args.build_only:
                 continue
             build_platform = "build_{}".format(platform)
             cmd = ["/work/mxnet/ci/docker/runtime_functions.sh", build_platform]
             shutil.rmtree(buildir(), ignore_errors=True)
-            container_run(platform, docker_binary, shared_memory_size, cmd)
+            container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=shared_memory_size,
+                          command=cmd, docker_registry=docker_registry)
             plat_buildir = os.path.join(get_mxnet_root(), build_platform)
             shutil.move(buildir(), plat_buildir)
             logging.info("Built files left in: %s", plat_buildir)
diff --git a/ci/docker_cache.py b/ci/docker_cache.py
index 7fdfbcf..16abb9e 100755
--- a/ci/docker_cache.py
+++ b/ci/docker_cache.py
@@ -28,237 +28,164 @@ import os
 import logging
 import argparse
 import sys
-import boto3
-import tempfile
-import pprint
-import threading
-import build as build_util
-import botocore
 import subprocess
-from botocore.handlers import disable_signing
-from subprocess import call, check_call, CalledProcessError
+import json
+import build as build_util
 from joblib import Parallel, delayed
 
-S3_METADATA_IMAGE_ID_KEY = 'docker-image-id'
-LOG_PROGRESS_PERCENTAGE_THRESHOLD = 10
-
-cached_aws_session = None
-
-
-class ProgressPercentage(object):
-    def __init__(self, object_name, size):
-        self._object_name = object_name
-        self._size = size
-        self._seen_so_far = 0
-        self._last_percentage = 0
-        self._lock = threading.Lock()
 
-    def __call__(self, bytes_amount) -> None:
-        # To simplify we'll assume this is hooked up
-        # to a single filename.
-        with self._lock:
-            self._seen_so_far += bytes_amount
-            percentage = int((self._seen_so_far / self._size) * 100)
-            if (percentage - self._last_percentage) >= LOG_PROGRESS_PERCENTAGE_THRESHOLD:
-                self._last_percentage = percentage
-                logging.info('{}% of {}'.format(percentage, self._object_name))
 
-
-def build_save_containers(platforms, bucket) -> int:
+def build_save_containers(platforms, registry, load_cache) -> int:
     """
     Entry point to build and upload all built dockerimages in parallel
     :param platforms: List of platforms
-    :param bucket: S3 bucket name
+    :param registry: Docker registry name
+    :param load_cache: Load cache before building
     :return: 1 if error occurred, 0 otherwise
     """
     if len(platforms) == 0:
         return 0
 
     platform_results = Parallel(n_jobs=len(platforms), backend="multiprocessing")(
-        delayed(_build_save_container)(platform, bucket)
+        delayed(_build_save_container)(platform, registry, load_cache)
         for platform in platforms)
 
     is_error = False
     for platform_result in platform_results:
         if platform_result is not None:
-            logging.error('Failed to generate {}'.format(platform_result))
+            logging.error('Failed to generate %s', platform_result)
             is_error = True
 
     return 1 if is_error else 0
 
 
-def _build_save_container(platform, bucket) -> str:
+def _build_save_container(platform, registry, load_cache) -> str:
     """
     Build image for passed platform and upload the cache to the specified S3 bucket
     :param platform: Platform
-    :param bucket: Target s3 bucket
+    :param registry: Docker registry name
+    :param load_cache: Load cache before building
     :return: Platform if failed, None otherwise
     """
-    docker_tag = build_util.get_docker_tag(platform)
+    docker_tag = build_util.get_docker_tag(platform=platform, registry=registry)
 
     # Preload cache
-    # TODO: Allow to disable this in order to allow clean rebuilds
-    load_docker_cache(bucket_name=bucket, docker_tag=docker_tag)
+    if load_cache:
+        load_docker_cache(registry=registry, docker_tag=docker_tag)
 
     # Start building
-    logging.debug('Building {} as {}'.format(platform, docker_tag))
+    logging.debug('Building %s as %s', platform, docker_tag)
     try:
-        image_id = build_util.build_docker(docker_binary='docker', platform=platform)
-        logging.info('Built {} as {}'.format(docker_tag, image_id))
+        image_id = build_util.build_docker(docker_binary='docker', platform=platform, registry=registry)
+        logging.info('Built %s as %s', docker_tag, image_id)
 
-        # Compile and upload tarfile
-        _compile_upload_cache_file(bucket_name=bucket, docker_tag=docker_tag, image_id=image_id)
+        # Push cache to registry
+        _upload_image(registry=registry, docker_tag=docker_tag, image_id=image_id)
         return None
     except Exception:
-        logging.exception('Unexpected exception during build of {}'.format(docker_tag))
+        logging.exception('Unexpected exception during build of %s', docker_tag)
         return platform
         # Error handling is done by returning the errorous platform name. This is necessary due to
         # Parallel being unable to handle exceptions
 
 
-def _compile_upload_cache_file(bucket_name, docker_tag, image_id) -> None:
+def _upload_image(registry, docker_tag, image_id) -> None:
     """
     Upload the passed image by id, tag it with docker tag and upload to S3 bucket
-    :param bucket_name: S3 bucket name
+    :param registry: Docker registry name
     :param docker_tag: Docker tag
     :param image_id: Image id
     :return: None
     """
-    session = _get_aws_session()
-    s3_object = session.resource('s3').Object(bucket_name, docker_tag)
-
-    remote_image_id = _get_remote_image_id(s3_object)
-    if remote_image_id == image_id:
-        logging.info('{} ({}) for {} has not been updated - skipping'.format(docker_tag, image_id, docker_tag))
-        return
-    else:
-        logging.debug('Cached image {} differs from local {} for {}'.format(remote_image_id, image_id, docker_tag))
+    _login_dockerhub()
+    # We don't have to retag the image since it is already in the right format
+    logging.info('Uploading %s (%s) to %s', docker_tag, image_id, registry)
+    push_cmd = ['docker', 'push', docker_tag]
+    subprocess.check_call(push_cmd)
 
-    # Compile layers into tarfile
-    with tempfile.TemporaryDirectory() as temp_dir:
-        tar_file_path = _format_docker_cache_filepath(output_dir=temp_dir, docker_tag=docker_tag)
-        logging.debug('Writing layers of {} to {}'.format(docker_tag, tar_file_path))
-        history_cmd = ['docker', 'history', '-q', docker_tag]
 
-        image_ids_b = subprocess.check_output(history_cmd)
-        image_ids_str = image_ids_b.decode('utf-8').strip()
-        layer_ids = [id.strip() for id in image_ids_str.split('\n') if id != '<missing>']
-
-        # docker_tag is important to preserve the image name. Otherwise, the --cache-from feature will not be able to
-        # reference the loaded cache later on. The other layer ids are added to ensure all intermediary layers
-        # are preserved to allow resuming the cache at any point
-        cmd = ['docker', 'save', '-o', tar_file_path, docker_tag]
-        cmd.extend(layer_ids)
-        try:
-            check_call(cmd)
-        except CalledProcessError as e:
-            logging.error('Error during save of {} at {}. Command: {}'.
-                          format(docker_tag, tar_file_path, pprint.pprint(cmd)))
-            return
-
-        # Upload file
-        logging.info('Uploading {} to S3'.format(docker_tag))
-        with open(tar_file_path, 'rb') as data:
-            s3_object.upload_fileobj(
-                Fileobj=data,
-                Callback=ProgressPercentage(object_name=docker_tag, size=os.path.getsize(tar_file_path)),
-                ExtraArgs={"Metadata": {S3_METADATA_IMAGE_ID_KEY: image_id}})
-            logging.info('Uploaded {} to S3'.format(docker_tag))
-
-
-def _get_remote_image_id(s3_object) -> str:
+def _login_dockerhub():
     """
-    Get the image id of the docker cache which is represented by the S3 object
-    :param s3_object: S3 object
-    :return: Image id as string or None if object does not exist
+    Login to the Docker Hub account
+    :return: None
     """
-    try:
-        if S3_METADATA_IMAGE_ID_KEY in s3_object.metadata:
-            cached_image_id = s3_object.metadata[S3_METADATA_IMAGE_ID_KEY]
-            return cached_image_id
-        else:
-            logging.debug('No cached image available for {}'.format(s3_object.key))
-    except botocore.exceptions.ClientError as e:
-        if e.response['Error']['Code'] == "404":
-            logging.debug('{} does not exist in S3 yet'.format(s3_object.key))
-        else:
-            raise
+    dockerhub_credentials = _get_dockerhub_credentials()
+    login_cmd = ['docker', 'login', '--username', dockerhub_credentials['username'], '--password',
+                 dockerhub_credentials['password']]
+    subprocess.check_call(login_cmd)
 
-    return None
 
-
-def load_docker_cache(bucket_name, docker_tag) -> None:
+def load_docker_cache(registry, docker_tag) -> None:
     """
-    Load the precompiled docker cache from the passed S3 bucket
-    :param bucket_name: S3 bucket name
+    Load the precompiled docker cache from the registry
+    :param registry: Docker registry name
     :param docker_tag: Docker tag to load
     :return: None
     """
-    # Allow anonymous access
-    s3_resource = boto3.resource('s3')
-    s3_resource.meta.client.meta.events.register('choose-signer.s3.*', disable_signing)
-    s3_object = s3_resource.Object(bucket_name, docker_tag)
-
-    # Check if cache is still valid and exists
-    remote_image_id = _get_remote_image_id(s3_object)
-    if remote_image_id:
-        if _docker_layer_exists(remote_image_id):
-            logging.info('Local docker cache already present for {}'.format(docker_tag))
-            return
-        else:
-            logging.info('Local docker cache not present for {}'.format(docker_tag))
-
-        # Download using public S3 endpoint (without requiring credentials)
-        with tempfile.TemporaryDirectory() as temp_dir:
-            tar_file_path = os.path.join(temp_dir, 'layers.tar')
-            s3_object.download_file(
-                Filename=tar_file_path,
-                Callback=ProgressPercentage(object_name=docker_tag, size=s3_object.content_length))
-
-            # Load layers
-            cmd = ['docker', 'load', '-i', tar_file_path]
-            try:
-                check_call(cmd)
-                logging.info('Docker cache for {} loaded successfully'.format(docker_tag))
-            except CalledProcessError as e:
-                logging.error('Error during load of docker cache for {} at {}'.format(docker_tag, tar_file_path))
-                logging.exception(e)
-                return
-    else:
-        logging.info('No cached remote image of {} present'.format(docker_tag))
-
-
-def _docker_layer_exists(layer_id) -> bool:
-    """
-    Check if the docker cache contains the layer with the passed id
-    :param layer_id: layer id
-    :return: True if exists, False otherwise
-    """
-    cmd = ['docker', 'images', '-q']
-    image_ids_b = subprocess.check_output(cmd)
-    image_ids_str = image_ids_b.decode('utf-8').strip()
-    return layer_id in [id.strip() for id in image_ids_str.split('\n')]
+    # We don't have to retag the image since it's already in the right format
+    logging.info('Loading Docker cache for %s from %s', docker_tag, registry)
+    pull_cmd = ['docker', 'pull', docker_tag]
+    subprocess.call(pull_cmd)  # Don't throw an error if the image does not exist
 
 
-def _get_aws_session() -> boto3.Session:  # pragma: no cover
+def delete_local_docker_cache(docker_tag):
     """
-    Get the boto3 AWS session
-    :return: Session object
+    Delete the local docker cache for the entire docker image chain
+    :param docker_tag: Docker tag
+    :return: None
     """
-    global cached_aws_session
-    if cached_aws_session:
-        return cached_aws_session
-
-    session = boto3.Session()  # Uses IAM user credentials
-    cached_aws_session = session
-    return session
+    history_cmd = ['docker', 'history', '-q', docker_tag]
 
+    try:
+        image_ids_b = subprocess.check_output(history_cmd)
+        image_ids_str = image_ids_b.decode('utf-8').strip()
+        layer_ids = [id.strip() for id in image_ids_str.split('\n') if id != '<missing>']
 
-def _format_docker_cache_filepath(output_dir, docker_tag) -> str:
-    return os.path.join(output_dir, docker_tag.replace('/', '_') + '.tar')
+        delete_cmd = ['docker', 'image', 'rm', '--force']
+        delete_cmd.extend(layer_ids)
+        subprocess.check_call(delete_cmd)
+    except subprocess.CalledProcessError as error:
+        # Could be caused by the image not being present
+        logging.debug('Error during local cache deletion %s', error)
+
+
+def _get_dockerhub_credentials():  # pragma: no cover
+    import boto3
+    import botocore
+    secret_name = os.environ['DOCKERHUB_SECRET_NAME']
+    endpoint_url = os.environ['DOCKERHUB_SECRET_ENDPOINT_URL']
+    region_name = os.environ['DOCKERHUB_SECRET_ENDPOINT_REGION']
+
+    session = boto3.Session()
+    client = session.client(
+        service_name='secretsmanager',
+        region_name=region_name,
+        endpoint_url=endpoint_url
+    )
+    try:
+        get_secret_value_response = client.get_secret_value(
+            SecretId=secret_name
+        )
+    except botocore.exceptions.ClientError as client_error:
+        if client_error.response['Error']['Code'] == 'ResourceNotFoundException':
+            logging.exception("The requested secret %s was not found", secret_name)
+        elif client_error.response['Error']['Code'] == 'InvalidRequestException':
+            logging.exception("The request was invalid due to:")
+        elif client_error.response['Error']['Code'] == 'InvalidParameterException':
+            logging.exception("The request had invalid params:")
+        else:
+            raise
+    else:
+        secret = get_secret_value_response['SecretString']
+        secret_dict = json.loads(secret)
+        return secret_dict
 
 
 def main() -> int:
+    """
+    Utility to create and publish the Docker cache to Docker Hub
+    :return:
+    """
     # We need to be in the same directory than the script so the commands in the dockerfiles work as
     # expected. But the script can be invoked from a different path
     base = os.path.split(os.path.realpath(__file__))[0]
@@ -275,17 +202,16 @@ def main() -> int:
 
     logging.basicConfig(format='{}: %(asctime)-15s %(message)s'.format(script_name()))
 
-    parser = argparse.ArgumentParser(description="Utility for preserving and loading Docker cache",epilog="")
-    parser.add_argument("--docker-cache-bucket",
-                        help="S3 docker cache bucket, e.g. mxnet-ci-docker-cache",
+    parser = argparse.ArgumentParser(description="Utility for preserving and loading Docker cache", epilog="")
+    parser.add_argument("--docker-registry",
+                        help="Docker hub registry name",
                         type=str,
                         required=True)
 
     args = parser.parse_args()
 
     platforms = build_util.get_platforms()
-    _get_aws_session()  # Init AWS credentials
-    return build_save_containers(platforms=platforms, bucket=args.docker_cache_bucket)
+    return build_save_containers(platforms=platforms, registry=args.docker_registry, load_cache=True)
 
 
 if __name__ == '__main__':
diff --git a/ci/test_docker_cache.py b/ci/test_docker_cache.py
new file mode 100644
index 0000000..fa8833f
--- /dev/null
+++ b/ci/test_docker_cache.py
@@ -0,0 +1,252 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Distributed Docker cache tests
+"""
+
+import unittest.mock
+import tempfile
+import os
+import logging
+import subprocess
+import sys
+
+sys.path.append(os.path.dirname(__file__))
+import docker_cache
+import build as build_util
+
+DOCKERFILE_DIR = 'docker'
+DOCKER_REGISTRY_NAME = 'test_registry'
+DOCKER_REGISTRY_PORT = 5000
+DOCKER_REGISTRY_PATH = 'localhost:{}'.format(DOCKER_REGISTRY_PORT)
+
+class RedirectSubprocessOutput(object):
+    """
+    Redirect the output of all subprocess.call calls to a readable buffer instead of writing it to stdout/stderr.
+    The output can then be retrieved with get_output.
+    """
+    def __enter__(self):
+        self.buf_output = tempfile.TemporaryFile()
+
+        def trampoline(*popenargs, **kwargs):
+            self.call(*popenargs, **kwargs)
+
+        self.old_method = subprocess.call
+        subprocess.call = trampoline
+        return self
+
+    def __exit__(self, *args):
+        logging.info('Releasing docker output buffer:\n%s', self.get_output())
+        subprocess.call = self.old_method
+        self.buf_output.close()
+
+    def call(self, *popenargs, **kwargs):
+        """
+        Replace subprocess.call
+        :param popenargs:
+        :param timeout:
+        :param kwargs:
+        :return:
+        """
+        kwargs['stderr'] = subprocess.STDOUT
+        kwargs['stdout'] = self.buf_output
+        return self.old_method(*popenargs, **kwargs)
+
+    def get_output(self):
+        self.buf_output.seek(0)
+        return self.buf_output.read().decode('utf-8')
+
+
+class TestDockerCache(unittest.TestCase):
+    """
+    Test utility class
+    """
+    def setUp(self):
+        logging.getLogger().setLevel(logging.DEBUG)
+
+        # We need to be in the same directory than the script so the commands in the dockerfiles work as
+        # expected. But the script can be invoked from a different path
+        base = os.path.split(os.path.realpath(__file__))[0]
+        os.chdir(base)
+
+        # Stop in case previous execution was dirty
+        try:
+            self._stop_local_docker_registry()
+        except Exception:
+            pass
+
+        # Start up docker registry
+        self._start_local_docker_registry()
+
+    def tearDown(self):
+        # Stop docker registry
+        self._stop_local_docker_registry()
+
+    @classmethod
+    def _start_local_docker_registry(cls):
+        # https://docs.docker.com/registry/deploying/#run-a-local-registrys
+        start_cmd = [
+            'docker', 'run', '-d', '-p', '{}:{}'.format(DOCKER_REGISTRY_PORT, DOCKER_REGISTRY_PORT),
+            '--name', DOCKER_REGISTRY_NAME, 'registry:2'
+        ]
+        subprocess.check_call(start_cmd)
+
+    @classmethod
+    def _stop_local_docker_registry(cls):
+        # https://docs.docker.com/registry/deploying/#run-a-local-registry
+        stop_cmd = ['docker', 'container', 'stop', DOCKER_REGISTRY_NAME]
+        subprocess.check_call(stop_cmd)
+
+        clean_cmd = ['docker', 'container', 'rm', '-v', DOCKER_REGISTRY_NAME]
+        subprocess.check_call(clean_cmd)
+
+    def test_full_cache(self):
+        """
+        Test whether it's possible to restore cache entirely
+        :return:
+        """
+        dockerfile_content = """
+                FROM busybox
+                RUN touch ~/file1
+                RUN touch ~/file2
+                RUN touch ~/file3
+                RUN touch ~/file4
+                """
+        platform = 'test_full_cache'
+        docker_tag = build_util.get_docker_tag(platform=platform, registry=DOCKER_REGISTRY_PATH)
+        dockerfile_path = os.path.join(DOCKERFILE_DIR, 'Dockerfile.build.' + platform)
+        try:
+            with open(dockerfile_path, 'w') as dockerfile_handle:
+                dockerfile_handle.write(dockerfile_content)
+
+            # Warm up
+            docker_cache.delete_local_docker_cache(docker_tag=docker_tag)
+
+            def warm_up_lambda_func():
+                build_util.build_docker(docker_binary='docker', platform=platform, registry=DOCKER_REGISTRY_PATH)
+            _assert_docker_build(lambda_func=warm_up_lambda_func, expected_cache_hit_count=0,
+                                 expected_cache_miss_count=4)
+
+            # Assert local cache is properly primed
+            def primed_cache_lambda_func():
+                build_util.build_docker(docker_binary='docker', platform=platform, registry=DOCKER_REGISTRY_PATH)
+            _assert_docker_build(lambda_func=primed_cache_lambda_func, expected_cache_hit_count=4,
+                                 expected_cache_miss_count=0)
+
+            # Upload and clean local cache
+            docker_cache.build_save_containers(platforms=[platform], registry=DOCKER_REGISTRY_PATH, load_cache=False)
+            docker_cache.delete_local_docker_cache(docker_tag=docker_tag)
+
+            # Build with clean local cache and cache loading enabled
+            def clean_cache_lambda_func():
+                docker_cache.build_save_containers(
+                    platforms=[platform], registry=DOCKER_REGISTRY_PATH, load_cache=True)
+            _assert_docker_build(lambda_func=clean_cache_lambda_func, expected_cache_hit_count=4,
+                                 expected_cache_miss_count=0)
+        finally:
+            # Delete dockerfile
+            os.remove(dockerfile_path)
+            docker_cache.delete_local_docker_cache(docker_tag=docker_tag)
+
+
+
+    def test_partial_cache(self):
+        """
+        Test whether it's possible to restore cache and then pit it up partially by using a Dockerfile which shares
+        some parts
+        :return:
+        """
+        # These two dockerfiles diverge at the fourth RUN statement. Their common parts (1-3) should be re-used
+        dockerfile_content_1 = """
+                FROM busybox
+                RUN touch ~/file1
+                RUN touch ~/file2
+                RUN touch ~/file3
+                RUN touch ~/file4
+                """
+        dockerfile_content_2 = """
+                FROM busybox
+                RUN touch ~/file1
+                RUN touch ~/file2
+                RUN touch ~/file3
+                RUN touch ~/file5
+                RUN touch ~/file4
+                RUN touch ~/file6
+                """
+        platform = 'test_partial_cache'
+        docker_tag = build_util.get_docker_tag(platform=platform, registry=DOCKER_REGISTRY_PATH)
+        dockerfile_path = os.path.join(DOCKERFILE_DIR, 'Dockerfile.build.' + platform)
+        try:
+            # Write initial Dockerfile
+            with open(dockerfile_path, 'w') as dockerfile_handle:
+                dockerfile_handle.write(dockerfile_content_1)
+
+            # Warm up
+            docker_cache.delete_local_docker_cache(docker_tag=docker_tag)
+
+            def warm_up_lambda_func():
+                build_util.build_docker(docker_binary='docker', platform=platform, registry=DOCKER_REGISTRY_PATH)
+            _assert_docker_build(lambda_func=warm_up_lambda_func, expected_cache_hit_count=0,
+                                 expected_cache_miss_count=4)
+
+            # Assert local cache is properly primed
+            def primed_cache_lambda_func():
+                build_util.build_docker(docker_binary='docker', platform=platform, registry=DOCKER_REGISTRY_PATH)
+            _assert_docker_build(lambda_func=primed_cache_lambda_func, expected_cache_hit_count=4,
+                                 expected_cache_miss_count=0)
+
+            # Upload and clean local cache
+            docker_cache.build_save_containers(platforms=[platform], registry=DOCKER_REGISTRY_PATH, load_cache=False)
+            docker_cache.delete_local_docker_cache(docker_tag=docker_tag)
+
+            # Replace Dockerfile with the second one, resulting in a partial cache hit
+            with open(dockerfile_path, 'w') as dockerfile_handle:
+                dockerfile_handle.write(dockerfile_content_2)
+
+            # Test if partial cache is properly hit. It will attempt to load the cache from the first Dockerfile,
+            # resulting in a partial hit
+            def partial_cache_lambda_func():
+                docker_cache.build_save_containers(
+                    platforms=[platform], registry=DOCKER_REGISTRY_PATH, load_cache=True)
+            _assert_docker_build(lambda_func=partial_cache_lambda_func, expected_cache_hit_count=3,
+                                 expected_cache_miss_count=3)
+
+        finally:
+            # Delete dockerfile
+            os.remove(dockerfile_path)
+            docker_cache.delete_local_docker_cache(docker_tag=docker_tag)
+
+
+def _assert_docker_build(lambda_func, expected_cache_hit_count: int, expected_cache_miss_count: int):
+    with RedirectSubprocessOutput() as redirected_output:
+        lambda_func()
+        output = redirected_output.get_output()
+        assert output.count('Running in') == expected_cache_miss_count, \
+            'Expected {} "Running in", got {}. Log:{}'.\
+                format(expected_cache_miss_count, output.count('Running in'), output)
+        assert output.count('Using cache') == expected_cache_hit_count, \
+            'Expected {} "Using cache", got {}. Log:{}'.\
+                format(expected_cache_hit_count, output.count('Using cache'), output)
+
+
+if __name__ == '__main__':
+    import nose2
+    nose2.main()

-- 
To stop receiving notification emails like this one, please contact
marcoabreu@apache.org.