You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by jo...@apache.org on 2012/12/07 17:11:48 UTC
[10/21] git commit: [#5127] ticket:192 taskd maintenance command
[#5127] ticket:192 taskd maintenance command
Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/80ee1bc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/80ee1bc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/80ee1bc8
Branch: refs/heads/cj/4691
Commit: 80ee1bc833dbb207a7da5b33f7dd075b2459ef80
Parents: b5d5fa3
Author: Igor Bondarenko <je...@gmail.com>
Authored: Wed Oct 17 13:55:10 2012 +0300
Committer: Dave Brondsema <db...@geek.net>
Committed: Thu Dec 6 22:46:32 2012 +0000
----------------------------------------------------------------------
Allura/allura/command/taskd.py | 7 +-
Allura/allura/command/taskd_cleanup.py | 166 +++++++++++++++++++++++++++
Allura/allura/tests/test_commands.py | 82 +++++++++++++-
Allura/development.ini | 15 ++-
Allura/setup.py | 1 +
5 files changed, 267 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/allura/command/taskd.py
----------------------------------------------------------------------
diff --git a/Allura/allura/command/taskd.py b/Allura/allura/command/taskd.py
index 947b6fd..d30b315 100644
--- a/Allura/allura/command/taskd.py
+++ b/Allura/allura/command/taskd.py
@@ -1,3 +1,4 @@
+import logging
import os
import time
import Queue
@@ -15,6 +16,8 @@ import base
faulthandler.enable()
+status_log = logging.getLogger('taskdstatus')
+
class TaskdCommand(base.Command):
summary = 'Task server'
@@ -48,7 +51,9 @@ class TaskdCommand(base.Command):
self.keep_running = False
def log_current_task(self, signum, frame):
- base.log.info('taskd pid %s is currently handling task %s' % (os.getpid(), getattr(self, 'task', None)))
+ entry = 'taskd pid %s is currently handling task %s' % (os.getpid(), getattr(self, 'task', None))
+ status_log.info(entry)
+ base.log.info(entry)
def worker(self):
from allura import model as M
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/allura/command/taskd_cleanup.py
----------------------------------------------------------------------
diff --git a/Allura/allura/command/taskd_cleanup.py b/Allura/allura/command/taskd_cleanup.py
new file mode 100644
index 0000000..3af29a7
--- /dev/null
+++ b/Allura/allura/command/taskd_cleanup.py
@@ -0,0 +1,166 @@
+import os
+import signal
+import socket
+import subprocess
+from ming.orm.ormsession import ThreadLocalORMSession
+
+from allura import model as M
+import base
+
+class TaskdCleanupCommand(base.Command):
+ summary = 'Tasks cleanup command'
+ parser = base.Command.standard_parser(verbose=True)
+ parser.add_option('-k', '--kill-stuck-taskd',
+ dest='kill', action='store_true',
+ help='automatically kill stuck taskd processes')
+ usage = '<ini file> [-k] <taskd status log file>'
+ min_args = 2
+ max_args = 2
+
+ def command(self):
+ self.basic_setup()
+ self.hostname = socket.gethostname()
+ self.taskd_status_log = self.args[1]
+ self.stuck_pids = []
+ self.error_tasks = []
+ self.suspicious_tasks = []
+
+ taskd_pids = self._taskd_pids()
+ base.log.info('Taskd processes on %s: %s' % (self.hostname, taskd_pids))
+
+ # find stuck taskd processes
+ base.log.info('Seeking for stuck taskd processes')
+ for pid in taskd_pids:
+ base.log.info('...sending USR1 to %s and watching status log' % (pid))
+ status = self._check_taskd_status(int(pid))
+ if status != 'OK':
+ base.log.info('...taskd pid %s has stuck' % pid)
+ self.stuck_pids.append(pid)
+ if self.options.kill:
+ base.log.info('...-k is set. Killing %s' % pid)
+ self._kill_stuck_taskd(pid)
+ else:
+ base.log.info('...%s' % status)
+
+ # find 'forsaken' tasks
+ base.log.info('Seeking for forsaken busy tasks')
+ tasks = [t for t in self._busy_tasks()
+ if t not in self.error_tasks] # skip seen tasks
+ base.log.info('Found %s busy tasks on %s' % (len(tasks), self.hostname))
+ for task in tasks:
+ base.log.info('Verifying task %s' % task)
+ pid = task.process.split()[-1]
+ if pid not in taskd_pids:
+ # 'forsaken' task
+ base.log.info('Task is forsaken '
+ '(can\'t find taskd with given pid). '
+ 'Setting state to \'error\'')
+ task.state = 'error'
+ task.result = 'Can\'t find taskd with given pid'
+ self.error_tasks.append(task)
+ else:
+ # check if taskd with given pid really processing this task now:
+ base.log.info('Checking that taskd pid %s is really processing task %s' % (pid, task._id))
+ status = self._check_task(pid, task)
+ if status != 'OK':
+ # maybe task moved quickly and now is complete
+ # so we need to check such tasks later
+ # and mark incomplete ones as 'error'
+ self.suspicious_tasks.append(task)
+ base.log.info('...NO. Adding task to suspisious list')
+ else:
+ base.log.info('...OK')
+
+ # check suspicious task and mark incomplete as error
+ base.log.info('Checking suspicious list for incomplete tasks')
+ self._check_suspicious_tasks()
+ ThreadLocalORMSession.flush_all()
+ self.print_summary()
+
+ def print_summary(self):
+ base.log.info('-' * 80)
+ if self.stuck_pids:
+ base.log.info('Found stuck taskd: %s' % self.stuck_pids)
+ if self.options.kill:
+ base.log.info('...stuck taskd processes were killed')
+ else:
+ base.log.info('...to kill these processes run command with -k flag')
+ if self.error_tasks:
+ base.log.info('Tasks marked as \'error\': %s' % self.error_tasks)
+
+ def _busy_tasks(self, pid=None):
+ regex = '^%s ' % self.hostname
+ if pid is not None:
+ regex = '^%s pid %s' % (self.hostname, pid)
+ return M.MonQTask.query.find({
+ 'state': 'busy',
+ 'process': {'$regex': regex}
+ })
+
+ def _taskd_pids(self):
+ p = subprocess.Popen(['pgrep', '-f', '/paster taskd'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ tasks = []
+ if p.returncode == 0:
+ # p.communicate() returns self-process too,
+ # so we need to skip last pid
+ tasks = [pid for pid in stdout.split('\n') if pid != ''][:-1]
+ return tasks
+
+ def _taskd_status(self, pid):
+ os.kill(int(pid), signal.SIGUSR1)
+ p = subprocess.Popen(['tail', '-n1', self.taskd_status_log],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if p.returncode != 0:
+ base.log.error('Can\'t read taskd status log %s' % self.taskd_status_log)
+ exit(1)
+ return stdout
+
+ def _check_taskd_status(self, pid):
+ status = self._taskd_status(pid)
+ if ('taskd pid %s' % pid) not in status:
+ return 'STUCK'
+ return 'OK'
+
+ def _check_task(self, taskd_pid, task):
+ status = self._taskd_status(taskd_pid)
+ line = 'taskd pid %s is currently handling task %s' % (taskd_pid, task)
+ if line not in status:
+ return 'FAIL'
+ return 'OK'
+
+ def _kill_stuck_taskd(self, pid):
+ os.kill(int(pid), signal.SIGKILL)
+ # find all 'busy' tasks for this pid and mark them as 'error'
+ tasks = list(self._busy_tasks(pid=pid))
+ base.log.info('...taskd pid %s has assigned tasks: %s. '
+ 'setting state to \'error\' for all of them' % (pid, tasks))
+ for task in tasks:
+ task.state = 'error'
+ task.result = 'Taskd has stuck with this task'
+ self.error_tasks.append(task)
+
+ def _complete_suspicious_tasks(self):
+ complete_tasks = M.MonQTask.query.find({
+ 'state': 'complete',
+ '_id': {'$in': [t._id for t in self.suspicious_tasks]}
+ });
+ return [t._id for t in complete_tasks]
+
+ def _check_suspicious_tasks(self):
+ if not self.suspicious_tasks:
+ return
+ complete_tasks = self._complete_suspicious_tasks()
+ for task in self.suspicious_tasks:
+ base.log.info('Verifying task %s' % task)
+ if task._id not in complete_tasks:
+ base.log.info('...incomplete. Setting status to \'error\'')
+ task.state = 'error'
+ task.result = 'Forsaken task'
+ self.error_tasks.append(task)
+ else:
+ base.log.info('...complete')
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/allura/tests/test_commands.py
----------------------------------------------------------------------
diff --git a/Allura/allura/tests/test_commands.py b/Allura/allura/tests/test_commands.py
index fadd57b..316ef01 100644
--- a/Allura/allura/tests/test_commands.py
+++ b/Allura/allura/tests/test_commands.py
@@ -5,7 +5,7 @@ from mock import Mock, call
from alluratest.controller import setup_basic_test, setup_global_objects
from allura.command import script, set_neighborhood_features, \
- create_neighborhood, show_models
+ create_neighborhood, show_models, taskd_cleanup
from allura import model as M
from forgeblog import model as BM
from allura.lib.exceptions import InvalidNBFeatureValueError
@@ -186,3 +186,83 @@ class TestEnsureIndexCommand(object):
call.ensure_index([('foo', 1), ('baz', 1)], unique=True),
call.ensure_index([('foo', 1), ('bar', 1)], background=True)
])
+
+
+class TestTaskdCleanupCommand(object):
+
+ def setUp(self):
+ self.cmd_class = taskd_cleanup.TaskdCleanupCommand
+ self.cmd_class._check_taskd_status = lambda x, p: 'OK'
+ self.cmd_class._check_task = lambda x, p, t: 'OK'
+ self.cmd_class._busy_tasks = lambda x: []
+ self.cmd_class._taskd_pids = lambda x: ['1111']
+ self.cmd_class._kill_stuck_taskd = Mock()
+ self.cmd_class._complete_suspicious_tasks = lambda x: []
+
+ def test_forsaken_tasks(self):
+ # forsaken task
+ task = Mock(state='busy', process='host pid 1111', result='')
+ self.cmd_class._busy_tasks = lambda x: [task]
+ self.cmd_class._taskd_pids = lambda x: ['2222']
+
+ cmd = self.cmd_class('taskd_command')
+ cmd.run([test_config, 'fake.log'])
+ assert task.state == 'error', task.state
+ assert task.result == 'Can\'t find taskd with given pid', task.result
+ assert cmd.error_tasks == [task]
+
+ # task actually running taskd pid == task.process pid == 2222
+ task = Mock(state='busy', process='host pid 2222', result='')
+ self.cmd_class._busy_tasks = lambda x: [task]
+ self.cmd_class._taskd_pids = lambda x: ['2222']
+
+ cmd = self.cmd_class('taskd_command')
+ cmd.run([test_config, 'fake.log'])
+ # nothing should change
+ assert task.state == 'busy', task.state
+ assert task.result == '', task.result
+ assert cmd.error_tasks == []
+
+ def test_stuck_taskd(self):
+ # does not stuck
+ cmd = self.cmd_class('taskd_command')
+ cmd.run([test_config, 'fake.log'])
+ assert cmd.stuck_pids == [], cmd.stuck_pids
+
+ # stuck
+ self.cmd_class._check_taskd_status = lambda x, p: 'STUCK'
+ cmd = self.cmd_class('taskd_command')
+ cmd.run([test_config, 'fake.log'])
+ assert cmd.stuck_pids == ['1111'], cmd.stuck_pids
+
+ # stuck with -k option
+ self.cmd_class._check_taskd_status = lambda x, p: 'STUCK'
+ cmd = self.cmd_class('taskd_command')
+ cmd.run([test_config, '-k', 'fake.log'])
+ cmd._kill_stuck_taskd.assert_called_with('1111')
+ assert cmd.stuck_pids == ['1111'], cmd.stuck_pids
+
+ def test_suspicious_tasks(self):
+ # task1 is lost
+ task1 = Mock(state='busy', process='host pid 1111', result='', _id=1)
+ task2 = Mock(state='busy', process='host pid 1111', result='', _id=2)
+ self.cmd_class._busy_tasks = lambda x: [task1, task2]
+ self.cmd_class._check_task = lambda x, p, t: 'FAIL' if t._id == 1 else 'OK'
+ cmd = self.cmd_class('taskd_command')
+ cmd.run([test_config, 'fake.log'])
+ assert cmd.suspicious_tasks == [task1], cmd.suspicious_tasks
+ assert cmd.error_tasks == [task1], cmd.error_tasks
+ assert task1.state == 'error'
+ assert task1.result == 'Forsaken task'
+
+ # task1 seems lost, but it just moved quickly
+ task1 = Mock(state='complete', process='host pid 1111', result='', _id=1)
+ task2 = Mock(state='busy', process='host pid 1111', result='', _id=2)
+ self.cmd_class._complete_suspicious_tasks = lambda x: [1]
+ self.cmd_class._busy_tasks = lambda x: [task1, task2]
+ self.cmd_class._check_task = lambda x, p, t: 'FAIL' if t._id == 1 else 'OK'
+ cmd = self.cmd_class('taskd_command')
+ cmd.run([test_config, 'fake.log'])
+ assert cmd.suspicious_tasks == [task1], cmd.suspicious_tasks
+ assert cmd.error_tasks == [], cmd.error_tasks
+ assert task1.state == 'complete'
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/development.ini
----------------------------------------------------------------------
diff --git a/Allura/development.ini b/Allura/development.ini
index a543b30..19000a6 100644
--- a/Allura/development.ini
+++ b/Allura/development.ini
@@ -273,10 +273,10 @@ override_root = task
# http://docs.python.org/lib/logging-config-fileformat.html
[loggers]
-keys = root, allura, sqlalchemy, paste, amqp, pylons
+keys = root, allura, sqlalchemy, paste, amqp, pylons, taskdstatus
[handlers]
-keys = console, stats
+keys = console, stats, taskdstatus
[formatters]
keys = generic, stats
@@ -314,6 +314,11 @@ level = INFO
qualname = pylons
handlers =
+[logger_taskdstatus]
+level = INFO
+qualname = taskdstatus
+handlers = taskdstatus
+
# If you create additional handlers, add them as a key to [handlers]
[handler_console]
class = StreamHandler
@@ -327,6 +332,12 @@ args = ('rtstats.log', 'allura', 1)
level = NOTSET
formatter = stats
+[handler_taskdstatus]
+class = handlers.WatchedFileHandler
+args = ('/var/log/allura/taskd_status.log', 'a')
+level = NOTSET
+formatter = generic
+
# If you create additional formatters, add them as a key to [formatters]
[formatter_generic]
format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/80ee1bc8/Allura/setup.py
----------------------------------------------------------------------
diff --git a/Allura/setup.py b/Allura/setup.py
index 3837535..916dc57 100644
--- a/Allura/setup.py
+++ b/Allura/setup.py
@@ -107,6 +107,7 @@ setup(
[paste.paster_command]
taskd = allura.command.taskd:TaskdCommand
+ taskd_cleanup = allura.command.taskd_cleanup:TaskdCleanupCommand
task = allura.command.taskd:TaskCommand
models = allura.command:ShowModelsCommand
reindex = allura.command:ReindexCommand