You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/06/16 08:27:11 UTC

[1/2] incubator-airflow git commit: [AIRFLOW-224] Collect orphaned tasks and reschedule them

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 06e70e2d1 -> 949479cf7


[AIRFLOW-224] Collect orphaned tasks and reschedule them

Tasks can get orphaned if the scheduler is killed in the middle
of processing the tasks or if the MQ queue is cleared without
a worker having picked these up. Now tasks do not get set
to a scheduled state anymore if they have not been sent to the
executor yet. Next to that a garbage collector scans the executor
for tasks not being present and reschedules those if needed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fb892767
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fb892767
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fb892767

Branch: refs/heads/master
Commit: fb89276750d23fc19a2d57036bc59d3aef846a26
Parents: 8aa7160
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sat Jun 11 13:53:32 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 15 21:58:12 2016 +0200

----------------------------------------------------------------------
 airflow/jobs.py                 | 62 +++++++++++++++++++++++++++---------
 airflow/models.py               |  8 +++--
 tests/executor/__init__.py      | 13 ++++++++
 tests/executor/test_executor.py | 33 +++++++++++++++++++
 tests/jobs.py                   | 47 +++++++++++++++++++++++++++
 5 files changed, 146 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 3a2d97a..1e583ac 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -518,12 +518,13 @@ class SchedulerJob(BaseJob):
             if run.execution_date > datetime.now():
                 continue
 
-            # todo: run.task is transient but needs to be set
+            # todo: run.dag is transient but needs to be set
             run.dag = dag
             # todo: preferably the integrity check happens at dag collection time
             run.verify_integrity(session=session)
             run.update_state(session=session)
             if run.state == State.RUNNING:
+                make_transient(run)
                 active_dag_runs.append(run)
 
         for run in active_dag_runs:
@@ -546,20 +547,6 @@ class SchedulerJob(BaseJob):
 
                 if ti.is_runnable(flag_upstream_failed=True):
                     self.logger.debug('Queuing task: {}'.format(ti))
-
-                    ti.refresh_from_db(session=session, lock_for_update=True)
-                    # another scheduler could have picked this task
-                    # todo: UP_FOR_RETRY still could create a race condition
-                    if ti.state is State.SCHEDULED:
-                        session.commit()
-                        self.logger.debug("Task {} was picked up by another scheduler"
-                                          .format(ti))
-                        continue
-                    elif ti.state is State.NONE:
-                        ti.state = State.SCHEDULED
-                        session.merge(ti)
-
-                    session.commit()
                     queue.put((ti.key, pickle_id))
 
         session.close()
@@ -676,7 +663,28 @@ class SchedulerJob(BaseJob):
             except Exception as e:
                 self.logger.exception(e)
 
+    @provide_session
+    def _reset_state_for_orphaned_tasks(self, dag_run, session=None):
+        """
+        This function checks for a DagRun if there are any tasks
+        that have a scheduled state but are not known by the
+        executor. If it finds those it will reset the state to None
+        so they will get picked up again.
+        """
+        queued_tis = self.executor.queued_tasks
+
+        # also consider running as the state might not have changed in the db yet
+        running = self.executor.running
+        tis = dag_run.get_task_instances(state=State.SCHEDULED, session=session)
+        for ti in tis:
+            if ti.key not in queued_tis and ti.key not in running:
+                ti.state = State.NONE
+                self.logger.debug("Rescheduling orphaned task {}".format(ti))
+
+        session.commit()
+
     def _execute(self):
+        session = settings.Session()
         TI = models.TaskInstance
 
         pessimistic_connection_handling()
@@ -687,6 +695,16 @@ class SchedulerJob(BaseJob):
         dagbag = models.DagBag(self.subdir, sync_to_db=True)
         executor = self.executor = dagbag.executor
         executor.start()
+
+        # grab orphaned tasks and make sure to reset their state
+        active_runs = DagRun.find(
+            state=State.RUNNING,
+            external_trigger=False,
+            session=session
+        )
+        for dr in active_runs:
+            self._reset_state_for_orphaned_tasks(dr, session=session)
+
         self.runs = 0
         while not self.num_runs or self.num_runs > self.runs:
             try:
@@ -738,8 +756,20 @@ class SchedulerJob(BaseJob):
                         dag = dagbag.dags[ti_key[0]]
                         task = dag.get_task(ti_key[1])
                         ti = TI(task, ti_key[2])
+                        ti.refresh_from_db(session=session, lock_for_update=True)
+                        if ti.state == State.SCHEDULED:
+                            session.commit()
+                            self.logger.debug("Task {} was picked up by another scheduler"
+                                              .format(ti))
+                            continue
+                        elif ti.state is State.NONE:
+                            ti.state = State.SCHEDULED
+
                         self.executor.queue_task_instance(ti, pickle_id=pickle_id)
 
+                        session.merge(ti)
+                        session.commit()
+
                 for j in jobs:
                     j.join()
 
