You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/05/09 23:52:39 UTC
[4/5] incubator-airflow git commit: Handle queued tasks from multiple
jobs/executors
Handle queued tasks from multiple jobs/executors
When Scheduler is run with `\u2014num-runs`, there can be multiple
Schedulers and Executors all trying to run tasks. For queued tasks,
Scheduler was previously only trying to run tasks that it itself had
queued \u2014 but that doesn\u2019t work if the Scheduler is restarting. This PR
reverts that behavior and adds two types of \u201cbest effort\u201d executions \u2014
before running a TI, executors check if it is already running, and
before ending executors call sync() one last time
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/edc718b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/edc718b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/edc718b4
Branch: refs/heads/airbnb_rb1.7.1_3
Commit: edc718b43941fdd685d332a5e44e3f1363f04e89
Parents: ff3a855
Author: jlowin <jl...@users.noreply.github.com>
Authored: Wed Apr 13 19:18:39 2016 -0400
Committer: Dan Davydov <da...@airbnb.com>
Committed: Mon May 9 16:26:26 2016 -0700
----------------------------------------------------------------------
airflow/executors/base_executor.py | 33 ++++++++++++++++------
airflow/executors/celery_executor.py | 1 +
airflow/executors/local_executor.py | 2 +-
airflow/jobs.py | 46 ++++++-------------------------
tests/dags/test_issue_1225.py | 16 ++++++++++-
tests/jobs.py | 20 +++++++++++---
6 files changed, 66 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 0307583..2e88fa9 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -30,10 +30,11 @@ class BaseExecutor(LoggingMixin):
"""
pass
- def queue_command(self, key, command, priority=1, queue=None):
+ def queue_command(self, task_instance, command, priority=1, queue=None):
+ key = task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.logger.info("Adding to queue: {}".format(command))
- self.queued_tasks[key] = (command, priority, queue)
+ self.queued_tasks[key] = (command, priority, queue, task_instance)
def queue_task_instance(
self,
@@ -54,7 +55,7 @@ class BaseExecutor(LoggingMixin):
pool=pool,
pickle_id=pickle_id)
self.queue_command(
- task_instance.key,
+ task_instance,
command,
priority=task_instance.task.priority_weight_total,
queue=task_instance.task.queue)
@@ -67,9 +68,6 @@ class BaseExecutor(LoggingMixin):
pass
def heartbeat(self):
- # Calling child class sync method
- self.logger.debug("Calling the {} sync method".format(self.__class__))
- self.sync()
# Triggering new jobs
if not self.parallelism:
@@ -86,10 +84,27 @@ class BaseExecutor(LoggingMixin):
key=lambda x: x[1][1],
reverse=True)
for i in range(min((open_slots, len(self.queued_tasks)))):
- key, (command, priority, queue) = sorted_queue.pop(0)
- self.running[key] = command
+ key, (command, _, queue, ti) = sorted_queue.pop(0)
+ # TODO(jlowin) without a way to know what Job ran which tasks,
+ # there is a danger that another Job started running a task
+ # that was also queued to this executor. This is the last chance
+ # to check if that hapened. The most probable way is that a
+ # Scheduler tried to run a task that was originally queued by a
+ # Backfill. This fix reduces the probability of a collision but
+ # does NOT eliminate it.
self.queued_tasks.pop(key)
- self.execute_async(key, command=command, queue=queue)
+ ti.refresh_from_db()
+ if ti.state != State.RUNNING:
+ self.running[key] = command
+ self.execute_async(key, command=command, queue=queue)
+ else:
+ self.logger.debug(
+ 'Task is already running, not sending to '
+ 'executor: {}'.format(key))
+
+ # Calling child class sync method
+ self.logger.debug("Calling the {} sync method".format(self.__class__))
+ self.sync()
def change_state(self, key, state):
self.running.pop(key)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 95f3daa..de56baf 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -95,3 +95,4 @@ class CeleryExecutor(BaseExecutor):
async.state not in celery_states.READY_STATES
for async in self.tasks.values()]):
time.sleep(5)
+ self.sync()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index f13ee6d..24ef6c6 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -73,4 +73,4 @@ class LocalExecutor(BaseExecutor):
[self.queue.put((None, None)) for w in self.workers]
# Wait for commands to finish
self.queue.join()
-
+ self.sync()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ae38298..8a38086 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -238,7 +238,6 @@ class SchedulerJob(BaseJob):
self.refresh_dags_every = refresh_dags_every
self.do_pickle = do_pickle
- self.queued_tis = set()
super(SchedulerJob, self).__init__(*args, **kwargs)
self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
@@ -557,47 +556,22 @@ class SchedulerJob(BaseJob):
session.close()
- def process_events(self, executor, dagbag):
- """
- Respond to executor events.
-
- Used to identify queued tasks and schedule them for further processing.
- """
- for key, executor_state in list(executor.get_event_buffer().items()):
- dag_id, task_id, execution_date = key
- if dag_id not in dagbag.dags:
- self.logger.error(
- 'Executor reported a dag_id that was not found in the '
- 'DagBag: {}'.format(dag_id))
- continue
- elif not dagbag.dags[dag_id].has_task(task_id):
- self.logger.error(
- 'Executor reported a task_id that was not found in the '
- 'dag: {} in dag {}'.format(task_id, dag_id))
- continue
- task = dagbag.dags[dag_id].get_task(task_id)
- ti = models.TaskInstance(task, execution_date)
- ti.refresh_from_db()
-
- if executor_state == State.SUCCESS:
- # collect queued tasks for prioritiztion
- if ti.state == State.QUEUED:
- self.queued_tis.add(ti)
- else:
- # special instructions for failed executions could go here
- pass
-
@provide_session
def prioritize_queued(self, session, executor, dagbag):
# Prioritizing queued task instances
pools = {p.pool: p for p in session.query(models.Pool).all()}
-
+ TI = models.TaskInstance
+ queued_tis = (
+ session.query(TI)
+ .filter(TI.state == State.QUEUED)
+ .all()
+ )
self.logger.info(
- "Prioritizing {} queued jobs".format(len(self.queued_tis)))
+ "Prioritizing {} queued jobs".format(len(queued_tis)))
session.expunge_all()
d = defaultdict(list)
- for ti in self.queued_tis:
+ for ti in queued_tis:
if ti.dag_id not in dagbag.dags:
self.logger.info(
"DAG no longer in dagbag, deleting {}".format(ti))
@@ -611,8 +585,6 @@ class SchedulerJob(BaseJob):
else:
d[ti.pool].append(ti)
- self.queued_tis.clear()
-
dag_blacklist = set(dagbag.paused_dags())
for pool, tis in list(d.items()):
if not pool:
@@ -666,6 +638,7 @@ class SchedulerJob(BaseJob):
open_slots -= 1
else:
session.delete(ti)
+ session.commit()
continue
ti.task = task
@@ -711,7 +684,6 @@ class SchedulerJob(BaseJob):
try:
loop_start_dttm = datetime.now()
try:
- self.process_events(executor=executor, dagbag=dagbag)
self.prioritize_queued(executor=executor, dagbag=dagbag)
except Exception as e:
self.logger.exception(e)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index 898cc04..ecfa646 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -23,6 +23,8 @@ from datetime import datetime
from airflow.models import DAG
from airflow.operators import DummyOperator, PythonOperator, SubDagOperator
from airflow.utils.trigger_rule import TriggerRule
+import time
+
DEFAULT_DATE = datetime(2016, 1, 1)
default_args = dict(
start_date=DEFAULT_DATE,
@@ -31,6 +33,16 @@ default_args = dict(
def fail():
raise ValueError('Expected failure.')
+def delayed_fail():
+ """
+ Delayed failure to make sure that processes are running before the error
+ is raised.
+
+ TODO handle more directly (without sleeping)
+ """
+ time.sleep(5)
+ raise ValueError('Expected failure.')
+
# DAG tests backfill with pooled tasks
# Previously backfill would queue the task but never run it
dag1 = DAG(dag_id='test_backfill_pooled_task_dag', default_args=default_args)
@@ -123,7 +135,9 @@ dag8 = DAG(
end_date=DEFAULT_DATE,
default_args=default_args)
dag8_task1 = PythonOperator(
- python_callable=fail,
+ # use delayed_fail because otherwise LocalExecutor will have a chance to
+ # complete the task
+ python_callable=delayed_fail,
task_id='test_queued_task',
dag=dag8,
pool='test_queued_pool')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/edc718b4/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index bc815e8..6802aae 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -23,11 +23,13 @@ import unittest
from airflow import AirflowException, settings
from airflow.bin import cli
+from airflow.executors import DEFAULT_EXECUTOR
from airflow.jobs import BackfillJob, SchedulerJob
-from airflow.models import DagBag, DagRun, Pool, TaskInstance as TI
+from airflow.models import DAG, DagBag, DagRun, Pool, TaskInstance as TI
+from airflow.operators import DummyOperator
+from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
-from airflow.utils.db import provide_session
from airflow import configuration
configuration.test_mode()
@@ -283,15 +285,25 @@ class SchedulerJobTest(unittest.TestCase):
dag = self.dagbag.get_dag(dag_id)
dag.clear()
- scheduler = SchedulerJob(dag_id, num_runs=10)
+ scheduler = SchedulerJob(dag_id, num_runs=1)
scheduler.run()
task_1 = dag.tasks[0]
logging.info("Trying to find task {}".format(task_1))
ti = TI(task_1, dag.start_date)
ti.refresh_from_db()
- self.assertEqual(ti.state, State.FAILED)
+ self.assertEqual(ti.state, State.QUEUED)
+ # now we use a DIFFERENT scheduler and executor
+ # to simulate the num-runs CLI arg
+ scheduler2 = SchedulerJob(
+ dag_id,
+ num_runs=5,
+ executor=DEFAULT_EXECUTOR.__class__())
+ scheduler2.run()
+
+ ti.refresh_from_db()
+ self.assertEqual(ti.state, State.FAILED)
dag.clear()
def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):