You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by jo...@apache.org on 2012/12/07 17:11:48 UTC

[10/21] git commit: [#5127] ticket:192 taskd maintenance command

[#5127] ticket:192 taskd maintenance command


Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/80ee1bc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/80ee1bc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/80ee1bc8

Branch: refs/heads/cj/4691
Commit: 80ee1bc833dbb207a7da5b33f7dd075b2459ef80
Parents: b5d5fa3
Author: Igor Bondarenko <je...@gmail.com>
Authored: Wed Oct 17 13:55:10 2012 +0300
Committer: Dave Brondsema <db...@geek.net>
Committed: Thu Dec 6 22:46:32 2012 +0000

----------------------------------------------------------------------
 Allura/allura/command/taskd.py         |    7 +-
 Allura/allura/command/taskd_cleanup.py |  166 +++++++++++++++++++++++++++
 Allura/allura/tests/test_commands.py   |   82 +++++++++++++-
 Allura/development.ini                 |   15 ++-
 Allura/setup.py                        |    1 +
 5 files changed, 267 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/allura/command/taskd.py
----------------------------------------------------------------------
diff --git a/Allura/allura/command/taskd.py b/Allura/allura/command/taskd.py
index 947b6fd..d30b315 100644
--- a/Allura/allura/command/taskd.py
+++ b/Allura/allura/command/taskd.py
@@ -1,3 +1,4 @@
+import logging
 import os
 import time
 import Queue
@@ -15,6 +16,8 @@ import base
 
 faulthandler.enable()
 
+status_log = logging.getLogger('taskdstatus')
+
 
 class TaskdCommand(base.Command):
     summary = 'Task server'
@@ -48,7 +51,9 @@ class TaskdCommand(base.Command):
         self.keep_running = False
 
     def log_current_task(self, signum, frame):
-        base.log.info('taskd pid %s is currently handling task %s' % (os.getpid(), getattr(self, 'task', None)))
+        entry = 'taskd pid %s is currently handling task %s' % (os.getpid(), getattr(self, 'task', None))
+        status_log.info(entry)
+        base.log.info(entry)
 
     def worker(self):
         from allura import model as M

http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/allura/command/taskd_cleanup.py
----------------------------------------------------------------------
diff --git a/Allura/allura/command/taskd_cleanup.py b/Allura/allura/command/taskd_cleanup.py
new file mode 100644
index 0000000..3af29a7
--- /dev/null
+++ b/Allura/allura/command/taskd_cleanup.py
@@ -0,0 +1,166 @@
+import os
+import signal
+import socket
+import subprocess
+from ming.orm.ormsession import ThreadLocalORMSession
+
+from allura import model as M
+import base
+
+class TaskdCleanupCommand(base.Command):
+    summary = 'Tasks cleanup command'
+    parser = base.Command.standard_parser(verbose=True)
+    parser.add_option('-k', '--kill-stuck-taskd',
+            dest='kill', action='store_true',
+            help='automatically kill stuck taskd processes')
+    usage = '<ini file> [-k] <taskd status log file>'
+    min_args = 2
+    max_args = 2
+
+    def command(self):
+        self.basic_setup()
+        self.hostname = socket.gethostname()
+        self.taskd_status_log = self.args[1]
+        self.stuck_pids = []
+        self.error_tasks = []
+        self.suspicious_tasks = []
+
+        taskd_pids = self._taskd_pids()
+        base.log.info('Taskd processes on %s: %s' % (self.hostname, taskd_pids))
+
+        # find stuck taskd processes
+        base.log.info('Seeking for stuck taskd processes')
+        for pid in taskd_pids:
+            base.log.info('...sending USR1 to %s and watching status log' % (pid))
+            status = self._check_taskd_status(int(pid))
+            if status != 'OK':
+                base.log.info('...taskd pid %s has stuck' % pid)
+                self.stuck_pids.append(pid)
+                if self.options.kill:
+                    base.log.info('...-k is set. Killing %s' % pid)
+                    self._kill_stuck_taskd(pid)
+            else:
+                base.log.info('...%s' % status)
+
+        # find 'forsaken' tasks
+        base.log.info('Seeking for forsaken busy tasks')
+        tasks = [t for t in self._busy_tasks()
+                 if t not in self.error_tasks]  # skip seen tasks
+        base.log.info('Found %s busy tasks on %s' % (len(tasks), self.hostname))
+        for task in tasks:
+            base.log.info('Verifying task %s' % task)
+            pid = task.process.split()[-1]
+            if pid not in taskd_pids:
+                # 'forsaken' task
+                base.log.info('Task is forsaken '
+                    '(can\'t find taskd with given pid). '
+                    'Setting state to \'error\'')
+                task.state = 'error'
+                task.result = 'Can\'t find taskd with given pid'
+                self.error_tasks.append(task)
+            else:
+                # check if taskd with given pid really processing this task now:
+                base.log.info('Checking that taskd pid %s is really processing task %s' % (pid, task._id))
+                status = self._check_task(pid, task)
+                if status != 'OK':
+                    # maybe task moved quickly and now is complete
+                    # so we need to check such tasks later
+                    # and mark incomplete ones as 'error'
+                    self.suspicious_tasks.append(task)
+                    base.log.info('...NO. Adding task to suspisious list')
+                else:
+                    base.log.info('...OK')
+
+        # check suspicious task and mark incomplete as error
+        base.log.info('Checking suspicious list for incomplete tasks')
+        self._check_suspicious_tasks()
+        ThreadLocalORMSession.flush_all()
+        self.print_summary()
+
+    def print_summary(self):
+        base.log.info('-' * 80)
+        if self.stuck_pids:
+            base.log.info('Found stuck taskd: %s' % self.stuck_pids)
+            if self.options.kill:
+                base.log.info('...stuck taskd processes were killed')
+            else:
+                base.log.info('...to kill these processes run command with -k flag')
+        if self.error_tasks:
+            base.log.info('Tasks marked as \'error\': %s' % self.error_tasks)
+
+    def _busy_tasks(self, pid=None):
+        regex = '^%s ' % self.hostname
+        if pid is not None:
+            regex = '^%s pid %s' % (self.hostname, pid)
+        return M.MonQTask.query.find({
+            'state': 'busy',
+            'process': {'$regex': regex}
+        })
+
+    def _taskd_pids(self):
+        p = subprocess.Popen(['pgrep', '-f', '/paster taskd'],
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE)
+        stdout, stderr = p.communicate()
+        tasks = []
+        if p.returncode == 0:
+            # p.communicate() returns self-process too,
+            # so we need to skip last pid
+            tasks = [pid for pid in stdout.split('\n') if pid != ''][:-1]
+        return tasks
+
+    def _taskd_status(self, pid):
+        os.kill(int(pid), signal.SIGUSR1)
+        p = subprocess.Popen(['tail', '-n1', self.taskd_status_log],
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE)
+        stdout, stderr = p.communicate()
+        if p.returncode != 0:
+            base.log.error('Can\'t read taskd status log %s' % self.taskd_status_log)
+            exit(1)
+        return stdout
+
+    def _check_taskd_status(self, pid):
+        status = self._taskd_status(pid)
+        if ('taskd pid %s' % pid) not in status:
+            return 'STUCK'
+        return 'OK'
+
+    def _check_task(self, taskd_pid, task):
+        status = self._taskd_status(taskd_pid)
+        line = 'taskd pid %s is currently handling task %s' % (taskd_pid, task)
+        if line not in status:
+            return 'FAIL'
+        return 'OK'
+
+    def _kill_stuck_taskd(self, pid):
+        os.kill(int(pid), signal.SIGKILL)
+        # find all 'busy' tasks for this pid and mark them as 'error'
+        tasks = list(self._busy_tasks(pid=pid))
+        base.log.info('...taskd pid %s has assigned tasks: %s. '
+                'setting state to \'error\' for all of them' % (pid, tasks))
+        for task in tasks:
+            task.state = 'error'
+            task.result = 'Taskd has stuck with this task'
+            self.error_tasks.append(task)
+
+    def _complete_suspicious_tasks(self):
+        complete_tasks = M.MonQTask.query.find({
+            'state': 'complete',
+            '_id': {'$in': [t._id for t in self.suspicious_tasks]}
+        });
+        return [t._id for t in complete_tasks]
+
+    def _check_suspicious_tasks(self):
+        if not self.suspicious_tasks:
+            return
+        complete_tasks = self._complete_suspicious_tasks()
+        for task in self.suspicious_tasks:
+            base.log.info('Verifying task %s' % task)
+            if task._id not in complete_tasks:
+                base.log.info('...incomplete. Setting status to \'error\'')
+                task.state = 'error'
+                task.result = 'Forsaken task'
+                self.error_tasks.append(task)
+            else:
+                base.log.info('...complete')

http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/allura/tests/test_commands.py
----------------------------------------------------------------------
diff --git a/Allura/allura/tests/test_commands.py b/Allura/allura/tests/test_commands.py
index fadd57b..316ef01 100644
--- a/Allura/allura/tests/test_commands.py
+++ b/Allura/allura/tests/test_commands.py
@@ -5,7 +5,7 @@ from mock import Mock, call
 
 from alluratest.controller import setup_basic_test, setup_global_objects
 from allura.command import script, set_neighborhood_features, \
-                           create_neighborhood, show_models
+                           create_neighborhood, show_models, taskd_cleanup
 from allura import model as M
 from forgeblog import model as BM
 from allura.lib.exceptions import InvalidNBFeatureValueError
@@ -186,3 +186,83 @@ class TestEnsureIndexCommand(object):
             call.ensure_index([('foo', 1), ('baz', 1)], unique=True),
             call.ensure_index([('foo', 1), ('bar', 1)], background=True)
         ])
