You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/09 23:04:26 UTC

[airflow] branch main updated: Refactor monolithic ECS Operator into Operators, Sensors, and a Hook (#25413)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a1b7d43e0 Refactor monolithic ECS Operator into Operators, Sensors, and a Hook (#25413)
8a1b7d43e0 is described below

commit 8a1b7d43e05e38576a728f2c49e75a63093f9103
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Tue Aug 9 16:04:19 2022 -0700

    Refactor monolithic ECS Operator into Operators, Sensors, and a Hook (#25413)
---
 airflow/contrib/operators/ecs_operator.py          |  24 +-
 .../amazon/aws/example_dags/example_ecs.py         | 164 ++++++-
 airflow/providers/amazon/aws/hooks/ecs.py          | 223 +++++++++
 airflow/providers/amazon/aws/operators/ecs.py      | 421 ++++++++++------
 airflow/providers/amazon/aws/sensors/ecs.py        | 187 ++++++++
 airflow/providers/amazon/provider.yaml             |   6 +
 .../operators/ecs.rst                              | 136 +++++-
 docs/spelling_wordlist.txt                         |   1 +
 tests/always/test_project_structure.py             |   2 +
 tests/providers/amazon/aws/hooks/test_ecs.py       | 210 ++++++++
 tests/providers/amazon/aws/operators/test_ecs.py   | 533 +++++++++++----------
 .../amazon/aws/operators/test_ecs_system.py        |  99 ----
 12 files changed, 1479 insertions(+), 527 deletions(-)

diff --git a/airflow/contrib/operators/ecs_operator.py b/airflow/contrib/operators/ecs_operator.py
index 68dbe0a433..aee3c19bcd 100644
--- a/airflow/contrib/operators/ecs_operator.py
+++ b/airflow/contrib/operators/ecs_operator.py
@@ -19,13 +19,29 @@
 
 import warnings
 
-from airflow.providers.amazon.aws.operators.ecs import (  # noqa
-    EcsOperator as ECSOperator,
-    EcsProtocol as ECSProtocol,
-)
+from airflow.providers.amazon.aws.hooks.ecs import EcsProtocol
+from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator
+
+__all__ = ["EcsRunTaskOperator", "EcsProtocol"]
 
 warnings.warn(
     "This module is deprecated. Please use `airflow.providers.amazon.aws.operators.ecs`.",
     DeprecationWarning,
     stacklevel=2,
 )
+
+
+class EcsOperator(EcsRunTaskOperator):
+    """
+    This operator is deprecated.
+    Please use :class:`airflow.providers.amazon.aws.operators.ecs.EcsRunTaskOperator`.
+    """
+
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            "This class is deprecated. "
+            "Please use `airflow.providers.amazon.aws.operators.ecs.EcsRunTaskOperator`.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        super().__init__(*args, **kwargs)
diff --git a/airflow/providers/amazon/aws/example_dags/example_ecs.py b/airflow/providers/amazon/aws/example_dags/example_ecs.py
index b8f1f67b8e..45154a617d 100644
--- a/airflow/providers/amazon/aws/example_dags/example_ecs.py
+++ b/airflow/providers/amazon/aws/example_dags/example_ecs.py
@@ -14,35 +14,114 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-import os
+
 from datetime import datetime
 
 from airflow import DAG
-from airflow.providers.amazon.aws.operators.ecs import EcsOperator
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.hooks.ecs import EcsClusterStates, EcsTaskStates
+from airflow.providers.amazon.aws.operators.ecs import (
+    EcsCreateClusterOperator,
+    EcsDeleteClusterOperator,
+    EcsDeregisterTaskDefinitionOperator,
+    EcsRegisterTaskDefinitionOperator,
+    EcsRunTaskOperator,
+)
+from airflow.providers.amazon.aws.sensors.ecs import (
+    EcsClusterStateSensor,
+    EcsTaskDefinitionStateSensor,
+    EcsTaskStateSensor,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+DAG_ID = 'new_ecs_refactor'
+ENV_ID = 'env1234why-2'
+
+# NOTE:  Creating a functional ECS Cluster which uses EC2 requires manually creating
+# and configuring a number of resources such as autoscaling groups, networking
+# etc which is out of scope for this demo and time-consuming for a system test
+# To simplify this demo and make it run in a reasonable length of time as a
+# system test, follow the steps below to create a new cluster on the AWS Console
+# which handles all asset creation and configuration using default values:
+# 1. https://us-east-1.console.aws.amazon.com/ecs/home?region=us-east-1#/clusters
+# 2. Select "EC2 Linux + Networking" and hit "Next"
+# 3. Name your cluster in the first field and click Create
+# 4. Enter the name you provided and the subnets that were generated below:
+EXISTING_CLUSTER_NAME = 'using-defaults'
+SUBNETS = ['subnet-08c6deb88019ef902']
+
 
 with DAG(
-    dag_id='example_ecs',
+    dag_id=DAG_ID,
     schedule_interval=None,
     start_date=datetime(2021, 1, 1),
     tags=['example'],
     catchup=False,
 ) as dag:
+    env_id = ENV_ID
+    cluster_name = f'{env_id}-cluster'
+    container_name = f'{env_id}-container'
+    family_name = f'{env_id}-task-definition'
+    asg_name = f'{env_id}-asg'
+
+    # [START howto_operator_ecs_create_cluster]
+    create_cluster = EcsCreateClusterOperator(
+        task_id='create_cluster',
+        cluster_name=cluster_name,
+        wait_for_completion=False,
+    )
+    # [END howto_operator_ecs_create_cluster]
 
-    # [START howto_operator_ecs]
-    hello_world = EcsOperator(
-        task_id="hello_world",
-        cluster=os.environ.get("CLUSTER_NAME", "existing_cluster_name"),
-        task_definition=os.environ.get("TASK_DEFINITION", "existing_task_definition_name"),
-        launch_type="EXTERNAL|EC2",
-        aws_conn_id="aws_ecs",
+    # [START howto_sensor_ecs_cluster_state]
+    await_cluster = EcsClusterStateSensor(
+        task_id='await_cluster',
+        cluster_name=cluster_name,
+    )
+    # [END howto_sensor_ecs_cluster_state]
+
+    # [START howto_operator_ecs_register_task_definition]
+    register_task = EcsRegisterTaskDefinitionOperator(
+        task_id='register_task',
+        family=family_name,
+        container_definitions=[
+            {
+                'name': container_name,
+                'image': 'ubuntu',
+                'workingDirectory': '/usr/bin',
+                'entryPoint': ['sh', '-c'],
+                'command': ['ls'],
+            }
+        ],
+        register_task_kwargs={
+            'cpu': '256',
+            'memory': '512',
+            'networkMode': 'awsvpc',
+        },
+    )
+    # [END howto_operator_ecs_register_task_definition]
+
+    # [START howto_sensor_ecs_task_definition_state]
+    await_task_definition = EcsTaskDefinitionStateSensor(
+        task_id='await_task_definition',
+        task_definition=register_task.output,
+    )
+    # [END howto_sensor_ecs_task_definition_state]
+
+    # [START howto_operator_ecs_run_task]
+    run_task = EcsRunTaskOperator(
+        task_id="run_task",
+        cluster=EXISTING_CLUSTER_NAME,
+        task_definition=register_task.output,
+        launch_type="EC2",
         overrides={
             "containerOverrides": [
                 {
-                    "name": "hello-world-container",
+                    "name": container_name,
                     "command": ["echo", "hello", "world"],
                 },
             ],
         },
+        network_configuration={'awsvpcConfiguration': {'subnets': SUBNETS}},
         tags={
             "Customer": "X",
             "Project": "Y",
@@ -50,10 +129,63 @@ with DAG(
             "Version": "0.0.1",
             "Environment": "Development",
         },
-        #    [START howto_awslogs_ecs]
+        # [START howto_awslogs_ecs]
         awslogs_group="/ecs/hello-world",
-        awslogs_region="aws-region",
-        awslogs_stream_prefix="ecs/hello-world-container"
-        #   [END howto_awslogs_ecs]
+        awslogs_region='us-east-1',
+        awslogs_stream_prefix="ecs/hello-world-container",
+        # [END howto_awslogs_ecs]
+        # NOTE: You must set `reattach=True` in order to get ecs_task_arn if you plan to use a Sensor.
+        reattach=True,
+    )
+    # [END howto_operator_ecs_run_task]
+
+    # [START howto_sensor_ecs_task_state]
+    # By default, EcsTaskStateSensor waits until the task has started, but the
+    # demo task runs so fast that the sensor misses it.  This sensor instead
+    # demonstrates how to wait until the ECS Task has completed by providing
+    # the target_state and failure_states parameters.
+    await_task_finish = EcsTaskStateSensor(
+        task_id='await_task_finish',
+        cluster=EXISTING_CLUSTER_NAME,
+        task='{{ ti.xcom_pull(key="ecs_task_arn") }}',
+        target_state=EcsTaskStates.STOPPED,
+        failure_states={EcsTaskStates.NONE},
+    )
+    # [END howto_sensor_ecs_task_state]
+
+    # [START howto_operator_ecs_deregister_task_definition]
+    deregister_task = EcsDeregisterTaskDefinitionOperator(
+        task_id='deregister_task',
+        trigger_rule=TriggerRule.ALL_DONE,
+        task_definition=register_task.output,
+    )
+    # [END howto_operator_ecs_deregister_task_definition]
+
+    # [START howto_operator_ecs_delete_cluster]
+    delete_cluster = EcsDeleteClusterOperator(
+        task_id='delete_cluster',
+        trigger_rule=TriggerRule.ALL_DONE,
+        cluster_name=cluster_name,
+        wait_for_completion=False,
+    )
+    # [END howto_operator_ecs_delete_cluster]
+
+    # [START howto_operator_ecs_delete_cluster]
+    await_delete_cluster = EcsClusterStateSensor(
+        task_id='await_delete_cluster',
+        cluster_name=cluster_name,
+        target_state=EcsClusterStates.INACTIVE,
+    )
+    # [END howto_operator_ecs_delete_cluster]
+
+    chain(
+        create_cluster,
+        await_cluster,
+        register_task,
+        await_task_definition,
+        run_task,
+        await_task_finish,
+        deregister_task,
+        delete_cluster,
+        await_delete_cluster,
     )
-    # [END howto_operator_ecs]
diff --git a/airflow/providers/amazon/aws/hooks/ecs.py b/airflow/providers/amazon/aws/hooks/ecs.py
new file mode 100644
index 0000000000..9c94bdb8d4
--- /dev/null
+++ b/airflow/providers/amazon/aws/hooks/ecs.py
@@ -0,0 +1,223 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from collections import deque
+from datetime import datetime, timedelta
+from enum import Enum
+from logging import Logger
+from threading import Event, Thread
+from typing import Dict, Generator, Optional
+
+from botocore.exceptions import ClientError, ConnectionClosedError
+from botocore.waiter import Waiter
+
+from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart
+from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
+from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
+from airflow.typing_compat import Protocol, runtime_checkable
+
+
+def should_retry(exception: Exception):
+    """Check if exception is related to ECS resource quota (CPU, MEM)."""
+    if isinstance(exception, EcsOperatorError):
+        return any(
+            quota_reason in failure['reason']
+            for quota_reason in ['RESOURCE:MEMORY', 'RESOURCE:CPU']
+            for failure in exception.failures
+        )
+    return False
+
+
+def should_retry_eni(exception: Exception):
+    """Check if exception is related to ENI (Elastic Network Interfaces)."""
+    if isinstance(exception, EcsTaskFailToStart):
+        return any(
+            eni_reason in exception.message
+            for eni_reason in ['network interface provisioning', 'ResourceInitializationError']
+        )
+    return False
+
+
+class EcsClusterStates(Enum):
+    """Contains the possible State values of an ECS Cluster."""
+
+    ACTIVE = 'ACTIVE'
+    PROVISIONING = 'PROVISIONING'
+    DEPROVISIONING = 'DEPROVISIONING'
+    FAILED = 'FAILED'
+    INACTIVE = 'INACTIVE'
+
+
+class EcsTaskDefinitionStates(Enum):
+    """Contains the possible State values of an ECS Task Definition."""
+
+    ACTIVE = 'ACTIVE'
+    INACTIVE = 'INACTIVE'
+
+
+class EcsTaskStates(Enum):
+    """Contains the possible State values of an ECS Task."""
+
+    PROVISIONING = 'PROVISIONING'
+    PENDING = 'PENDING'
+    ACTIVATING = 'ACTIVATING'
+    RUNNING = 'RUNNING'
+    DEACTIVATING = 'DEACTIVATING'
+    STOPPING = 'STOPPING'
+    DEPROVISIONING = 'DEPROVISIONING'
+    STOPPED = 'STOPPED'
+    NONE = 'NONE'
+
+
+class EcsHook(AwsGenericHook):
+    """
+    Interact with AWS Elastic Container Service, using the boto3 library
+    Hook attribute `conn` has all methods that listed in documentation
+
+    .. seealso::
+        - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html
+        - https://docs.aws.amazon.com/AmazonECS/latest/APIReference/Welcome.html
+
+    Additional arguments (such as ``aws_conn_id`` or ``region_name``) may be specified and
+        are passed down to the underlying AwsBaseHook.
+
+    .. seealso::
+        :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`
+
+    :param aws_conn_id: The Airflow connection used for AWS credentials.
+    """
+
+    def __init__(self, *args, **kwargs) -> None:
+        kwargs['client_type'] = 'ecs'
+        super().__init__(*args, **kwargs)
+
+    def get_cluster_state(self, cluster_name: str) -> str:
+        return self.conn.describe_clusters(clusters=[cluster_name])['clusters'][0]['status']
+
+    def get_task_definition_state(self, task_definition: str) -> str:
+        return self.conn.describe_task_definition(taskDefinition=task_definition)['taskDefinition']['status']
+
+    def get_task_state(self, cluster, task) -> str:
+        return self.conn.describe_tasks(cluster=cluster, tasks=[task])['tasks'][0]['lastStatus']
+
+
+class EcsTaskLogFetcher(Thread):
+    """
+    Fetches Cloudwatch log events with specific interval as a thread
+    and sends the log events to the info channel of the provided logger.
+    """
+
+    def __init__(
+        self,
+        *,
+        log_group: str,
+        log_stream_name: str,
+        fetch_interval: timedelta,
+        logger: Logger,
+        aws_conn_id: Optional[str] = 'aws_default',
+        region_name: Optional[str] = None,
+    ):
+        super().__init__()
+        self._event = Event()
+
+        self.fetch_interval = fetch_interval
+
+        self.logger = logger
+        self.log_group = log_group
+        self.log_stream_name = log_stream_name
+
+        self.hook = AwsLogsHook(aws_conn_id=aws_conn_id, region_name=region_name)
+
+    def run(self) -> None:
+        logs_to_skip = 0
+        while not self.is_stopped():
+            time.sleep(self.fetch_interval.total_seconds())
+            log_events = self._get_log_events(logs_to_skip)
+            for log_event in log_events:
+                self.logger.info(self._event_to_str(log_event))
+                logs_to_skip += 1
+
+    def _get_log_events(self, skip: int = 0) -> Generator:
+        try:
+            yield from self.hook.get_log_events(self.log_group, self.log_stream_name, skip=skip)
+        except ClientError as error:
+            if error.response['Error']['Code'] != 'ResourceNotFoundException':
+                self.logger.warning('Error on retrieving Cloudwatch log events', error)
+
+            yield from ()
+        except ConnectionClosedError as error:
+            self.logger.warning('ConnectionClosedError on retrieving Cloudwatch log events', error)
+            yield from ()
+
+    def _event_to_str(self, event: dict) -> str:
+        event_dt = datetime.utcfromtimestamp(event['timestamp'] / 1000.0)
+        formatted_event_dt = event_dt.strftime('%Y-%m-%d %H:%M:%S,%f')[:-3]
+        message = event['message']
+        return f'[{formatted_event_dt}] {message}'
+
+    def get_last_log_messages(self, number_messages) -> list:
+        return [log['message'] for log in deque(self._get_log_events(), maxlen=number_messages)]
+
+    def get_last_log_message(self) -> Optional[str]:
+        try:
+            return self.get_last_log_messages(1)[0]
+        except IndexError:
+            return None
+
+    def is_stopped(self) -> bool:
+        return self._event.is_set()
+
+    def stop(self):
+        self._event.set()
+
+
+@runtime_checkable
+class EcsProtocol(Protocol):
+    """
+    A structured Protocol for ``boto3.client('ecs')``. This is used for type hints on
+    :py:meth:`.EcsOperator.client`.
+
+    .. seealso::
+
+        - https://mypy.readthedocs.io/en/latest/protocols.html
+        - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html
+    """
+
+    def run_task(self, **kwargs) -> Dict:
+        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task"""  # noqa: E501
+        ...
+
+    def get_waiter(self, x: str) -> Waiter:
+        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.get_waiter"""  # noqa: E501
+        ...
+
+    def describe_tasks(self, cluster: str, tasks) -> Dict:
+        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_tasks"""  # noqa: E501
+        ...
+
+    def stop_task(self, cluster, task, reason: str) -> Dict:
+        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.stop_task"""  # noqa: E501
+        ...
+
+    def describe_task_definition(self, taskDefinition: str) -> Dict:
+        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_task_definition"""  # noqa: E501
+        ...
+
+    def list_tasks(self, cluster: str, launchType: str, desiredStatus: str, family: str) -> Dict:
+        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.list_tasks"""  # noqa: E501
+        ...
diff --git a/airflow/providers/amazon/aws/operators/ecs.py b/airflow/providers/amazon/aws/operators/ecs.py
index e04fbc3040..2b5ee08d97 100644
--- a/airflow/providers/amazon/aws/operators/ecs.py
+++ b/airflow/providers/amazon/aws/operators/ecs.py
@@ -17,160 +17,257 @@
 # under the License.
 import re
 import sys
-import time
-from collections import deque
-from datetime import datetime, timedelta
-from logging import Logger
-from threading import Event, Thread
-from typing import Dict, Generator, Optional, Sequence
+import warnings
+from datetime import timedelta
+from typing import TYPE_CHECKING, Dict, List, Optional, Sequence
 
-from botocore.exceptions import ClientError, ConnectionClosedError
-from botocore.waiter import Waiter
+import boto3
 
+from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator, XCom
 from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart
+
+# TODO: Remove the following import when EcsProtocol and EcsTaskLogFetcher deprecations are removed.
+from airflow.providers.amazon.aws.hooks import ecs
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
-from airflow.typing_compat import Protocol, runtime_checkable
+from airflow.providers.amazon.aws.hooks.ecs import (
+    EcsClusterStates,
+    EcsHook,
+    EcsTaskDefinitionStates,
+    should_retry_eni,
+)
+from airflow.providers.amazon.aws.sensors.ecs import EcsClusterStateSensor, EcsTaskDefinitionStateSensor
 from airflow.utils.session import provide_session
 
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
 
-def should_retry(exception: Exception):
-    """Check if exception is related to ECS resource quota (CPU, MEM)."""
-    if isinstance(exception, EcsOperatorError):
-        return any(
-            quota_reason in failure['reason']
-            for quota_reason in ['RESOURCE:MEMORY', 'RESOURCE:CPU']
-            for failure in exception.failures
-        )
-    return False
+DEFAULT_CONN_ID = 'aws_default'
 
 
-def should_retry_eni(exception: Exception):
-    """Check if exception is related to ENI (Elastic Network Interfaces)."""
-    if isinstance(exception, EcsTaskFailToStart):
-        return any(
-            eni_reason in exception.message
-            for eni_reason in ['network interface provisioning', 'ResourceInitializationError']
-        )
-    return False
+class EcsBaseOperator(BaseOperator):
+    """This is the base operator for all Elastic Container Service operators."""
+
+    def __init__(self, **kwargs):
+        self.aws_conn_id = kwargs.get('aws_conn_id', DEFAULT_CONN_ID)
+        self.region = kwargs.get('region')
+        super().__init__(**kwargs)
+
+    @cached_property
+    def hook(self) -> EcsHook:
+        """Create and return an EcsHook."""
+        return EcsHook(aws_conn_id=self.aws_conn_id, region_name=self.region)
 
+    @cached_property
+    def client(self) -> boto3.client:
+        """Create and return the EcsHook's client."""
+        return self.hook.conn
 
-@runtime_checkable
-class EcsProtocol(Protocol):
+    def execute(self, context: 'Context'):
+        """Must overwrite in child classes."""
+        raise NotImplementedError('Please implement execute() in subclass')
+
+
+class EcsCreateClusterOperator(EcsBaseOperator):
     """
-    A structured Protocol for ``boto3.client('ecs')``. This is used for type hints on
-    :py:meth:`.EcsOperator.client`.
+    Creates an AWS ECS cluster.
 
     .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EcsCreateClusterOperator`
 
-        - https://mypy.readthedocs.io/en/latest/protocols.html
-        - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html
+    :param cluster_name: The name of your cluster. If you don't specify a name for your
+        cluster, you create a cluster that's named default.
+    :param create_cluster_kwargs: Extra arguments for Cluster Creation.
+    :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
     """
 
-    def run_task(self, **kwargs) -> Dict:
-        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task"""  # noqa: E501
-        ...
+    template_fields: Sequence[str] = ('cluster_name', 'create_cluster_kwargs', 'wait_for_completion')
 
-    def get_waiter(self, x: str) -> Waiter:
-        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.get_waiter"""  # noqa: E501
-        ...
-
-    def describe_tasks(self, cluster: str, tasks) -> Dict:
-        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_tasks"""  # noqa: E501
-        ...
+    def __init__(
+        self,
+        *,
+        cluster_name: str,
+        create_cluster_kwargs: Optional[Dict] = None,
+        wait_for_completion: bool = True,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.cluster_name = cluster_name
+        self.create_cluster_kwargs = create_cluster_kwargs or {}
+        self.wait_for_completion = wait_for_completion
 
-    def stop_task(self, cluster, task, reason: str) -> Dict:
-        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.stop_task"""  # noqa: E501
-        ...
+    def execute(self, context: 'Context'):
+        self.log.info(
+            'Creating cluster %s using the following values: %s',
+            self.cluster_name,
+            self.create_cluster_kwargs,
+        )
+        result = self.client.create_cluster(clusterName=self.cluster_name, **self.create_cluster_kwargs)
 
-    def describe_task_definition(self, taskDefinition: str) -> Dict:
-        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.describe_task_definition"""  # noqa: E501
-        ...
+        if self.wait_for_completion:
+            while not EcsClusterStateSensor(
+                task_id='await_cluster',
+                cluster_name=self.cluster_name,
+            ).poke(context):
+                # The sensor has a built-in delay and will try again until
+                # the cluster is ready or has reached a failed state.
+                pass
 
-    def list_tasks(self, cluster: str, launchType: str, desiredStatus: str, family: str) -> Dict:
-        """https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.list_tasks"""  # noqa: E501
-        ...
+        return result['cluster']
 
 
-class EcsTaskLogFetcher(Thread):
+class EcsDeleteClusterOperator(EcsBaseOperator):
     """
-    Fetches Cloudwatch log events with specific interval as a thread
-    and sends the log events to the info channel of the provided logger.
+    Deletes an AWS ECS cluster.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EcsDeleteClusterOperator`
+
+    :param cluster_name: The short name or full Amazon Resource Name (ARN) of the cluster to delete.
+    :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
     """
 
+    template_fields: Sequence[str] = ('cluster_name', 'wait_for_completion')
+
     def __init__(
         self,
         *,
-        aws_conn_id: Optional[str] = 'aws_default',
-        region_name: Optional[str] = None,
-        log_group: str,
-        log_stream_name: str,
-        fetch_interval: timedelta,
-        logger: Logger,
-    ):
-        super().__init__()
-        self._event = Event()
+        cluster_name: str,
+        wait_for_completion: bool = True,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.cluster_name = cluster_name
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: 'Context'):
+        self.log.info('Deleting cluster %s.', self.cluster_name)
+        result = self.client.delete_cluster(cluster=self.cluster_name)
+
+        if self.wait_for_completion:
+            while not EcsClusterStateSensor(
+                task_id='await_cluster_delete',
+                cluster_name=self.cluster_name,
+                target_state=EcsClusterStates.INACTIVE,
+                failure_states={EcsClusterStates.FAILED},
+            ).poke(context):
+                # The sensor has a built-in delay and will try again until
+                # the cluster is deleted or reaches a failed state.
+                pass
+
+        return result['cluster']
+
+
+class EcsDeregisterTaskDefinitionOperator(EcsBaseOperator):
+    """
+    Deregister a task definition on AWS ECS.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EcsDeregisterTaskDefinitionOperator`
+
+    :param task_definition: The family and revision (family:revision) or full Amazon Resource Name (ARN)
+        of the task definition to deregister. If you use a family name, you must specify a revision.
+    :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    """
+
+    template_fields: Sequence[str] = ('task_definition', 'wait_for_completion')
+
+    def __init__(self, *, task_definition: str, wait_for_completion: bool = True, **kwargs):
+        super().__init__(**kwargs)
+        self.task_definition = task_definition
+        self.wait_for_completion = wait_for_completion
+
+    def execute(self, context: 'Context'):
+        self.log.info('Deregistering task definition %s.', self.task_definition)
+        result = self.client.deregister_task_definition(taskDefinition=self.task_definition)
 
-        self.fetch_interval = fetch_interval
+        if self.wait_for_completion:
+            while not EcsTaskDefinitionStateSensor(
+                task_id='await_deregister_task_definition',
+                task_definition=self.task_definition,
+                target_state=EcsTaskDefinitionStates.INACTIVE,
+            ).poke(context):
+                # The sensor has a built-in delay and will try again until the
+                # task definition is deregistered or reaches a failed state.
+                pass
 
-        self.logger = logger
-        self.log_group = log_group
-        self.log_stream_name = log_stream_name
+        return result['taskDefinition']['taskDefinitionArn']
 
-        self.hook = AwsLogsHook(aws_conn_id=aws_conn_id, region_name=region_name)
 
-    def run(self) -> None:
-        logs_to_skip = 0
-        while not self.is_stopped():
-            time.sleep(self.fetch_interval.total_seconds())
-            log_events = self._get_log_events(logs_to_skip)
-            for log_event in log_events:
-                self.logger.info(self._event_to_str(log_event))
-                logs_to_skip += 1
+class EcsRegisterTaskDefinitionOperator(EcsBaseOperator):
+    """
+    Register a task definition on AWS ECS.
 
-    def _get_log_events(self, skip: int = 0) -> Generator:
-        try:
-            yield from self.hook.get_log_events(self.log_group, self.log_stream_name, skip=skip)
-        except ClientError as error:
-            if error.response['Error']['Code'] != 'ResourceNotFoundException':
-                self.logger.warning('Error on retrieving Cloudwatch log events', error)
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:EcsRegisterTaskDefinitionOperator`
 
-            yield from ()
-        except ConnectionClosedError as error:
-            self.logger.warning('ConnectionClosedError on retrieving Cloudwatch log events', error)
-            yield from ()
+    :param family: The family name of a task definition to create.
+    :param container_definitions: A list of container definitions in JSON format that describe
+        the different containers that make up your task.
+    :param register_task_kwargs: Extra arguments for Register Task Definition.
+    :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
+    """
 
-    def _event_to_str(self, event: dict) -> str:
-        event_dt = datetime.utcfromtimestamp(event['timestamp'] / 1000.0)
-        formatted_event_dt = event_dt.strftime('%Y-%m-%d %H:%M:%S,%f')[:-3]
-        message = event['message']
-        return f'[{formatted_event_dt}] {message}'
+    template_fields: Sequence[str] = (
+        'family',
+        'container_definitions',
+        'register_task_kwargs',
+        'wait_for_completion',
+    )
 
-    def get_last_log_messages(self, number_messages) -> list:
-        return [log['message'] for log in deque(self._get_log_events(), maxlen=number_messages)]
+    def __init__(
+        self,
+        *,
+        family: str,
+        container_definitions: List[Dict],
+        register_task_kwargs: Optional[Dict] = None,
+        wait_for_completion: bool = True,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.family = family
+        self.container_definitions = container_definitions
+        self.register_task_kwargs = register_task_kwargs or {}
+        self.wait_for_completion = wait_for_completion
 
-    def get_last_log_message(self) -> Optional[str]:
-        try:
-            return self.get_last_log_messages(1)[0]
-        except IndexError:
-            return None
+    def execute(self, context: 'Context'):
+        self.log.info(
+            'Registering task definition %s using the following values: %s',
+            self.family,
+            self.register_task_kwargs,
+        )
+        self.log.info('Using container definition %s', self.container_definitions)
+        response = self.client.register_task_definition(
+            family=self.family,
+            containerDefinitions=self.container_definitions,
+            **self.register_task_kwargs,
+        )
+        task_arn = response['taskDefinition']['taskDefinitionArn']
 
-    def is_stopped(self) -> bool:
-        return self._event.is_set()
+        if self.wait_for_completion:
+            while not EcsTaskDefinitionStateSensor(
+                task_id='await_register_task_definition', task_definition=task_arn
+            ).poke(context):
+                # The sensor has a built-in delay and will try again until
+                # the task definition is registered or reaches a failed state.
+                pass
 
-    def stop(self):
-        self._event.set()
+        context['ti'].xcom_push(key='task_definition_arn', value=task_arn)
+        return task_arn
 
 
-class EcsOperator(BaseOperator):
+class EcsRunTaskOperator(EcsBaseOperator):
     """
     Execute a task on AWS ECS (Elastic Container Service)
 
     .. seealso::
         For more information on how to use this operator, take a look at the guide:
-        :ref:`howto/operator:EcsOperator`
+        :ref:`howto/operator:EcsRunTaskOperator`
 
     :param task_definition: the task definition name on Elastic Container Service
     :param cluster: the cluster name on Elastic Container Service
@@ -215,20 +312,35 @@ class EcsOperator(BaseOperator):
     :param number_logs_exception: Number of lines from the last Cloudwatch logs to return in the
         AirflowException if an ECS task is stopped (to receive Airflow alerts with the logs of what
         failed in the code running in ECS).
+    :param wait_for_completion: If True, waits for creation of the cluster to complete. (default: True)
     """
 
     ui_color = '#f0ede4'
     template_fields: Sequence[str] = (
-        'cluster',
         'task_definition',
+        'cluster',
         'overrides',
+        'launch_type',
+        'capacity_provider_strategy',
+        'group',
+        'placement_constraints',
+        'placement_strategy',
+        'platform_version',
         'network_configuration',
+        'tags',
+        'awslogs_group',
+        'awslogs_region',
+        'awslogs_stream_prefix',
+        'awslogs_fetch_interval',
+        'propagate_tags',
+        'reattach',
+        'number_logs_exception',
+        'wait_for_completion',
     )
     template_fields_renderers = {
         "overrides": "json",
         "network_configuration": "json",
         "tags": "json",
-        "quota_retry": "json",
     }
     REATTACH_XCOM_KEY = "ecs_task_arn"
     REATTACH_XCOM_TASK_ID_TEMPLATE = "{task_id}_task_arn"
@@ -239,8 +351,6 @@ class EcsOperator(BaseOperator):
         task_definition: str,
         cluster: str,
         overrides: dict,
-        aws_conn_id: Optional[str] = None,
-        region_name: Optional[str] = None,
         launch_type: str = 'EC2',
         capacity_provider_strategy: Optional[list] = None,
         group: Optional[str] = None,
@@ -257,12 +367,11 @@ class EcsOperator(BaseOperator):
         quota_retry: Optional[dict] = None,
         reattach: bool = False,
         number_logs_exception: int = 10,
+        wait_for_completion: bool = True,
         **kwargs,
     ):
         super().__init__(**kwargs)
 
-        self.aws_conn_id = aws_conn_id
-        self.region_name = region_name
         self.task_definition = task_definition
         self.cluster = cluster
         self.overrides = overrides
@@ -284,13 +393,12 @@ class EcsOperator(BaseOperator):
         self.number_logs_exception = number_logs_exception
 
         if self.awslogs_region is None:
-            self.awslogs_region = region_name
+            self.awslogs_region = self.region
 
-        self.hook: Optional[AwsBaseHook] = None
-        self.client: Optional[EcsProtocol] = None
         self.arn: Optional[str] = None
         self.retry_args = quota_retry
         self.task_log_fetcher: Optional[EcsTaskLogFetcher] = None
+        self.wait_for_completion = wait_for_completion
 
     @provide_session
     def execute(self, context, session=None):
@@ -299,8 +407,6 @@ class EcsOperator(BaseOperator):
         )
         self.log.info('EcsOperator overrides: %s', self.overrides)
 
-        self.client = self.get_hook().get_conn()
-
         if self.reattach:
             self._try_reattach_task(context)
 
@@ -330,13 +436,15 @@ class EcsOperator(BaseOperator):
             self.task_log_fetcher.start()
 
             try:
-                self._wait_for_task_ended()
+                if self.wait_for_completion:
+                    self._wait_for_task_ended()
             finally:
                 self.task_log_fetcher.stop()
 
             self.task_log_fetcher.join()
         else:
-            self._wait_for_task_ended()
+            if self.wait_for_completion:
+                self._wait_for_task_ended()
 
         self._check_success_task()
 
@@ -383,21 +491,7 @@ class EcsOperator(BaseOperator):
 
         if self.reattach:
             # Save the task ARN in XCom to be able to reattach it if needed
-            self._xcom_set(
-                context,
-                key=self.REATTACH_XCOM_KEY,
-                value=self.arn,
-                task_id=self.REATTACH_XCOM_TASK_ID_TEMPLATE.format(task_id=self.task_id),
-            )
-
-    def _xcom_set(self, context, key, value, task_id):
-        XCom.set(
-            key=key,
-            value=value,
-            task_id=task_id,
-            dag_id=self.dag_id,
-            run_id=context["run_id"],
-        )
+            self.xcom_push(context, key=self.REATTACH_XCOM_KEY, value=self.arn)
 
     def _try_reattach_task(self, context):
         task_def_resp = self.client.describe_task_definition(taskDefinition=self.task_definition)
@@ -434,7 +528,8 @@ class EcsOperator(BaseOperator):
     def _aws_logs_enabled(self):
         return self.awslogs_group and self.awslogs_stream_prefix
 
-    def _get_task_log_fetcher(self) -> EcsTaskLogFetcher:
+    # TODO: When the deprecation wrapper below is removed, please fix the following return type hint.
+    def _get_task_log_fetcher(self) -> ecs.EcsTaskLogFetcher:
         if not self.awslogs_group:
             raise ValueError("must specify awslogs_group to fetch task logs")
         log_stream_name = f"{self.awslogs_stream_prefix}/{self.ecs_task_id}"
@@ -498,14 +593,6 @@ class EcsOperator(BaseOperator):
                         f"{container.get('reason', '').lower()}"
                     )
 
-    def get_hook(self) -> AwsBaseHook:
-        """Create and return an AwsHook."""
-        if self.hook:
-            return self.hook
-
-        self.hook = AwsBaseHook(aws_conn_id=self.aws_conn_id, client_type='ecs', region_name=self.region_name)
-        return self.hook
-
     def on_kill(self) -> None:
         if not self.client or not self.arn:
             return
@@ -517,3 +604,53 @@ class EcsOperator(BaseOperator):
             cluster=self.cluster, task=self.arn, reason='Task killed by the user'
         )
         self.log.info(response)
+
+
+class EcsOperator(EcsRunTaskOperator):
+    """
+    This operator is deprecated.
+    Please use :class:`airflow.providers.amazon.aws.operators.ecs.EcsRunTaskOperator`.
+    """
+
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            "This operator is deprecated. "
+            "Please use `airflow.providers.amazon.aws.operators.ecs.EcsRunTaskOperator`.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        super().__init__(*args, **kwargs)
+
+
+class EcsTaskLogFetcher(ecs.EcsTaskLogFetcher):
+    """
+    This class is deprecated.
+    Please use :class:`airflow.providers.amazon.aws.hooks.ecs.EcsTaskLogFetcher`.
+    """
+
+    # TODO: Note to deprecator, Be sure to fix the use of `ecs.EcsTaskLogFetcher`
+    #       in the Operators above when you remove this wrapper class.
+    def __init__(self, *args, **kwargs):
+        warnings.warn(
+            "This class is deprecated. "
+            "Please use `airflow.providers.amazon.aws.hooks.ecs.EcsTaskLogFetcher`.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
+        super().__init__(*args, **kwargs)
+
+
+class EcsProtocol(ecs.EcsProtocol):
+    """
+    This class is deprecated.
+    Please use :class:`airflow.providers.amazon.aws.hooks.ecs.EcsProtocol`.
+    """
+
+    # TODO: Note to deprecator, Be sure to fix the use of `ecs.EcsProtocol`
+    #       in the Operators above when you remove this wrapper class.
+    def __init__(self):
+        warnings.warn(
+            "This class is deprecated.  Please use `airflow.providers.amazon.aws.hooks.ecs.EcsProtocol`.",
+            DeprecationWarning,
+            stacklevel=2,
+        )
diff --git a/airflow/providers/amazon/aws/sensors/ecs.py b/airflow/providers/amazon/aws/sensors/ecs.py
new file mode 100644
index 0000000000..171e2b14eb
--- /dev/null
+++ b/airflow/providers/amazon/aws/sensors/ecs.py
@@ -0,0 +1,187 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from typing import TYPE_CHECKING, Optional, Sequence, Set
+
+import boto3
+
+from airflow import AirflowException
+from airflow.compat.functools import cached_property
+from airflow.providers.amazon.aws.hooks.ecs import (
+    EcsClusterStates,
+    EcsHook,
+    EcsTaskDefinitionStates,
+    EcsTaskStates,
+)
+from airflow.sensors.base import BaseSensorOperator
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+DEFAULT_CONN_ID: str = 'aws_default'
+
+
+def _check_failed(current_state, target_state, failure_states):
+    if (current_state != target_state) and (current_state in failure_states):
+        raise AirflowException(
+            f'Terminal state reached. Current state: {current_state}, Expected state: {target_state}'
+        )
+
+
+class EcsBaseSensor(BaseSensorOperator):
+    """Contains general sensor behavior for Elastic Container Service."""
+
+    def __init__(self, **kwargs):
+        self.aws_conn_id = kwargs.get('aws_conn_id', DEFAULT_CONN_ID)
+        self.region = kwargs.get('region')
+        super().__init__(**kwargs)
+
+    @cached_property
+    def hook(self) -> EcsHook:
+        """Create and return an EcsHook."""
+        return EcsHook(aws_conn_id=self.aws_conn_id, region_name=self.region)
+
+    @cached_property
+    def client(self) -> boto3.client:
+        """Create and return an EcsHook client."""
+        return self.hook.conn
+
+
+class EcsClusterStateSensor(EcsBaseSensor):
+    """
+    Polls the cluster state until it reaches a terminal state.  Raises an
+    AirflowException with the failure reason if a failed state is reached.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/sensor:EcsClusterStateSensor`
+
+    :param cluster_name: The name of your cluster.
+    :param target_state: Success state to watch for. (Default: "ACTIVE")
+    :param failure_states: Fail if any of these states are reached before the
+         Success State. (Default: "FAILED" or "INACTIVE")
+    """
+
+    template_fields: Sequence[str] = ('cluster_name', 'target_state', 'failure_states')
+
+    def __init__(
+        self,
+        *,
+        cluster_name: str,
+        target_state: Optional[EcsClusterStates] = EcsClusterStates.ACTIVE,
+        failure_states: Optional[Set[EcsClusterStates]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster_name = cluster_name
+        self.target_state = target_state
+        self.failure_states = failure_states or {EcsClusterStates.FAILED, EcsClusterStates.INACTIVE}
+
+    def poke(self, context: 'Context'):
+        cluster_state = EcsClusterStates(self.hook.get_cluster_state(cluster_name=self.cluster_name))
+
+        self.log.info("Cluster state: %s, waiting for: %s", cluster_state, self.target_state)
+        _check_failed(cluster_state, self.target_state, self.failure_states)
+
+        return cluster_state == self.target_state
+
+
+class EcsTaskDefinitionStateSensor(EcsBaseSensor):
+    """
+    Polls the task definition state until it reaches a terminal state.  Raises an
+    AirflowException with the failure reason if a failed state is reached.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/sensor:EcsTaskDefinitionStateSensor`
+
+    :param task_definition: The family for the latest ACTIVE revision, family and
+         revision (family:revision ) for a specific revision in the family, or full
+         Amazon Resource Name (ARN) of the task definition.
+    :param target_state: Success state to watch for. (Default: "ACTIVE")
+    """
+
+    template_fields: Sequence[str] = ('task_definition', 'target_state', 'failure_states')
+
+    def __init__(
+        self,
+        *,
+        task_definition: str,
+        target_state: Optional[EcsTaskDefinitionStates] = EcsTaskDefinitionStates.ACTIVE,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.task_definition = task_definition
+        self.target_state = target_state
+        # There are only two possible states, so set failure_state to whatever is not the target_state
+        self.failure_states = {
+            (
+                EcsTaskDefinitionStates.INACTIVE
+                if target_state == EcsTaskDefinitionStates.ACTIVE
+                else EcsTaskDefinitionStates.ACTIVE
+            )
+        }
+
+    def poke(self, context: 'Context'):
+        task_definition_state = EcsTaskDefinitionStates(
+            self.hook.get_task_definition_state(task_definition=self.task_definition)
+        )
+
+        self.log.info("Task Definition state: %s, waiting for: %s", task_definition_state, self.target_state)
+        _check_failed(task_definition_state, self.target_state, [self.failure_states])
+        return task_definition_state == self.target_state
+
+
+class EcsTaskStateSensor(EcsBaseSensor):
+    """
+    Polls the task state until it reaches a terminal state.  Raises an
+    AirflowException with the failure reason if a failed state is reached.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/sensor:EcsTaskStateSensor`
+
+    :param cluster: The short name or full Amazon Resource Name (ARN) of the cluster that hosts the task.
+    :param task: The task ID or full ARN of the task to poll.
+    :param target_state: Success state to watch for. (Default: "ACTIVE")
+    :param failure_states: Fail if any of these states are reached before
+         the Success State. (Default: "STOPPED")
+    """
+
+    template_fields: Sequence[str] = ('cluster', 'task', 'target_state', 'failure_states')
+
+    def __init__(
+        self,
+        *,
+        cluster: str,
+        task: str,
+        target_state: Optional[EcsTaskStates] = EcsTaskStates.RUNNING,
+        failure_states: Optional[Set[EcsTaskStates]] = None,
+        **kwargs,
+    ):
+        super().__init__(**kwargs)
+        self.cluster = cluster
+        self.task = task
+        self.target_state = target_state
+        self.failure_states = failure_states or {EcsTaskStates.STOPPED}
+
+    def poke(self, context: 'Context'):
+        task_state = EcsTaskStates(self.hook.get_task_state(cluster=self.cluster, task=self.task))
+
+        self.log.info("Task state: %s, waiting for: %s", task_state, self.target_state)
+        _check_failed(task_state, self.target_state, self.failure_states)
+        return task_state == self.target_state
diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml
index af9a7f4694..d8d2db5daf 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -329,6 +329,9 @@ sensors:
   - integration-name: Amazon EC2
     python-modules:
       - airflow.providers.amazon.aws.sensors.ec2
+  - integration-name: Amazon ECS
+    python-modules:
+      - airflow.providers.amazon.aws.sensors.ecs
   - integration-name: Amazon Elastic Kubernetes Service (EKS)
     python-modules:
       - airflow.providers.amazon.aws.sensors.eks
@@ -394,6 +397,9 @@ hooks:
   - integration-name: Amazon EC2
     python-modules:
       - airflow.providers.amazon.aws.hooks.ec2
+  - integration-name: Amazon ECS
+    python-modules:
+      - airflow.providers.amazon.aws.hooks.ecs
   - integration-name: Amazon ElastiCache
     python-modules:
       - airflow.providers.amazon.aws.hooks.elasticache_replication_group
diff --git a/docs/apache-airflow-providers-amazon/operators/ecs.rst b/docs/apache-airflow-providers-amazon/operators/ecs.rst
index b4dae28001..8ea7bc6247 100644
--- a/docs/apache-airflow-providers-amazon/operators/ecs.rst
+++ b/docs/apache-airflow-providers-amazon/operators/ecs.rst
@@ -33,13 +33,78 @@ Prerequisite Tasks
 Operators
 ---------
 
-.. _howto/operator:EcsOperator:
+.. _howto/operator:EcsCreateClusterOperator:
 
-Run a task definition
+Create an AWS ECS Cluster
+=========================
+
+To create an Amazon ECS cluster you can use
+:class:`~airflow.providers.amazon.aws.operators.ecs.EcsCreateClusterOperator`.
+
+All optional parameters to be passed to the Create Cluster API should be
+passed in the 'create_cluster_kwargs' dict.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_ecs_create_cluster]
+    :end-before: [END howto_operator_ecs_create_cluster]
+
+.. _howto/operator:EcsDeleteClusterOperator:
+
+Delete an AWS ECS Cluster
+=========================
+
+To delete an Amazon ECS cluster you can use
+:class:`~airflow.providers.amazon.aws.operators.ecs.EcsDeleteClusterOperator`.
+
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_ecs_delete_cluster]
+    :end-before: [END howto_operator_ecs_delete_cluster]
+
+.. _howto/operator:EcsRegisterTaskDefinitionOperator:
+
+Register a Task Definition
+==========================
+
+To register a task definition you can use
+:class:`~airflow.providers.amazon.aws.operators.ecs.EcsRegisterTaskDefinitionOperator`.
+
+All optional parameters to be passed to the Register Task Definition API should be
+passed in the 'register_task_kwargs' dict.
+
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_ecs_register_task_definition]
+    :end-before: [END howto_operator_ecs_register_task_definition]
+
+.. _howto/operator:EcsDeregisterTaskDefinitionOperator:
+
+Deregister a Task Definition
+=============================
+
+To deregister a task definition you can use
+:class:`~airflow.providers.amazon.aws.operators.ecs.EcsDeregisterTaskDefinitionOperator`.
+
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_ecs_deregister_task_definition]
+    :end-before: [END howto_operator_ecs_deregister_task_definition]
+
+.. _howto/operator:EcsRunTaskOperator:
+
+Run a Task Definition
 =====================
 
 To run a Task Definition defined in an Amazon ECS cluster you can use
-:class:`~airflow.providers.amazon.aws.operators.ecs.EcsOperator`.
+:class:`~airflow.providers.amazon.aws.operators.ecs.EcsRunTaskOperator`.
 
 You need to have created your ECS Cluster, and have created a Task Definition before you can use this Operator.
 The Task Definition contains details of the containerized application you want to run.
@@ -59,8 +124,8 @@ The parameters you need to configure for this Operator will depend upon which ``
 .. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
     :language: python
     :dedent: 4
-    :start-after: [START howto_operator_ecs]
-    :end-before: [END howto_operator_ecs]
+    :start-after: [START howto_operator_ecs_run_task]
+    :end-before: [END howto_operator_ecs_run_task]
 
 
 .. exampleinclude:: /../../tests/system/providers/amazon/aws/example_ecs_fargate.py
@@ -129,6 +194,67 @@ If you plan on streaming Apache Airflow logs into AWS CloudWatch, you need to en
                         ]
                 )
 
+Sensors
+-------
+
+.. _howto/sensor:EcsClusterStateSensor:
+
+AWS ECS Cluster State Sensor
+============================
+
+To poll the cluster state until it reaches a terminal state you can use
+:class:`~airflow.providers.amazon.aws.operators.ecs.EcsClusterStateSensor`.
+
+Defaults to EcsClusterStates.ACTIVE as a success state and no failure state,
+both can be overridden with provided values.  Raises an AirflowException with
+the failure reason if a failed state is provided and that state is reached
+before the target state.
+
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_ecs_cluster_state]
+    :end-before: [END howto_sensor_ecs_cluster_state]
+
+.. _howto/sensor:EcsTaskDefinitionStateSensor:
+
+AWS ECS Task Definition State Sensor
+=====================================
+
+To poll the task definition state until it reaches a terminal state you can use
+:class:`~airflow.providers.amazon.aws.operators.ecs.EcsTaskDefinitionStateSensor`.
+
+Valid states are either EcsTaskDefinitionStates.ACTIVE or EcsTaskDefinitionStates.INACTIVE.
+Defaults to EcsTaskDefinitionStates.ACTIVE as the success state, but accepts a parameter
+to change that.  Raises an AirflowException with the failure reason if the failed state
+is reached before the target state.
+
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_ecs_task_definition_state]
+    :end-before: [END howto_sensor_ecs_task_definition_state]
+
+.. _howto/sensor:EcsTaskStateSensor:
+
+AWS ECS Task State Sensor
+=========================
+
+To poll the task state until it reaches a terminal state you can use
+:class:`~airflow.providers.amazon.aws.operators.ecs.EcsTaskStateSensor`.
+
+Defaults to EcsTaskStates.RUNNING as the success state and no failure state,
+both can be overridden with provided values.  Raises an AirflowException with
+the failure reason if a failed state is provided and that state is reached
+before the target state.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_ecs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_sensor_ecs_task_state]
+    :end-before: [END howto_sensor_ecs_task_state]
 
 Reference
 ---------
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index a06292dc9f..035484f82b 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -730,6 +730,7 @@ delim
 dep
 deploymentUrl
 deps
+deregister
 desc
 deserialization
 deserialize
diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py
index 1e6feb5d93..b610942031 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -409,6 +409,8 @@ class TestAmazonProviderProjectStructure(ExampleCoverageTest):
         'airflow.providers.amazon.aws.sensors.rds.RdsBaseSensor',
         'airflow.providers.amazon.aws.sensors.sagemaker.SageMakerBaseSensor',
         'airflow.providers.amazon.aws.operators.appflow.AppflowBaseOperator',
+        'airflow.providers.amazon.aws.operators.ecs.EcsBaseOperator',
+        'airflow.providers.amazon.aws.sensors.ecs.EcsBaseSensor',
     }
 
     MISSING_EXAMPLES_FOR_CLASSES = {
diff --git a/tests/providers/amazon/aws/hooks/test_ecs.py b/tests/providers/amazon/aws/hooks/test_ecs.py
new file mode 100644
index 0000000000..fb37b7161a
--- /dev/null
+++ b/tests/providers/amazon/aws/hooks/test_ecs.py
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from datetime import timedelta
+from unittest import mock
+
+import pytest
+from botocore.exceptions import ClientError
+
+from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart
+from airflow.providers.amazon.aws.hooks.ecs import EcsHook, EcsTaskLogFetcher, should_retry, should_retry_eni
+
+try:
+    from moto import mock_ecs
+except ImportError:
+    mock_ecs = None
+
+DEFAULT_CONN_ID: str = 'aws_default'
+REGION: str = 'us-east-1'
+
+
+@pytest.fixture
+def mock_conn():
+    with mock.patch.object(EcsHook, 'conn') as _conn:
+        yield _conn
+
+
+@pytest.mark.skipif(mock_ecs is None, reason="mock_ecs package not present")
+class TestEksHooks:
+    def test_hook(self) -> None:
+        hook = EcsHook(region_name=REGION)
+        assert hook.conn is not None
+        assert hook.aws_conn_id == DEFAULT_CONN_ID
+        assert hook.region_name == REGION
+
+    def test_get_cluster_state(self, mock_conn) -> None:
+        mock_conn.describe_clusters.return_value = {'clusters': [{'status': 'ACTIVE'}]}
+        assert EcsHook().get_cluster_state(cluster_name='cluster_name') == 'ACTIVE'
+
+    def test_get_task_definition_state(self, mock_conn) -> None:
+        mock_conn.describe_task_definition.return_value = {'taskDefinition': {'status': 'ACTIVE'}}
+        assert EcsHook().get_task_definition_state(task_definition='task_name') == 'ACTIVE'
+
+    def test_get_task_state(self, mock_conn) -> None:
+        mock_conn.describe_tasks.return_value = {'tasks': [{'lastStatus': 'ACTIVE'}]}
+        assert EcsHook().get_task_state(cluster='cluster_name', task='task_name') == 'ACTIVE'
+
+
+class TestShouldRetry(unittest.TestCase):
+    def test_return_true_on_valid_reason(self):
+        self.assertTrue(should_retry(EcsOperatorError([{'reason': 'RESOURCE:MEMORY'}], 'Foo')))
+
+    def test_return_false_on_invalid_reason(self):
+        self.assertFalse(should_retry(EcsOperatorError([{'reason': 'CLUSTER_NOT_FOUND'}], 'Foo')))
+
+
+class TestShouldRetryEni(unittest.TestCase):
+    def test_return_true_on_valid_reason(self):
+        self.assertTrue(
+            should_retry_eni(
+                EcsTaskFailToStart(
+                    "The task failed to start due to: "
+                    "Timeout waiting for network interface provisioning to complete."
+                )
+            )
+        )
+
+    def test_return_false_on_invalid_reason(self):
+        self.assertFalse(
+            should_retry_eni(
+                EcsTaskFailToStart(
+                    "The task failed to start due to: "
+                    "CannotPullContainerError: "
+                    "ref pull has been retried 5 time(s): failed to resolve reference"
+                )
+            )
+        )
+
+
+class TestEcsTaskLogFetcher(unittest.TestCase):
+    @mock.patch('logging.Logger')
+    def set_up_log_fetcher(self, logger_mock):
+        self.logger_mock = logger_mock
+
+        self.log_fetcher = EcsTaskLogFetcher(
+            log_group="test_log_group",
+            log_stream_name="test_log_stream_name",
+            fetch_interval=timedelta(milliseconds=1),
+            logger=logger_mock,
+        )
+
+    def setUp(self):
+        self.set_up_log_fetcher()
+
+    @mock.patch(
+        'threading.Event.is_set',
+        side_effect=(False, False, False, True),
+    )
+    @mock.patch(
+        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
+        side_effect=(
+            iter(
+                [
+                    {'timestamp': 1617400267123, 'message': 'First'},
+                    {'timestamp': 1617400367456, 'message': 'Second'},
+                ]
+            ),
+            iter(
+                [
+                    {'timestamp': 1617400467789, 'message': 'Third'},
+                ]
+            ),
+            iter([]),
+        ),
+    )
+    def test_run(self, get_log_events_mock, event_is_set_mock):
+
+        self.log_fetcher.run()
+
+        self.logger_mock.info.assert_has_calls(
+            [
+                mock.call('[2021-04-02 21:51:07,123] First'),
+                mock.call('[2021-04-02 21:52:47,456] Second'),
+                mock.call('[2021-04-02 21:54:27,789] Third'),
+            ]
+        )
+
+    @mock.patch(
+        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
+        side_effect=ClientError({"Error": {"Code": "ResourceNotFoundException"}}, None),
+    )
+    def test_get_log_events_with_expected_error(self, get_log_events_mock):
+        with pytest.raises(StopIteration):
+            next(self.log_fetcher._get_log_events())
+
+    @mock.patch(
+        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
+        side_effect=Exception(),
+    )
+    def test_get_log_events_with_unexpected_error(self, get_log_events_mock):
+        with pytest.raises(Exception):
+            next(self.log_fetcher._get_log_events())
+
+    def test_event_to_str(self):
+        events = [
+            {'timestamp': 1617400267123, 'message': 'First'},
+            {'timestamp': 1617400367456, 'message': 'Second'},
+            {'timestamp': 1617400467789, 'message': 'Third'},
+        ]
+        assert [self.log_fetcher._event_to_str(event) for event in events] == (
+            [
+                '[2021-04-02 21:51:07,123] First',
+                '[2021-04-02 21:52:47,456] Second',
+                '[2021-04-02 21:54:27,789] Third',
+            ]
+        )
+
+    @mock.patch(
+        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
+        return_value=(),
+    )
+    def test_get_last_log_message_with_no_log_events(self, mock_log_events):
+        assert self.log_fetcher.get_last_log_message() is None
+
+    @mock.patch(
+        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
+        return_value=iter(
+            [
+                {'timestamp': 1617400267123, 'message': 'First'},
+                {'timestamp': 1617400367456, 'message': 'Second'},
+            ]
+        ),
+    )
+    def test_get_last_log_message_with_log_events(self, mock_log_events):
+        assert self.log_fetcher.get_last_log_message() == 'Second'
+
+    @mock.patch(
+        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
+        return_value=iter(
+            [
+                {'timestamp': 1617400267123, 'message': 'First'},
+                {'timestamp': 1617400367456, 'message': 'Second'},
+                {'timestamp': 1617400367458, 'message': 'Third'},
+            ]
+        ),
+    )
+    def test_get_last_log_messages_with_log_events(self, mock_log_events):
+        assert self.log_fetcher.get_last_log_messages(2) == ['Second', 'Third']
+
+    @mock.patch(
+        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
+        return_value=(),
+    )
+    def test_get_last_log_messages_with_no_log_events(self, mock_log_events):
+        assert self.log_fetcher.get_last_log_messages(2) == []
diff --git a/tests/providers/amazon/aws/operators/test_ecs.py b/tests/providers/amazon/aws/operators/test_ecs.py
index 701f4d7853..ecdd4d779e 100644
--- a/tests/providers/amazon/aws/operators/test_ecs.py
+++ b/tests/providers/amazon/aws/operators/test_ecs.py
@@ -20,58 +20,80 @@
 import sys
 import unittest
 from copy import deepcopy
-from datetime import timedelta
 from unittest import mock
 
 import pytest
-from botocore.exceptions import ClientError
 from parameterized import parameterized
 
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart
+from airflow.providers.amazon.aws.hooks.ecs import EcsHook
 from airflow.providers.amazon.aws.operators.ecs import (
-    EcsOperator,
+    EcsBaseOperator,
+    EcsCreateClusterOperator,
+    EcsDeleteClusterOperator,
+    EcsDeregisterTaskDefinitionOperator,
+    EcsRegisterTaskDefinitionOperator,
+    EcsRunTaskOperator,
     EcsTaskLogFetcher,
-    should_retry,
-    should_retry_eni,
 )
-
-# fmt: off
+from airflow.providers.amazon.aws.sensors.ecs import EcsClusterStateSensor, EcsTaskDefinitionStateSensor
+
+try:
+    from moto import mock_ecs
+except ImportError:
+    mock_ecs = None
+
+CLUSTER_NAME = 'test_cluster'
+CONTAINER_NAME = 'e1ed7aac-d9b2-4315-8726-d2432bf11868'
+TASK_ID = 'd8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
+TASK_DEFINITION_NAME = 'td_name'
+TASK_DEFINITION_CONFIG = {
+    'family': 'family_name',
+    'register_task_kwargs': {
+        'cpu': '256',
+        'memory': '512',
+        'networkMode': 'awsvpc',
+    },
+    'container_definitions': [
+        {
+            'name': CONTAINER_NAME,
+            'image': 'ubuntu',
+            'workingDirectory': '/usr/bin',
+            'entryPoint': ['sh', '-c'],
+            'command': ['ls'],
+        }
+    ],
+}
 RESPONSE_WITHOUT_FAILURES = {
     "failures": [],
     "tasks": [
         {
             "containers": [
                 {
-                    "containerArn":
-                        "arn:aws:ecs:us-east-1:012345678910:container/e1ed7aac-d9b2-4315-8726-d2432bf11868",
+                    "containerArn": f"arn:aws:ecs:us-east-1:012345678910:container/{CONTAINER_NAME}",
                     "lastStatus": "PENDING",
                     "name": "wordpress",
-                    "taskArn": "arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55"
+                    "taskArn": f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}",
                 }
             ],
             "desiredStatus": "RUNNING",
             "lastStatus": "PENDING",
-            "taskArn": "arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55",
-            "taskDefinitionArn": "arn:aws:ecs:us-east-1:012345678910:task-definition/hello_world:11"
+            "taskArn": f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}",
+            "taskDefinitionArn": "arn:aws:ecs:us-east-1:012345678910:task-definition/hello_world:11",
         }
-    ]
+    ],
 }
-# fmt: on
-
 
-class TestEcsOperator(unittest.TestCase):
-    @mock.patch('airflow.providers.amazon.aws.operators.ecs.AwsBaseHook')
-    def set_up_operator(self, aws_hook_mock, **kwargs):
-        self.aws_hook_mock = aws_hook_mock
 
+@pytest.mark.skipif(mock_ecs is None, reason="mock_ecs package not present")
+class TestEcsRunTaskOperator(unittest.TestCase):
+    def set_up_operator(self, **kwargs):
         self.ecs_operator_args = {
             'task_id': 'task',
             'task_definition': 't',
             'cluster': 'c',
             'overrides': {},
-            'aws_conn_id': None,
-            'region_name': 'eu-west-1',
             'group': 'group',
             'placement_constraints': [
                 {'expression': 'attribute:ecs.instance-type =~ t2.*', 'type': 'memberOf'}
@@ -82,29 +104,38 @@ class TestEcsOperator(unittest.TestCase):
             },
             'propagate_tags': 'TASK_DEFINITION',
         }
-        self.ecs = EcsOperator(**self.ecs_operator_args, **kwargs)
-        self.ecs.get_hook()
+        self.ecs = EcsRunTaskOperator(**self.ecs_operator_args, **kwargs)
 
     def setUp(self):
         self.set_up_operator()
         self.mock_context = mock.MagicMock()
 
     def test_init(self):
-        assert self.ecs.region_name == 'eu-west-1'
         assert self.ecs.task_definition == 't'
-        assert self.ecs.aws_conn_id is None
         assert self.ecs.cluster == 'c'
         assert self.ecs.overrides == {}
-        self.ecs.get_hook()
-        assert self.ecs.hook == self.aws_hook_mock.return_value
-        self.aws_hook_mock.assert_called_once()
 
     def test_template_fields_overrides(self):
         assert self.ecs.template_fields == (
-            'cluster',
             'task_definition',
+            'cluster',
             'overrides',
+            'launch_type',
+            'capacity_provider_strategy',
+            'group',
+            'placement_constraints',
+            'placement_strategy',
+            'platform_version',
             'network_configuration',
+            'tags',
+            'awslogs_group',
+            'awslogs_region',
+            'awslogs_stream_prefix',
+            'awslogs_fetch_interval',
+            'propagate_tags',
+            'reattach',
+            'number_logs_exception',
+            'wait_for_completion',
         )
 
     @parameterized.expand(
@@ -180,8 +211,9 @@ class TestEcsOperator(unittest.TestCase):
             ],
         ]
     )
-    @mock.patch.object(EcsOperator, '_wait_for_task_ended')
-    @mock.patch.object(EcsOperator, '_check_success_task')
+    @mock.patch.object(EcsRunTaskOperator, '_wait_for_task_ended')
+    @mock.patch.object(EcsRunTaskOperator, '_check_success_task')
+    @mock.patch.object(EcsBaseOperator, 'client')
     def test_execute_without_failures(
         self,
         launch_type,
@@ -189,6 +221,7 @@ class TestEcsOperator(unittest.TestCase):
         platform_version,
         tags,
         expected_args,
+        client_mock,
         check_mock,
         wait_mock,
     ):
@@ -199,13 +232,10 @@ class TestEcsOperator(unittest.TestCase):
             platform_version=platform_version,
             tags=tags,
         )
-        client_mock = self.aws_hook_mock.return_value.get_conn.return_value
         client_mock.run_task.return_value = RESPONSE_WITHOUT_FAILURES
 
         self.ecs.execute(None)
 
-        self.aws_hook_mock.return_value.get_conn.assert_called_once()
-
         client_mock.run_task.assert_called_once_with(
             cluster='c',
             overrides={},
@@ -223,11 +253,11 @@ class TestEcsOperator(unittest.TestCase):
 
         wait_mock.assert_called_once_with()
         check_mock.assert_called_once_with()
-        assert self.ecs.arn == 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
-        assert self.ecs.ecs_task_id == 'd8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
+        assert self.ecs.arn == f'arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}'
+        assert self.ecs.ecs_task_id == TASK_ID
 
-    def test_execute_with_failures(self):
-        client_mock = self.aws_hook_mock.return_value.get_conn.return_value
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_execute_with_failures(self, client_mock):
         resp_failures = deepcopy(RESPONSE_WITHOUT_FAILURES)
         resp_failures['failures'].append('dummy error')
         client_mock.run_task.return_value = resp_failures
@@ -235,7 +265,6 @@ class TestEcsOperator(unittest.TestCase):
         with pytest.raises(EcsOperatorError):
             self.ecs.execute(None)
 
-        self.aws_hook_mock.return_value.get_conn.assert_called_once()
         client_mock.run_task.assert_called_once_with(
             cluster='c',
             launchType='EC2',
@@ -254,20 +283,18 @@ class TestEcsOperator(unittest.TestCase):
             propagateTags='TASK_DEFINITION',
         )
 
-    def test_wait_end_tasks(self):
-        client_mock = mock.Mock()
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_wait_end_tasks(self, client_mock):
         self.ecs.arn = 'arn'
-        self.ecs.client = client_mock
 
         self.ecs._wait_for_task_ended()
         client_mock.get_waiter.assert_called_once_with('tasks_stopped')
         client_mock.get_waiter.return_value.wait.assert_called_once_with(cluster='c', tasks=['arn'])
         assert sys.maxsize == client_mock.get_waiter.return_value.config.max_attempts
 
-    def test_check_success_tasks_raises_failed_to_start(self):
-        client_mock = mock.Mock()
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_check_success_tasks_raises_failed_to_start(self, client_mock):
         self.ecs.arn = 'arn'
-        self.ecs.client = client_mock
 
         client_mock.describe_tasks.return_value = {
             'tasks': [
@@ -285,20 +312,17 @@ class TestEcsOperator(unittest.TestCase):
         assert str(ctx.value) == "The task failed to start due to: Task failed to start"
         client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
 
-    def test_check_success_tasks_raises_cloudwatch_logs(self):
-        client_mock = mock.Mock()
+    @mock.patch.object(EcsBaseOperator, 'client')
+    @mock.patch('airflow.providers.amazon.aws.hooks.ecs.EcsTaskLogFetcher')
+    def test_check_success_tasks_raises_cloudwatch_logs(self, log_fetcher_mock, client_mock):
         self.ecs.arn = 'arn'
-        self.ecs.client = client_mock
+        self.ecs.task_log_fetcher = log_fetcher_mock
 
+        log_fetcher_mock.get_last_log_messages.return_value = ["1", "2", "3", "4", "5"]
         client_mock.describe_tasks.return_value = {
             'tasks': [{'containers': [{'name': 'foo', 'lastStatus': 'STOPPED', 'exitCode': 1}]}]
         }
 
-        task_log_fetcher = mock.Mock()
-        self.ecs.task_log_fetcher = task_log_fetcher
-
-        task_log_fetcher.get_last_log_messages.return_value = ["1", "2", "3", "4", "5"]
-
         with pytest.raises(Exception) as ctx:
             self.ecs._check_success_task()
 
@@ -307,30 +331,26 @@ class TestEcsOperator(unittest.TestCase):
         )
         client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
 
-    def test_check_success_tasks_raises_cloudwatch_logs_empty(self):
-        client_mock = mock.Mock()
+    @mock.patch.object(EcsBaseOperator, 'client')
+    @mock.patch('airflow.providers.amazon.aws.hooks.ecs.EcsTaskLogFetcher')
+    def test_check_success_tasks_raises_cloudwatch_logs_empty(self, log_fetcher_mock, client_mock):
         self.ecs.arn = 'arn'
-        self.ecs.client = client_mock
+        self.ecs.task_log_fetcher = log_fetcher_mock
 
+        log_fetcher_mock.get_last_log_messages.return_value = []
         client_mock.describe_tasks.return_value = {
             'tasks': [{'containers': [{'name': 'foo', 'lastStatus': 'STOPPED', 'exitCode': 1}]}]
         }
 
-        task_log_fetcher = mock.Mock()
-        self.ecs.task_log_fetcher = task_log_fetcher
-
-        task_log_fetcher.get_last_log_messages.return_value = []
-
         with pytest.raises(Exception) as ctx:
             self.ecs._check_success_task()
 
         assert str(ctx.value) == "This task is not in success state - last 10 logs from Cloudwatch:\n"
         client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
 
-    def test_check_success_tasks_raises_logs_disabled(self):
-        client_mock = mock.Mock()
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_check_success_tasks_raises_logs_disabled(self, client_mock):
         self.ecs.arn = 'arn'
-        self.ecs.client = client_mock
 
         client_mock.describe_tasks.return_value = {
             'tasks': [{'containers': [{'name': 'foo', 'lastStatus': 'STOPPED', 'exitCode': 1}]}]
@@ -345,10 +365,9 @@ class TestEcsOperator(unittest.TestCase):
         assert "'exitCode': 1" in str(ctx.value)
         client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
 
-    def test_check_success_tasks_handles_initialization_failure(self):
-        client_mock = mock.Mock()
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_check_success_tasks_handles_initialization_failure(self, client_mock):
         self.ecs.arn = 'arn'
-        self.ecs.client = client_mock
 
         # exitCode is missing during some container initialization failures
         client_mock.describe_tasks.return_value = {
@@ -365,9 +384,8 @@ class TestEcsOperator(unittest.TestCase):
         assert "exitCode" not in str(ctx.value)
         client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
 
-    def test_check_success_tasks_raises_pending(self):
-        client_mock = mock.Mock()
-        self.ecs.client = client_mock
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_check_success_tasks_raises_pending(self, client_mock):
         self.ecs.arn = 'arn'
         client_mock.describe_tasks.return_value = {
             'tasks': [{'containers': [{'name': 'container-name', 'lastStatus': 'PENDING'}]}]
@@ -380,9 +398,8 @@ class TestEcsOperator(unittest.TestCase):
         assert "'lastStatus': 'PENDING'" in str(ctx.value)
         client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
 
-    def test_check_success_tasks_raises_multiple(self):
-        client_mock = mock.Mock()
-        self.ecs.client = client_mock
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_check_success_tasks_raises_multiple(self, client_mock):
         self.ecs.arn = 'arn'
         client_mock.describe_tasks.return_value = {
             'tasks': [
@@ -397,9 +414,8 @@ class TestEcsOperator(unittest.TestCase):
         self.ecs._check_success_task()
         client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
 
-    def test_host_terminated_raises(self):
-        client_mock = mock.Mock()
-        self.ecs.client = client_mock
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_host_terminated_raises(self, client_mock):
         self.ecs.arn = 'arn'
         client_mock.describe_tasks.return_value = {
             'tasks': [
@@ -407,16 +423,16 @@ class TestEcsOperator(unittest.TestCase):
                     'stoppedReason': 'Host EC2 (instance i-1234567890abcdef) terminated.',
                     "containers": [
                         {
-                            "containerArn": "arn:aws:ecs:us-east-1:012345678910:container/e1ed7aac-d9b2-4315-8726-d2432bf11868",  # noqa: E501
+                            "containerArn": f"arn:aws:ecs:us-east-1:012345678910:container/{CONTAINER_NAME}",
                             "lastStatus": "RUNNING",
                             "name": "wordpress",
-                            "taskArn": "arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55",  # noqa: E501
+                            "taskArn": f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}",
                         }
                     ],
                     "desiredStatus": "STOPPED",
                     "lastStatus": "STOPPED",
-                    "taskArn": "arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55",  # noqa: E501
-                    "taskDefinitionArn": "arn:aws:ecs:us-east-1:012345678910:task-definition/hello_world:11",  # noqa: E501
+                    "taskArn": f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}",
+                    "taskDefinitionArn": "arn:aws:ecs:us-east-1:012345678910:task-definition/hello_world:11",
                 }
             ]
         }
@@ -429,9 +445,8 @@ class TestEcsOperator(unittest.TestCase):
         assert ") terminated" in str(ctx.value)
         client_mock.describe_tasks.assert_called_once_with(cluster='c', tasks=['arn'])
 
-    def test_check_success_task_not_raises(self):
-        client_mock = mock.Mock()
-        self.ecs.client = client_mock
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_check_success_task_not_raises(self, client_mock):
         self.ecs.arn = 'arn'
         client_mock.describe_tasks.return_value = {
             'tasks': [{'containers': [{'name': 'container-name', 'lastStatus': 'STOPPED', 'exitCode': 0}]}]
@@ -447,33 +462,40 @@ class TestEcsOperator(unittest.TestCase):
             ['', {'testTagKey': 'testTagValue'}],
         ]
     )
-    @mock.patch.object(EcsOperator, "_xcom_del")
+    @mock.patch.object(EcsRunTaskOperator, "_xcom_del")
     @mock.patch.object(
-        EcsOperator,
+        EcsRunTaskOperator,
         "xcom_pull",
-        return_value="arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55",
+        return_value=f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}",
     )
-    @mock.patch.object(EcsOperator, '_wait_for_task_ended')
-    @mock.patch.object(EcsOperator, '_check_success_task')
-    @mock.patch.object(EcsOperator, '_start_task')
+    @mock.patch.object(EcsRunTaskOperator, '_wait_for_task_ended')
+    @mock.patch.object(EcsRunTaskOperator, '_check_success_task')
+    @mock.patch.object(EcsRunTaskOperator, '_start_task')
+    @mock.patch.object(EcsBaseOperator, 'client')
     def test_reattach_successful(
-        self, launch_type, tags, start_mock, check_mock, wait_mock, xcom_pull_mock, xcom_del_mock
+        self,
+        launch_type,
+        tags,
+        client_mock,
+        start_mock,
+        check_mock,
+        wait_mock,
+        xcom_pull_mock,
+        xcom_del_mock,
     ):
 
         self.set_up_operator(launch_type=launch_type, tags=tags)
-        client_mock = self.aws_hook_mock.return_value.get_conn.return_value
         client_mock.describe_task_definition.return_value = {'taskDefinition': {'family': 'f'}}
         client_mock.list_tasks.return_value = {
             'taskArns': [
                 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b54',
-                'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55',
+                f'arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}',
             ]
         }
 
         self.ecs.reattach = True
         self.ecs.execute(self.mock_context)
 
-        self.aws_hook_mock.return_value.get_conn.assert_called_once()
         extend_args = {}
         if launch_type:
             extend_args['launchType'] = launch_type
@@ -495,8 +517,8 @@ class TestEcsOperator(unittest.TestCase):
         wait_mock.assert_called_once_with()
         check_mock.assert_called_once_with()
         xcom_del_mock.assert_called_once()
-        assert self.ecs.arn == 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
-        assert self.ecs.ecs_task_id == 'd8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
+        assert self.ecs.arn == f'arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}'
+        assert self.ecs.ecs_task_id == TASK_ID
 
     @parameterized.expand(
         [
@@ -506,17 +528,23 @@ class TestEcsOperator(unittest.TestCase):
             ['', {'testTagKey': 'testTagValue'}],
         ]
     )
-    @mock.patch.object(EcsOperator, '_xcom_del')
-    @mock.patch.object(EcsOperator, '_xcom_set')
-    @mock.patch.object(EcsOperator, '_try_reattach_task')
-    @mock.patch.object(EcsOperator, '_wait_for_task_ended')
-    @mock.patch.object(EcsOperator, '_check_success_task')
+    @mock.patch.object(EcsRunTaskOperator, '_xcom_del')
+    @mock.patch.object(EcsRunTaskOperator, '_try_reattach_task')
+    @mock.patch.object(EcsRunTaskOperator, '_wait_for_task_ended')
+    @mock.patch.object(EcsRunTaskOperator, '_check_success_task')
+    @mock.patch.object(EcsBaseOperator, 'client')
     def test_reattach_save_task_arn_xcom(
-        self, launch_type, tags, check_mock, wait_mock, reattach_mock, xcom_set_mock, xcom_del_mock
+        self,
+        launch_type,
+        tags,
+        client_mock,
+        check_mock,
+        wait_mock,
+        reattach_mock,
+        xcom_del_mock,
     ):
 
         self.set_up_operator(launch_type=launch_type, tags=tags)
-        client_mock = self.aws_hook_mock.return_value.get_conn.return_value
         client_mock.describe_task_definition.return_value = {'taskDefinition': {'family': 'f'}}
         client_mock.list_tasks.return_value = {'taskArns': []}
         client_mock.run_task.return_value = RESPONSE_WITHOUT_FAILURES
@@ -524,7 +552,6 @@ class TestEcsOperator(unittest.TestCase):
         self.ecs.reattach = True
         self.ecs.execute(self.mock_context)
 
-        self.aws_hook_mock.return_value.get_conn.assert_called_once()
         extend_args = {}
         if launch_type:
             extend_args['launchType'] = launch_type
@@ -535,185 +562,169 @@ class TestEcsOperator(unittest.TestCase):
 
         reattach_mock.assert_called_once()
         client_mock.run_task.assert_called_once()
-        xcom_set_mock.assert_called_once_with(
-            self.mock_context,
-            key=self.ecs.REATTACH_XCOM_KEY,
-            task_id=self.ecs.REATTACH_XCOM_TASK_ID_TEMPLATE.format(task_id=self.ecs.task_id),
-            value="arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55",
-        )
         wait_mock.assert_called_once_with()
         check_mock.assert_called_once_with()
         xcom_del_mock.assert_called_once()
-        assert self.ecs.arn == 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
-        assert self.ecs.ecs_task_id == 'd8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
+        assert self.ecs.arn == f'arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}'
+        assert self.ecs.ecs_task_id == TASK_ID
 
-    def test_execute_xcom_with_log(self):
+    @mock.patch.object(EcsBaseOperator, 'client')
+    @mock.patch('airflow.providers.amazon.aws.hooks.ecs.EcsTaskLogFetcher')
+    def test_execute_xcom_with_log(self, log_fetcher_mock, client_mock):
         self.ecs.do_xcom_push = True
-        self.ecs.task_log_fetcher = mock.Mock()
-        self.ecs.task_log_fetcher.get_last_log_message.return_value = "Log output"
+        self.ecs.task_log_fetcher = log_fetcher_mock
+
+        log_fetcher_mock.get_last_log_message.return_value = "Log output"
+
         assert self.ecs.execute(None) == "Log output"
 
-    def test_execute_xcom_with_no_log(self):
+    @mock.patch.object(EcsBaseOperator, 'client')
+    @mock.patch('airflow.providers.amazon.aws.hooks.ecs.EcsTaskLogFetcher')
+    def test_execute_xcom_with_no_log(self, log_fetcher_mock, client_mock):
         self.ecs.do_xcom_push = True
-        self.ecs.task_log_fetcher = mock.Mock()
-        self.ecs.task_log_fetcher.get_last_log_message.return_value = None
+        self.ecs.task_log_fetcher = log_fetcher_mock
+
+        log_fetcher_mock.get_last_log_message.return_value = None
+
         assert self.ecs.execute(None) is None
 
-    def test_execute_xcom_with_no_log_fetcher(self):
+    @mock.patch.object(EcsBaseOperator, 'client')
+    def test_execute_xcom_with_no_log_fetcher(self, client_mock):
         self.ecs.do_xcom_push = True
         assert self.ecs.execute(None) is None
 
-    def test_execute_xcom_disabled(self):
+    @mock.patch.object(EcsBaseOperator, 'client')
+    @mock.patch.object(EcsTaskLogFetcher, 'get_last_log_message', return_value='Log output')
+    def test_execute_xcom_disabled(self, log_fetcher_mock, client_mock):
         self.ecs.do_xcom_push = False
-        self.ecs.task_log_fetcher = mock.Mock()
-        self.ecs.task_log_fetcher.get_last_log_message.return_value = "Log output"
         assert self.ecs.execute(None) is None
 
 
-class TestShouldRetry(unittest.TestCase):
-    def test_return_true_on_valid_reason(self):
-        self.assertTrue(should_retry(EcsOperatorError([{'reason': 'RESOURCE:MEMORY'}], 'Foo')))
-
-    def test_return_false_on_invalid_reason(self):
-        self.assertFalse(should_retry(EcsOperatorError([{'reason': 'CLUSTER_NOT_FOUND'}], 'Foo')))
-
-
-class TestShouldRetryEni(unittest.TestCase):
-    def test_return_true_on_valid_reason(self):
-        self.assertTrue(
-            should_retry_eni(
-                EcsTaskFailToStart(
-                    "The task failed to start due to: "
-                    "Timeout waiting for network interface provisioning to complete."
-                )
-            )
-        )
-
-    def test_return_false_on_invalid_reason(self):
-        self.assertFalse(
-            should_retry_eni(
-                EcsTaskFailToStart(
-                    "The task failed to start due to: "
-                    "CannotPullContainerError: "
-                    "ref pull has been retried 5 time(s): failed to resolve reference"
-                )
-            )
-        )
-
-
-class TestEcsTaskLogFetcher(unittest.TestCase):
-    @mock.patch('logging.Logger')
-    def set_up_log_fetcher(self, logger_mock):
-        self.logger_mock = logger_mock
-
-        self.log_fetcher = EcsTaskLogFetcher(
-            log_group="test_log_group",
-            log_stream_name="test_log_stream_name",
-            fetch_interval=timedelta(milliseconds=1),
-            logger=logger_mock,
+@pytest.mark.skipif(mock_ecs is None, reason="mock_ecs package not present")
+class TestEcsCreateClusterOperator(unittest.TestCase):
+    @mock.patch.object(EcsClusterStateSensor, 'poke')
+    @mock.patch.object(EcsHook, 'conn')
+    def test_execute(self, mock_conn, mock_sensor):
+        op = EcsCreateClusterOperator(task_id='task', cluster_name=CLUSTER_NAME)
+        result = op.execute(None)
+
+        mock_sensor.assert_called_once()
+        mock_conn.create_cluster.assert_called_once_with(clusterName=CLUSTER_NAME)
+        assert result is not None
+
+    @mock.patch.object(EcsClusterStateSensor, 'poke')
+    @mock.patch.object(EcsHook, 'conn')
+    def test_execute_without_wait(self, mock_conn, mock_sensor):
+        op = EcsCreateClusterOperator(task_id='task', cluster_name=CLUSTER_NAME, wait_for_completion=False)
+        result = op.execute(None)
+
+        mock_sensor.assert_not_called()
+        mock_conn.create_cluster.assert_called_once_with(clusterName=CLUSTER_NAME)
+        assert result is not None
+
+
+@pytest.mark.skipif(mock_ecs is None, reason="mock_ecs package not present")
+class TestEcsDeleteClusterOperator(unittest.TestCase):
+    @mock.patch.object(EcsClusterStateSensor, 'poke')
+    @mock.patch.object(EcsHook, 'conn')
+    def test_execute(self, mock_client, mock_sensor):
+        op = EcsDeleteClusterOperator(task_id='task', cluster_name=CLUSTER_NAME)
+        result = op.execute(None)
+
+        mock_client.delete_cluster.assert_called_once_with(cluster=CLUSTER_NAME)
+        mock_sensor.assert_called_once()
+        assert result is not None
+
+    @mock.patch.object(EcsClusterStateSensor, 'poke')
+    @mock.patch.object(EcsHook, 'conn')
+    def test_execute_without_wait(self, mock_conn, mock_sensor):
+        op = EcsDeleteClusterOperator(task_id='task', cluster_name=CLUSTER_NAME, wait_for_completion=False)
+        result = op.execute(None)
+
+        mock_sensor.assert_not_called()
+        mock_conn.delete_cluster.assert_called_once_with(cluster=CLUSTER_NAME)
+        assert result is not None
+
+
+@pytest.mark.skipif(mock_ecs is None, reason="mock_ecs package not present")
+class TestEcsDeregisterTaskDefinitionOperator(unittest.TestCase):
+    @mock.patch.object(EcsTaskDefinitionStateSensor, 'poke')
+    @mock.patch.object(EcsHook, 'conn')
+    def test_execute(self, mock_conn, mock_sensor):
+        op = EcsDeregisterTaskDefinitionOperator(task_id='task', task_definition=TASK_DEFINITION_NAME)
+        result = op.execute(None)
+
+        mock_conn.deregister_task_definition.assert_called_once_with(taskDefinition=TASK_DEFINITION_NAME)
+        mock_sensor.assert_called_once()
+        assert result is not None
+
+    @mock.patch.object(EcsTaskDefinitionStateSensor, 'poke')
+    @mock.patch.object(EcsHook, 'conn')
+    def test_execute_without_wait(self, mock_conn, mock_sensor):
+        op = EcsDeregisterTaskDefinitionOperator(
+            task_id='task', task_definition=TASK_DEFINITION_NAME, wait_for_completion=False
         )
+        result = op.execute(None)
+
+        mock_sensor.assert_not_called()
+        mock_conn.deregister_task_definition.assert_called_once_with(taskDefinition=TASK_DEFINITION_NAME)
+        assert result is not None
+
+
+@pytest.mark.skipif(mock_ecs is None, reason="mock_ecs package not present")
+class TestEcsRegisterTaskDefinitionOperator(unittest.TestCase):
+    @mock.patch.object(EcsTaskDefinitionStateSensor, 'poke')
+    @mock.patch.object(EcsHook, 'conn')
+    def test_execute(self, mock_conn, mock_sensor):
+        mock_context = mock.MagicMock()
+        expected_task_definition_config = {
+            'family': 'family_name',
+            'containerDefinitions': [
+                {
+                    'name': CONTAINER_NAME,
+                    'image': 'ubuntu',
+                    'workingDirectory': '/usr/bin',
+                    'entryPoint': ['sh', '-c'],
+                    'command': ['ls'],
+                }
+            ],
+            'cpu': '256',
+            'memory': '512',
+            'networkMode': 'awsvpc',
+        }
 
-    def setUp(self):
-        self.set_up_log_fetcher()
-
-    @mock.patch(
-        'threading.Event.is_set',
-        side_effect=(False, False, False, True),
-    )
-    @mock.patch(
-        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
-        side_effect=(
-            iter(
-                [
-                    {'timestamp': 1617400267123, 'message': 'First'},
-                    {'timestamp': 1617400367456, 'message': 'Second'},
-                ]
-            ),
-            iter(
-                [
-                    {'timestamp': 1617400467789, 'message': 'Third'},
-                ]
-            ),
-            iter([]),
-        ),
-    )
-    def test_run(self, get_log_events_mock, event_is_set_mock):
-
-        self.log_fetcher.run()
+        op = EcsRegisterTaskDefinitionOperator(task_id='task', **TASK_DEFINITION_CONFIG)
+        result = op.execute(mock_context)
 
-        self.logger_mock.info.assert_has_calls(
-            [
-                mock.call('[2021-04-02 21:51:07,123] First'),
-                mock.call('[2021-04-02 21:52:47,456] Second'),
-                mock.call('[2021-04-02 21:54:27,789] Third'),
-            ]
-        )
+        mock_conn.register_task_definition.assert_called_once_with(**expected_task_definition_config)
+        mock_sensor.assert_called_once()
+        assert result is not None
 
-    @mock.patch(
-        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
-        side_effect=ClientError({"Error": {"Code": "ResourceNotFoundException"}}, None),
-    )
-    def test_get_log_events_with_expected_error(self, get_log_events_mock):
-        with pytest.raises(StopIteration):
-            next(self.log_fetcher._get_log_events())
+    @mock.patch.object(EcsTaskDefinitionStateSensor, 'poke')
+    @mock.patch.object(EcsHook, 'conn')
+    def test_execute_without_wait(self, mock_conn, mock_sensor):
+        mock_context = mock.MagicMock()
+        expected_task_definition_config = {
+            'family': 'family_name',
+            'containerDefinitions': [
+                {
+                    'name': CONTAINER_NAME,
+                    'image': 'ubuntu',
+                    'workingDirectory': '/usr/bin',
+                    'entryPoint': ['sh', '-c'],
+                    'command': ['ls'],
+                }
+            ],
+            'cpu': '256',
+            'memory': '512',
+            'networkMode': 'awsvpc',
+        }
 
-    @mock.patch(
-        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
-        side_effect=Exception(),
-    )
-    def test_get_log_events_with_unexpected_error(self, get_log_events_mock):
-        with pytest.raises(Exception):
-            next(self.log_fetcher._get_log_events())
-
-    def test_event_to_str(self):
-        events = [
-            {'timestamp': 1617400267123, 'message': 'First'},
-            {'timestamp': 1617400367456, 'message': 'Second'},
-            {'timestamp': 1617400467789, 'message': 'Third'},
-        ]
-        assert [self.log_fetcher._event_to_str(event) for event in events] == (
-            [
-                '[2021-04-02 21:51:07,123] First',
-                '[2021-04-02 21:52:47,456] Second',
-                '[2021-04-02 21:54:27,789] Third',
-            ]
+        op = EcsRegisterTaskDefinitionOperator(
+            task_id='task', **TASK_DEFINITION_CONFIG, wait_for_completion=False
         )
+        result = op.execute(mock_context)
 
-    @mock.patch(
-        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
-        return_value=(),
-    )
-    def test_get_last_log_message_with_no_log_events(self, mock_log_events):
-        assert self.log_fetcher.get_last_log_message() is None
-
-    @mock.patch(
-        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
-        return_value=iter(
-            [
-                {'timestamp': 1617400267123, 'message': 'First'},
-                {'timestamp': 1617400367456, 'message': 'Second'},
-            ]
-        ),
-    )
-    def test_get_last_log_message_with_log_events(self, mock_log_events):
-        assert self.log_fetcher.get_last_log_message() == 'Second'
-
-    @mock.patch(
-        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
-        return_value=iter(
-            [
-                {'timestamp': 1617400267123, 'message': 'First'},
-                {'timestamp': 1617400367456, 'message': 'Second'},
-                {'timestamp': 1617400367458, 'message': 'Third'},
-            ]
-        ),
-    )
-    def test_get_last_log_messages_with_log_events(self, mock_log_events):
-        assert self.log_fetcher.get_last_log_messages(2) == ['Second', 'Third']
-
-    @mock.patch(
-        'airflow.providers.amazon.aws.hooks.logs.AwsLogsHook.get_log_events',
-        return_value=(),
-    )
-    def test_get_last_log_messages_with_no_log_events(self, mock_log_events):
-        assert self.log_fetcher.get_last_log_messages(2) == []
+        mock_sensor.assert_not_called()
+        mock_conn.register_task_definition.assert_called_once_with(**expected_task_definition_config)
+        assert result is not None
diff --git a/tests/providers/amazon/aws/operators/test_ecs_system.py b/tests/providers/amazon/aws/operators/test_ecs_system.py
deleted file mode 100644
index a5460b981a..0000000000
--- a/tests/providers/amazon/aws/operators/test_ecs_system.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import pytest
-
-from tests.test_utils.amazon_system_helpers import AWS_DAG_FOLDER, AmazonSystemTest
-
-
-@pytest.mark.backend("postgres", "mysql")
-class EcsSystemTest(AmazonSystemTest):
-    """
-    ECS System Test to run and test example ECS dags
-
-    Required variables.env file content (from your account):
-        # Auto-export all variables
-        set -a
-
-        # aws parameters
-        REGION_NAME="eu-west-1"
-        REGISTRY_ID="123456789012"
-        IMAGE="alpine:3.9"
-        SUBNET_ID="subnet-068e9654a3c357a"
-        SECURITY_GROUP_ID="sg-054dc69874a651"
-        EXECUTION_ROLE_ARN="arn:aws:iam::123456789012:role/FooBarRole"
-
-        # remove all created/existing resources flag
-        # comment out to keep resources or use empty string
-        # REMOVE_RESOURCES="True"
-    """
-
-    # should be same as in the example dag
-    aws_conn_id = "aws_ecs"
-    cluster = "c"
-    task_definition = "hello-world"
-    container = "hello-world-container"
-    awslogs_group = "/ecs/hello-world"
-    awslogs_stream_prefix = "prefix_b"  # only prefix without container name
-
-    @classmethod
-    def setup_class(cls):
-        cls.create_connection(
-            aws_conn_id=cls.aws_conn_id,
-            region=cls._region_name(),
-        )
-
-        # create ecs cluster if it does not exist
-        cls.create_ecs_cluster(
-            aws_conn_id=cls.aws_conn_id,
-            cluster_name=cls.cluster,
-        )
-
-        # create task_definition if it does not exist
-        task_definition_exists = cls.is_ecs_task_definition_exists(
-            aws_conn_id=cls.aws_conn_id,
-            task_definition=cls.task_definition,
-        )
-        if not task_definition_exists:
-            cls.create_ecs_task_definition(
-                aws_conn_id=cls.aws_conn_id,
-                task_definition=cls.task_definition,
-                container=cls.container,
-                image=cls._image(),
-                execution_role_arn=cls._execution_role_arn(),
-                awslogs_group=cls.awslogs_group,
-                awslogs_region=cls._region_name(),
-                awslogs_stream_prefix=cls.awslogs_stream_prefix,
-            )
-
-    @classmethod
-    def teardown_class(cls):
-        # remove all created/existing resources in tear down
-        if cls._remove_resources():
-            cls.delete_ecs_cluster(
-                aws_conn_id=cls.aws_conn_id,
-                cluster_name=cls.cluster,
-            )
-            cls.delete_ecs_task_definition(
-                aws_conn_id=cls.aws_conn_id,
-                task_definition=cls.task_definition,
-            )
-
-    def test_run_example_dag_ecs_fargate_dag(self):
-        self.run_dag("ecs_fargate_dag", AWS_DAG_FOLDER)