You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by pb...@apache.org on 2012/03/15 22:40:17 UTC

svn commit: r1301216 [4/4] - in /subversion/branches/inheritable-props: ./ notes/ notes/directory-index/ subversion/bindings/javahl/ subversion/bindings/swig/ruby/test/ subversion/include/ subversion/include/private/ subversion/libsvn_client/ subversio...

Modified: subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py
URL: http://svn.apache.org/viewvc/subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py?rev=1301216&r1=1301215&r2=1301216&view=diff
==============================================================================
--- subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py (original)
+++ subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py Thu Mar 15 21:40:15 2012
@@ -39,23 +39,19 @@ import time
 import logging.handlers
 import Queue
 import optparse
+import functools
+import urlparse
 
-from twisted.internet import reactor, task, threads
-from twisted.internet.utils import getProcessOutput
-from twisted.application import internet
-from twisted.web.client import HTTPClientFactory, HTTPPageDownloader
-from urlparse import urlparse
-from xml.sax import handler, make_parser
-from twisted.internet import protocol
-
+import daemonize
+import svnpubsub.client
 
 # check_output() is only available in Python 2.7. Allow us to run with
 # earlier versions
 try:
     check_output = subprocess.check_output
 except AttributeError:
-    def check_output(args):  # note: we don't use anything beyond args
-        pipe = subprocess.Popen(args, stdout=subprocess.PIPE)
+    def check_output(args, env):  # note: we only use these two args
+        pipe = subprocess.Popen(args, stdout=subprocess.PIPE, env=env)
         output, _ = pipe.communicate()
         if pipe.returncode:
             raise subprocess.CalledProcessError(pipe.returncode, args)
@@ -65,10 +61,10 @@ except AttributeError:
 ### note: this runs synchronously. within the current Twisted environment,
 ### it is called from ._get_match() which is run on a thread so it won't
 ### block the Twisted main loop.
-def svn_info(svnbin, path):
+def svn_info(svnbin, env, path):
     "Run 'svn info' on the target path, returning a dict of info data."
     args = [svnbin, "info", "--non-interactive", "--", path]
-    output = check_output(args).strip()
+    output = check_output(args, env=env).strip()
     info = { }
     for line in output.split('\n'):
         idx = line.index(':')
@@ -78,20 +74,14 @@ def svn_info(svnbin, path):
 
 class WorkingCopy(object):
     def __init__(self, bdec, path, url):
-        self.bdec = bdec
         self.path = path
         self.url = url
-        self.repos = None
-        self.match = None
-        d = threads.deferToThread(self._get_match)
-        d.addCallback(self._set_match)
-
-    def _set_match(self, value):
-        self.match = str(value[0])
-        self.url = value[1]
-        self.repos = value[2]
-        self.uuid = value[3]
-        self.bdec.wc_ready(self)
+
+        try:
+            self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env)
+            bdec.wc_ready(self)
+        except:
+            logging.exception('problem with working copy: %s', path)
 
     def update_applies(self, uuid, path):
         if self.uuid != uuid:
@@ -114,181 +104,44 @@ class WorkingCopy(object):
                 return True
         return False
 
-    def _get_match(self):
+    def _get_match(self, svnbin, env):
         ### quick little hack to auto-checkout missing working copies
         if not os.path.isdir(self.path):
             logging.info("autopopulate %s from %s" % (self.path, self.url))
-            subprocess.check_call([self.bdec.svnbin, 'co', '-q',
+            subprocess.check_call([svnbin, 'co', '-q',
                                    '--non-interactive',
-                                   '--config-dir',
-                                   '/home/svnwc/.subversion',
-                                   '--', self.url, self.path])
+                                   '--', self.url, self.path],
+                                  env=env)
 
         # Fetch the info for matching dirs_changed against this WC
-        info = svn_info(self.bdec.svnbin, self.path)
+        info = svn_info(svnbin, env, self.path)
+        root = info['Repository Root']
         url = info['URL']
-        repos = info['Repository Root']
+        relpath = url[len(root):]  # also has leading '/'
         uuid = info['Repository UUID']
