You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/04/11 11:56:55 UTC

[airflow] branch master updated: Ensure executors end method is called (#14085)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 16902d0  Ensure executors end method is called (#14085)
16902d0 is described below

commit 16902d0437623c1aecf8f40e96228e20f85941f8
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Sun Apr 11 05:56:44 2021 -0600

    Ensure executors end method is called (#14085)
    
    closes: #11982
    
    SchedulerJob doesn't actually call the executors `end` method when the scheduler receives SIGINT/SIGTERM or when there is an exception (e.g. database connectivity issues).
    
    If you are using KuberenetesExecutor, this results in the scheduler not actually exiting.
    
    In contrast, if you are using LocalExecutor, this results in the scheduler not dealing with the tasks it is currently running. By ensuring the executors `end` method is called, LocalExecutor will now wait around until the tasks complete or a second SIGINT/SIGTERM/exception. I believe we need to document that behavior change somewhere, I just haven't looked where yet.
---
 airflow/executors/local_executor.py |  4 ++++
 airflow/jobs/scheduler_job.py       | 22 +++++++++++++---------
 tests/jobs/test_scheduler_job.py    | 31 +++++++++++++++++++++++++++++++
 3 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index e8049af..a29342f 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -383,6 +383,10 @@ class LocalExecutor(BaseExecutor):
             raise AirflowException(NOT_STARTED_MESSAGE)
         if not self.manager:
             raise AirflowException(NOT_STARTED_MESSAGE)
+        self.log.info(
+            "Shutting down LocalExecutor"
+            "; waiting for running tasks to finish.  Signal again if you don't want to wait."
+        )
         self.impl.end()
         self.manager.shutdown()
 
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 0e22180..93d6426 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -31,7 +31,7 @@ from collections import defaultdict
 from contextlib import redirect_stderr, redirect_stdout, suppress
 from datetime import timedelta
 from multiprocessing.connection import Connection as MultiprocessingConnection
-from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
+from typing import Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
 
 from setproctitle import setproctitle
 from sqlalchemy import and_, func, not_, or_, tuple_
@@ -678,10 +678,6 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
     If so, it creates appropriate TaskInstances and sends run commands to the
     executor. It does this for each task in each DAG and repeats.
 
-    :param dag_id: if specified, only schedule tasks with this DAG ID
-    :type dag_id: str
-    :param dag_ids: if specified, only schedule tasks with these DAG IDs
-    :type dag_ids: list[str]
     :param subdir: directory containing Python files with Airflow DAG
         definitions, or a specific path to a file
     :type subdir: str
@@ -698,6 +694,8 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
     :param do_pickle: once a DAG object is obtained by executing the Python
         file, whether to serialize the DAG object to the DB
     :type do_pickle: bool
+    :param log: override the default Logger
+    :type log: logging.Logger
     """
 
     __mapper_args__ = {'polymorphic_identity': 'SchedulerJob'}
@@ -710,7 +708,7 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
         num_times_parse_dags: int = -1,
         processor_poll_interval: float = conf.getfloat('scheduler', 'processor_poll_interval'),
         do_pickle: bool = False,
-        log: Any = None,
+        log: logging.Logger = None,
         *args,
         **kwargs,
     ):
@@ -1299,13 +1297,19 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                 )
                 models.DAG.deactivate_stale_dags(execute_start_time)
 
-            self.executor.end()
-
             settings.Session.remove()  # type: ignore
         except Exception:  # pylint: disable=broad-except
             self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
+            raise
         finally:
-            self.processor_agent.end()
+            try:
+                self.executor.end()
+            except Exception:  # pylint: disable=broad-except
+                self.log.exception("Exception when executing Executor.end")
+            try:
+                self.processor_agent.end()
+            except Exception:  # pylint: disable=broad-except
+                self.log.exception("Exception when executing DagFileProcessorAgent.end")
             self.log.info("Exited execute loop")
 
     @staticmethod
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index b7baacd..875f3e8 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -2009,6 +2009,37 @@ class TestSchedulerJob(unittest.TestCase):
             assert ti.start_date == ti.end_date
             assert ti.duration is not None
 
+    @mock.patch('airflow.jobs.scheduler_job.DagFileProcessorAgent')
+    def test_executor_end_called(self, mock_processor_agent):
+        """
+        Test to make sure executor.end gets called with a successful scheduler loop run
+        """
+        self.scheduler_job = SchedulerJob(subdir=os.devnull, num_runs=1)
+        self.scheduler_job.executor = mock.MagicMock(slots_available=8)
+
+        self.scheduler_job.run()
+
+        self.scheduler_job.executor.end.assert_called_once()
+        self.scheduler_job.processor_agent.end.assert_called_once()
+
+    @mock.patch('airflow.jobs.scheduler_job.DagFileProcessorAgent')
+    def test_cleanup_methods_all_called(self, mock_processor_agent):
+        """
+        Test to make sure all cleanup methods are called when the scheduler loop has an exception
+        """
+        self.scheduler_job = SchedulerJob(subdir=os.devnull, num_runs=1)
+        self.scheduler_job.executor = mock.MagicMock(slots_available=8)
+        self.scheduler_job._run_scheduler_loop = mock.MagicMock(side_effect=Exception("oops"))
+        mock_processor_agent.return_value.end.side_effect = Exception("double oops")
+        self.scheduler_job.executor.end = mock.MagicMock(side_effect=Exception("tripple oops"))
+
+        with self.assertRaises(Exception):
+            self.scheduler_job.run()
+
+        self.scheduler_job.processor_agent.end.assert_called_once()
+        self.scheduler_job.executor.end.assert_called_once()
+        mock_processor_agent.return_value.end.reset_mock(side_effect=True)
+
     def test_dagrun_timeout_verify_max_active_runs(self):
         """
         Test if a a dagrun will not be scheduled if max_dag_runs