You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by on...@apache.org on 2023/06/14 17:59:27 UTC
[airflow] branch main updated: Deferrable mode for EksCreateFargateProfileOperator and EksDeleteFargateProfileOperator (#31657)
This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 212a37fbec Deferrable mode for EksCreateFargateProfileOperator and EksDeleteFargateProfileOperator (#31657)
212a37fbec is described below
commit 212a37fbeccebce2b8fb14535234a9bdc4b93708
Author: Syed Hussaain <10...@users.noreply.github.com>
AuthorDate: Wed Jun 14 10:59:18 2023 -0700
Deferrable mode for EksCreateFargateProfileOperator and EksDeleteFargateProfileOperator (#31657)
* Remove hook caching in Trigger
Raise Error from Trigger rather than a TriggerEvent in case of failure
Other minor fixes
* Add EKS trigger to provider.yaml
* Add comment on operator timeout
* Fix integration name for EKS for triggers in provider.yaml
---
airflow/providers/amazon/aws/operators/eks.py | 84 ++++++-
airflow/providers/amazon/aws/triggers/eks.py | 160 ++++++++++++
airflow/providers/amazon/provider.yaml | 3 +
tests/providers/amazon/aws/operators/test_eks.py | 39 ++-
tests/providers/amazon/aws/triggers/test_eks.py | 299 +++++++++++++++++++++++
5 files changed, 577 insertions(+), 8 deletions(-)
diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py
index 891b4d727c..8131be4f65 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import warnings
from ast import literal_eval
+from datetime import timedelta
from typing import TYPE_CHECKING, Any, List, Sequence, cast
from botocore.exceptions import ClientError, WaiterError
@@ -26,6 +27,10 @@ from botocore.exceptions import ClientError, WaiterError
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.eks import EksHook
+from airflow.providers.amazon.aws.triggers.eks import (
+ EksCreateFargateProfileTrigger,
+ EksDeleteFargateProfileTrigger,
+)
try:
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
@@ -353,6 +358,11 @@ class EksCreateFargateProfileOperator(BaseOperator):
maintained on each worker node).
:param region: Which AWS region the connection should use. (templated)
If this is None or empty then the default boto3 behaviour is used.
+ :param waiter_delay: Time (in seconds) to wait between two consecutive calls to check profile status
+ :param waiter_max_attempts: The maximum number of attempts to check the status of the profile.
+ :param deferrable: If True, the operator will wait asynchronously for the profile to be created.
+ This implies waiting for completion. This mode requires aiobotocore module to be installed.
+ (default: False)
"""
template_fields: Sequence[str] = (
@@ -371,11 +381,14 @@ class EksCreateFargateProfileOperator(BaseOperator):
cluster_name: str,
pod_execution_role_arn: str,
selectors: list,
- fargate_profile_name: str | None = DEFAULT_FARGATE_PROFILE_NAME,
+ fargate_profile_name: str = DEFAULT_FARGATE_PROFILE_NAME,
create_fargate_profile_kwargs: dict | None = None,
wait_for_completion: bool = False,
aws_conn_id: str = DEFAULT_CONN_ID,
region: str | None = None,
+ waiter_delay: int = 10,
+ waiter_max_attempts: int = 60,
+ deferrable: bool = False,
**kwargs,
) -> None:
self.cluster_name = cluster_name
@@ -386,6 +399,9 @@ class EksCreateFargateProfileOperator(BaseOperator):
self.wait_for_completion = wait_for_completion
self.aws_conn_id = aws_conn_id
self.region = region
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
super().__init__(**kwargs)
def execute(self, context: Context):
@@ -401,13 +417,35 @@ class EksCreateFargateProfileOperator(BaseOperator):
selectors=self.selectors,
**self.create_fargate_profile_kwargs,
)
-
- if self.wait_for_completion:
+ if self.deferrable:
+ self.defer(
+ trigger=EksCreateFargateProfileTrigger(
+ cluster_name=self.cluster_name,
+ fargate_profile_name=self.fargate_profile_name,
+ aws_conn_id=self.aws_conn_id,
+ poll_interval=self.waiter_delay,
+ max_attempts=self.waiter_max_attempts,
+ ),
+ method_name="execute_complete",
+ # timeout is set to ensure that if a trigger dies, the timeout does not restart
+ # 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent)
+ timeout=timedelta(seconds=(self.waiter_max_attempts * self.waiter_delay + 60)),
+ )
+ elif self.wait_for_completion:
self.log.info("Waiting for Fargate profile to provision. This will take some time.")
eks_hook.conn.get_waiter("fargate_profile_active").wait(
- clusterName=self.cluster_name, fargateProfileName=self.fargate_profile_name
+ clusterName=self.cluster_name,
+ fargateProfileName=self.fargate_profile_name,
+ WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": self.waiter_max_attempts},
)
+ def execute_complete(self, context, event=None):
+ if event["status"] != "success":
+ raise AirflowException(f"Error creating Fargate profile: {event}")
+ else:
+ self.log.info("Fargate profile created successfully")
+ return
+
class EksDeleteClusterOperator(BaseOperator):
"""
@@ -587,6 +625,11 @@ class EksDeleteFargateProfileOperator(BaseOperator):
maintained on each worker node).
:param region: Which AWS region the connection should use. (templated)
If this is None or empty then the default boto3 behaviour is used.
+ :param waiter_delay: Time (in seconds) to wait between two consecutive calls to check profile status
+ :param waiter_max_attempts: The maximum number of attempts to check the status of the profile.
+ :param deferrable: If True, the operator will wait asynchronously for the profile to be deleted.
+ This implies waiting for completion. This mode requires aiobotocore module to be installed.
+ (default: False)
"""
template_fields: Sequence[str] = (
@@ -604,6 +647,9 @@ class EksDeleteFargateProfileOperator(BaseOperator):
wait_for_completion: bool = False,
aws_conn_id: str = DEFAULT_CONN_ID,
region: str | None = None,
+ waiter_delay: int = 30,
+ waiter_max_attempts: int = 60,
+ deferrable: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -612,6 +658,9 @@ class EksDeleteFargateProfileOperator(BaseOperator):
self.wait_for_completion = wait_for_completion
self.aws_conn_id = aws_conn_id
self.region = region
+ self.waiter_delay = waiter_delay
+ self.waiter_max_attempts = waiter_max_attempts
+ self.deferrable = deferrable
def execute(self, context: Context):
eks_hook = EksHook(
@@ -622,12 +671,35 @@ class EksDeleteFargateProfileOperator(BaseOperator):
eks_hook.delete_fargate_profile(
clusterName=self.cluster_name, fargateProfileName=self.fargate_profile_name
)
- if self.wait_for_completion:
+ if self.deferrable:
+ self.defer(
+ trigger=EksDeleteFargateProfileTrigger(
+ cluster_name=self.cluster_name,
+ fargate_profile_name=self.fargate_profile_name,
+ aws_conn_id=self.aws_conn_id,
+ poll_interval=self.waiter_delay,
+ max_attempts=self.waiter_max_attempts,
+ ),
+ method_name="execute_complete",
+ # timeout is set to ensure that if a trigger dies, the timeout does not restart
+ # 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent)
+ timeout=timedelta(seconds=(self.waiter_max_attempts * self.waiter_delay + 60)),
+ )
+ elif self.wait_for_completion:
self.log.info("Waiting for Fargate profile to delete. This will take some time.")
eks_hook.conn.get_waiter("fargate_profile_deleted").wait(
- clusterName=self.cluster_name, fargateProfileName=self.fargate_profile_name
+ clusterName=self.cluster_name,
+ fargateProfileName=self.fargate_profile_name,
+ WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": self.waiter_max_attempts},
)
+ def execute_complete(self, context, event=None):
+ if event["status"] != "success":
+ raise AirflowException(f"Error deleting Fargate profile: {event}")
+ else:
+ self.log.info("Fargate profile deleted successfully")
+ return
+
class EksPodOperator(KubernetesPodOperator):
"""
diff --git a/airflow/providers/amazon/aws/triggers/eks.py b/airflow/providers/amazon/aws/triggers/eks.py
new file mode 100644
index 0000000000..dddab74b30
--- /dev/null
+++ b/airflow/providers/amazon/aws/triggers/eks.py
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+from typing import Any
+
+from botocore.exceptions import WaiterError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.eks import EksHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class EksCreateFargateProfileTrigger(BaseTrigger):
+ """
+ Trigger for EksCreateFargateProfileOperator.
+ The trigger will asynchronously wait for the fargate profile to be created.
+
+ :param cluster_name: The name of the EKS cluster
+ :param fargate_profile_name: The name of the fargate profile
+ :param poll_interval: The amount of time in seconds to wait between attempts.
+ :param max_attempts: The maximum number of attempts to be made.
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ """
+
+ def __init__(
+ self,
+ cluster_name: str,
+ fargate_profile_name: str,
+ poll_interval: int,
+ max_attempts: int,
+ aws_conn_id: str,
+ ):
+ self.cluster_name = cluster_name
+ self.fargate_profile_name = fargate_profile_name
+ self.poll_interval = poll_interval
+ self.max_attempts = max_attempts
+ self.aws_conn_id = aws_conn_id
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ return (
+ self.__class__.__module__ + "." + self.__class__.__qualname__,
+ {
+ "cluster_name": self.cluster_name,
+ "fargate_profile_name": self.fargate_profile_name,
+ "poll_interval": str(self.poll_interval),
+ "max_attempts": str(self.max_attempts),
+ "aws_conn_id": self.aws_conn_id,
+ },
+ )
+
+ async def run(self):
+ self.hook = EksHook(aws_conn_id=self.aws_conn_id)
+ async with self.hook.async_conn as client:
+ attempt = 0
+ waiter = client.get_waiter("fargate_profile_active")
+ while attempt < int(self.max_attempts):
+ attempt += 1
+ try:
+ await waiter.wait(
+ clusterName=self.cluster_name,
+ fargateProfileName=self.fargate_profile_name,
+ WaiterConfig={"Delay": int(self.poll_interval), "MaxAttempts": 1},
+ )
+ break
+ except WaiterError as error:
+ if "terminal failure" in str(error):
+ raise AirflowException(f"Create Fargate Profile failed: {error}")
+ self.log.info(
+ "Status of fargate profile is %s", error.last_response["fargateProfile"]["status"]
+ )
+ await asyncio.sleep(int(self.poll_interval))
+ if attempt >= int(self.max_attempts):
+ raise AirflowException(
+ f"Create Fargate Profile failed - max attempts reached: {self.max_attempts}"
+ )
+ else:
+ yield TriggerEvent({"status": "success", "message": "Fargate Profile Created"})
+
+
+class EksDeleteFargateProfileTrigger(BaseTrigger):
+ """
+ Trigger for EksDeleteFargateProfileOperator.
+ The trigger will asynchronously wait for the fargate profile to be deleted.
+
+ :param cluster_name: The name of the EKS cluster
+ :param fargate_profile_name: The name of the fargate profile
+ :param poll_interval: The amount of time in seconds to wait between attempts.
+ :param max_attempts: The maximum number of attempts to be made.
+ :param aws_conn_id: The Airflow connection used for AWS credentials.
+ """
+
+ def __init__(
+ self,
+ cluster_name: str,
+ fargate_profile_name: str,
+ poll_interval: int,
+ max_attempts: int,
+ aws_conn_id: str,
+ ):
+ self.cluster_name = cluster_name
+ self.fargate_profile_name = fargate_profile_name
+ self.poll_interval = poll_interval
+ self.max_attempts = max_attempts
+ self.aws_conn_id = aws_conn_id
+
+ def serialize(self) -> tuple[str, dict[str, Any]]:
+ return (
+ self.__class__.__module__ + "." + self.__class__.__qualname__,
+ {
+ "cluster_name": self.cluster_name,
+ "fargate_profile_name": self.fargate_profile_name,
+ "poll_interval": str(self.poll_interval),
+ "max_attempts": str(self.max_attempts),
+ "aws_conn_id": self.aws_conn_id,
+ },
+ )
+
+ async def run(self):
+ self.hook = EksHook(aws_conn_id=self.aws_conn_id)
+ async with self.hook.async_conn as client:
+ attempt = 0
+ waiter = client.get_waiter("fargate_profile_deleted")
+ while attempt < int(self.max_attempts):
+ attempt += 1
+ try:
+ await waiter.wait(
+ clusterName=self.cluster_name,
+ fargateProfileName=self.fargate_profile_name,
+ WaiterConfig={"Delay": int(self.poll_interval), "MaxAttempts": 1},
+ )
+ break
+ except WaiterError as error:
+ if "terminal failure" in str(error):
+ raise AirflowException(f"Delete Fargate Profile failed: {error}")
+ self.log.info(
+ "Status of fargate profile is %s", error.last_response["fargateProfile"]["status"]
+ )
+ await asyncio.sleep(int(self.poll_interval))
+ if attempt >= int(self.max_attempts):
+ raise AirflowException(
+ f"Delete Fargate Profile failed - max attempts reached: {self.max_attempts}"
+ )
+ else:
+ yield TriggerEvent({"status": "success", "message": "Fargate Profile Deleted"})
diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml
index 05924eebc9..51af2e4d24 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -528,6 +528,9 @@ triggers:
- integration-name: Amazon EMR
python-modules:
- airflow.providers.amazon.aws.triggers.emr
+ - integration-name: Amazon Elastic Kubernetes Service (EKS)
+ python-modules:
+ - airflow.providers.amazon.aws.triggers.eks
transfers:
- source-integration-name: Amazon DynamoDB
diff --git a/tests/providers/amazon/aws/operators/test_eks.py b/tests/providers/amazon/aws/operators/test_eks.py
index 37342bbdaa..089aef1704 100644
--- a/tests/providers/amazon/aws/operators/test_eks.py
+++ b/tests/providers/amazon/aws/operators/test_eks.py
@@ -23,6 +23,7 @@ from unittest import mock
import pytest
from botocore.waiter import Waiter
+from airflow.exceptions import TaskDeferred
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, EksHook
from airflow.providers.amazon.aws.operators.eks import (
EksCreateClusterOperator,
@@ -33,6 +34,10 @@ from airflow.providers.amazon.aws.operators.eks import (
EksDeleteNodegroupOperator,
EksPodOperator,
)
+from airflow.providers.amazon.aws.triggers.eks import (
+ EksCreateFargateProfileTrigger,
+ EksDeleteFargateProfileTrigger,
+)
from airflow.typing_compat import TypedDict
from tests.providers.amazon.aws.utils.eks_test_constants import (
NODEROLE_ARN,
@@ -369,10 +374,27 @@ class TestEksCreateFargateProfileOperator:
operator.execute({})
mock_create_fargate_profile.assert_called_with(**convert_keys(parameters))
mock_waiter.assert_called_with(
- mock.ANY, clusterName=CLUSTER_NAME, fargateProfileName=FARGATE_PROFILE_NAME
+ mock.ANY,
+ clusterName=CLUSTER_NAME,
+ fargateProfileName=FARGATE_PROFILE_NAME,
+ WaiterConfig={"Delay": 10, "MaxAttempts": 60},
)
assert_expected_waiter_type(mock_waiter, "FargateProfileActive")
+ @mock.patch.object(EksHook, "create_fargate_profile")
+ def test_create_fargate_profile_deferrable(self, _):
+ op_kwargs = {**self.create_fargate_profile_params}
+ operator = EksCreateFargateProfileOperator(
+ task_id=TASK_ID,
+ **op_kwargs,
+ deferrable=True,
+ )
+ with pytest.raises(TaskDeferred) as exc:
+ operator.execute({})
+ assert isinstance(
+ exc.value.trigger, EksCreateFargateProfileTrigger
+ ), "Trigger is not a EksCreateFargateProfileTrigger"
+
class TestEksCreateNodegroupOperator:
def setup_method(self) -> None:
@@ -532,10 +554,23 @@ class TestEksDeleteFargateProfileOperator:
clusterName=self.cluster_name, fargateProfileName=self.fargate_profile_name
)
mock_waiter.assert_called_with(
- mock.ANY, clusterName=CLUSTER_NAME, fargateProfileName=FARGATE_PROFILE_NAME
+ mock.ANY,
+ clusterName=CLUSTER_NAME,
+ fargateProfileName=FARGATE_PROFILE_NAME,
+ WaiterConfig={"Delay": 30, "MaxAttempts": 60},
)
assert_expected_waiter_type(mock_waiter, "FargateProfileDeleted")
+ @mock.patch.object(EksHook, "delete_fargate_profile")
+ def test_delete_fargate_profile_deferrable(self, _):
+ self.delete_fargate_profile_operator.deferrable = True
+
+ with pytest.raises(TaskDeferred) as exc:
+ self.delete_fargate_profile_operator.execute({})
+ assert isinstance(
+ exc.value.trigger, EksDeleteFargateProfileTrigger
+ ), "Trigger is not a EksDeleteFargateProfileTrigger"
+
class TestEksPodOperator:
@mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.execute")
diff --git a/tests/providers/amazon/aws/triggers/test_eks.py b/tests/providers/amazon/aws/triggers/test_eks.py
new file mode 100644
index 0000000000..abab121d24
--- /dev/null
+++ b/tests/providers/amazon/aws/triggers/test_eks.py
@@ -0,0 +1,299 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+from unittest.mock import AsyncMock
+
+import pytest
+from botocore.exceptions import WaiterError
+
+from airflow.exceptions import AirflowException
+from airflow.providers.amazon.aws.hooks.eks import EksHook
+from airflow.providers.amazon.aws.triggers.eks import (
+ EksCreateFargateProfileTrigger,
+ EksDeleteFargateProfileTrigger,
+)
+from airflow.triggers.base import TriggerEvent
+
+TEST_CLUSTER_IDENTIFIER = "test-cluster"
+TEST_FARGATE_PROFILE_NAME = "test-fargate-profile"
+TEST_POLL_INTERVAL = 10
+TEST_MAX_ATTEMPTS = 10
+TEST_AWS_CONN_ID = "test-aws-id"
+
+
+class TestEksCreateFargateProfileTrigger:
+ def test_eks_create_fargate_profile_serialize(self):
+ eks_create_fargate_profile_trigger = EksCreateFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=TEST_MAX_ATTEMPTS,
+ )
+
+ class_path, args = eks_create_fargate_profile_trigger.serialize()
+ assert class_path == "airflow.providers.amazon.aws.triggers.eks.EksCreateFargateProfileTrigger"
+ assert args["cluster_name"] == TEST_CLUSTER_IDENTIFIER
+ assert args["fargate_profile_name"] == TEST_FARGATE_PROFILE_NAME
+ assert args["aws_conn_id"] == TEST_AWS_CONN_ID
+ assert args["poll_interval"] == str(TEST_POLL_INTERVAL)
+ assert args["max_attempts"] == str(TEST_MAX_ATTEMPTS)
+
+ @pytest.mark.asyncio
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_fargate_profile_trigger_run(self, mock_async_conn):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+
+ a_mock.get_waiter().wait = AsyncMock()
+
+ eks_create_fargate_profile_trigger = EksCreateFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=TEST_MAX_ATTEMPTS,
+ )
+
+ generator = eks_create_fargate_profile_trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent({"status": "success", "message": "Fargate Profile Created"})
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_fargate_profile_trigger_run_multiple_attempts(
+ self, mock_async_conn, mock_sleep
+ ):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"fargateProfile": {"status": "CREATING"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, True])
+ mock_sleep.return_value = True
+
+ eks_create_fargate_profile_trigger = EksCreateFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=TEST_MAX_ATTEMPTS,
+ )
+
+ generator = eks_create_fargate_profile_trigger.run()
+ response = await generator.asend(None)
+
+ assert a_mock.get_waiter().wait.call_count == 3
+ assert response == TriggerEvent({"status": "success", "message": "Fargate Profile Created"})
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_fargate_profile_trigger_run_attempts_exceeded(
+ self, mock_async_conn, mock_sleep
+ ):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"fargateProfile": {"status": "CREATING"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, True])
+ mock_sleep.return_value = True
+
+ eks_create_fargate_profile_trigger = EksCreateFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=2,
+ )
+ with pytest.raises(AirflowException) as exc:
+ generator = eks_create_fargate_profile_trigger.run()
+ await generator.asend(None)
+ assert "Create Fargate Profile failed - max attempts reached:" in str(exc.value)
+ assert a_mock.get_waiter().wait.call_count == 2
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_create_fargate_profile_trigger_run_attempts_failed(self, mock_async_conn, mock_sleep):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error_creating = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"fargateProfile": {"status": "CREATING"}},
+ )
+ error_failed = WaiterError(
+ name="test_name",
+ reason="Waiter encountered a terminal failure state:",
+ last_response={"fargateProfile": {"status": "CREATE_FAILED"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error_creating, error_creating, error_failed])
+ mock_sleep.return_value = True
+
+ eks_create_fargate_profile_trigger = EksCreateFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=TEST_MAX_ATTEMPTS,
+ )
+
+ with pytest.raises(AirflowException) as exc:
+ generator = eks_create_fargate_profile_trigger.run()
+ await generator.asend(None)
+ assert f"Create Fargate Profile failed: {error_failed}" in str(exc.value)
+ assert a_mock.get_waiter().wait.call_count == 3
+
+
+class TestEksDeleteFargateProfileTrigger:
+ def test_eks_delete_fargate_profile_serialize(self):
+ eks_delete_fargate_profile_trigger = EksDeleteFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=TEST_MAX_ATTEMPTS,
+ )
+
+ class_path, args = eks_delete_fargate_profile_trigger.serialize()
+ assert class_path == "airflow.providers.amazon.aws.triggers.eks.EksDeleteFargateProfileTrigger"
+ assert args["cluster_name"] == TEST_CLUSTER_IDENTIFIER
+ assert args["fargate_profile_name"] == TEST_FARGATE_PROFILE_NAME
+ assert args["aws_conn_id"] == TEST_AWS_CONN_ID
+ assert args["poll_interval"] == str(TEST_POLL_INTERVAL)
+ assert args["max_attempts"] == str(TEST_MAX_ATTEMPTS)
+
+ @pytest.mark.asyncio
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_delete_fargate_profile_trigger_run(self, mock_async_conn):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+
+ a_mock.get_waiter().wait = AsyncMock()
+
+ eks_delete_fargate_profile_trigger = EksDeleteFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=TEST_MAX_ATTEMPTS,
+ )
+
+ generator = eks_delete_fargate_profile_trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent({"status": "success", "message": "Fargate Profile Deleted"})
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_delete_fargate_profile_trigger_run_multiple_attempts(
+ self, mock_async_conn, mock_sleep
+ ):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"fargateProfile": {"status": "DELETING"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, True])
+ mock_sleep.return_value = True
+
+ eks_delete_fargate_profile_trigger = EksDeleteFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=TEST_MAX_ATTEMPTS,
+ )
+
+ generator = eks_delete_fargate_profile_trigger.run()
+ response = await generator.asend(None)
+ assert a_mock.get_waiter().wait.call_count == 3
+ assert response == TriggerEvent({"status": "success", "message": "Fargate Profile Deleted"})
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_delete_fargate_profile_trigger_run_attempts_exceeded(
+ self, mock_async_conn, mock_sleep
+ ):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"fargateProfile": {"status": "DELETING"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error, error, error, True])
+ mock_sleep.return_value = True
+
+ eks_delete_fargate_profile_trigger = EksDeleteFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=2,
+ )
+ with pytest.raises(AirflowException) as exc:
+ generator = eks_delete_fargate_profile_trigger.run()
+ await generator.asend(None)
+ assert "Delete Fargate Profile failed - max attempts reached: 2" in str(exc.value)
+ assert a_mock.get_waiter().wait.call_count == 2
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch.object(EksHook, "async_conn")
+ async def test_eks_delete_fargate_profile_trigger_run_attempts_failed(self, mock_async_conn, mock_sleep):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+ error_creating = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"fargateProfile": {"status": "DELETING"}},
+ )
+ error_failed = WaiterError(
+ name="test_name",
+ reason="Waiter encountered a terminal failure state:",
+ last_response={"fargateProfile": {"status": "DELETE_FAILED"}},
+ )
+ a_mock.get_waiter().wait = AsyncMock(side_effect=[error_creating, error_creating, error_failed])
+ mock_sleep.return_value = True
+
+ eks_delete_fargate_profile_trigger = EksDeleteFargateProfileTrigger(
+ cluster_name=TEST_CLUSTER_IDENTIFIER,
+ fargate_profile_name=TEST_FARGATE_PROFILE_NAME,
+ aws_conn_id=TEST_AWS_CONN_ID,
+ poll_interval=TEST_POLL_INTERVAL,
+ max_attempts=TEST_MAX_ATTEMPTS,
+ )
+ with pytest.raises(AirflowException) as exc:
+ generator = eks_delete_fargate_profile_trigger.run()
+ await generator.asend(None)
+ assert f"Delete Fargate Profile failed: {error_failed}" in str(exc.value)
+ assert a_mock.get_waiter().wait.call_count == 3