You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/20 12:41:21 UTC

[GitHub] [airflow] r4um opened a new issue, #26520: Task sync fails when using celery executor with AWS SQS broker.

r4um opened a new issue, #26520:
URL: https://github.com/apache/airflow/issues/26520

   ### Apache Airflow version
   
   Other Airflow 2 version
   
   ### What happened
   
   The change in https://github.com/apache/airflow/pull/3830 introduced multiprocessing while syncing task status, when using celery executor with AWS SQS broker, the dag runs fail with the following backtrace in the scheduler logs.
   
   ```
   [2022-02-01 12:52:52,595] {celery_executor.py:299} ERROR - Error sending Celery task: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed:
   b''
   Celery Task ID: TaskInstanceKey(dag_id='shared-dev_jobflow-airflow_k8s_job_multiple_tasks', task_id='t4', run_id='manual__2022-02-01T12:52:40.329850+00:00', try_number=1)
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 480, in _parse_xml_string_to_dom
       root = parser.close()
     File "<string>", line None
   xml.etree.ElementTree.ParseError: no element found: line 1, column 0
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/airflow/executors/celery_executor.py", line 172, in send_task_to_executor
       result = task_to_run.apply_async(args=[command], queue=queue)
     File "/usr/local/lib/python3.7/site-packages/celery/app/task.py", line 579, in apply_async
       **options
     File "/usr/local/lib/python3.7/site-packages/celery/app/base.py", line 788, in send_task
       amqp.send_task_message(P, name, message, **options)
     File "/usr/local/lib/python3.7/site-packages/celery/app/amqp.py", line 519, in send_task_message
       **properties
     File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 180, in publish
       exchange_name, declare, timeout
     File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 524, in _ensured
       return fun(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 193, in _publish
       [maybe_declare(entity) for entity in declare]
     File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 193, in <listcomp>
       [maybe_declare(entity) for entity in declare]
     File "/usr/local/lib/python3.7/site-packages/kombu/messaging.py", line 99, in maybe_declare
       return maybe_declare(entity, self.channel, retry, **retry_policy)
     File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 110, in maybe_declare
       return _maybe_declare(entity, channel)
     File "/usr/local/lib/python3.7/site-packages/kombu/common.py", line 150, in _maybe_declare
       entity.declare(channel=channel)
     File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 606, in declare
       self._create_queue(nowait=nowait, channel=channel)
     File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 615, in _create_queue
       self.queue_declare(nowait=nowait, passive=False, channel=channel)
     File "/usr/local/lib/python3.7/site-packages/kombu/entity.py", line 650, in queue_declare
       nowait=nowait,
     File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 528, in queue_declare
       return queue_declare_ok_t(queue, self._size(queue), 0)
     File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 607, in _size
       AttributeNames=['ApproximateNumberOfMessages'])
     File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 388, in _api_call
       return self._make_api_call(operation_name, kwargs)
     File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 695, in _make_api_call
       operation_model, request_dict, request_context)
     File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 714, in _make_request
       return self._endpoint.make_request(operation_model, request_dict)
     File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 102, in make_request
       return self._send_request(request_dict, operation_model)
     File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 135, in _send_request
       request, operation_model, context)
     File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 166, in _get_response
       request, operation_model)
     File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 217, in _do_get_response
       response_dict, operation_model.output_shape)
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 245, in parse
       parsed = self._do_parse(response, shape)
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 551, in _do_parse
       return self._parse_body_as_xml(response, shape, inject_metadata=True)
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 555, in _parse_body_as_xml
       root = self._parse_xml_string_to_dom(xml_contents)
     File "/usr/local/lib/python3.7/site-packages/botocore/parsers.py", line 485, in _parse_xml_string_to_dom
       (e, xml_string))
   botocore.parsers.ResponseParserError: Unable to parse response (no element found: line 1, column 0), invalid XML received. Further retries may succeed: 
   b''
   
   [2022-02-01 12:52:52,821] {scheduler_job.py:572} ERROR - Executor reports task instance <TaskInstance: shared-dev_jobflow-airflow_k8s_job_multiple_tasks.t4 manual__2022-02-01T12:52:40.329850+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
   [2022-02-01 12:52:52,825] {taskinstance.py:1705} ERROR - Executor reports task instance <TaskInstance: shared-dev_jobflow-airflow_k8s_job_multiple_tasks.t4 manual__2022-02-01T12:52:40.329850+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
   [2022-02-01 12:52:52,834] {taskinstance.py:1280} INFO - Marking task as FAILED. dag_id=shared-dev_jobflow-airflow_k8s_job_multiple_tasks, task_id=t4, execution_date=20220201T125240, start_date=, end_date=20220201T125252
   ```    
   
   This is likely due to boto3 session and resource objects not being thread safe[1]. Setting `sync_parallelism` to `1`,  the problem does not occur.
   
   [1] https://boto3.amazonaws.com/v1/documentation/api/1.14.31/guide/session.html#multithreading-or-multiprocessing-with-sessions
   
   ### What you think should happen instead
   
   _No response_
   
   ### How to reproduce
   
   Use celery executor and AWS SQS as the celery broker. Launch a dag with multiple tasks and dependencies among them. e.g 5 tasks with dependencies as 
   
   ```
   t3 << [t1, t2]
   t4 << [t3]
   t5 << [t3]
   ```
   
   Task type doesn't matter. In our case, it is the KubernetesPodOperator.
   
   ### Operating System
   
   CentOS Linux 7 (Core)
   
   ### Versions of Apache Airflow Providers
   
   airflow version 2.2.2
   Python version 3.7.9
   Using airflow constraints for 2.2.2 while installing packages.
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Deployed  under AWS EKS via containers. 
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #26520: Task sync fails when using celery executor with AWS SQS broker.

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #26520:
URL: https://github.com/apache/airflow/issues/26520#issuecomment-1252297996

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed issue #26520: Task sync fails when using celery executor with AWS SQS broker.

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #26520: Task sync fails when using celery executor with AWS SQS broker. 
URL: https://github.com/apache/airflow/issues/26520


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org