You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/05/09 23:52:39 UTC

[4/5] incubator-airflow git commit: Handle queued tasks from multiple jobs/executors

Handle queued tasks from multiple jobs/executors

When Scheduler is run with `\u2014num-runs`, there can be multiple
Schedulers and Executors all trying to run tasks. For queued tasks,
Scheduler was previously only trying to run tasks that it itself had
queued \u2014 but that doesn\u2019t work if the Scheduler is restarting. This PR
reverts that behavior and adds two types of \u201cbest effort\u201d executions \u2014
before running a TI, executors check if it is already running, and
before ending executors call sync() one last time


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/edc718b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/edc718b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/edc718b4

Branch: refs/heads/airbnb_rb1.7.1_3
Commit: edc718b43941fdd685d332a5e44e3f1363f04e89
Parents: ff3a855
Author: jlowin <jl...@users.noreply.github.com>
Authored: Wed Apr 13 19:18:39 2016 -0400
Committer: Dan Davydov <da...@airbnb.com>
Committed: Mon May 9 16:26:26 2016 -0700

----------------------------------------------------------------------
 airflow/executors/base_executor.py   | 33 ++++++++++++++++------
 airflow/executors/celery_executor.py |  1 +
 airflow/executors/local_executor.py  |  2 +-
 airflow/jobs.py                      | 46 ++++++-------------------------
 tests/dags/test_issue_1225.py        | 16 ++++++++++-
 tests/jobs.py                        | 20 +++++++++++---
 6 files changed, 66 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 0307583..2e88fa9 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -30,10 +30,11 @@ class BaseExecutor(LoggingMixin):
         """
         pass
 
-    def queue_command(self, key, command, priority=1, queue=None):
+    def queue_command(self, task_instance, command, priority=1, queue=None):
+        key = task_instance.key
         if key not in self.queued_tasks and key not in self.running:
             self.logger.info("Adding to queue: {}".format(command))
-            self.queued_tasks[key] = (command, priority, queue)
+            self.queued_tasks[key] = (command, priority, queue, task_instance)
 
     def queue_task_instance(
             self,
@@ -54,7 +55,7 @@ class BaseExecutor(LoggingMixin):
             pool=pool,
             pickle_id=pickle_id)
         self.queue_command(
-            task_instance.key,
+            task_instance,
             command,
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
@@ -67,9 +68,6 @@ class BaseExecutor(LoggingMixin):
         pass
 
     def heartbeat(self):
-        # Calling child class sync method
-        self.logger.debug("Calling the {} sync method".format(self.__class__))
-        self.sync()
 
         # Triggering new jobs
         if not self.parallelism:
@@ -86,10 +84,27 @@ class BaseExecutor(LoggingMixin):
             key=lambda x: x[1][1],
             reverse=True)
         for i in range(min((open_slots, len(self.queued_tasks)))):
-            key, (command, priority, queue) = sorted_queue.pop(0)
-            self.running[key] = command
+            key, (command, _, queue, ti) = sorted_queue.pop(0)
+            # TODO(jlowin) without a way to know what Job ran which tasks,
+            # there is a danger that another Job started running a task
+            # that was also queued to this executor. This is the last chance
+            # to check if that hapened. The most probable way is that a
+            # Scheduler tried to run a task that was originally queued by a
+            # Backfill. This fix reduces the probability of a collision but
+            # does NOT eliminate it.
             self.queued_tasks.pop(key)
-            self.execute_async(key, command=command, queue=queue)
+            ti.refresh_from_db()
+            if ti.state != State.RUNNING:
+                self.running[key] = command
+                self.execute_async(key, command=command, queue=queue)
+            else:
+                self.logger.debug(
+                    'Task is already running, not sending to '
+                    'executor: {}'.format(key))
+
+        # Calling child class sync method
+        self.logger.debug("Calling the {} sync method".format(self.__class__))
+        self.sync()
 
     def change_state(self, key, state):
         self.running.pop(key)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 95f3daa..de56baf 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -95,3 +95,4 @@ class CeleryExecutor(BaseExecutor):
                     async.state not in celery_states.READY_STATES
                     for async in self.tasks.values()]):
                 time.sleep(5)
+        self.sync()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index f13ee6d..24ef6c6 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -73,4 +73,4 @@ class LocalExecutor(BaseExecutor):
         [self.queue.put((None, None)) for w in self.workers]
         # Wait for commands to finish
         self.queue.join()
-
+        self.sync()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ae38298..8a38086 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -238,7 +238,6 @@ class SchedulerJob(BaseJob):
 
         self.refresh_dags_every = refresh_dags_every
         self.do_pickle = do_pickle
-        self.queued_tis = set()
         super(SchedulerJob, self).__init__(*args, **kwargs)
 
         self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
@@ -557,47 +556,22 @@ class SchedulerJob(BaseJob):
 
         session.close()
 
-    def process_events(self, executor, dagbag):
-        """
-        Respond to executor events.
-
-        Used to identify queued tasks and schedule them for further processing.
-        """
-        for key, executor_state in list(executor.get_event_buffer().items()):
-            dag_id, task_id, execution_date = key
-            if dag_id not in dagbag.dags:
-                self.logger.error(
-                    'Executor reported a dag_id that was not found in the '
-                    'DagBag: {}'.format(dag_id))
-                continue
-            elif not dagbag.dags[dag_id].has_task(task_id):
-                self.logger.error(
-                    'Executor reported a task_id that was not found in the '
-                    'dag: {} in dag {}'.format(task_id, dag_id))
-                continue
-            task = dagbag.dags[dag_id].get_task(task_id)
-            ti = models.TaskInstance(task, execution_date)
-            ti.refresh_from_db()
-
-            if executor_state == State.SUCCESS:
-                # collect queued tasks for prioritiztion
-                if ti.state == State.QUEUED:
-                    self.queued_tis.add(ti)
-            else:
-                # special instructions for failed executions could go here
-                pass
-
     @provide_session
     def prioritize_queued(self, session, executor, dagbag):
         # Prioritizing queued task instances
 
         pools = {p.pool: p for p in session.query(models.Pool).all()}
-
+        TI = models.TaskInstance
+        queued_tis = (
+            session.query(TI)
+            .filter(TI.state == State.QUEUED)
+            .all()
+        )
         self.logger.info(
-            "Prioritizing {} queued jobs".format(len(self.queued_tis)))
+            "Prioritizing {} queued jobs".format(len(queued_tis)))
         session.expunge_all()
         d = defaultdict(list)
-        for ti in self.queued_tis:
+        for ti in queued_tis:
             if ti.dag_id not in dagbag.dags:
                 self.logger.info(
                     "DAG no longer in dagbag, deleting {}".format(ti))
@@ -611,8 +585,6 @@ class SchedulerJob(BaseJob):
             else:
                 d[ti.pool].append(ti)
 
-        self.queued_tis.clear()
-
         dag_blacklist = set(dagbag.paused_dags())
         for pool, tis in list(d.items()):
             if not pool:
@@ -666,6 +638,7 @@ class SchedulerJob(BaseJob):
                     open_slots -= 1
                 else:
                     session.delete(ti)
+                    session.commit()
                     continue
                 ti.task = task
 
@@ -711,7 +684,6 @@ class SchedulerJob(BaseJob):
             try:
                 loop_start_dttm = datetime.now()
                 try:
-                    self.process_events(executor=executor, dagbag=dagbag)
                     self.prioritize_queued(executor=executor, dagbag=dagbag)
                 except Exception as e:
                     self.logger.exception(e)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index 898cc04..ecfa646 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -23,6 +23,8 @@ from datetime import datetime
 from airflow.models import DAG
 from airflow.operators import DummyOperator, PythonOperator, SubDagOperator
 from airflow.utils.trigger_rule import TriggerRule
+import time
+
 DEFAULT_DATE = datetime(2016, 1, 1)
 default_args = dict(
     start_date=DEFAULT_DATE,
@@ -31,6 +33,16 @@ default_args = dict(
 def fail():
     raise ValueError('Expected failure.')
 
+def delayed_fail():
+    """
+    Delayed failure to make sure that processes are running before the error
+    is raised.
+    
+    TODO handle more directly (without sleeping)
+    """
+    time.sleep(5)
+    raise ValueError('Expected failure.')
+
 # DAG tests backfill with pooled tasks
 # Previously backfill would queue the task but never run it
 dag1 = DAG(dag_id='test_backfill_pooled_task_dag', default_args=default_args)
@@ -123,7 +135,9 @@ dag8 = DAG(
     end_date=DEFAULT_DATE,
     default_args=default_args)
 dag8_task1 = PythonOperator(
-    python_callable=fail,
+    # use delayed_fail because otherwise LocalExecutor will have a chance to
+    # complete the task
+    python_callable=delayed_fail,
     task_id='test_queued_task',
     dag=dag8,
     pool='test_queued_pool')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index bc815e8..6802aae 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -23,11 +23,13 @@ import unittest
 
 from airflow import AirflowException, settings
 from airflow.bin import cli
+from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
-from airflow.models import DagBag, DagRun, Pool, TaskInstance as TI
+from airflow.models import DAG, DagBag, DagRun, Pool, TaskInstance as TI
+from airflow.operators import DummyOperator
+from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
-from airflow.utils.db import provide_session
 
 from airflow import configuration
 configuration.test_mode()
@@ -283,15 +285,25 @@ class SchedulerJobTest(unittest.TestCase):
         dag = self.dagbag.get_dag(dag_id)
         dag.clear()
 
-        scheduler = SchedulerJob(dag_id, num_runs=10)
+        scheduler = SchedulerJob(dag_id, num_runs=1)
         scheduler.run()
 
         task_1 = dag.tasks[0]
         logging.info("Trying to find task {}".format(task_1))
         ti = TI(task_1, dag.start_date)
         ti.refresh_from_db()
-        self.assertEqual(ti.state, State.FAILED)
+        self.assertEqual(ti.state, State.QUEUED)
 
+        # now we use a DIFFERENT scheduler and executor
+        # to simulate the num-runs CLI arg
+        scheduler2 = SchedulerJob(
+            dag_id,
+            num_runs=5,
+            executor=DEFAULT_EXECUTOR.__class__())
+        scheduler2.run()
+
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.FAILED)
         dag.clear()
 
     def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):