You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/30 14:27:58 UTC

[airflow] 09/11: docker new system test (#23167)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1bf8938fdfcb061dd234d91b9747168f48baa670
Author: Bowrna <ma...@gmail.com>
AuthorDate: Mon Jun 6 20:50:35 2022 +0530

    docker new system test (#23167)
    
    (cherry picked from commit 06856337a51139d66b1a39544e276e477c6b5ea1)
---
 airflow/providers/docker/example_dags/__init__.py  |  17 ----
 .../docker/example_dags/example_docker.py          |  51 ----------
 .../example_dags/example_docker_copy_data.py       | 101 -------------------
 docs/apache-airflow-providers-docker/index.rst     |  14 ++-
 docs/apache-airflow/tutorial_taskflow_api.rst      |   4 +-
 tests/system/providers/docker/example_docker.py    |  63 ++++++++++++
 .../providers/docker/example_docker_copy_data.py   | 110 +++++++++++++++++++++
 .../providers/docker}/example_docker_swarm.py      |  30 ++++--
 .../example_taskflow_api_etl_docker_virtualenv.py  |  39 +++++---
 9 files changed, 236 insertions(+), 193 deletions(-)

diff --git a/airflow/providers/docker/example_dags/__init__.py b/airflow/providers/docker/example_dags/__init__.py
deleted file mode 100644
index 217e5db960..0000000000
--- a/airflow/providers/docker/example_dags/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/airflow/providers/docker/example_dags/example_docker.py b/airflow/providers/docker/example_dags/example_docker.py
deleted file mode 100644
index 83f6744883..0000000000
--- a/airflow/providers/docker/example_dags/example_docker.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from datetime import datetime, timedelta
-
-from airflow import DAG
-from airflow.operators.bash import BashOperator
-from airflow.providers.docker.operators.docker import DockerOperator
-
-dag = DAG(
-    'docker_sample',
-    default_args={'retries': 1},
-    schedule_interval=timedelta(minutes=10),
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-)
-
-t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
-
-t2 = BashOperator(task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)
-
-t3 = DockerOperator(
-    docker_url='tcp://localhost:2375',  # Set your docker URL
-    command='/bin/sleep 30',
-    image='centos:latest',
-    network_mode='bridge',
-    task_id='docker_op_tester',
-    dag=dag,
-)
-
-
-t4 = BashOperator(task_id='print_hello', bash_command='echo "hello world!!!"', dag=dag)
-
-
-t1 >> t2
-t1 >> t3
-t3 >> t4
diff --git a/airflow/providers/docker/example_dags/example_docker_copy_data.py b/airflow/providers/docker/example_dags/example_docker_copy_data.py
deleted file mode 100644
index 5ce78d02cd..0000000000
--- a/airflow/providers/docker/example_dags/example_docker_copy_data.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-This sample "listen to directory". move the new file and print it,
-using docker-containers.
-The following operators are being used: DockerOperator,
-BashOperator & ShortCircuitOperator.
-TODO: Review the workflow, change it accordingly to
-      your environment & enable the code.
-"""
-
-from datetime import datetime, timedelta
-
-from docker.types import Mount
-
-from airflow import DAG
-from airflow.operators.bash import BashOperator
-from airflow.operators.python import ShortCircuitOperator
-from airflow.providers.docker.operators.docker import DockerOperator
-
-dag = DAG(
-    "docker_sample_copy_data",
-    default_args={"retries": 1},
-    schedule_interval=timedelta(minutes=10),
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-)
-
-locate_file_cmd = """
-    sleep 10
-    find {{params.source_location}} -type f  -printf "%f\n" | head -1
-"""
-
-t_view = BashOperator(
-    task_id="view_file",
-    bash_command=locate_file_cmd,
-    do_xcom_push=True,
-    params={"source_location": "/your/input_dir/path"},
-    dag=dag,
-)
-
-t_is_data_available = ShortCircuitOperator(
-    task_id="check_if_data_available",
-    python_callable=lambda task_output: not task_output == "",
-    op_kwargs=dict(task_output=t_view.output),
-    dag=dag,
-)
-
-t_move = DockerOperator(
-    api_version="1.19",
-    docker_url="tcp://localhost:2375",  # replace it with swarm/docker endpoint
-    image="centos:latest",
-    network_mode="bridge",
-    mounts=[
-        Mount(source="/your/host/input_dir/path", target="/your/input_dir/path", type="bind"),
-        Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind"),
-    ],
-    command=[
-        "/bin/bash",
-        "-c",
-        "/bin/sleep 30; "
-        "/bin/mv {{ params.source_location }}/" + str(t_view.output) + " {{ params.target_location }};"
-        "/bin/echo '{{ params.target_location }}/" + f"{t_view.output}';",
-    ],
-    task_id="move_data",
-    do_xcom_push=True,
-    params={"source_location": "/your/input_dir/path", "target_location": "/your/output_dir/path"},
-    dag=dag,
-)
-
-t_print = DockerOperator(
-    api_version="1.19",
-    docker_url="tcp://localhost:2375",
-    image="centos:latest",
-    mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")],
-    command=f"cat {t_move.output}",
-    task_id="print",
-    dag=dag,
-)
-
-t_is_data_available.set_downstream(t_move)
-t_move.set_downstream(t_print)
-
-# Task dependencies created via `XComArgs`:
-#   t_view >> t_is_data_available
diff --git a/docs/apache-airflow-providers-docker/index.rst b/docs/apache-airflow-providers-docker/index.rst
index 3fa5c2b9c0..fc4ae5b6e6 100644
--- a/docs/apache-airflow-providers-docker/index.rst
+++ b/docs/apache-airflow-providers-docker/index.rst
@@ -39,7 +39,7 @@ Content
     :maxdepth: 1
     :caption: Resources
 
-    Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/docker/example_dags>
+    Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/docker>
     PyPI Repository <https://pypi.org/project/apache-airflow-providers-docker/>
     Installing from sources <installing-providers-from-sources>
 
@@ -84,3 +84,15 @@ PIP package         Version required
 ==================  ==================
 
 .. include:: ../../airflow/providers/docker/CHANGELOG.rst
+
+DockerOperator
+--------------
+Use the
+:class:`~airflow.providers.docker.operators.docker.DockerOperator`
+to execute command in Docker container.
+
+.. exampleinclude:: /../../tests/system/providers/docker/example_docker.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_docker]
+    :end-before: [END howto_operator_docker]
diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst
index 0904465e98..00b3e43a2a 100644
--- a/docs/apache-airflow/tutorial_taskflow_api.rst
+++ b/docs/apache-airflow/tutorial_taskflow_api.rst
@@ -233,7 +233,7 @@ image must have a working Python installed and take in a bash command as the ``c
 
 Below is an example of using the ``@task.docker`` decorator to run a Python task.
 
-.. exampleinclude:: /../../airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
     :language: python
     :dedent: 4
     :start-after: [START transform_docker]
@@ -257,7 +257,7 @@ environment on the same machine, you can use the ``@task.virtualenv`` decorator
 decorator will allow you to create a new virtualenv with custom libraries and even a different
 Python version to run your function.
 
-.. exampleinclude:: /../../airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
     :language: python
     :dedent: 4
     :start-after: [START extract_virtualenv]
diff --git a/tests/system/providers/docker/example_docker.py b/tests/system/providers/docker/example_docker.py
new file mode 100644
index 0000000000..8a00c408b7
--- /dev/null
+++ b/tests/system/providers/docker/example_docker.py
@@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.providers.docker.operators.docker import DockerOperator
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = 'docker_test'
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "docker"],
+) as dag:
+    t1 = BashOperator(task_id='print_date', bash_command='date', dag=dag)
+    t2 = BashOperator(task_id='sleep', bash_command='sleep 5', retries=3, dag=dag)
+    # [START howto_operator_docker]
+    t3 = DockerOperator(
+        docker_url='unix://var/run/docker.sock',  # Set your docker URL
+        command='/bin/sleep 30',
+        image='centos:latest',
+        network_mode='bridge',
+        task_id='docker_op_tester',
+        dag=dag,
+    )
+    # [END howto_operator_docker]
+
+    t4 = BashOperator(task_id='print_hello', bash_command='echo "hello world!!!"', dag=dag)
+    # t1 >> t2
+    # t1 >> t3
+    # t3 >> t4
+
+    (
+        # TEST BODY
+        t1
+        >> [t2, t3]
+        >> t4
+    )
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/tests/system/providers/docker/example_docker_copy_data.py b/tests/system/providers/docker/example_docker_copy_data.py
new file mode 100644
index 0000000000..d709bf14d5
--- /dev/null
+++ b/tests/system/providers/docker/example_docker_copy_data.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+This sample "listen to directory". move the new file and print it,
+using docker-containers.
+The following operators are being used: DockerOperator,
+BashOperator & ShortCircuitOperator.
+TODO: Review the workflow, change it accordingly to
+      your environment & enable the code.
+"""
+import os
+from datetime import datetime
+
+from docker.types import Mount
+
+from airflow import models
+from airflow.operators.bash import BashOperator
+from airflow.operators.python import ShortCircuitOperator
+from airflow.providers.docker.operators.docker import DockerOperator
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = 'docker_sample_copy_data'
+
+with models.DAG(
+    DAG_ID,
+    schedule_interval="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "docker"],
+) as dag:
+
+    locate_file_cmd = """
+        sleep 10
+        find {{params.source_location}} -type f  -printf "%f\n" | head -1
+    """
+
+    t_view = BashOperator(
+        task_id="view_file",
+        bash_command=locate_file_cmd,
+        do_xcom_push=True,
+        params={"source_location": "/your/input_dir/path"},
+        dag=dag,
+    )
+
+    t_is_data_available = ShortCircuitOperator(
+        task_id="check_if_data_available",
+        python_callable=lambda task_output: not task_output == "",
+        op_kwargs=dict(task_output=t_view.output),
+        dag=dag,
+    )
+
+    t_move = DockerOperator(
+        api_version="1.19",
+        docker_url="tcp://localhost:2375",  # replace it with swarm/docker endpoint
+        image="centos:latest",
+        network_mode="bridge",
+        mounts=[
+            Mount(source="/your/host/input_dir/path", target="/your/input_dir/path", type="bind"),
+            Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind"),
+        ],
+        command=[
+            "/bin/bash",
+            "-c",
+            "/bin/sleep 30; "
+            "/bin/mv {{ params.source_location }}/" + str(t_view.output) + " {{ params.target_location }};"
+            "/bin/echo '{{ params.target_location }}/" + f"{t_view.output}';",
+        ],
+        task_id="move_data",
+        do_xcom_push=True,
+        params={"source_location": "/your/input_dir/path", "target_location": "/your/output_dir/path"},
+        dag=dag,
+    )
+
+    t_print = DockerOperator(
+        api_version="1.19",
+        docker_url="tcp://localhost:2375",
+        image="centos:latest",
+        mounts=[Mount(source="/your/host/output_dir/path", target="/your/output_dir/path", type="bind")],
+        command=f"cat {t_move.output}",
+        task_id="print",
+        dag=dag,
+    )
+
+    (
+        # TEST BODY
+        t_is_data_available
+        >> t_move
+        >> t_print
+    )
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/docker/example_dags/example_docker_swarm.py b/tests/system/providers/docker/example_docker_swarm.py
similarity index 67%
rename from airflow/providers/docker/example_dags/example_docker_swarm.py
rename to tests/system/providers/docker/example_docker_swarm.py
index 365a4b44a9..d216f11b94 100644
--- a/airflow/providers/docker/example_dags/example_docker_swarm.py
+++ b/tests/system/providers/docker/example_docker_swarm.py
@@ -15,24 +15,38 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from datetime import datetime, timedelta
+import os
+from datetime import datetime
 
-from airflow import DAG
+from airflow import models
 from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator
 
-dag = DAG(
-    'docker_swarm_sample',
-    schedule_interval=timedelta(minutes=10),
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = 'docker_swarm_dag'
+
+with models.DAG(
+    dag_id=DAG_ID,
+    schedule_interval='@once',
     start_date=datetime(2021, 1, 1),
     catchup=False,
-)
+    tags=['example', "docker"],
+) as dag:
 
-with dag as dag:
     t1 = DockerSwarmOperator(
         api_version='auto',
-        docker_url='tcp://localhost:2375',  # Set your docker URL
+        docker_url='unix://var/run/docker.sock',  # Set your docker URL
         command='/bin/sleep 10',
         image='centos:latest',
         auto_remove=True,
         task_id='sleep_with_swarm',
     )
+
+    (
+        # TEST BODY
+        t1
+    )
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
diff --git a/airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py b/tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
similarity index 77%
rename from airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
rename to tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
index c16588c10c..dc0f56dbb6 100644
--- a/airflow/providers/docker/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py
+++ b/tests/system/providers/docker/example_taskflow_api_etl_docker_virtualenv.py
@@ -19,15 +19,17 @@
 
 # [START tutorial]
 # [START import_module]
+import os
 from datetime import datetime
 
-from airflow.decorators import dag, task
+from airflow import models
+from airflow.decorators import task
 
 # [END import_module]
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+DAG_ID = 'docker_taskflow'
 
 
-# [START instantiate_dag]
-@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
 def tutorial_taskflow_api_etl_docker_virtualenv():
     """
     ### TaskFlow API Tutorial Documentation
@@ -37,7 +39,6 @@ def tutorial_taskflow_api_etl_docker_virtualenv():
     located
     [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
     """
-    # [END instantiate_dag]
 
     # [START extract_virtualenv]
     @task.virtualenv(
@@ -98,14 +99,26 @@ def tutorial_taskflow_api_etl_docker_virtualenv():
     # [END main_flow]
 
 
-# The try/except here is because Airflow versions less than 2.2.0 doesn't support
-# @task.docker decorator and we use this dag in CI test. Thus, in order not to
-# break the CI test, we added this try/except here.
-try:
-    # [START dag_invocation]
-    tutorial_etl_dag = tutorial_taskflow_api_etl_docker_virtualenv()
-    # [END dag_invocation]
-except AttributeError:
-    pass
+with models.DAG(
+    DAG_ID,
+    schedule_interval="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "docker"],
+) as dag:
+    # The try/except here is because Airflow versions less than 2.2.0 doesn't support
+    # @task.docker decorator and we use this dag in CI test. Thus, in order not to
+    # break the CI test, we added this try/except here.
+    try:
+        # [START dag_invocation]
+        tutorial_etl_dag = tutorial_taskflow_api_etl_docker_virtualenv()
+        # [END dag_invocation]
+    except AttributeError:
+        pass
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)
 
 # [END tutorial]