You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gump.apache.org by le...@apache.org on 2005/04/15 14:27:44 UTC

svn commit: r161445 - in gump/branches/Gump3/pygump: main.py python/gump/test/testUtilExecutor.py python/gump/util/executor.py

Author: leosimons
Date: Fri Apr 15 05:27:43 2005
New Revision: 161445

URL: http://svn.apache.org/viewcvs?view=rev&rev=161445
Log:
Adding in posix process group management for better cleanup.

* pygump/python/gump/util/executor.py: new wrapper library around the 'subprocess' module as per GUMP-95 and GUMP-97.

* pygump/python/gump/test/testUtilExecutor.py: very basic testcases for the executor utility as per GUMP-96. It is very hard to unit test that the cleanup code functions as intended, esp. on different platforms. Instead, testing that kind of functionality should be done through integration testing.

* pygump/main.py: hook up the process cleanup into the gump run. In addition, fix a bug where the finally block would not be executed on exceptions because of a system.exit call.

Added:
    gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py
      - copied, changed from r161424, gump/branches/Gump3/pygump/python/gump/test/testExample.py
    gump/branches/Gump3/pygump/python/gump/util/executor.py
      - copied, changed from r161424, gump/branches/Gump3/pygump/python/gump/util/mysql.py
Modified:
    gump/branches/Gump3/pygump/main.py

Modified: gump/branches/Gump3/pygump/main.py
URL: http://svn.apache.org/viewcvs/gump/branches/Gump3/pygump/main.py?view=diff&r1=161444&r2=161445
==============================================================================
--- gump/branches/Gump3/pygump/main.py (original)
+++ gump/branches/Gump3/pygump/main.py Fri Apr 15 05:27:43 2005
@@ -403,6 +403,7 @@
     
     # create logger
     log = _Logger(options.logdir)
+    exitcode = 0
     try:
         if options.debug:
             log.level = DEBUG
@@ -452,11 +453,27 @@
                 log.exception("Unable to send e-mail to administrator")
                 pass
             
-            sys.exit(1)
+            exitcode = 1
     finally:
+        # rigorously clean up our child processes
+        try:
+            timeout = 300
+            try:
+                log.debug("Cleaning up child processes. This may take up to a little over %s seconds." % (timeout+100))
+            except:
+                pass
+            from gump.util.executor import clean_up_processes
+            clean_up_processes(timeout)
+        except:
+            try:
+                log.exception("Error cleaning up child processes!")
+            except:
+                pass
+    
+        # close the logs
         try:
             log.close()
         except:
             pass
     
-    sys.exit(0)
+    sys.exit(exitcode)

Copied: gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py (from r161424, gump/branches/Gump3/pygump/python/gump/test/testExample.py)
URL: http://svn.apache.org/viewcvs/gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py?view=diff&rev=161445&p1=gump/branches/Gump3/pygump/python/gump/test/testExample.py&r1=161424&p2=gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py&r2=161445
==============================================================================
--- gump/branches/Gump3/pygump/python/gump/test/testExample.py (original)
+++ gump/branches/Gump3/pygump/python/gump/test/testUtilExecutor.py Fri Apr 15 05:27:43 2005
@@ -17,39 +17,44 @@
 __copyright__ = "Copyright (c) 2005 The Apache Software Foundation"
 __license__   = "http://www.apache.org/licenses/LICENSE-2.0"
 
-"""
-    This is an example of a testcase. Simply copy and rename this file
-    (the filename has to start with "test"), then rename the class below,
-    update the reference to the classname in the test_suite() method,
-    and write your test methods.
-    
-    See the documentation for the unittest package for more help with
-    tests. You can run all tests from the commandline using "./gump test".
-"""
-
 import unittest
 from unittest import TestCase
 
-class ExampleTestCase(TestCase):
-    def setUp(self):
-        # initialize tests here
-        pass
-    
-    def tearDown(self):
-        # clean up after tests here
-        pass
+import os
+import sys
+
+from gump.util.executor import Popen
+from subprocess import PIPE
+from gump.util.executor import clean_up_processes
+
+class ZZZExecutorUtilTestCase(TestCase):
+    def test_zzz_run_simple_command_then_clean_up_processes(self):
+        if sys.platform == "win32":
+            return
         
-    def test_something(self):
-        # you can do anything inside a test
-        # use the assertXXX methods on TestCase
-        # to check conditions
-        self.assert_( True )
-        self.assertEquals( type({}), type({}) )
+        result = Popen(["pwd"], stdout=PIPE).communicate()[0]
+        self.assertNotEqual("", result)
 
