You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/26 22:07:21 UTC

[GitHub] [airflow] zachliu commented on a diff in pull request #25413: Refactor monolithic ECS Operator into Operators, Sensors, and a Hook

zachliu commented on code in PR #25413:
URL: https://github.com/apache/airflow/pull/25413#discussion_r956473322


##########
airflow/providers/amazon/aws/operators/ecs.py:
##########
@@ -17,160 +17,257 @@
 # under the License.
 import re
 import sys
-import time
-from collections import deque
-from datetime import datetime, timedelta
-from logging import Logger
-from threading import Event, Thread
-from typing import Dict, Generator, Optional, Sequence
+import warnings
+from datetime import timedelta
+from typing import TYPE_CHECKING, Dict, List, Optional, Sequence
 
-from botocore.exceptions import ClientError, ConnectionClosedError
-from botocore.waiter import Waiter
+import boto3
 
+from airflow.compat.functools import cached_property
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator, XCom
 from airflow.providers.amazon.aws.exceptions import EcsOperatorError, EcsTaskFailToStart
+
+# TODO: Remove the following import when EcsProtocol and EcsTaskLogFetcher deprecations are removed.
+from airflow.providers.amazon.aws.hooks import ecs
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook
-from airflow.typing_compat import Protocol, runtime_checkable
+from airflow.providers.amazon.aws.hooks.ecs import (
+    EcsClusterStates,
+    EcsHook,
+    EcsTaskDefinitionStates,
+    should_retry_eni,
+)
+from airflow.providers.amazon.aws.sensors.ecs import EcsClusterStateSensor, EcsTaskDefinitionStateSensor
 from airflow.utils.session import provide_session
 
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
 
-def should_retry(exception: Exception):
-    """Check if exception is related to ECS resource quota (CPU, MEM)."""
-    if isinstance(exception, EcsOperatorError):
-        return any(
-            quota_reason in failure['reason']
-            for quota_reason in ['RESOURCE:MEMORY', 'RESOURCE:CPU']
-            for failure in exception.failures
-        )
-    return False
+DEFAULT_CONN_ID = 'aws_default'
 
 
-def should_retry_eni(exception: Exception):
-    """Check if exception is related to ENI (Elastic Network Interfaces)."""
-    if isinstance(exception, EcsTaskFailToStart):
-        return any(
-            eni_reason in exception.message
-            for eni_reason in ['network interface provisioning', 'ResourceInitializationError']
-        )
-    return False
+class EcsBaseOperator(BaseOperator):
+    """This is the base operator for all Elastic Container Service operators."""
+
+    def __init__(self, **kwargs):
+        self.aws_conn_id = kwargs.get('aws_conn_id', DEFAULT_CONN_ID)
+        self.region = kwargs.get('region')
+        super().__init__(**kwargs)

Review Comment:
   oh man, you passed all kwargs including the `aws_conn_id` to the BaseOperator
   now 80% of my dags (i use EcsOperator heavily) are failing with
   ```
   Broken DAG: [/usr/local/airflow/dags/my_dag.py] Traceback (most recent call last):
     File "/usr/local/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 390, in apply_defaults
       result = func(self, **kwargs, default_args=default_args)
     File "/usr/local/airflow/.local/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 743, in __init__
       raise AirflowException(
   airflow.exceptions.AirflowException: Invalid arguments were passed to EcsOperator (task_id: my-task). Invalid arguments were:
   **kwargs: {'aws_conn_id': 'aws_default'}
   ``` 
   is there a fix? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org