You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/05/17 15:03:42 UTC

[airflow] branch master updated: Replace DockerOperator's 'volumes' arg for 'mounts' (#15843)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 12995cf  Replace DockerOperator's 'volumes' arg for 'mounts' (#15843)
12995cf is described below

commit 12995cfb9a90d1f93511a4a4ab692323e62cc318
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Mon May 17 23:03:18 2021 +0800

    Replace DockerOperator's 'volumes' arg for 'mounts' (#15843)
    
    * Replace DockerOperator's 'volumes' arg for 'mounts'
    
    All existing 'volumes' usages are also migrated to use 'mounts' instead.
    
    This also includes a fix to DockerSwarmOperator's inability to mount
    things into the container; the new argument is also passed to Swam, so
    users can mount by setting 'mounts' on DockerSwarmOprerator as well.
---
 airflow/providers/docker/CHANGELOG.rst                   | 16 ++++++++++++++++
 .../docker/example_dags/example_docker_copy_data.py      | 10 ++++++----
 airflow/providers/docker/operators/docker.py             | 16 ++++++++--------
 airflow/providers/docker/operators/docker_swarm.py       |  1 +
 airflow/providers/docker/provider.yaml                   |  1 +
 docs/conf.py                                             |  1 +
 docs/exts/docs_build/third_party_inventories.py          |  1 +
 tests/providers/docker/operators/test_docker.py          | 10 +++++++---
 tests/providers/docker/operators/test_docker_swarm.py    |  3 +++
 9 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/docker/CHANGELOG.rst b/airflow/providers/docker/CHANGELOG.rst
index 14fb77e..8e315e4 100644
--- a/airflow/providers/docker/CHANGELOG.rst
+++ b/airflow/providers/docker/CHANGELOG.rst
@@ -19,6 +19,22 @@
 Changelog
 ---------
 
+2.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+Change in ``DockerOperator`` and ``DockerSwarmOperator``
+````````````````````````````````````````````````````````
+
+The ``volumes`` parameter in
+:class:`~airflow.providers.docker.operators.docker.DockerOperator` and
+:class:`~airflow.providers.docker.operators.docker_swarm.DockerSwarmOperator`
+was replaced by the ``mounts`` parameter, which uses the newer
+`mount syntax <https://docs.docker.com/storage/>`__ instead of ``--bind``.
+
+
 1.2.0
 .....
 
diff --git a/airflow/providers/docker/example_dags/example_docker_copy_data.py b/airflow/providers/docker/example_dags/example_docker_copy_data.py
index a96ed92..7adb5ca 100644
--- a/airflow/providers/docker/example_dags/example_docker_copy_data.py
+++ b/airflow/providers/docker/example_dags/example_docker_copy_data.py
@@ -27,6 +27,8 @@ TODO: Review the workflow, change it accordingly to
 
 from datetime import timedelta
 
+from docker.types import Mount
+
 from airflow import DAG
 from airflow.operators.bash import BashOperator
 from airflow.operators.python import ShortCircuitOperator
@@ -80,9 +82,9 @@ t_move = DockerOperator(
     docker_url="tcp://localhost:2375",  # replace it with swarm/docker endpoint
     image="centos:latest",
     network_mode="bridge",
-    volumes=[
-        "/your/host/input_dir/path:/your/input_dir/path",
-        "/your/host/output_dir/path:/your/output_dir/path",
+    mounts=[
+        Mount(source="/your/host/input_dir/path", target="/your/input_dir/path", type="bind"),
+        Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind"),
     ],
     command=[
         "/bin/bash",
@@ -105,7 +107,7 @@ t_print = DockerOperator(
     api_version="1.19",
     docker_url="tcp://localhost:2375",
     image="centos:latest",
-    volumes=["/your/host/output_dir/path:/your/output_dir/path"],
+    mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")],
     command=print_templated_cmd,
     task_id="print",
     dag=dag,
diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py
index 31093cd..29a6f79 100644
--- a/airflow/providers/docker/operators/docker.py
+++ b/airflow/providers/docker/operators/docker.py
@@ -21,6 +21,7 @@ from tempfile import TemporaryDirectory
 from typing import Dict, Iterable, List, Optional, Union
 
 from docker import APIClient, tls
+from docker.types import Mount
 
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
@@ -95,9 +96,9 @@ class DockerOperator(BaseOperator):
     :type tmp_dir: str
     :param user: Default user inside the docker container.
     :type user: int or str
-    :param volumes: List of volumes to mount into the container, e.g.
-        ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
-    :type volumes: list
+    :param mounts: List of volumes to mount into the container. Each item should
+        be a :py:class:`docker.types.Mount` instance.
+    :type mounts: list[docker.types.Mount]
     :param entrypoint: Overwrite the default ENTRYPOINT of the image
     :type entrypoint: str or list
     :param working_dir: Working directory to
@@ -157,7 +158,7 @@ class DockerOperator(BaseOperator):
         tls_ssl_version: Optional[str] = None,
         tmp_dir: str = '/tmp/airflow',
         user: Optional[Union[str, int]] = None,
-        volumes: Optional[List[str]] = None,
+        mounts: Optional[List[Mount]] = None,
         entrypoint: Optional[Union[str, List[str]]] = None,
         working_dir: Optional[str] = None,
         xcom_all: bool = False,
@@ -196,7 +197,7 @@ class DockerOperator(BaseOperator):
         self.tls_ssl_version = tls_ssl_version
         self.tmp_dir = tmp_dir
         self.user = user
-        self.volumes = volumes or []
+        self.mounts = mounts or []
         self.entrypoint = entrypoint
         self.working_dir = working_dir
         self.xcom_all = xcom_all
@@ -230,17 +231,16 @@ class DockerOperator(BaseOperator):
         self.log.info('Starting docker container from image %s', self.image)
 
         with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir:
-            self.volumes.append(f'{host_tmp_dir}:{self.tmp_dir}')
-
             if not self.cli:
                 raise Exception("The 'cli' should be initialized before!")
+            tmp_mount = Mount(self.tmp_dir, host_tmp_dir, "bind")
             self.container = self.cli.create_container(
                 command=self.format_command(self.command),
                 name=self.container_name,
                 environment={**self.environment, **self._private_environment},
                 host_config=self.cli.create_host_config(
                     auto_remove=False,
-                    binds=self.volumes,
+                    mounts=self.mounts + [tmp_mount],
                     network_mode=self.network_mode,
                     shm_size=self.shm_size,
                     dns=self.dns,
diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py
index ca315ad..00a1589 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -117,6 +117,7 @@ class DockerSwarmOperator(DockerOperator):
                 container_spec=types.ContainerSpec(
                     image=self.image,
                     command=self.format_command(self.command),
+                    mounts=self.mounts,
                     env=self.environment,
                     user=self.user,
                     tty=self.tty,
diff --git a/airflow/providers/docker/provider.yaml b/airflow/providers/docker/provider.yaml
index f686529..184c625 100644
--- a/airflow/providers/docker/provider.yaml
+++ b/airflow/providers/docker/provider.yaml
@@ -22,6 +22,7 @@ description: |
     `Docker <https://docs.docker.com/install/>`__
 
 versions:
+  - 2.0.0
   - 1.2.0
   - 1.1.0
   - 1.0.2
diff --git a/docs/conf.py b/docs/conf.py
index 33376eb..39426d6 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -521,6 +521,7 @@ intersphinx_mapping = {
     for pkg_name in [
         'boto3',
         'celery',
+        'docker',
         'hdfs',
         'jinja2',
         'mongodb',
diff --git a/docs/exts/docs_build/third_party_inventories.py b/docs/exts/docs_build/third_party_inventories.py
index de8a930..6b03e67 100644
--- a/docs/exts/docs_build/third_party_inventories.py
+++ b/docs/exts/docs_build/third_party_inventories.py
@@ -18,6 +18,7 @@
 THIRD_PARTY_INDEXES = {
     'boto3': 'https://boto3.amazonaws.com/v1/documentation/api/latest',
     'celery': 'https://docs.celeryproject.org/en/stable',
+    'docker': 'https://docker-py.readthedocs.io/en/stable',
     'hdfs': 'https://hdfscli.readthedocs.io/en/latest',
     'jinja2': 'https://jinja.palletsprojects.com/en/2.11.x',
     'mongodb': 'https://pymongo.readthedocs.io/en/3.11.3',
diff --git a/tests/providers/docker/operators/test_docker.py b/tests/providers/docker/operators/test_docker.py
index 4c9936c..a2442b4 100644
--- a/tests/providers/docker/operators/test_docker.py
+++ b/tests/providers/docker/operators/test_docker.py
@@ -25,6 +25,7 @@ from airflow.exceptions import AirflowException
 
 try:
     from docker import APIClient
+    from docker.types import Mount
 
     from airflow.providers.docker.hooks.docker import DockerHook
     from airflow.providers.docker.operators.docker import DockerOperator
@@ -66,7 +67,7 @@ class TestDockerOperator(unittest.TestCase):
             network_mode='bridge',
             owner='unittest',
             task_id='unittest',
-            volumes=['/host/path:/container/path'],
+            mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
             entrypoint='["sh", "-c"]',
             working_dir='/container/path',
             shm_size=1000,
@@ -92,7 +93,10 @@ class TestDockerOperator(unittest.TestCase):
             tty=True,
         )
         self.client_mock.create_host_config.assert_called_once_with(
-            binds=['/host/path:/container/path', '/mkdtemp:/tmp/airflow'],
+            mounts=[
+                Mount(source='/host/path', target='/container/path', type='bind'),
+                Mount(source='/mkdtemp', target='/tmp/airflow', type='bind'),
+            ],
             network_mode='bridge',
             shm_size=1000,
             cpu_shares=1024,
@@ -238,7 +242,7 @@ class TestDockerOperator(unittest.TestCase):
             'network_mode': 'bridge',
             'owner': 'unittest',
             'task_id': 'unittest',
-            'volumes': ['/host/path:/container/path'],
+            'mounts': [Mount(source='/host/path', target='/container/path', type='bind')],
             'working_dir': '/container/path',
             'shm_size': 1000,
             'host_tmp_dir': '/host/airflow',
diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py
index bb7bdf3..c41e1e2 100644
--- a/tests/providers/docker/operators/test_docker_swarm.py
+++ b/tests/providers/docker/operators/test_docker_swarm.py
@@ -22,6 +22,7 @@ from unittest import mock
 import pytest
 import requests
 from docker import APIClient
+from docker.types import Mount
 from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
@@ -65,6 +66,7 @@ class TestDockerSwarmOperator(unittest.TestCase):
             mem_limit='128m',
             user='unittest',
             task_id='unittest',
+            mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
             auto_remove=True,
             tty=True,
         )
@@ -77,6 +79,7 @@ class TestDockerSwarmOperator(unittest.TestCase):
             image='ubuntu:latest',
             command='env',
             user='unittest',
+            mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
             tty=True,
             env={'UNIT': 'TEST', 'AIRFLOW_TMP_DIR': '/tmp/airflow'},
         )