You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2022/03/11 07:25:59 UTC

[airflow] branch main updated: additional information in the ECSOperator around support of launch_type=EXTERNAL (#22093)

This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e63f6e3  additional information in the ECSOperator around support of launch_type=EXTERNAL (#22093)
e63f6e3 is described below

commit e63f6e36d14a8cd2462e80f26fb4809ab8698380
Author: 094459 <ri...@gmail.com>
AuthorDate: Fri Mar 11 07:25:11 2022 +0000

    additional information in the ECSOperator around support of launch_type=EXTERNAL (#22093)
    
    * additional information in the ECSOperator around support of launch_type=EXTERNAL
---
 .../{example_ecs_fargate.py => example_ecs_ec2.py} | 23 ++++--
 .../amazon/aws/example_dags/example_ecs_fargate.py | 14 +++-
 airflow/providers/amazon/aws/operators/ecs.py      |  2 +-
 .../operators/ecs.rst                              | 86 ++++++++++++++++++++--
 tests/providers/amazon/aws/operators/test_ecs.py   |  7 ++
 5 files changed, 116 insertions(+), 16 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py b/airflow/providers/amazon/aws/example_dags/example_ecs_ec2.py
similarity index 73%
copy from airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py
copy to airflow/providers/amazon/aws/example_dags/example_ecs_ec2.py
index 207ef99..5e0c386 100644
--- a/airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py
+++ b/airflow/providers/amazon/aws/example_dags/example_ecs_ec2.py
@@ -21,7 +21,7 @@ from airflow import DAG
 from airflow.providers.amazon.aws.operators.ecs import EcsOperator
 
 with DAG(
-    dag_id='example_ecs_fargate',
+    dag_id='example_ecs_ec2',
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     tags=['example'],
@@ -33,20 +33,27 @@ with DAG(
         task_id="hello_world",
         cluster=os.environ.get("CLUSTER_NAME", "existing_cluster_name"),
         task_definition=os.environ.get("TASK_DEFINITION", "existing_task_definition_name"),
-        launch_type="FARGATE",
+        launch_type="EXTERNAL|EC2",
+        aws_conn_id="aws_ecs",
         overrides={
             "containerOverrides": [
                 {
-                    "name": "existing_container_name",
+                    "name": "hello-world-container",
                     "command": ["echo", "hello", "world"],
                 },
             ],
         },
-        network_configuration={
-            "awsvpcConfiguration": {
-                "subnets": [os.environ.get("SUBNET_ID", "subnet-123456ab")],
-                "assignPublicIp": "ENABLED",
-            },
+        tags={
+            "Customer": "X",
+            "Project": "Y",
+            "Application": "Z",
+            "Version": "0.0.1",
+            "Environment": "Development",
         },
+        #    [START howto_awslogs_ecs]
+        awslogs_group="/ecs/hello-world",
+        awslogs_region="aws-region",
+        awslogs_stream_prefix="ecs/hello-world-container"
+        #   [END howto_awslogs_ecs]
     )
     # [END howto_operator_ecs]
diff --git a/airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py b/airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py
index 207ef99..1e48367 100644
--- a/airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py
+++ b/airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py
@@ -34,19 +34,29 @@ with DAG(
         cluster=os.environ.get("CLUSTER_NAME", "existing_cluster_name"),
         task_definition=os.environ.get("TASK_DEFINITION", "existing_task_definition_name"),
         launch_type="FARGATE",
+        aws_conn_id="aws_ecs",
         overrides={
             "containerOverrides": [
                 {
-                    "name": "existing_container_name",
+                    "name": "hello-world-container",
                     "command": ["echo", "hello", "world"],
                 },
             ],
         },
         network_configuration={
             "awsvpcConfiguration": {
+                "securityGroups": [os.environ.get("SECURITY_GROUP_ID", "sg-123abc")],
                 "subnets": [os.environ.get("SUBNET_ID", "subnet-123456ab")],
-                "assignPublicIp": "ENABLED",
             },
         },
+        tags={
+            "Customer": "X",
+            "Project": "Y",
+            "Application": "Z",
+            "Version": "0.0.1",
+            "Environment": "Development",
+        },
+        awslogs_group="/ecs/hello-world",
+        awslogs_stream_prefix="prefix_b/hello-world-container",
     )
     # [END howto_operator_ecs]
diff --git a/airflow/providers/amazon/aws/operators/ecs.py b/airflow/providers/amazon/aws/operators/ecs.py
index ca52e0a..b6bae92 100644
--- a/airflow/providers/amazon/aws/operators/ecs.py
+++ b/airflow/providers/amazon/aws/operators/ecs.py
@@ -179,7 +179,7 @@ class EcsOperator(BaseOperator):
         (http://boto3.readthedocs.io/en/latest/guide/configuration.html).
     :param region_name: region name to use in AWS Hook.
         Override the region_name in connection (if provided)
-    :param launch_type: the launch type on which to run your task ('EC2' or 'FARGATE')
+    :param launch_type: the launch type on which to run your task ('EC2', 'EXTERNAL', or 'FARGATE')
     :param capacity_provider_strategy: the capacity provider strategy to use for the task.
         When capacity_provider_strategy is specified, the launch_type parameter is omitted.
         If no capacity_provider_strategy or launch_type is specified,
diff --git a/docs/apache-airflow-providers-amazon/operators/ecs.rst b/docs/apache-airflow-providers-amazon/operators/ecs.rst
index d56907f..df991a4 100644
--- a/docs/apache-airflow-providers-amazon/operators/ecs.rst
+++ b/docs/apache-airflow-providers-amazon/operators/ecs.rst
@@ -28,24 +28,100 @@ Airflow provides operators to run Task Definitions on an ECS cluster.
 Prerequisite Tasks
 ^^^^^^^^^^^^^^^^^^
 
+.. include:: _partials/prerequisite_tasks.rst
+
+* You will need to have created your ECS Cluster, and have created a Task Definition before you can use this Operator. The Task Definition contains details of the containerized application you want to run.
+
 .. include::/operators/_partials/prerequisite_tasks.rst
 
 .. _howto/operator:EcsOperator:
 
-Run a Task
-^^^^^^^^^^
+Overview
+^^^^^^^^
 
-To run a task defined in an Amazon ECS cluster you can use
+To run a Task Definition defined in an Amazon ECS cluster you can use
 :class:`~airflow.providers.amazon.aws.operators.ecs.EcsOperator`.
 
-Before using EcsOperator *cluster*, *task definition*, and *container* need to be created.
+This Operator support running your containers in ECS Clusters that are either Serverless (FARGATE), via EC2, or via external resources (EXTERNAL). The parameters you need to configure for this Operator will depend upon which ``launch_type`` you want to use.
+
+Launch Types
+------------
+.. code-block::
+
+    launch_type="EC2|FARGATE|EXTERNAL"
+
+* If you are using AWS Fargate as your compute resource in your ECS Cluster, set the parameter ``launch_type`` to FARGATE. When using a launch type of FARGATE you will need to provide ``network_configuration`` parameters.
+* If you are using EC2 as the compute resources in your ECS Cluster, set the parameter to EC2.
+* If you have integrated external resources in your ECS Cluster, for example using ECS Anywhere, and want to run your containers on those external resources, set the parameter to EXTERNAL.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs_ec2.py
+    :language: python
+    :start-after: [START howto_operator_ecs]
+    :end-before: [END howto_operator_ecs]
+
 
 .. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs_fargate.py
     :language: python
-    :dedent: 4
     :start-after: [START howto_operator_ecs]
     :end-before: [END howto_operator_ecs]
 
+
+CloudWatch Logging
+------------------
+
+To stream logs to AWS CloudWatch, you need to define these parameters. Using the example Operators above, we would add these additional parameters to enable logging to CloudWatch. You will need to ensure that you have the appropriate level of permissions (see next section)
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs_ec2.py
+    :language: python
+    :start-after: [START howto_awslogs_ecs]
+    :end-before: [END howto_awslogs_ecs]
+
+IAM Permissions
+---------------
+
+You will need to ensure you have the following IAM permissions to run Tasks via this Operator. In this example, the Operator will have permissions to run Tasks on an ECS Cluster called "cluster a" in a specific AWS region and account.
+
+.. code-block::
+
+        {
+            "Effect": "Allow",
+            "Action": [
+                "ecs:RunTask",
+                "ecs:DescribeTasks"
+            ],
+            "Resource": : [ "arn:aws:ecs:{aws region}:{aws account number}:cluster/{custer a}"
+        }
+
+If you use the "reattach=True" (the default is False), you will need to add further permissions. You will need to add the following additional Actions to the IAM policy.
+
+.. code-block::
+
+        "ecs:DescribeTaskDefinition",
+        "ecs:ListTasks"
+
+**CloudWatch Permissions**
+
+If you plan on streaming Apache Airflow logs into AWS CloudWatch, you will need to ensure that you have configured the appropriate permissions set.
+
+.. code-block::
+
+                iam.PolicyStatement(
+                    actions=[
+                        "logs:CreateLogStream",
+                        "logs:CreateLogGroup",
+                        "logs:PutLogEvents",
+                        "logs:GetLogEvents",
+                        "logs:GetLogRecord",
+                        "logs:GetLogGroupFields",
+                        "logs:GetQueryResults"
+                    ],
+                    effect=iam.Effect.ALLOW,
+                    resources=[
+                        "arn:aws:logs:{aws region}:{aws account number}:log-group:{aws-log-group-name}:log-stream:{aws-log-stream-name}/\*"
+                        ]
+                )
+
+
 More information
 ----------------
 
diff --git a/tests/providers/amazon/aws/operators/test_ecs.py b/tests/providers/amazon/aws/operators/test_ecs.py
index 3a2bcce..1dde03b 100644
--- a/tests/providers/amazon/aws/operators/test_ecs.py
+++ b/tests/providers/amazon/aws/operators/test_ecs.py
@@ -112,6 +112,13 @@ class TestEcsOperator(unittest.TestCase):
                 {'launchType': 'EC2'},
             ],
             [
+                'EXTERNAL',
+                None,
+                None,
+                None,
+                {'launchType': 'EXTERNAL'},
+            ],
+            [
                 'FARGATE',
                 None,
                 'LATEST',