+
+
+class TestTaskdCleanupCommand(object):
+
+    def setUp(self):
+        self.cmd_class = taskd_cleanup.TaskdCleanupCommand
+        self.cmd_class._check_taskd_status = lambda x, p: 'OK'
+        self.cmd_class._check_task = lambda x, p, t: 'OK'
+        self.cmd_class._busy_tasks = lambda x: []
+        self.cmd_class._taskd_pids = lambda x: ['1111']
+        self.cmd_class._kill_stuck_taskd = Mock()
+        self.cmd_class._complete_suspicious_tasks = lambda x: []
+
+    def test_forsaken_tasks(self):
+        # forsaken task
+        task = Mock(state='busy', process='host pid 1111', result='')
+        self.cmd_class._busy_tasks = lambda x: [task]
+        self.cmd_class._taskd_pids = lambda x: ['2222']
+
+        cmd = self.cmd_class('taskd_command')
+        cmd.run([test_config, 'fake.log'])
+        assert task.state == 'error', task.state
+        assert task.result == 'Can\'t find taskd with given pid', task.result
+        assert cmd.error_tasks == [task]
+
+        # task actually running taskd pid == task.process pid == 2222
+        task = Mock(state='busy', process='host pid 2222', result='')
+        self.cmd_class._busy_tasks = lambda x: [task]
+        self.cmd_class._taskd_pids = lambda x: ['2222']
+
+        cmd = self.cmd_class('taskd_command')
+        cmd.run([test_config, 'fake.log'])
+        # nothing should change
+        assert task.state == 'busy', task.state
+        assert task.result == '', task.result
+        assert cmd.error_tasks == []
+
+    def test_stuck_taskd(self):
+        # does not stuck
+        cmd = self.cmd_class('taskd_command')
+        cmd.run([test_config, 'fake.log'])
+        assert cmd.stuck_pids == [], cmd.stuck_pids
+
+        # stuck
+        self.cmd_class._check_taskd_status = lambda x, p: 'STUCK'
+        cmd = self.cmd_class('taskd_command')
+        cmd.run([test_config, 'fake.log'])
+        assert cmd.stuck_pids == ['1111'], cmd.stuck_pids
+
+        # stuck with -k option
+        self.cmd_class._check_taskd_status = lambda x, p: 'STUCK'
+        cmd = self.cmd_class('taskd_command')
+        cmd.run([test_config, '-k', 'fake.log'])
+        cmd._kill_stuck_taskd.assert_called_with('1111')
+        assert cmd.stuck_pids == ['1111'], cmd.stuck_pids
+
+    def test_suspicious_tasks(self):
+        # task1 is lost
+        task1 = Mock(state='busy', process='host pid 1111', result='', _id=1)
+        task2 = Mock(state='busy', process='host pid 1111', result='', _id=2)
+        self.cmd_class._busy_tasks = lambda x: [task1, task2]
+        self.cmd_class._check_task = lambda x, p, t: 'FAIL' if t._id == 1 else 'OK'
+        cmd = self.cmd_class('taskd_command')
+        cmd.run([test_config, 'fake.log'])
+        assert cmd.suspicious_tasks == [task1], cmd.suspicious_tasks
+        assert cmd.error_tasks == [task1], cmd.error_tasks
+        assert task1.state == 'error'
+        assert task1.result == 'Forsaken task'
+
+        # task1 seems lost, but it just moved quickly
+        task1 = Mock(state='complete', process='host pid 1111', result='', _id=1)
+        task2 = Mock(state='busy', process='host pid 1111', result='', _id=2)
+        self.cmd_class._complete_suspicious_tasks = lambda x: [1]
+        self.cmd_class._busy_tasks = lambda x: [task1, task2]
+        self.cmd_class._check_task = lambda x, p, t: 'FAIL' if t._id == 1 else 'OK'
+        cmd = self.cmd_class('taskd_command')
+        cmd.run([test_config, 'fake.log'])
+        assert cmd.suspicious_tasks == [task1], cmd.suspicious_tasks
+        assert cmd.error_tasks == [], cmd.error_tasks
+        assert task1.state == 'complete'