+        # This test can only be run once, since after the call to
+        # "clean_up_processes" all future invocations of subprocesses
+        # will fail. Therefore this class and its methods are awkwardly
+        # named (the ZZZ prefix) so we "ensure" they run last. It's
+        # ugly, I know.
+        processes = []
+        for i in range(0,10):
+            processes.append(Popen(["cat"], stdin=PIPE, stdout=PIPE))
+            
+        clean_up_processes(5)
+        for p in processes:
+            try:
+                pid, sts = os.waitpid(p.pid, os.WNOHANG)
+                self.assert_(os.WIFSIGNALED(exitcode), "Process should've been signalled...")
+            except:
+                pass
+        
 # this is used by testrunner.py to determine what tests to run
 def test_suite():
-    # be sure to change the referenceto the TestCase class you create above!
-    return unittest.makeSuite(ExampleTestCase,'test')
+    return unittest.makeSuite(ZZZExecutorUtilTestCase,'test')
 
 # this allows us to run this test by itself from the commandline
 if __name__ == '__main__':

Copied: gump/branches/Gump3/pygump/python/gump/util/executor.py (from r161424, gump/branches/Gump3/pygump/python/gump/util/mysql.py)
URL: http://svn.apache.org/viewcvs/gump/branches/Gump3/pygump/python/gump/util/executor.py?view=diff&rev=161445&p1=gump/branches/Gump3/pygump/python/gump/util/mysql.py&r1=161424&p2=gump/branches/Gump3/pygump/python/gump/util/executor.py&r2=161445
==============================================================================
--- gump/branches/Gump3/pygump/python/gump/util/mysql.py (original)
+++ gump/branches/Gump3/pygump/python/gump/util/executor.py Fri Apr 15 05:27:43 2005
@@ -14,106 +14,156 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""This module provides a thin wrapper around the MySQLdb library."""
+"""This module provides a thin wrapper around the subprocess library.
+
+On posix platforms, it does process group management to allow us to clean up
+misbehaved processes."""
 
 __copyright__ = "Copyright (c) 2004-2005 The Apache Software Foundation"
 __license__   = "http://www.apache.org/licenses/LICENSE-2.0"
 
-import types
+import sys
 
-class Database:
-    """
-    Very simple database abstraction layer, basically adding some utilities
-    and logging around MySQLdb.
-    
-    See http://www.python.org/peps/pep-0249.html for more on python and databases.
-    This class adheres to the PEP 249 Connection interface.
-    """
-    def __init__(self, log, host=None, user=None, password=None, db=None):
-        self.log = log
-        self.host = host
-        self.user = user
-        self.password = password
-        self.db = db
-        self._conn = None
-        
-    def __del__(self):
-        self.close()
-    
-    def commit(self):
-        """
-        See PEP 249.
-        """
-        pass
+if sys.platform == "win32":
+    from subprocess import PIPE
+    from subprocess import STDOUT
+    from subprocess import Popen
     
-    def rollback(self):
-        """
-        See PEP 249.
-        """
+    def clean_up_processes(timeout):
+        """This function can be called prior to program exit to attempt to
+        kill all our running children that were created using this module.
+        It does not work on windows!"""
         pass
+else:
+    # POSIX
+    import os
+    import time
+    import subprocess
+    import signal
+    import errno
+    from subprocess import PIPE
+    from subprocess import STDOUT
+
+    def _get_new_process_group():
+        """Get us an unused (or so we hope) process group."""
+        pid = os.fork()
+        gid = pid # that *should* be correct. However, let's actually
+                  # create something in that group.
+        if pid == 0:
+            # Child
+            
+            # ensure a process group is created
+            os.setpgrp()
+            
+            # sleep for ten days to keep the process group around
+            # for "a while"
+            import time
+            time.sleep(10*24*60*60)
+            os._exit(0)
+        else:
+            # Parent
     
-    def cursor(self):
-        """
-        See PEP 249.
-        """
-        return self._connection().cursor()
-
-    def close(self):
-        """
-        See PEP 249.
-        """
-        if self._conn:
-            self._conn.close()
-            self._conn=None
-
-    def execute(self, statement):
-        """
-        Simple helper method to execute SQL statements that isolates its users
-        from cursor handling.
+            # wait for child a little so it can set its group
+            import time
+            time.sleep(1)
+            
+            # get the gid for the child
+            gid = os.getpgid(pid)
         
