You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/08/19 10:59:59 UTC

[airflow] branch main updated: Add support for configs, secrets, networks and replicas for DockerSwarmOperator (#17474)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4da4c18  Add support for configs, secrets, networks and replicas for DockerSwarmOperator (#17474)
4da4c18 is described below

commit 4da4c186ecdcdae308fe8b4a7994c21faf42bc96
Author: AmineB <am...@gmail.com>
AuthorDate: Thu Aug 19 12:59:40 2021 +0200

    Add support for configs, secrets, networks and replicas for DockerSwarmOperator (#17474)
---
 airflow/providers/docker/operators/docker_swarm.py | 35 ++++++++++++++++++++--
 .../docker/operators/test_docker_swarm.py          | 19 ++++++++----
 2 files changed, 47 insertions(+), 7 deletions(-)

diff --git a/airflow/providers/docker/operators/docker_swarm.py b/airflow/providers/docker/operators/docker_swarm.py
index 00a1589..2d5373c 100644
--- a/airflow/providers/docker/operators/docker_swarm.py
+++ b/airflow/providers/docker/operators/docker_swarm.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 """Run ephemeral Docker Swarm services"""
-from typing import Optional
+from typing import List, Optional, Union
 
 import requests
 from docker import types
@@ -93,13 +93,40 @@ class DockerSwarmOperator(DockerOperator):
         Supported only if the Docker engine is using json-file or journald logging drivers.
         The `tty` parameter should be set to use this with Python applications.
     :type enable_logging: bool
+    :param configs: List of docker configs to be exposed to the containers of the swarm service.
+        The configs are ConfigReference objects as per the docker api
+        [https://docker-py.readthedocs.io/en/stable/services.html#docker.models.services.ServiceCollection.create]_
+    :type configs: List[docker.types.ConfigReference]
+    :param secrets: List of docker secrets to be exposed to the containers of the swarm service.
+        The secrets are SecretReference objects as per the docker create_service api.
+        [https://docker-py.readthedocs.io/en/stable/services.html#docker.models.services.ServiceCollection.create]_
+    :type secrets: List[docker.types.SecretReference]
+    :param mode: Indicate whether a service should be deployed as a replicated or global service,
+        and associated parameters
+    :type mode: docker.types.ServiceMode
+    :param networks: List of network names or IDs or NetworkAttachmentConfig to attach the service to.
+    :type networks: List[Union[str, NetworkAttachmentConfig]]
     """
 
-    def __init__(self, *, image: str, enable_logging: bool = True, **kwargs) -> None:
+    def __init__(
+        self,
+        *,
+        image: str,
+        enable_logging: bool = True,
+        configs: Optional[List[types.ConfigReference]] = None,
+        secrets: Optional[List[types.SecretReference]] = None,
+        mode: Optional[types.ServiceMode] = None,
+        networks: Optional[List[Union[str, types.NetworkAttachmentConfig]]] = None,
+        **kwargs,
+    ) -> None:
         super().__init__(image=image, **kwargs)
 
         self.enable_logging = enable_logging
         self.service = None
+        self.configs = configs
+        self.secrets = secrets
+        self.mode = mode
+        self.networks = networks
 
     def execute(self, context) -> None:
         self.cli = self._get_cli()
@@ -121,12 +148,16 @@ class DockerSwarmOperator(DockerOperator):
                     env=self.environment,
                     user=self.user,
                     tty=self.tty,
+                    configs=self.configs,
+                    secrets=self.secrets,
                 ),
                 restart_policy=types.RestartPolicy(condition='none'),
                 resources=types.Resources(mem_limit=self.mem_limit),
+                networks=self.networks,
             ),
             name=f'airflow-{get_random_string()}',
             labels={'name': f'airflow__{self.dag_id}__{self.task_id}'},
+            mode=self.mode,
         )
 
         self.log.info('Service started: %s', str(self.service))
diff --git a/tests/providers/docker/operators/test_docker_swarm.py b/tests/providers/docker/operators/test_docker_swarm.py
index c41e1e2..8523644 100644
--- a/tests/providers/docker/operators/test_docker_swarm.py
+++ b/tests/providers/docker/operators/test_docker_swarm.py
@@ -21,8 +21,7 @@ from unittest import mock
 
 import pytest
 import requests
-from docker import APIClient
-from docker.types import Mount
+from docker import APIClient, types
 from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
@@ -66,22 +65,31 @@ class TestDockerSwarmOperator(unittest.TestCase):
             mem_limit='128m',
             user='unittest',
             task_id='unittest',
-            mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
+            mounts=[types.Mount(source='/host/path', target='/container/path', type='bind')],
             auto_remove=True,
             tty=True,
+            configs=[types.ConfigReference(config_id="dummy_cfg_id", config_name="dummy_cfg_name")],
+            secrets=[types.SecretReference(secret_id="dummy_secret_id", secret_name="dummy_secret_name")],
+            mode=types.ServiceMode(mode="replicated", replicas=3),
+            networks=["dummy_network"],
         )
         operator.execute(None)
 
         types_mock.TaskTemplate.assert_called_once_with(
-            container_spec=mock_obj, restart_policy=mock_obj, resources=mock_obj
+            container_spec=mock_obj,
+            restart_policy=mock_obj,
+            resources=mock_obj,
+            networks=["dummy_network"],
         )
         types_mock.ContainerSpec.assert_called_once_with(
             image='ubuntu:latest',
             command='env',
             user='unittest',
-            mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
+            mounts=[types.Mount(source='/host/path', target='/container/path', type='bind')],
             tty=True,
             env={'UNIT': 'TEST', 'AIRFLOW_TMP_DIR': '/tmp/airflow'},
+            configs=[types.ConfigReference(config_id="dummy_cfg_id", config_name="dummy_cfg_name")],
+            secrets=[types.SecretReference(secret_id="dummy_secret_id", secret_name="dummy_secret_name")],
         )
         types_mock.RestartPolicy.assert_called_once_with(condition='none')
         types_mock.Resources.assert_called_once_with(mem_limit='128m')
@@ -99,6 +107,7 @@ class TestDockerSwarmOperator(unittest.TestCase):
         assert csargs == (mock_obj,)
         assert cskwargs['labels'] == {'name': 'airflow__adhoc_airflow__unittest'}
         assert cskwargs['name'].startswith('airflow-')
+        assert cskwargs['mode'] == types.ServiceMode(mode="replicated", replicas=3)
         assert client_mock.tasks.call_count == 5
         client_mock.remove_service.assert_called_once_with('some_id')