You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/30 14:27:58 UTC
[airflow] 09/11: docker new system test (#23167)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1bf8938fdfcb061dd234d91b9747168f48baa670
Author: Bowrna <ma...@gmail.com>
AuthorDate: Mon Jun 6 20:50:35 2022 +0530
docker new system test (#23167)
(cherry picked from commit 06856337a51139d66b1a39544e276e477c6b5ea1)
---
airflow/providers/docker/example_dags/__init__.py | 17 ----
.../docker/example_dags/example_docker.py | 51 ----------
.../example_dags/example_docker_copy_data.py | 101 -------------------
docs/apache-airflow-providers-docker/index.rst | 14 ++-
docs/apache-airflow/tutorial_taskflow_api.rst | 4 +-
tests/system/providers/docker/example_docker.py | 63 ++++++++++++
.../providers/docker/example_docker_copy_data.py | 110 +++++++++++++++++++++
.../providers/docker}/example_docker_swarm.py | 30 ++++--
.../example_taskflow_api_etl_docker_virtualenv.py | 39 +++++---
9 files changed, 236 insertions(+), 193 deletions(-)
diff --git a/airflow/providers/docker/example_dags/__init__.py b/airflow/providers/docker/example_dags/__init__.py
deleted file mode 100644
index 217e5db960..0000000000
--- a/airflow/providers/docker/example_dags/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/airflow/providers/docker/example_dags/example_docker.py b/airflow/providers/docker/example_dags/example_docker.py
deleted file mode 100644
index 83f6744883..0000000000
--- a/airflow/providers/docker/example_dags/example_docker.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from datetime import datetime, timedelta
-
-from airflow import DAG
-from airflow.operators.bash import BashOperator
-from airflow.providers.docker.operators.docker import DockerOperator
-
-dag = DAG(
- 'docker_sample',
- default_args={'retries': 1},
- schedule_interval=timedelta(minutes=10),
- start_date=datetime(2021, 1, 1),
- catchup=False,
-)
-
-t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
-
-t2 = BashOperator(task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)
-
-t3 = DockerOperator(
- docker_url='tcp://localhost:2375', # Set your docker URL
- command='/bin/sleep 30',
- image='centos:latest',
- network_mode='bridge',
- task_id='docker_op_tester',
- dag=dag,
-)
-
-
-t4 = BashOperator(task_id='print_hello', bash_command='echo "hello world!!!"', dag=dag)
-
-
-t1 >> t2
-t1 >> t3
-t3 >> t4
diff --git a/airflow/providers/docker/example_dags/example_docker_copy_data.py b/airflow/providers/docker/example_dags/example_docker_copy_data.py
deleted file mode 100644
index 5ce78d02cd..0000000000
--- a/airflow/providers/docker/example_dags/example_docker_copy_data.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-This sample "listen to directory". move the new file and print it,
-using docker-containers.
-The following operators are being used: DockerOperator,
-BashOperator & ShortCircuitOperator.
-TODO: Review the workflow, change it accordingly to
- your environment & enable the code.
-"""
-
-from datetime import datetime, timedelta
-
-from docker.types import Mount
-
-from airflow import DAG
-from airflow.operators.bash import BashOperator
-from airflow.operators.python import ShortCircuitOperator
-from airflow.providers.docker.operators.docker import DockerOperator
-
-dag = DAG(
- "docker_sample_copy_data",
- default_args={"retries": 1},
- schedule_interval=timedelta(minutes=10),
- start_date=datetime(2021, 1, 1),
- catchup=False,
-)
-
-locate_file_cmd = """
- sleep 10
- find {{params.source_location}} -type f -printf "%f\n" | head -1
-"""
-
-t_view = BashOperator(
- task_id="view_file",
- bash_command=locate_file_cmd,
- do_xcom_push=True,
- params={"source_location": "/your/input_dir/path"},
- dag=dag,
-)
-
-t_is_data_available = ShortCircuitOperator(
- task_id="check_if_data_available",
- python_callable=lambda task_output: not task_output == "",
- op_kwargs=dict(task_output=t_view.output),
- dag=dag,
-)
-
-t_move = DockerOperator(
- api_version="1.19",
- docker_url="tcp://localhost:2375", # replace it with swarm/docker endpoint
- image="centos:latest",
- network_mode="bridge",
- mounts=[
- Mount(source="/your/host/input_dir/path", target="/your/input_dir/path", type="bind"),
- Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind"),
- ],
- command=[
- "/bin/bash",
- "-c",
- "/bin/sleep 30; "
- "/bin/mv {{ params.source_location }}/" + str(t_view.output) + " {{ params.target_location }};"
- "/bin/echo '{{ params.target_location }}/" + f"{t_view.output}';",
- ],
- task_id="move_data",
- do_xcom_push=True,
- params={"source_location": "/your/input_dir/path", "target_location": "/your/output_dir/path"},
- dag=dag,
-)
-
-t_print = DockerOperator(
- api_version="1.19",
- docker_url="tcp://localhost:2375",
- image="centos:latest",
- mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")],
- command=f"cat {t_move.output}",
- task_id="print",
- dag=dag,
-)
-
-t_is_data_available.set_downstream(t_move)
-t_move.set_downstream(t_print)
-
-# Task dependencies created via `XComArgs`:
-# t_view >> t_is_data_available
diff --git a/docs/apache-airflow-providers-docker/index.rst b/docs/apache-airflow-providers-docker/index.rst
index 3fa5c2b9c0..fc4ae5b6e6 100644
--- a/docs/apache-airflow-providers-docker/index.rst
+++ b/docs/apache-airflow-providers-docker/index.rst
@@ -39,7 +39,7 @@ Content
:maxdepth: 1
:caption: Resources
- Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/docker/example_dags>
+ Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/docker>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-docker/>
Installing from sources <installing-providers-from-sources>
@@ -84,3 +84,15 @@ PIP package Version required
================== ==================
.. include:: ../../airflow/providers/docker/CHANGELOG.rst
+
+DockerOperator
+--------------
+Use the
+:class:`~airflow.providers.docker.operators.docker.DockerOperator`
+to execute command in Docker container.
+
+.. exampleinclude:: /../../tests/system/providers/docker/example_docker.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_docker]
+ :end-before: [END howto_operator_docker]
diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst
index 0904465e98..00b3e43a2a 100644
--- a/docs/apache-airflow/tutorial_taskflow_api.rst
+++ b/docs/apache-airflow/tutorial_taskflow_api.rst
@@ -233,7 +233,7 @@ image must have a working Python installed and take in a bash command as the ``c
Below is an example of using the ``@task.docker`` decorator to run a Python task.
-.. exampleinclude:: /../../airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
:language: python
:dedent: 4
:start-after: [START transform_docker]
@@ -257,7 +257,7 @@ environment on the same machine, you can use the ``@task.virtualenv`` decorator
decorator will allow you to create a new virtualenv with custom libraries and even a different
Python version to run your function.
-.. exampleinclude:: /../../airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
:language: python
:dedent: 4
:start-after: [START extract_virtualenv]
diff --git a/tests/system/providers/docker/example_docker.py b/tests/system/providers/docker/example_docker.py
new file mode 100644
index 0000000000..8a00c408b7
--- /dev/null
+++ b/tests/system/providers/docker/example_docker.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.docker.operators.docker import DockerOperator
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = 'docker_test'
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "docker"],
+) as dag:
+ t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
+ t2 = BashOperator(task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)
+ # [START howto_operator_docker]
+ t3 = DockerOperator(
+ docker_url='unix://var/run/docker.sock', # Set your docker URL
+ command='/bin/sleep 30',
+ image='centos:latest',
+ network_mode='bridge',
+ task_id='docker_op_tester',
+ dag=dag,
+ )
+ # [END howto_operator_docker]
+
+ t4 = BashOperator(task_id='print_hello', bash_command='echo "hello world!!!"', dag=dag)
+ # t1 >> t2
+ # t1 >> t3
+ # t3 >> t4
+
+ (
+ # TEST BODY
+ t1
+ >> [t2, t3]
+ >> t4
+ )
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/docker/example_docker_copy_data.py b/tests/system/providers/docker/example_docker_copy_data.py
new file mode 100644
index 0000000000..d709bf14d5
--- /dev/null
+++ b/tests/system/providers/docker/example_docker_copy_data.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This sample "listen to directory". move the new file and print it,
+using docker-containers.
+The following operators are being used: DockerOperator,
+BashOperator & ShortCircuitOperator.
+TODO: Review the workflow, change it accordingly to
+ your environment & enable the code.
+"""
+import os
+from datetime import datetime
+
+from docker.types import Mount
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.operators.python import ShortCircuitOperator
+from airflow.providers.docker.operators.docker import DockerOperator
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = 'docker_sample_copy_data'
+
+with models.DAG(
+ DAG_ID,
+ schedule_interval="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "docker"],
+) as dag:
+
+ locate_file_cmd = """
+ sleep 10
+ find {{params.source_location}} -type f -printf "%f\n" | head -1
+ """
+
+ t_view = BashOperator(
+ task_id="view_file",
+ bash_command=locate_file_cmd,
+ do_xcom_push=True,
+ params={"source_location": "/your/input_dir/path"},
+ dag=dag,
+ )
+
+ t_is_data_available = ShortCircuitOperator(
+ task_id="check_if_data_available",
+ python_callable=lambda task_output: not task_output == "",
+ op_kwargs=dict(task_output=t_view.output),
+ dag=dag,
+ )
+
+ t_move = DockerOperator(
+ api_version="1.19",
+ docker_url="tcp://localhost:2375", # replace it with swarm/docker endpoint
+ image="centos:latest",
+ network_mode="bridge",
+ mounts=[
+ Mount(source="/your/host/input_dir/path", target="/your/input_dir/path", type="bind"),
+ Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind"),
+ ],
+ command=[
+ "/bin/bash",
+ "-c",
+ "/bin/sleep 30; "
+ "/bin/mv {{ params.source_location }}/" + str(t_view.output) + " {{ params.target_location }};"
+ "/bin/echo '{{ params.target_location }}/" + f"{t_view.output}';",
+ ],
+ task_id="move_data",
+ do_xcom_push=True,
+ params={"source_location": "/your/input_dir/path", "target_location": "/your/output_dir/path"},
+ dag=dag,
+ )
+
+ t_print = DockerOperator(
+ api_version="1.19",
+ docker_url="tcp://localhost:2375",
+ image="centos:latest",
+ mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")],
+ command=f"cat {t_move.output}",
+ task_id="print",
+ dag=dag,
+ )
+
+ (
+ # TEST BODY
+ t_is_data_available
+ >> t_move
+ >> t_print
+ )
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/docker/example_dags/example_docker_swarm.py b/tests/system/providers/docker/example_docker_swarm.py
similarity index 67%
rename from airflow/providers/docker/example_dags/example_docker_swarm.py
rename to tests/system/providers/docker/example_docker_swarm.py
index 365a4b44a9..d216f11b94 100644
--- a/airflow/providers/docker/example_dags/example_docker_swarm.py
+++ b/tests/system/providers/docker/example_docker_swarm.py
@@ -15,24 +15,38 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from datetime import datetime, timedelta
+import os
+from datetime import datetime
-from airflow import DAG
+from airflow import models
from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator
-dag = DAG(
- 'docker_swarm_sample',
- schedule_interval=timedelta(minutes=10),
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = 'docker_swarm_dag'
+
+with models.DAG(
+ dag_id=DAG_ID,
+ schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
-)
+ tags=['example', "docker"],
+) as dag:
-with dag as dag:
t1 = DockerSwarmOperator(
api_version='auto',
- docker_url='tcp://localhost:2375', # Set your docker URL
+ docker_url='unix://var/run/docker.sock', # Set your docker URL
command='/bin/sleep 10',
image='centos:latest',
auto_remove=True,
task_id='sleep_with_swarm',
)
+
+ (
+ # TEST BODY
+ t1
+ )
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py b/tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
similarity index 77%
rename from airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
rename to tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
index c16588c10c..dc0f56dbb6 100644
--- a/airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+++ b/tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
@@ -19,15 +19,17 @@
# [START tutorial]
# [START import_module]
+import os
from datetime import datetime
-from airflow.decorators import dag, task
+from airflow import models
+from airflow.decorators import task
# [END import_module]
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = 'docker_taskflow'
-# [START instantiate_dag]
-@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
def tutorial_taskflow_api_etl_docker_virtualenv():
"""
### TaskFlow API Tutorial Documentation
@@ -37,7 +39,6 @@ def tutorial_taskflow_api_etl_docker_virtualenv():
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
- # [END instantiate_dag]
# [START extract_virtualenv]
@task.virtualenv(
@@ -98,14 +99,26 @@ def tutorial_taskflow_api_etl_docker_virtualenv():
# [END main_flow]
-# The try/except here is because Airflow versions less than 2.2.0 doesn't support
-# @task.docker decorator and we use this dag in CI test. Thus, in order not to
-# break the CI test, we added this try/except here.
-try:
- # [START dag_invocation]
- tutorial_etl_dag = tutorial_taskflow_api_etl_docker_virtualenv()
- # [END dag_invocation]
-except AttributeError:
- pass
+with models.DAG(
+ DAG_ID,
+ schedule_interval="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "docker"],
+) as dag:
+ # The try/except here is because Airflow versions less than 2.2.0 doesn't support
+ # @task.docker decorator and we use this dag in CI test. Thus, in order not to
+ # break the CI test, we added this try/except here.
+ try:
+ # [START dag_invocation]
+ tutorial_etl_dag = tutorial_taskflow_api_etl_docker_virtualenv()
+ # [END dag_invocation]
+ except AttributeError:
+ pass
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
# [END tutorial]