You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Holden Karau's magical unicorn (JIRA)" <ji...@apache.org> on 2018/09/23 07:00:00 UTC

[jira] [Assigned] (AIRFLOW-3046) ECS Operator mistakenly reports success when task is killed due to EC2 host termination

     [ https://issues.apache.org/jira/browse/AIRFLOW-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Holden Karau's magical unicorn reassigned AIRFLOW-3046:
-------------------------------------------------------

    Assignee: Holden Karau's magical unicorn

> ECS Operator mistakenly reports success when task is killed due to EC2 host termination
> ---------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3046
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3046
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: contrib, operators
>            Reporter: Dan MacTough
>            Assignee: Holden Karau's magical unicorn
>            Priority: Major
>
> We have ECS clusters made up of EC2 spot fleets. Among other things, this means hosts can be terminated on short notice. When this happens, all tasks (and associated containers) get terminated, as well.
> We expect that when that happens for Airflow task instances using the ECS Operator, those instances will be marked as failures and retried.
> Instead, they are marked as successful.
> As a result, the immediate downstream task fails, causing the scheduled DAG run to fail.
> Here's an example of the Airflow log output when this happens:
> {noformat}
> [2018-09-12 01:02:02,712] {ecs_operator.py:112} INFO - ECS Task stopped, check status: {'tasks': [{'taskArn': 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc', 'clusterArn': 'arn:aws:ecs:us-east-1:111111111111:cluster/processing', 'taskDefinitionArn': 'arn:aws:ecs:us-east-1:111111111111:task-definition/foobar-testing_dataEngineering_rd:76', 'containerInstanceArn': 'arn:aws:ecs:us-east-1:111111111111:container-instance/7431f0a6-8fc5-4eff-8196-32f77d286a61', 'overrides': {'containerOverrides': [{'name': 'foobar-testing', 'command': ['./bin/generate-features.sh', '2018-09-11']}]}, 'lastStatus': 'STOPPED', 'desiredStatus': 'STOPPED', 'cpu': '4096', 'memory': '60000', 'containers': [{'containerArn': 'arn:aws:ecs:us-east-1:111111111111:container/0d5cc553-f894-4f9a-b17c-9f80f7ce8d0a', 'taskArn': 'arn:aws:ecs:us-east-1:111111111111:task/32d43a1d-fbc7-4659-815d-9133bde11cdc', 'name': 'foobar-testing', 'lastStatus': 'RUNNING', 'networkBindings': [], 'networkInterfaces': [], 'healthStatus': 'UNKNOWN'}], 'startedBy': 'Airflow', 'version': 3, 'stoppedReason': 'Host EC2 (instance i-02cf23bbd5ae26194) terminated.', 'connectivity': 'CONNECTED', 'connectivityAt': datetime.datetime(2018, 9, 12, 0, 6, 30, 245000, tzinfo=tzlocal()), 'pullStartedAt': datetime.datetime(2018, 9, 12, 0, 6, 32, 748000, tzinfo=tzlocal()), 'pullStoppedAt': datetime.datetime(2018, 9, 12, 0, 6, 59, 748000, tzinfo=tzlocal()), 'createdAt': datetime.datetime(2018, 9, 12, 0, 6, 30, 245000, tzinfo=tzlocal()), 'startedAt': datetime.datetime(2018, 9, 12, 0, 7, 0, 748000, tzinfo=tzlocal()), 'stoppingAt': datetime.datetime(2018, 9, 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'stoppedAt': datetime.datetime(2018, 9, 12, 1, 2, 0, 91000, tzinfo=tzlocal()), 'group': 'family:foobar-testing_dataEngineering_rd', 'launchType': 'EC2', 'attachments': [], 'healthStatus': 'UNKNOWN'}], 'failures': [], 'ResponseMetadata': {'RequestId': '758c791f-b627-11e8-83f7-2b76f4796ed2', 'HTTPStatusCode': 200, 'HTTPHeaders': {'server': 'Server', 'date': 'Wed, 12 Sep 2018 01:02:02 GMT', 'content-type': 'application/x-amz-json-1.1', 'content-length': '1412', 'connection': 'keep-alive', 'x-amzn-requestid': '758c791f-b627-11e8-83f7-2b76f4796ed2'}, 'RetryAttempts': 0}}{noformat}
> I believe the function that checks whether the task is successful needs at least one more check. 
> We are currently running a modified version of the ECS Operator that contains the following {{_check_success_task}} function to address this failure condition:
> {code}
>     def _check_success_task(self):
>         response = self.client.describe_tasks(
>             cluster=self.cluster,
>             tasks=[self.arn]
>         )
>         self.log.info('ECS Task stopped, check status: %s', response)
>         if len(response.get('failures', [])) > 0:
>             raise AirflowException(response)
>         for task in response['tasks']:
>             if 'terminated' in task.get('stoppedReason', '').lower():
>                 raise AirflowException('The task was stopped because the host instance terminated: {}'.format(
>                     task.get('stoppedReason', '')))
>             containers = task['containers']
>             for container in containers:
>                 if container.get('lastStatus') == 'STOPPED' and \
>                         container['exitCode'] != 0:
>                     raise AirflowException(
>                         'This task is not in success state {}'.format(task))
>                 elif container.get('lastStatus') == 'PENDING':
>                     raise AirflowException(
>                         'This task is still pending {}'.format(task))
>                 elif 'error' in container.get('reason', '').lower():
>                     raise AirflowException(
>                         'This containers encounter an error during launching : {}'.
>                         format(container.get('reason', '').lower()))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)