-        relpath = url[len(repos):]  # also has leading '/'
-        return [relpath, url, repos, uuid]
-
-
-class HTTPStream(HTTPClientFactory):
-    protocol = HTTPPageDownloader
-
-    def __init__(self, url):
-        self.url = url
-        HTTPClientFactory.__init__(self, url, method="GET", agent="SvnWcSub/0.1.0")
-
-    def pageStart(self, partial):
-        pass
-
-    def pagePart(self, data):
-        pass
-
-    def pageEnd(self):
-        pass
-
-class Revision:
-    def __init__(self, repos, rev):
-        self.repos = repos
-        self.rev = rev
-        self.dirs_changed = []
-
-class StreamHandler(handler.ContentHandler):
-    def __init__(self, stream, bdec):
-        handler.ContentHandler.__init__(self)
-        self.stream = stream
-        self.bdec =  bdec
-        self.rev = None
-        self.text_value = None
-
-    def startElement(self, name, attrs):
-        #print "start element: %s" % (name)
-        """
-        <commit revision="7">
-                        <dirs_changed><path>/</path></dirs_changed>
-                      </commit>
-        """
-        if name == "commit":
-            self.rev = Revision(attrs['repository'], int(attrs['revision']))
-        elif name == "stillalive":
-            self.bdec.stillalive(self.stream)
-    def characters(self, data):
-        if self.text_value is not None:
-            self.text_value = self.text_value + data
-        else:
-            self.text_value = data
+        return str(relpath), uuid
 
-    def endElement(self, name):
-        #print "end   element: %s" % (name)
-        if name == "commit":
-            self.bdec.commit(self.stream, self.rev)
-            self.rev = None
-        if name == "path" and self.text_value is not None and self.rev is not None:
-            self.rev.dirs_changed.append(self.text_value.strip())
-        self.text_value = None
-
-
-class XMLHTTPStream(HTTPStream):
-    def __init__(self, url, bdec):
-        HTTPStream.__init__(self, url)
-        self.alive = 0
-        self.bdec =  bdec
-        self.parser = make_parser(['xml.sax.expatreader'])
-        self.handler = StreamHandler(self, bdec)
-        self.parser.setContentHandler(self.handler)
-
-    def pageStart(self, parital):
-        self.bdec.pageStart(self)
-
-    def pagePart(self, data):
-        self.parser.feed(data)
-
-    def pageEnd(self):
-        self.bdec.pageEnd(self)
-
-def connectTo(url, bdec):
-    u = urlparse(url)
-    port = u.port
-    if not port:
-        port = 80
-    s = XMLHTTPStream(url, bdec)
-    if bdec.service:
-      conn = internet.TCPClient(u.hostname, u.port, s)
-      conn.setServiceParent(bdec.service)
-    else:
-      conn = reactor.connectTCP(u.hostname, u.port, s)
-    return [s, conn]
 
-
-CHECKBEAT_TIME = 60
 PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/")
 
 class BigDoEverythingClasss(object):
-    def __init__(self, config, service = None):
-        self.urls = [s.strip() for s in config.get_value('streams').split()]
+    def __init__(self, config):
         self.svnbin = config.get_value('svnbin')
         self.env = config.get_env()
+        self.tracking = config.get_track()
         self.worker = BackgroundWorker(self.svnbin, self.env)
-        self.service = service
-        self.failures = 0
-        self.alive = time.time()
-        self.checker = task.LoopingCall(self._checkalive)
-        self.transports = {}
-        self.streams = {}
-        for u in self.urls:
-          self._restartStream(u)
-        self.watch = []
-        for path, url in config.get_track().items():
+        self.watch = [ ]
+
+        self.hostports = [ ]
+        ### switch from URLs in the config to just host:port pairs
+        for url in config.get_value('streams').split():
+            parsed = urlparse.urlparse(url.strip())
+            self.hostports.append((parsed.hostname, parsed.port))
+
+    def start(self):
+        for path, url in self.tracking.items():
             # working copies auto-register with the BDEC when they are ready.
             WorkingCopy(self, path, url)
