You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/10/27 09:02:33 UTC

incubator-airflow git commit: [AIRFLOW-1641] Handle executor events in the scheduler

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f271d437a -> 2abead704


[AIRFLOW-1641] Handle executor events in the scheduler

While in Backfills we do handle the executor
state,
we do not in the Scheduler. In case there is an
unspecified
error (e.g. a timeout, airflow command failure)
tasks
can get stuck.

Closes #2715 from bolkedebruin/AIRFLOW-1641


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2abead70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2abead70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2abead70

Branch: refs/heads/master
Commit: 2abead7049806482047e29d123a109b444c00355
Parents: f271d43
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Oct 27 11:02:24 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Oct 27 11:02:24 2017 +0200

----------------------------------------------------------------------
 airflow/executors/base_executor.py    | 23 ++++++++++---
 airflow/jobs.py                       | 52 +++++++++++++++++++++++-------
 airflow/utils/dag_processing.py       | 35 +++-----------------
 tests/executors/test_base_executor.py | 40 +++++++++++++++++++++++
 tests/jobs.py                         | 45 ++++++++++++++++++++++++++
 5 files changed, 148 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2abead70/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 410a558..d96c10f 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -141,13 +141,26 @@ class BaseExecutor(LoggingMixin):
     def success(self, key):
         self.change_state(key, State.SUCCESS)
 
-    def get_event_buffer(self):
+    def get_event_buffer(self, dag_ids=None):
         """
-        Returns and flush the event buffer
+        Returns and flush the event buffer. In case dag_ids is specified
+        it will only return and flush events for the given dag_ids. Otherwise
+        it returns and flushes all
+
+        :param dag_ids: to dag_ids to return events for, if None returns all
+        :return: a dict of events
         """
-        d = self.event_buffer
-        self.event_buffer = {}
-        return d
+        cleared_events = dict()
+        if dag_ids is None:
+            cleared_events = self.event_buffer
+            self.event_buffer = dict()
+        else:
+            for key in list(self.event_buffer.keys()):
+                dag_id, _, _ = key
+                if dag_id in dag_ids:
+                    cleared_events[key] = self.event_buffer.pop(key)
+
+        return cleared_events
 
     def execute_async(self, key, command, queue=None):  # pragma: no cover
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2abead70/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 2675bd3..892c5cb 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1290,6 +1290,7 @@ class SchedulerJob(BaseJob):
         TI = models.TaskInstance
         # actually enqueue them
         for task_instance in task_instances:
+            simple_dag = simple_dag_bag.get_dag(task_instance.dag_id)
             command = " ".join(TI.generate_command(
                 task_instance.dag_id,
                 task_instance.task_id,
@@ -1301,8 +1302,8 @@ class SchedulerJob(BaseJob):
                 ignore_task_deps=False,
                 ignore_ti_state=False,
                 pool=task_instance.pool,
-                file_path=simple_dag_bag.get_dag(task_instance.dag_id).full_filepath,
-                pickle_id=simple_dag_bag.get_dag(task_instance.dag_id).pickle_id))
+                file_path=simple_dag.full_filepath,
+                pickle_id=simple_dag.pickle_id))
 
             priority = task_instance.priority_weight
             queue = task_instance.queue
@@ -1412,20 +1413,49 @@ class SchedulerJob(BaseJob):
 
         models.DagStat.update([d.dag_id for d in dags])
 
-    def _process_executor_events(self):
+    @provide_session
+    def _process_executor_events(self, simple_dag_bag, session=None):
         """
         Respond to executor events.
-
-        :param executor: the executor that's running the task instances
-        :type executor: BaseExecutor
-        :return: None
         """
-        for key, executor_state in list(self.executor.get_event_buffer().items()):
+        # TODO: this shares quite a lot of code with _manage_executor_state
+
+        TI = models.TaskInstance
+        for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)
+                                   .items()):
             dag_id, task_id, execution_date = key
             self.log.info(
                 "Executor reports %s.%s execution_date=%s as %s",
-                dag_id, task_id, execution_date, executor_state
+                dag_id, task_id, execution_date, state
             )
