You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/13 16:39:51 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #18208: Celery: Sort adopted tasks in _check_for_stalled_adopted_tasks method

ephraimbuddy opened a new pull request #18208:
URL: https://github.com/apache/airflow/pull/18208


   This PR adds sorting in adopted_tasks_timeout to ensure we correctly
   clear stalled adopted tasks
   
   Closes: https://github.com/apache/airflow/issues/17381
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #18208: Celery: Sort adopted tasks in _check_for_stalled_adopted_tasks method

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #18208:
URL: https://github.com/apache/airflow/pull/18208


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #18208: Celery: Sort adopted tasks in _check_for_stalled_adopted_tasks method

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #18208:
URL: https://github.com/apache/airflow/pull/18208#issuecomment-919294988


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #18208: Celery: Sort adopted tasks in _check_for_stalled_adopted_tasks method

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #18208:
URL: https://github.com/apache/airflow/pull/18208#discussion_r708876286



##########
File path: airflow/executors/celery_executor.py
##########
@@ -341,11 +341,13 @@ def _check_for_stalled_adopted_tasks(self):
         just resend them. We do that by clearing the state and letting the
         normal scheduler loop deal with that
         """
-        now = utcnow()
+        sorted_adopted_task_timeouts = OrderedDict(
+            sorted(self.adopted_task_timeouts.items(), key=lambda k: k[1])
+        )

Review comment:
       Probably don’t even need `OrderedDict` since all we do is calling `items()` later anyway.
   
   This should be good enough:
   
   ```
   sorted_adopted_task_timeouts = sorted(self.adopted_task_timeouts.items(), key=lambda k: k[1])
   
   for key, stalled_after in sorted_adopted_task_timeouts:
       ...
   ```
   
   Also I feel having `utcnow()` outside of the loop is slightly better? It saves some function calls, unless millisecond accuracy is actually important here. But that’s minor either way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #18208: Celery: Sort adopted tasks in _check_for_stalled_adopted_tasks method

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #18208:
URL: https://github.com/apache/airflow/pull/18208#discussion_r708939929



##########
File path: airflow/executors/celery_executor.py
##########
@@ -341,11 +341,13 @@ def _check_for_stalled_adopted_tasks(self):
         just resend them. We do that by clearing the state and letting the
         normal scheduler loop deal with that
         """
-        now = utcnow()
+        sorted_adopted_task_timeouts = OrderedDict(
+            sorted(self.adopted_task_timeouts.items(), key=lambda k: k[1])
+        )

Review comment:
       You're right




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org