@@ -770,6 +800,8 @@ class SchedulerJob(BaseJob):
                 settings.Session.remove()
         executor.end()
 
+        session.close()
+
     def heartbeat_callback(self):
         Stats.gauge('scheduler_heartbeat', 1, 1)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index b6b7987..6a10ee8 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3361,10 +3361,12 @@ class DagRun(Base):
 
     @staticmethod
     @provide_session
-    def find(dag_id, run_id=None, execution_date=None,
+    def find(dag_id=None, run_id=None, execution_date=None,
              state=None, external_trigger=None, session=None):
         """
         Returns a set of dag runs for the given search criteria.
+        :param dag_id: the dag_id to find dag runs for
+        :type dag_id: integer, list
         :param run_id: defines the the run id for this dag run
         :type run_id: string
         :param execution_date: the execution date
@@ -3378,7 +3380,9 @@ class DagRun(Base):
         """
         DR = DagRun
 
-        qry = session.query(DR).filter(DR.dag_id == dag_id)
+        qry = session.query(DR)
+        if dag_id:
+            qry = qry.filter(DR.dag_id == dag_id)
         if run_id:
             qry = qry.filter(DR.run_id == run_id)
         if execution_date:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/tests/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/executor/__init__.py b/tests/executor/__init__.py
new file mode 100644
index 0000000..a85b772
--- /dev/null
+++ b/tests/executor/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/tests/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/executor/test_executor.py b/tests/executor/test_executor.py
new file mode 100644
index 0000000..2015d9c
--- /dev/null
+++ b/tests/executor/test_executor.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.executors.base_executor import BaseExecutor
+
+
+class TestExecutor(BaseExecutor):
+    """
+    TestExecutor is used for unit testing purposes.
+    """
+    def execute_async(self, key, command, queue=None):
+        self.logger.debug("{} running task instances".format(len(self.running)))
+        self.logger.debug("{} in queue".format(len(self.queued_tasks)))
+
+    def heartbeat(self):
+        pass
+
+    def terminate(self):
+        pass
+
+    def end(self):
+        self.sync()
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 455716e..3618ce4 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -31,6 +31,8 @@ from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 
+from tests.executor.test_executor import TestExecutor
+
 from airflow import configuration
 configuration.test_mode()
 
@@ -45,6 +47,7 @@ except ImportError:
 DEV_NULL = '/dev/null'
 DEFAULT_DATE = datetime.datetime(2016, 1, 1)
 
+
 class BackfillJobTest(unittest.TestCase):
 
     def setUp(self):
@@ -732,3 +735,47 @@ class SchedulerJobTest(unittest.TestCase):
         dr = scheduler.schedule_dag(dag)
         self.assertIsNotNone(dr)
         self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 10))
+
+    def test_scheduler_reschedule(self):
+        """
+        Checks if tasks that are not taken up by the executor
+        get rescheduled
+        """
+        executor = TestExecutor()
+
+        dagbag = DagBag(executor=executor)
+        dagbag.dags.clear()
+        dagbag.executor = executor
+
+        dag = DAG(
+            dag_id='test_scheduler_reschedule',
+            start_date=DEFAULT_DATE)
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        dag.clear()
+        dag.is_subdag = False
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        orm_dag.is_paused = False
+        session.merge(orm_dag)
+        session.commit()
+
+        dagbag.bag_dag(dag=dag, root_dag=dag, parent_dag=dag)
+
+        @mock.patch('airflow.models.DagBag', return_value=dagbag)
+        @mock.patch('airflow.models.DagBag.collect_dags')
+        def do_schedule(function, function2):
+            scheduler = SchedulerJob(num_runs=1, executor=executor,)
+            scheduler.run()
+
+        do_schedule()
+        self.assertEquals(1, len(executor.queued_tasks))
+        executor.queued_tasks.clear()
+
+        do_schedule()
+        self.assertEquals(2, len(executor.queued_tasks))
+


[2/2] incubator-airflow git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow

Posted by bo...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/949479cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/949479cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/949479cf

Branch: refs/heads/master
Commit: 949479cf7b5c298a31a54e7e2ebe71bf782b5d5d
Parents: fb89276 06e70e2
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu Jun 16 10:26:59 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Jun 16 10:26:59 2016 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py                       | 11 +++--
 airflow/configuration.py                 |  7 ++-
 airflow/contrib/hooks/__init__.py        |  3 +-
 airflow/contrib/hooks/fs_hook.py         | 41 +++++++++++++++++
 airflow/contrib/operators/__init__.py    |  3 +-
 airflow/contrib/operators/fs_operator.py | 57 ++++++++++++++++++++++++
 airflow/utils/db.py                      |  4 ++
 airflow/www/views.py                     |  1 +
 tests/contrib/operators/__init__.py      | 16 +++++++
 tests/contrib/operators/fs_operator.py   | 64 +++++++++++++++++++++++++++
 tests/core.py                            | 50 ++++++++++-----------
 11 files changed, 225 insertions(+), 32 deletions(-)
----------------------------------------------------------------------