http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/development.ini
----------------------------------------------------------------------
diff --git a/Allura/development.ini b/Allura/development.ini
index a543b30..19000a6 100644
--- a/Allura/development.ini
+++ b/Allura/development.ini
@@ -273,10 +273,10 @@ override_root = task
 # http://docs.python.org/lib/logging-config-fileformat.html
 
 [loggers]
-keys = root, allura, sqlalchemy, paste, amqp, pylons
+keys = root, allura, sqlalchemy, paste, amqp, pylons, taskdstatus
 
 [handlers]
-keys = console, stats
+keys = console, stats, taskdstatus
 
 [formatters]
 keys = generic, stats
@@ -314,6 +314,11 @@ level = INFO
 qualname = pylons
 handlers =
 
+[logger_taskdstatus]
+level = INFO
+qualname = taskdstatus
+handlers = taskdstatus
+
 # If you create additional handlers, add them as a key to [handlers]
 [handler_console]
 class = StreamHandler
@@ -327,6 +332,12 @@ args = ('rtstats.log', 'allura', 1)
 level = NOTSET
 formatter = stats
 
+[handler_taskdstatus]
+class = handlers.WatchedFileHandler
+args = ('/var/log/allura/taskd_status.log', 'a')
+level = NOTSET
+formatter = generic
+
 # If you create additional formatters, add them as a key to [formatters]
 [formatter_generic]
 format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s

http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/setup.py
----------------------------------------------------------------------
diff --git a/Allura/setup.py b/Allura/setup.py
index 3837535..916dc57 100644
--- a/Allura/setup.py
+++ b/Allura/setup.py
@@ -107,6 +107,7 @@ setup(
 
     [paste.paster_command]
     taskd = allura.command.taskd:TaskdCommand
+    taskd_cleanup = allura.command.taskd_cleanup:TaskdCleanupCommand
     task = allura.command.taskd:TaskCommand
     models = allura.command:ShowModelsCommand
     reindex = allura.command:ReindexCommand