-        self.checker.start(CHECKBEAT_TIME)
-
-    def pageStart(self, stream):
-        logging.info("Stream %s Connection Established" % (stream.url))
-        self.failures = 0
-
-    def pageEnd(self, stream):
-        logging.info("Stream %s Connection Dead" % (stream.url))
-        self.streamDead(stream.url)
-
-    def _restartStream(self, url):
-        (self.streams[url], self.transports[url]) = connectTo(url, self)
-        self.streams[url].deferred.addBoth(self.streamDead, url)
-        self.streams[url].alive = time.time()
-
-    def _checkalive(self):
-        n = time.time()
-        for k in self.streams.keys():
-          s = self.streams[k]
-          if n - s.alive > CHECKBEAT_TIME:
-            logging.info("Stream %s is dead, reconnecting" % (s.url))
-            #self.transports[s.url].disconnect()
-            self.streamDead(self, s.url)
-
-#        d=filter(lambda x:x not in self.streams.keys(), self.urls)
-#        for u in d:
-#          self._restartStream(u)
-
-    def stillalive(self, stream):
-        stream.alive = time.time()
-
-    def streamDead(self, url, result=None):
-        s = self.streams.get(url)
-        if not s:
-          logging.info("Stream %s is messed up" % (url))
-          return
-        BACKOFF_SECS = 5
-        BACKOFF_MAX = 60
-        #self.checker.stop()
-
-        self.streams[url] = None
-        self.transports[url] = None
-        self.failures += 1
-        backoff = min(self.failures * BACKOFF_SECS, BACKOFF_MAX)
-        logging.info("Stream disconnected, trying again in %d seconds.... %s" % (backoff, s.url))
-        reactor.callLater(backoff, self._restartStream, url)
 
     def wc_ready(self, wc):
         # called when a working copy object has its basic info/url,
@@ -302,8 +155,10 @@ class BigDoEverythingClasss(object):
             return "/" + path
         return os.path.abspath(path)
 
-    def commit(self, stream, rev):
-        logging.info("COMMIT r%d (%d paths) via %s" % (rev.rev, len(rev.dirs_changed), stream.url))
+    def commit(self, host, port, rev):
+        logging.info("COMMIT r%d (%d paths) from %s:%d"
+                     % (rev.rev, len(rev.dirs_changed), host, port))
+
         paths = map(self._normalize_path, rev.dirs_changed)
         if len(paths):
             pre = os.path.commonprefix(paths)
@@ -317,7 +172,7 @@ class BigDoEverythingClasss(object):
                         break
 
             #print "Common Prefix: %s" % (pre)
-            wcs = [wc for wc in self.watch if wc.update_applies(rev.repos, pre)]
+            wcs = [wc for wc in self.watch if wc.update_applies(rev.uuid, pre)]
             logging.info("Updating %d WC for r%d" % (len(wcs), rev.rev))
             for wc in wcs:
                 self.worker.add_work(OP_UPDATE, wc)
