You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/04 22:44:18 UTC

[GitHub] [airflow] jedcunningham opened a new pull request #14085: Ensure executors end method is called

jedcunningham opened a new pull request #14085:
URL: https://github.com/apache/airflow/pull/14085


   closes: #11982 
   
   SchedulerJob doesn't actually call the executors `end` method when the scheduler receives SIGINT/SIGTERM or when there is an exception (e.g. database connectivity issues).
   
   If you are using KuberenetesExecutor, this results in the scheduler not actually exiting.
   
   In contrast, if you are using LocalExecutor, this results in the scheduler not dealing with the tasks it is currently running. By ensuring the executors `end` method is called, LocalExecutor will now wait around until the tasks complete or a second SIGINT/SIGTERM/exception. I believe we need to document that behavior change somewhere, I just haven't looked where yet.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#issuecomment-773711742


   [The Workflow run](https://github.com/apache/airflow/actions/runs/538969804) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#discussion_r590703795



##########
File path: airflow/executors/local_executor.py
##########
@@ -383,6 +383,10 @@ def end(self) -> None:
             raise AirflowException(NOT_STARTED_MESSAGE)
         if not self.manager:
             raise AirflowException(NOT_STARTED_MESSAGE)
+        self.log.info(
+            "Shutting down LocalExecutor"
+            " - this will wait for running tasks to finish, or signal again if you don't want to wait."
+        )

Review comment:
       Good call, thanks. I left beginning alone though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#discussion_r590744829



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1295,13 +1293,19 @@ def _execute(self) -> None:
                 )
                 models.DAG.deactivate_stale_dags(execute_start_time)
 
-            self.executor.end()
-
             settings.Session.remove()  # type: ignore
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
+            raise

Review comment:
       Noticed this was missing too. Without it, the SchedulerJob is always successful:
   
   https://github.com/apache/airflow/blob/923bde2b917099135adfe470a5453f663131fd5f/airflow/jobs/base_job.py#L236-L249




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#issuecomment-773711742


   [The Workflow run](https://github.com/apache/airflow/actions/runs/538969804) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#discussion_r570601695



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1292,13 +1292,12 @@ def _execute(self) -> None:
                 )
                 models.DAG.deactivate_stale_dags(execute_start_time)
 
-            self.executor.end()
-
             settings.Session.remove()  # type: ignore
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
         finally:
             self.processor_agent.end()

Review comment:
       I think we can drop the `self.processor_agent.end()` in `_exit_gracefully` as it would be called here anyways. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #14085:
URL: https://github.com/apache/airflow/pull/14085


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#discussion_r571283551



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1292,13 +1292,12 @@ def _execute(self) -> None:
                 )
                 models.DAG.deactivate_stale_dags(execute_start_time)
 
-            self.executor.end()
-
             settings.Session.remove()  # type: ignore
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
         finally:
             self.processor_agent.end()

Review comment:
       Leaving it as we may not be far enough for the finally to clean up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#discussion_r590744829



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1295,13 +1293,19 @@ def _execute(self) -> None:
                 )
                 models.DAG.deactivate_stale_dags(execute_start_time)
 
-            self.executor.end()
-
             settings.Session.remove()  # type: ignore
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
+            raise

Review comment:
       Noticed this was missing. Without it, the SchedulerJob is always successful:
   
   https://github.com/apache/airflow/blob/923bde2b917099135adfe470a5453f663131fd5f/airflow/jobs/base_job.py#L236-L249




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#issuecomment-774342376


   @jedcunningham this LGTM. With regards to documentation, @kaxil do we need to document this? Seems like it fixes a bug instead of modifying a functionality


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#issuecomment-774247460


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#discussion_r590704615



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1292,13 +1290,12 @@ def _execute(self) -> None:
                 )
                 models.DAG.deactivate_stale_dags(execute_start_time)
 
-            self.executor.end()
-
             settings.Session.remove()  # type: ignore
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
         finally:
             self.processor_agent.end()

Review comment:
       Good point. I tried exit stack but wasn't able to get it to log any exceptions, if any happened, in the end methods. I fell back to try/catch, but open to ideas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#issuecomment-774321409


   @dimberman, take another look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#discussion_r574227085



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1292,13 +1290,12 @@ def _execute(self) -> None:
                 )
                 models.DAG.deactivate_stale_dags(execute_start_time)
 
-            self.executor.end()
-
             settings.Session.remove()  # type: ignore
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
         finally:
             self.processor_agent.end()

Review comment:
       what if `self.processor_agent.end()` fails? will `self.executor.end()` be reached?  maybe you need something like an exit stack

##########
File path: airflow/executors/local_executor.py
##########
@@ -383,6 +383,10 @@ def end(self) -> None:
             raise AirflowException(NOT_STARTED_MESSAGE)
         if not self.manager:
             raise AirflowException(NOT_STARTED_MESSAGE)
+        self.log.info(
+            "Shutting down LocalExecutor"
+            " - this will wait for running tasks to finish, or signal again if you don't want to wait."
+        )

Review comment:
       ```suggestion
           self.log.info(
               "Attempting to shut down LocalExecutor"
               "; waiting for running tasks to finish.  Signal again if you don't want to wait."
           )
   ```
   
   main point here is to clarify grammatically that it's not `_this_ will wait or signal` but `_this_ will wait and _you_ must signal`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#discussion_r574226484



##########
File path: airflow/executors/local_executor.py
##########
@@ -383,6 +383,10 @@ def end(self) -> None:
             raise AirflowException(NOT_STARTED_MESSAGE)
         if not self.manager:
             raise AirflowException(NOT_STARTED_MESSAGE)
+        self.log.info(
+            "Shutting down LocalExecutor"
+            " - this will wait for running tasks to finish, or signal again if you don't want to wait."
+        )

Review comment:
       ```suggestion
           self.log.info(
               "Attempting to shut down LocalExecutor"
               "; waiting for running tasks to finish.  Signal again if you don't want to wait."
           )
   ```
   
   main point here is to clarify grammatically that it's not `this will (wait or signal)` but `(this will wait) and (you must signal if...)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#issuecomment-794498196


   I noticed different behavior this morning around whether the parent process alone is signaled (e.g. kill PID), or the whole process group (bash ctrl-c or dumb-init).
   
   With this change when the whole process group is signaled, `LocalExecutor.end()` blows up with a `BrokenPipeError`. However, even without this change (`LocalExecutor.end()` isn't called) you can get a `EOFError` on the task_queue if a task is running. I'm not sure this change makes it "worse", just different, and in both cases the task process is left behind.
   
   This makes me think there are some dark corners in LocalExecutor and I feel I've drifted pretty far from the original "KubernetesExecutor doesn't exit on SIGTERM" issue. Thoughts? Input?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #14085: Ensure executors end method is called

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #14085:
URL: https://github.com/apache/airflow/pull/14085#issuecomment-773983591


   Can you rebase on master please --- the static check failure should be fixed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org