You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gump.apache.org by le...@apache.org on 2005/04/15 14:27:44 UTC
svn commit: r161445 - in gump/branches/Gump3/pygump: main.py
python/gump/test/testUtilExecutor.py python/gump/util/executor.py
Author: leosimons
Date: Fri Apr 15 05:27:43 2005
New Revision: 161445
URL: http://svn.apache.org/viewcvs?view=rev&rev=161445
Log:
Adding in posix process group management for better cleanup.
* pygump/python/gump/util/executor.py: new wrapper library around the 'subprocess' module as per GUMP-95 and GUMP-97.
* pygump/python/gump/test/testUtilExecutor.py: very basic testcases for the executor utility as per GUMP-96. It is very hard to unit test that the cleanup code functions as intended, esp. on different platforms. Instead, testing that kind of functionality should be done through integration testing.
* pygump/main.py: hook up the process cleanup into the gump run. In addition, fix a bug where the finally block would not be executed on exceptions because of a system.exit call.
Added:
gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py
- copied, changed from r161424, gump/branches/Gump3/pygump/python/gump/test/testExample.py
gump/branches/Gump3/pygump/python/gump/util/executor.py
- copied, changed from r161424, gump/branches/Gump3/pygump/python/gump/util/mysql.py
Modified:
gump/branches/Gump3/pygump/main.py
Modified: gump/branches/Gump3/pygump/main.py
URL: http://svn.apache.org/viewcvs/gump/branches/Gump3/pygump/main.py?view=diff&r1=161444&r2=161445
==============================================================================
--- gump/branches/Gump3/pygump/main.py (original)
+++ gump/branches/Gump3/pygump/main.py Fri Apr 15 05:27:43 2005
@@ -403,6 +403,7 @@
# create logger
log = _Logger(options.logdir)
+ exitcode = 0
try:
if options.debug:
log.level = DEBUG
@@ -452,11 +453,27 @@
log.exception("Unable to send e-mail to administrator")
pass
- sys.exit(1)
+ exitcode = 1
finally:
+ # rigorously clean up our child processes
+ try:
+ timeout = 300
+ try:
+ log.debug("Cleaning up child processes. This may take up to a little over %s seconds." % (timeout+100))
+ except:
+ pass
+ from gump.util.executor import clean_up_processes
+ clean_up_processes(timeout)
+ except:
+ try:
+ log.exception("Error cleaning up child processes!")
+ except:
+ pass
+
+ # close the logs
try:
log.close()
except:
pass
- sys.exit(0)
+ sys.exit(exitcode)
Copied: gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py (from r161424, gump/branches/Gump3/pygump/python/gump/test/testExample.py)
URL: http://svn.apache.org/viewcvs/gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py?view=diff&rev=161445&p1=gump/branches/Gump3/pygump/python/gump/test/testExample.py&r1=161424&p2=gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py&r2=161445
==============================================================================
--- gump/branches/Gump3/pygump/python/gump/test/testExample.py (original)
+++ gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py Fri Apr 15 05:27:43 2005
@@ -17,39 +17,44 @@
__copyright__ = "Copyright (c) 2005 The Apache Software Foundation"
__license__ = "http://www.apache.org/licenses/LICENSE-2.0"
-"""
- This is an example of a testcase. Simply copy and rename this file
- (the filename has to start with "test"), then rename the class below,
- update the reference to the classname in the test_suite() method,
- and write your test methods.
-
- See the documentation for the unittest package for more help with
- tests. You can run all tests from the commandline using "./gump test".
-"""
-
import unittest
from unittest import TestCase
-class ExampleTestCase(TestCase):
- def setUp(self):
- # initialize tests here
- pass
-
- def tearDown(self):
- # clean up after tests here
- pass
+import os
+import sys
+
+from gump.util.executor import Popen
+from subprocess import PIPE
+from gump.util.executor import clean_up_processes
+
+class ZZZExecutorUtilTestCase(TestCase):
+ def test_zzz_run_simple_command_then_clean_up_processes(self):
+ if sys.platform == "win32":
+ return
- def test_something(self):
- # you can do anything inside a test
- # use the assertXXX methods on TestCase
- # to check conditions
- self.assert_( True )
- self.assertEquals( type({}), type({}) )
+ result = Popen(["pwd"], stdout=PIPE).communicate()[0]
+ self.assertNotEqual("", result)
+ # This test can only be run once, since after the call to
+ # "clean_up_processes" all future invocations of subprocesses
+ # will fail. Therefore this class and its methods are awkwardly
+ # named (the ZZZ prefix) so we "ensure" they run last. It's
+ # ugly, I know.
+ processes = []
+ for i in range(0,10):
+ processes.append(Popen(["cat"], stdin=PIPE, stdout=PIPE))
+
+ clean_up_processes(5)
+ for p in processes:
+ try:
+ pid, sts = os.waitpid(p.pid, os.WNOHANG)
+ self.assert_(os.WIFSIGNALED(exitcode), "Process should've been signalled...")
+ except:
+ pass
+
# this is used by testrunner.py to determine what tests to run
def test_suite():
- # be sure to change the referenceto the TestCase class you create above!
- return unittest.makeSuite(ExampleTestCase,'test')
+ return unittest.makeSuite(ZZZExecutorUtilTestCase,'test')
# this allows us to run this test by itself from the commandline
if __name__ == '__main__':
Copied: gump/branches/Gump3/pygump/python/gump/util/executor.py (from r161424, gump/branches/Gump3/pygump/python/gump/util/mysql.py)
URL: http://svn.apache.org/viewcvs/gump/branches/Gump3/pygump/python/gump/util/executor.py?view=diff&rev=161445&p1=gump/branches/Gump3/pygump/python/gump/util/mysql.py&r1=161424&p2=gump/branches/Gump3/pygump/python/gump/util/executor.py&r2=161445
==============================================================================
--- gump/branches/Gump3/pygump/python/gump/util/mysql.py (original)
+++ gump/branches/Gump3/pygump/python/gump/util/executor.py Fri Apr 15 05:27:43 2005
@@ -14,106 +14,156 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""This module provides a thin wrapper around the MySQLdb library."""
+"""This module provides a thin wrapper around the subprocess library.
+
+On posix platforms, it does process group management to allow us to clean up
+misbehaved processes."""
__copyright__ = "Copyright (c) 2004-2005 The Apache Software Foundation"
__license__ = "http://www.apache.org/licenses/LICENSE-2.0"
-import types
+import sys
-class Database:
- """
- Very simple database abstraction layer, basically adding some utilities
- and logging around MySQLdb.
-
- See http://www.python.org/peps/pep-0249.html for more on python and databases.
- This class adheres to the PEP 249 Connection interface.
- """
- def __init__(self, log, host=None, user=None, password=None, db=None):
- self.log = log
- self.host = host
- self.user = user
- self.password = password
- self.db = db
- self._conn = None
-
- def __del__(self):
- self.close()
-
- def commit(self):
- """
- See PEP 249.
- """
- pass
+if sys.platform == "win32":
+ from subprocess import PIPE
+ from subprocess import STDOUT
+ from subprocess import Popen
- def rollback(self):
- """
- See PEP 249.
- """
+ def clean_up_processes(timeout):
+ """This function can be called prior to program exit to attempt to
+ kill all our running children that were created using this module.
+ It does not work on windows!"""
pass
+else:
+ # POSIX
+ import os
+ import time
+ import subprocess
+ import signal
+ import errno
+ from subprocess import PIPE
+ from subprocess import STDOUT
+
+ def _get_new_process_group():
+ """Get us an unused (or so we hope) process group."""
+ pid = os.fork()
+ gid = pid # that *should* be correct. However, let's actually
+ # create something in that group.
+ if pid == 0:
+ # Child
+
+ # ensure a process group is created
+ os.setpgrp()
+
+ # sleep for ten days to keep the process group around
+ # for "a while"
+ import time
+ time.sleep(10*24*60*60)
+ os._exit(0)
+ else:
+ # Parent
- def cursor(self):
- """
- See PEP 249.
- """
- return self._connection().cursor()
-
- def close(self):
- """
- See PEP 249.
- """
- if self._conn:
- self._conn.close()
- self._conn=None
-
- def execute(self, statement):
- """
- Simple helper method to execute SQL statements that isolates its users
- from cursor handling.
+ # wait for child a little so it can set its group
+ import time
+ time.sleep(1)
+
+ # get the gid for the child
+ gid = os.getpgid(pid)
- Pass in any SQL command. Retrieve back the results of having a cursor
- execute that command. The result is a tuple containing the number of
- rows affected and the result set as a tuple of tuples, if there was a
- result set, or None otherwise.
- """
- self.log.debug("Executing SQL statement: %s" % statement)
- cursor = None
- try:
- cursor = self._connection().cursor()
- cursor.execute(statement)
+ return gid
+
+ # This is the group we chuck our children in. We don't just want to
+ # use our own group since we don't want to kill ourselves prematurely!
+ _our_process_group = _get_new_process_group()
+
+ class Popen(subprocess.Popen):
+ """This is a thin wrapper around subprocess.Popen which handles
+ process group management. The gump.util.executor.clean_up_processes()
+ method can be used to clean up the cruft left around by these Popen'ed
+ processes."""
+ def __init__(self, args, bufsize=0, executable=None,
+ stdin=None, stdout=None, stderr=None,
+ preexec_fn=None, close_fds=False, shell=False,
+ cwd=None, env=None, universal_newlines=False,
+ startupinfo=None, creationflags=0):
+ """Create a new Popen instance that delegates to the
+ subprocess Popen."""
+ if not preexec_fn:
+ # setpgid to the gump process group inside the child
+ pre_exec_function = lambda: os.setpgid(0, _our_process_group)
+ else:
+ # The below has a "stupid lambda trick" that makes the lambda
+ # evaluate a tuple of functions. This sticks our own function
+ # call in there while still supporting the originally provided
+ # function
+ pre_exec_function = lambda: (preexec_fn(),os.setpgid(0, _our_process_group))
- affected = cursor.rowcount
- self.log.debug(" ...%s rows affected." % affected)
- if statement.lower().startswith("select"):
- if affected > 0:
- result = cursor.fetchall()
- return (affected, result)
-
- return (affected, None)
- finally:
- if cursor: cursor.close()
+ subprocess.Popen.__init__(self, args, bufsize=bufsize, executable=executable,
+ stdin=stdin, stdout=stdout, stderr=stderr,
+ # note our custom function in there...
+ preexec_fn=pre_exec_function, close_fds=close_fds, shell=shell,
+ cwd=cwd, env=env, universal_newlines=universal_newlines,
+ startupinfo=startupinfo, creationflags=creationflags)
+
+ def clean_up_processes(timeout):
+ """This function can be called prior to program exit to attempt to
+ kill all our running children that were created using this module."""
- def _connection(self):
- """
- Get a connection to the actual database, setting one up if neccessary.
- """
- if not self._conn:
- import MySQLdb
- import MySQLdb.cursors
- if self.password:
- self._conn = MySQLdb.Connect(
- host=self.host,
- user=self.user,
- passwd=self.password,
- db=self.db,
- compress=1,
- cursorclass=MySQLdb.cursors.DictCursor)
- else:
- self._conn = MySQLdb.Connect(
- host=self.host,
- user=self.user,
- db=self.db,
- compress=1,
- cursorclass=MySQLdb.cursors.DictCursor)
+ pgrp_list = [_our_process_group]
+ # send SIGTERM to everything, and update pgrp_list to just those
+ # process groups which have processes in them.
+ _kill_groups(pgrp_list, signal.SIGTERM)
+
+ # pass a copy of the process groups. we want to remember every
+ # group that we SIGTERM'd so that we can SIGKILL them later. it
+ # is possible that a process in the pgrp was reparented to the
+ # init process. those will be invisible to wait(), so we don't
+ # want to mistakenly think we've killed all processes in the
+ # group. thus, we preserve the list and SIGKILL it later.
+ _reap_children(pgrp_list[:], timeout)
+
+ # SIGKILL everything, editing pgrp_list again.
+ _kill_groups(pgrp_list, signal.SIGKILL)
+
+ # reap everything left, but don't really bother waiting on them.
+ # if we exit, then init will reap them.
+ _reap_children(pgrp_list, 60)
+
+ def _kill_groups(pgrp_list, sig):
+ # NOTE: this function edits pgrp_list
+
+ for pgrp in pgrp_list[:]:
+ try:
+ os.killpg(pgrp, sig)
+ except OSError, e:
+ if e.errno == errno.ESRCH:
+ pgrp_list.remove(pgrp)
+
+ def _reap_children(pgrp_list, timeout=300):
+ # NOTE: this function edits pgrp_list
+
+ # keep reaping until the timeout expires, or we finish
+ end_time = time.time() + timeout
+
+ # keep reaping until all pgrps are done, or we run out of time
+ while pgrp_list and time.time() < end_time:
+ # pause for a bit while processes work on exiting. this pause is
+ # at the top, so we can also pause right after the killpg()
+ time.sleep(1)
- return self._conn
+ # go through all pgrps to reap them
+ for pgrp in pgrp_list[:]:
+ # loop quickly to clean everything in this pgrp
+ while 1:
+ try:
+ pid, status = os.waitpid(-pgrp, os.WNOHANG)
+ except OSError, e:
+ if e.errno == errno.ECHILD:
+ # no more children in this pgrp.
+ pgrp_list.remove(pgrp)
+ break
+ raise
+ if pid == 0:
+ # some stuff has not exited yet, and WNOHANG avoided
+ # blocking. go ahead and move to the next pgrp.
+ break