+            if state == State.FAILED or state == State.SUCCESS:
+                qry = session.query(TI).filter(TI.dag_id == dag_id,
+                                               TI.task_id == task_id,
+                                               TI.execution_date == execution_date)
+                ti = qry.first()
+                if not ti:
+                    self.log.warning("TaskInstance %s went missing from the database", ti)
+                    continue
+
+                # TODO: should we fail RUNNING as well, as we do in Backfills?
+                if ti.state == State.QUEUED:
+                    msg = ("Executor reports task instance %s finished (%s) "
+                           "although the task says its %s. Was the task "
+                           "killed externally?".format(ti, state, ti.state))
+                    self.log.error(msg)
+                    try:
+                        simple_dag = simple_dag_bag.get_dag(dag_id)
+                        dagbag = models.DagBag(simple_dag.full_filepath)
+                        dag = dagbag.get_dag(dag_id)
+                        ti.task = dag.get_task(task_id)
+                        ti.handle_failure(msg)
+                    except Exception:
+                        self.log.error("Cannot load the dag bag to handle failure for %s"
+                                       ". Setting task to FAILED without callbacks or "
+                                       "retries. Do you have enough resources?", ti)
+                        ti.state = State.FAILED
+                        session.merge(ti)
+                        session.commit()
 
     def _log_file_processing_stats(self,
                                    known_file_paths,
@@ -1626,8 +1656,8 @@ class SchedulerJob(BaseJob):
                 processor_manager.wait_until_finished()
 
             # Send tasks for execution if available
+            simple_dag_bag = SimpleDagBag(simple_dags)
             if len(simple_dags) > 0:
-                simple_dag_bag = SimpleDagBag(simple_dags)
 
                 # Handle cases where a DAG run state is set (perhaps manually) to
                 # a non-running state. Handle task instances that belong to
@@ -1655,7 +1685,7 @@ class SchedulerJob(BaseJob):
             self.executor.heartbeat()
 
             # Process events from the executor
-            self._process_executor_events()
+            self._process_executor_events(simple_dag_bag)
 
             # Heartbeat the scheduler periodically
             time_since_last_heartbeat = (datetime.utcnow() -

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2abead70/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index b80f701..ebb5ca0 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -36,40 +36,13 @@ class SimpleDag(BaseDag):
     required for instantiating and scheduling its associated tasks.
     """
 
-    def __init__(self,
-                 dag_id,
-                 task_ids,
-                 full_filepath,
-                 concurrency,
-                 is_paused,
-                 pickle_id,
-                 task_special_args):
-        """
-        :param dag_id: ID of the DAG
-        :type dag_id: unicode
-        :param task_ids: task IDs associated with the DAG
-        :type task_ids: list[unicode]
-        :param full_filepath: path to the file containing the DAG e.g.
-        /a/b/c.py
-        :type full_filepath: unicode
-        :param concurrency: No more than these many tasks from the
-        dag should run concurrently
-        :type concurrency: int
-        :param is_paused: Whether or not this DAG is paused. Tasks from paused
-        DAGs are not scheduled
-        :type is_paused: bool
+    def __init__(self, dag, pickle_id=None):
+        """
+        :param dag: the DAG
+        :type dag: DAG
         :param pickle_id: ID associated with the pickled version of this DAG.
         :type pickle_id: unicode
         """
-        self._dag_id = dag_id
-        self._task_ids = task_ids
-        self._full_filepath = full_filepath
-        self._is_paused = is_paused
-        self._concurrency = concurrency
-        self._pickle_id = pickle_id
-        self._task_special_args = task_special_args
-
-    def __init__(self, dag, pickle_id=None):
         self._dag_id = dag.dag_id
         self._task_ids = [task.task_id for task in dag.tasks]
         self._full_filepath = dag.full_filepath

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2abead70/tests/executors/test_base_executor.py
----------------------------------------------------------------------
diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py
new file mode 100644
index 0000000..fa6123a
--- /dev/null
+++ b/tests/executors/test_base_executor.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.executors.base_executor import BaseExecutor
+from airflow.utils.state import State
+
+from datetime import datetime
+
+
+class BaseExecutorTest(unittest.TestCase):
+    def test_get_event_buffer(self):
+        executor = BaseExecutor()
+
+        date = datetime.utcnow()
+
+        key1 = ("my_dag1", "my_task1", date)
+        key2 = ("my_dag2", "my_task1", date)
+        key3 = ("my_dag2", "my_task2", date)
+        state = State.SUCCESS
+        executor.event_buffer[key1] = state
+        executor.event_buffer[key2] = state
+        executor.event_buffer[key3] = state
+
+        self.assertEqual(len(executor.get_event_buffer(("my_dag1",))), 1)
+        self.assertEqual(len(executor.get_event_buffer()), 2)
+        self.assertEqual(len(executor.event_buffer), 0)
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2abead70/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 88589d8..119e1b4 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -935,6 +935,51 @@ class SchedulerJobTest(unittest.TestCase):
     def _make_simple_dag_bag(self, dags):
         return SimpleDagBag([SimpleDag(dag) for dag in dags])
 
+    def test_process_executor_events(self):
+        dag_id = "test_process_executor_events"
+        dag_id2 = "test_process_executor_events_2"
+        task_id_1 = 'dummy_task'
+
+        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE)
+        dag2 = DAG(dag_id=dag_id2, start_date=DEFAULT_DATE)
+        task1 = DummyOperator(dag=dag, task_id=task_id_1)
+        task2 = DummyOperator(dag=dag2, task_id=task_id_1)
+
+        dagbag1 = self._make_simple_dag_bag([dag])
+        dagbag2 = self._make_simple_dag_bag([dag2])
+
+        scheduler = SchedulerJob(**self.default_scheduler_args)
+        session = settings.Session()
+
+        ti1 = TI(task1, DEFAULT_DATE)
+        ti1.state = State.QUEUED
+        session.merge(ti1)
+        session.commit()
+
+        executor = TestExecutor()
+        executor.event_buffer[ti1.key] = State.FAILED
+
+        scheduler.executor = executor
+
+        # dag bag does not contain dag_id
+        scheduler._process_executor_events(simple_dag_bag=dagbag2)
+        ti1.refresh_from_db()
+        self.assertEqual(ti1.state, State.QUEUED)
+
+        # dag bag does contain dag_id
+        scheduler._process_executor_events(simple_dag_bag=dagbag1)
+        ti1.refresh_from_db()
+        self.assertEqual(ti1.state, State.FAILED)
+
+        ti1.state = State.SUCCESS
+        session.merge(ti1)
+        session.commit()
+        executor.event_buffer[ti1.key] = State.SUCCESS
+
+        scheduler._process_executor_events(simple_dag_bag=dagbag1)
+        ti1.refresh_from_db()
+        self.assertEqual(ti1.state, State.SUCCESS)
+
     def test_execute_task_instances_is_paused_wont_execute(self):
         dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
         task_id_1 = 'dummy_task'