-        Pass in any SQL command. Retrieve back the results of having a cursor
-        execute that command. The result is a tuple containing the number of
-        rows affected and the result set as a tuple of tuples, if there was a
-        result set, or None otherwise.
-        """
-        self.log.debug("Executing SQL statement: %s" % statement)
-        cursor = None
-        try:
-            cursor = self._connection().cursor()
-            cursor.execute(statement)
+        return gid
+
+    # This is the group we chuck our children in. We don't just want to
+    # use our own group since we don't want to kill ourselves prematurely!
+    _our_process_group = _get_new_process_group()
+
+    class Popen(subprocess.Popen):
+        """This is a thin wrapper around subprocess.Popen which handles
+        process group management. The gump.util.executor.clean_up_processes()
+        method can be used to clean up the cruft left around by these Popen'ed
+        processes."""
+        def __init__(self, args, bufsize=0, executable=None,
+                     stdin=None, stdout=None, stderr=None,
+                     preexec_fn=None, close_fds=False, shell=False,
+                     cwd=None, env=None, universal_newlines=False,
+                     startupinfo=None, creationflags=0):
+            """Create a new Popen instance that delegates to the
+            subprocess Popen."""
+            if not preexec_fn:
+                # setpgid to the gump process group inside the child
+                pre_exec_function = lambda: os.setpgid(0, _our_process_group)
+            else:
+                # The below has a "stupid lambda trick" that makes the lambda
+                # evaluate a tuple of functions. This sticks our own function
+                # call in there while still supporting the originally provided
+                # function
+                pre_exec_function = lambda: (preexec_fn(),os.setpgid(0, _our_process_group))
             
-            affected = cursor.rowcount
-            self.log.debug("   ...%s rows affected." % affected)
-            if statement.lower().startswith("select"):
-                if affected > 0:
-                    result = cursor.fetchall()
-                    return (affected, result)
-                
-            return (affected, None)
-        finally:
-            if cursor: cursor.close()
+            subprocess.Popen.__init__(self, args, bufsize=bufsize, executable=executable,
+                     stdin=stdin, stdout=stdout, stderr=stderr,
+                     # note our custom function in there...
+                     preexec_fn=pre_exec_function, close_fds=close_fds, shell=shell,
+                     cwd=cwd, env=env, universal_newlines=universal_newlines,
+                     startupinfo=startupinfo, creationflags=creationflags)
+
+    def clean_up_processes(timeout):
+        """This function can be called prior to program exit to attempt to
+        kill all our running children that were created using this module."""
     
-    def _connection(self):
-        """
-        Get a connection to the actual database, setting one up if neccessary.
-        """
-        if not self._conn:
-            import MySQLdb
-            import MySQLdb.cursors
-            if self.password:
-                self._conn = MySQLdb.Connect(
-                    host=self.host, 
-                    user=self.user,
-                    passwd=self.password, 
-                    db=self.db,
-                    compress=1,
-                    cursorclass=MySQLdb.cursors.DictCursor)
-            else:
-                self._conn = MySQLdb.Connect(
-                    host=self.host, 
-                    user=self.user,
-                    db=self.db,
-                    compress=1,
-                    cursorclass=MySQLdb.cursors.DictCursor)
+        pgrp_list = [_our_process_group]
+        # send SIGTERM to everything, and update pgrp_list to just those
+        # process groups which have processes in them.
+        _kill_groups(pgrp_list, signal.SIGTERM)
+       
+        # pass a copy of the process groups. we want to remember every
+        # group that we SIGTERM'd so that we can SIGKILL them later. it
+        # is possible that a process in the pgrp was reparented to the
+        # init process. those will be invisible to wait(), so we don't
+        # want to mistakenly think we've killed all processes in the
+        # group. thus, we preserve the list and SIGKILL it later.
+        _reap_children(pgrp_list[:], timeout)
+      
+        # SIGKILL everything, editing pgrp_list again.
+        _kill_groups(pgrp_list, signal.SIGKILL)
+       
+        # reap everything left, but don't really bother waiting on them.
+        # if we exit, then init will reap them.
+        _reap_children(pgrp_list, 60)
+
+    def _kill_groups(pgrp_list, sig):
+        # NOTE: this function edits pgrp_list
+    
+        for pgrp in pgrp_list[:]:
+            try:
+                os.killpg(pgrp, sig)
+            except OSError, e:
+                if e.errno == errno.ESRCH:
+                    pgrp_list.remove(pgrp)
+
+    def _reap_children(pgrp_list, timeout=300):
+        # NOTE: this function edits pgrp_list
+      
+        # keep reaping until the timeout expires, or we finish
+        end_time = time.time() + timeout
+      
+        # keep reaping until all pgrps are done, or we run out of time
+        while pgrp_list and time.time() < end_time:
+            # pause for a bit while processes work on exiting. this pause is
+            # at the top, so we can also pause right after the killpg()
+            time.sleep(1)
         
-        return self._conn
+            # go through all pgrps to reap them
+            for pgrp in pgrp_list[:]:
+                # loop quickly to clean everything in this pgrp
+                while 1:
+                    try:
+                        pid, status = os.waitpid(-pgrp, os.WNOHANG)
+                    except OSError, e:
+                        if e.errno == errno.ECHILD:
+                            # no more children in this pgrp.
+                            pgrp_list.remove(pgrp)
+                            break
+                        raise
+                    if pid == 0:
+                        # some stuff has not exited yet, and WNOHANG avoided
+                        # blocking. go ahead and move to the next pgrp.
+                        break