You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/16 08:27:11 UTC
[1/2] incubator-airflow git commit: [AIRFLOW-224] Collect orphaned
tasks and reschedule them
Repository: incubator-airflow
Updated Branches:
refs/heads/master 06e70e2d1 -> 949479cf7
[AIRFLOW-224] Collect orphaned tasks and reschedule them
Tasks can get orphaned if the scheduler is killed in the middle
of processing the tasks or if the MQ queue is cleared without
a worker having picked these up. Now tasks do not get set
to a scheduled state anymore if they have not been sent to the
executor yet. Next to that a garbage collector scans the executor
for tasks not being present and reschedules those if needed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fb892767
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fb892767
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fb892767
Branch: refs/heads/master
Commit: fb89276750d23fc19a2d57036bc59d3aef846a26
Parents: 8aa7160
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Jun 11 13:53:32 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 15 21:58:12 2016 +0200
----------------------------------------------------------------------
airflow/jobs.py | 62 +++++++++++++++++++++++++++---------
airflow/models.py | 8 +++--
tests/executor/__init__.py | 13 ++++++++
tests/executor/test_executor.py | 33 +++++++++++++++++++
tests/jobs.py | 47 +++++++++++++++++++++++++++
5 files changed, 146 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 3a2d97a..1e583ac 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -518,12 +518,13 @@ class SchedulerJob(BaseJob):
if run.execution_date > datetime.now():
continue
- # todo: run.task is transient but needs to be set
+ # todo: run.dag is transient but needs to be set
run.dag = dag
# todo: preferably the integrity check happens at dag collection time
run.verify_integrity(session=session)
run.update_state(session=session)
if run.state == State.RUNNING:
+ make_transient(run)
active_dag_runs.append(run)
for run in active_dag_runs:
@@ -546,20 +547,6 @@ class SchedulerJob(BaseJob):
if ti.is_runnable(flag_upstream_failed=True):
self.logger.debug('Queuing task: {}'.format(ti))
-
- ti.refresh_from_db(session=session, lock_for_update=True)
- # another scheduler could have picked this task
- # todo: UP_FOR_RETRY still could create a race condition
- if ti.state is State.SCHEDULED:
- session.commit()
- self.logger.debug("Task {} was picked up by another scheduler"
- .format(ti))
- continue
- elif ti.state is State.NONE:
- ti.state = State.SCHEDULED
- session.merge(ti)
-
- session.commit()
queue.put((ti.key, pickle_id))
session.close()
@@ -676,7 +663,28 @@ class SchedulerJob(BaseJob):
except Exception as e:
self.logger.exception(e)
+ @provide_session
+ def _reset_state_for_orphaned_tasks(self, dag_run, session=None):
+ """
+ This function checks for a DagRun if there are any tasks
+ that have a scheduled state but are not known by the
+ executor. If it finds those it will reset the state to None
+ so they will get picked up again.
+ """
+ queued_tis = self.executor.queued_tasks
+
+ # also consider running as the state might not have changed in the db yet
+ running = self.executor.running
+ tis = dag_run.get_task_instances(state=State.SCHEDULED, session=session)
+ for ti in tis:
+ if ti.key not in queued_tis and ti.key not in running:
+ ti.state = State.NONE
+ self.logger.debug("Rescheduling orphaned task {}".format(ti))
+
+ session.commit()
+
def _execute(self):
+ session = settings.Session()
TI = models.TaskInstance
pessimistic_connection_handling()
@@ -687,6 +695,16 @@ class SchedulerJob(BaseJob):
dagbag = models.DagBag(self.subdir, sync_to_db=True)
executor = self.executor = dagbag.executor
executor.start()
+
+ # grab orphaned tasks and make sure to reset their state
+ active_runs = DagRun.find(
+ state=State.RUNNING,
+ external_trigger=False,
+ session=session
+ )
+ for dr in active_runs:
+ self._reset_state_for_orphaned_tasks(dr, session=session)
+
self.runs = 0
while not self.num_runs or self.num_runs > self.runs:
try:
@@ -738,8 +756,20 @@ class SchedulerJob(BaseJob):
dag = dagbag.dags[ti_key[0]]
task = dag.get_task(ti_key[1])
ti = TI(task, ti_key[2])
+ ti.refresh_from_db(session=session, lock_for_update=True)
+ if ti.state == State.SCHEDULED:
+ session.commit()
+ self.logger.debug("Task {} was picked up by another scheduler"
+ .format(ti))
+ continue
+ elif ti.state is State.NONE:
+ ti.state = State.SCHEDULED
+
self.executor.queue_task_instance(ti, pickle_id=pickle_id)
+ session.merge(ti)
+ session.commit()
+
for j in jobs:
j.join()
@@ -770,6 +800,8 @@ class SchedulerJob(BaseJob):
settings.Session.remove()
executor.end()
+ session.close()
+
def heartbeat_callback(self):
Stats.gauge('scheduler_heartbeat', 1, 1)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index b6b7987..6a10ee8 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3361,10 +3361,12 @@ class DagRun(Base):
@staticmethod
@provide_session
- def find(dag_id, run_id=None, execution_date=None,
+ def find(dag_id=None, run_id=None, execution_date=None,
state=None, external_trigger=None, session=None):
"""
Returns a set of dag runs for the given search criteria.
+ :param dag_id: the dag_id to find dag runs for
+ :type dag_id: integer, list
:param run_id: defines the the run id for this dag run
:type run_id: string
:param execution_date: the execution date
@@ -3378,7 +3380,9 @@ class DagRun(Base):
"""
DR = DagRun
- qry = session.query(DR).filter(DR.dag_id == dag_id)
+ qry = session.query(DR)
+ if dag_id:
+ qry = qry.filter(DR.dag_id == dag_id)
if run_id:
qry = qry.filter(DR.run_id == run_id)
if execution_date:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/tests/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executor/__init__.py b/tests/executor/__init__.py
new file mode 100644
index 0000000..a85b772
--- /dev/null
+++ b/tests/executor/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/tests/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executor/test_executor.py b/tests/executor/test_executor.py
new file mode 100644
index 0000000..2015d9c
--- /dev/null
+++ b/tests/executor/test_executor.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.executors.base_executor import BaseExecutor
+
+
+class TestExecutor(BaseExecutor):
+ """
+ TestExecutor is used for unit testing purposes.
+ """
+ def execute_async(self, key, command, queue=None):
+ self.logger.debug("{} running task instances".format(len(self.running)))
+ self.logger.debug("{} in queue".format(len(self.queued_tasks)))
+
+ def heartbeat(self):
+ pass
+
+ def terminate(self):
+ pass
+
+ def end(self):
+ self.sync()
+
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 455716e..3618ce4 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -31,6 +31,8 @@ from airflow.utils.db import provide_session
from airflow.utils.state import State
from airflow.utils.timeout import timeout
+from tests.executor.test_executor import TestExecutor
+
from airflow import configuration
configuration.test_mode()
@@ -45,6 +47,7 @@ except ImportError:
DEV_NULL = '/dev/null'
DEFAULT_DATE = datetime.datetime(2016, 1, 1)
+
class BackfillJobTest(unittest.TestCase):
def setUp(self):
@@ -732,3 +735,47 @@ class SchedulerJobTest(unittest.TestCase):
dr = scheduler.schedule_dag(dag)
self.assertIsNotNone(dr)
self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 10))
+
+ def test_scheduler_reschedule(self):
+ """
+ Checks if tasks that are not taken up by the executor
+ get rescheduled
+ """
+ executor = TestExecutor()
+
+ dagbag = DagBag(executor=executor)
+ dagbag.dags.clear()
+ dagbag.executor = executor
+
+ dag = DAG(
+ dag_id='test_scheduler_reschedule',
+ start_date=DEFAULT_DATE)
+ dag_task1 = DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ owner='airflow')
+
+ dag.clear()
+ dag.is_subdag = False
+
+ session = settings.Session()
+ orm_dag = DagModel(dag_id=dag.dag_id)
+ orm_dag.is_paused = False
+ session.merge(orm_dag)
+ session.commit()
+
+ dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
+
+ @mock.patch('airflow.models.DagBag', return_value=dagbag)
+ @mock.patch('airflow.models.DagBag.collect_dags')
+ def do_schedule(function, function2):
+ scheduler = SchedulerJob(num_runs=1, executor=executor,)
+ scheduler.run()
+
+ do_schedule()
+ self.assertEquals(1, len(executor.queued_tasks))
+ executor.queued_tasks.clear()
+
+ do_schedule()
+ self.assertEquals(2, len(executor.queued_tasks))
+
[2/2] incubator-airflow git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-airflow
Posted by bo...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/949479cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/949479cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/949479cf
Branch: refs/heads/master
Commit: 949479cf7b5c298a31a54e7e2ebe71bf782b5d5d
Parents: fb89276 06e70e2
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu Jun 16 10:26:59 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Jun 16 10:26:59 2016 +0200
----------------------------------------------------------------------
airflow/bin/cli.py | 11 +++--
airflow/configuration.py | 7 ++-
airflow/contrib/hooks/__init__.py | 3 +-
airflow/contrib/hooks/fs_hook.py | 41 +++++++++++++++++
airflow/contrib/operators/__init__.py | 3 +-
airflow/contrib/operators/fs_operator.py | 57 ++++++++++++++++++++++++
airflow/utils/db.py | 4 ++
airflow/www/views.py | 1 +
tests/contrib/operators/__init__.py | 16 +++++++
tests/contrib/operators/fs_operator.py | 64 +++++++++++++++++++++++++++
tests/core.py | 50 ++++++++++-----------
11 files changed, 225 insertions(+), 32 deletions(-)
----------------------------------------------------------------------