@@ -384,7 +239,6 @@ class BackgroundWorker(threading.Thread)
         ### still specific to the ASF setup.
         args = [self.svnbin, 'update',
                 '--quiet',
-                '--config-dir', '/home/svnwc/.subversion',
                 '--non-interactive',
                 '--trust-server-cert',
                 '--ignore-externals',
@@ -392,7 +246,7 @@ class BackgroundWorker(threading.Thread)
         subprocess.check_call(args, env=self.env)
 
         ### check the loglevel before running 'svn info'?
-        info = svn_info(self.svnbin, wc.path)
+        info = svn_info(self.svnbin, self.env, wc.path)
         logging.info("updated: %s now at r%s", wc.path, info['Revision'])
 
     def _cleanup(self, wc):
@@ -401,7 +255,6 @@ class BackgroundWorker(threading.Thread)
         ### we need to move some of these args into the config. these are
         ### still specific to the ASF setup.
         args = [self.svnbin, 'cleanup',
-                '--config-dir', '/home/svnwc/.subversion',
                 '--non-interactive',
                 '--trust-server-cert',
                 wc.path]
@@ -452,6 +305,45 @@ class ReloadableConfig(ConfigParser.Safe
         return str(option)
 
 
+class Daemon(daemonize.Daemon):
+    def __init__(self, logfile, pidfile, umask, bdec):
+        daemonize.Daemon.__init__(self, logfile, pidfile)
+
+        self.umask = umask
+        self.bdec = bdec
+
+    def setup(self):
+        # There is no setup which the parent needs to wait for.
+        pass
+
+    def run(self):
+        logging.info('svnwcsub started, pid=%d', os.getpid())
+
+        # Set the umask in the daemon process. Defaults to 000 for
+        # daemonized processes. Foreground processes simply inherit
+        # the value from the parent process.
+        if self.umask is not None:
+            umask = int(self.umask, 8)
+            os.umask(umask)
+            logging.info('umask set to %03o', umask)
+
+        # Start the BDEC (on the main thread), then start the client
+        self.bdec.start()
+
+        mc = svnpubsub.client.MultiClient(self.bdec.hostports,
+                                          self.bdec.commit,
+                                          self._event)
+        mc.run_forever()
+
+    def _event(self, host, port, event_name):
+        if event_name == 'error':
+            logging.exception('from %s:%s', host, port)
+        elif event_name == 'ping':
+            logging.debug('ping from %s:%s', host, port)
+        else:
+            logging.info('"%s" from %s:%s', event_name, host, port)
+
+
 def prepare_logging(logfile):
     "Log to the specified file, or to stdout if None."
 
@@ -480,20 +372,13 @@ def handle_options(options):
     # Set up the logging, then process the rest of the options.
     prepare_logging(options.logfile)
 
-    if options.pidfile:
+    # In daemon mode, we let the daemonize module handle the pidfile.
+    # Otherwise, we should write this (foreground) PID into the file.
+    if options.pidfile and not options.daemon:
         pid = os.getpid()
         open(options.pidfile, 'w').write('%s\n' % pid)
         logging.info('pid %d written to %s', pid, options.pidfile)
 
-    if options.uid:
-        try:
-            uid = int(options.uid)
-        except ValueError:
-            import pwd
-            uid = pwd.getpwnam(options.uid)[2]
-        logging.info('setting uid %d', uid)
-        os.setuid(uid)
-
     if options.gid:
         try:
             gid = int(options.gid)
@@ -503,10 +388,14 @@ def handle_options(options):
         logging.info('setting gid %d', gid)
         os.setgid(gid)
 
-    if options.umask:
-        umask = int(options.umask, 8)
-        os.umask(umask)
-        logging.info('umask set to %03o', umask)
+    if options.uid:
+        try:
+            uid = int(options.uid)
+        except ValueError:
+            import pwd
+            uid = pwd.getpwnam(options.uid)[2]
+        logging.info('setting uid %d', uid)
+        os.setuid(uid)
 
 
 def main(args):
@@ -525,6 +414,8 @@ def main(args):
                       help='switch to this GID before running')
     parser.add_option('--umask',
                       help='set this (octal) umask before running')
+    parser.add_option('--daemon', action='store_true',
+                      help='run as a background daemon')
 
     options, extra = parser.parse_args(args)
 
@@ -532,12 +423,26 @@ def main(args):
         parser.error('CONFIG_FILE is required')
     config_file = extra[0]
 
+    if options.daemon and not options.logfile:
+        parser.error('LOGFILE is required when running as a daemon')
+    if options.daemon and not options.pidfile:
+        parser.error('PIDFILE is required when running as a daemon')
+
     # Process any provided options.
     handle_options(options)
 
     c = ReloadableConfig(config_file)
-    big = BigDoEverythingClasss(c)
-    reactor.run()
+    bdec = BigDoEverythingClasss(c)
+
+    # We manage the logfile ourselves (along with possible rotation). The
+    # daemon process can just drop stdout/stderr into /dev/null.
+    d = Daemon('/dev/null', options.pidfile, options.umask, bdec)
+    if options.daemon:
+        # Daemonize the process and call sys.exit() with appropriate code
+        d.daemonize_exit()
+    else:
+        # Just run in the foreground (the default)
+        d.foreground()
 
 
 if __name__ == "__main__":