You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by pb...@apache.org on 2012/03/15 22:40:17 UTC
svn commit: r1301216 [4/4] - in /subversion/branches/inheritable-props: ./
notes/ notes/directory-index/ subversion/bindings/javahl/
subversion/bindings/swig/ruby/test/ subversion/include/
subversion/include/private/ subversion/libsvn_client/ subversio...
Modified: subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py
URL: http://svn.apache.org/viewvc/subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py?rev=1301216&r1=1301215&r2=1301216&view=diff
==============================================================================
--- subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py (original)
+++ subversion/branches/inheritable-props/tools/server-side/svnpubsub/svnwcsub.py Thu Mar 15 21:40:15 2012
@@ -39,23 +39,19 @@ import time
import logging.handlers
import Queue
import optparse
+import functools
+import urlparse
-from twisted.internet import reactor, task, threads
-from twisted.internet.utils import getProcessOutput
-from twisted.application import internet
-from twisted.web.client import HTTPClientFactory, HTTPPageDownloader
-from urlparse import urlparse
-from xml.sax import handler, make_parser
-from twisted.internet import protocol
-
+import daemonize
+import svnpubsub.client
# check_output() is only available in Python 2.7. Allow us to run with
# earlier versions
try:
check_output = subprocess.check_output
except AttributeError:
- def check_output(args): # note: we don't use anything beyond args
- pipe = subprocess.Popen(args, stdout=subprocess.PIPE)
+ def check_output(args, env): # note: we only use these two args
+ pipe = subprocess.Popen(args, stdout=subprocess.PIPE, env=env)
output, _ = pipe.communicate()
if pipe.returncode:
raise subprocess.CalledProcessError(pipe.returncode, args)
@@ -65,10 +61,10 @@ except AttributeError:
### note: this runs synchronously. within the current Twisted environment,
### it is called from ._get_match() which is run on a thread so it won't
### block the Twisted main loop.
-def svn_info(svnbin, path):
+def svn_info(svnbin, env, path):
"Run 'svn info' on the target path, returning a dict of info data."
args = [svnbin, "info", "--non-interactive", "--", path]
- output = check_output(args).strip()
+ output = check_output(args, env=env).strip()
info = { }
for line in output.split('\n'):
idx = line.index(':')
@@ -78,20 +74,14 @@ def svn_info(svnbin, path):
class WorkingCopy(object):
def __init__(self, bdec, path, url):
- self.bdec = bdec
self.path = path
self.url = url
- self.repos = None
- self.match = None
- d = threads.deferToThread(self._get_match)
- d.addCallback(self._set_match)
-
- def _set_match(self, value):
- self.match = str(value[0])
- self.url = value[1]
- self.repos = value[2]
- self.uuid = value[3]
- self.bdec.wc_ready(self)
+
+ try:
+ self.match, self.uuid = self._get_match(bdec.svnbin, bdec.env)
+ bdec.wc_ready(self)
+ except:
+ logging.exception('problem with working copy: %s', path)
def update_applies(self, uuid, path):
if self.uuid != uuid:
@@ -114,181 +104,44 @@ class WorkingCopy(object):
return True
return False
- def _get_match(self):
+ def _get_match(self, svnbin, env):
### quick little hack to auto-checkout missing working copies
if not os.path.isdir(self.path):
logging.info("autopopulate %s from %s" % (self.path, self.url))
- subprocess.check_call([self.bdec.svnbin, 'co', '-q',
+ subprocess.check_call([svnbin, 'co', '-q',
'--non-interactive',
- '--config-dir',
- '/home/svnwc/.subversion',
- '--', self.url, self.path])
+ '--', self.url, self.path],
+ env=env)
# Fetch the info for matching dirs_changed against this WC
- info = svn_info(self.bdec.svnbin, self.path)
+ info = svn_info(svnbin, env, self.path)
+ root = info['Repository Root']
url = info['URL']
- repos = info['Repository Root']
+ relpath = url[len(root):] # also has leading '/'
uuid = info['Repository UUID']
- relpath = url[len(repos):] # also has leading '/'
- return [relpath, url, repos, uuid]
-
-
-class HTTPStream(HTTPClientFactory):
- protocol = HTTPPageDownloader
-
- def __init__(self, url):
- self.url = url
- HTTPClientFactory.__init__(self, url, method="GET", agent="SvnWcSub/0.1.0")
-
- def pageStart(self, partial):
- pass
-
- def pagePart(self, data):
- pass
-
- def pageEnd(self):
- pass
-
-class Revision:
- def __init__(self, repos, rev):
- self.repos = repos
- self.rev = rev
- self.dirs_changed = []
-
-class StreamHandler(handler.ContentHandler):
- def __init__(self, stream, bdec):
- handler.ContentHandler.__init__(self)
- self.stream = stream
- self.bdec = bdec
- self.rev = None
- self.text_value = None
-
- def startElement(self, name, attrs):
- #print "start element: %s" % (name)
- """
- <commit revision="7">
- <dirs_changed><path>/</path></dirs_changed>
- </commit>
- """
- if name == "commit":
- self.rev = Revision(attrs['repository'], int(attrs['revision']))
- elif name == "stillalive":
- self.bdec.stillalive(self.stream)
- def characters(self, data):
- if self.text_value is not None:
- self.text_value = self.text_value + data
- else:
- self.text_value = data
+ return str(relpath), uuid
- def endElement(self, name):
- #print "end element: %s" % (name)
- if name == "commit":
- self.bdec.commit(self.stream, self.rev)
- self.rev = None
- if name == "path" and self.text_value is not None and self.rev is not None:
- self.rev.dirs_changed.append(self.text_value.strip())
- self.text_value = None
-
-
-class XMLHTTPStream(HTTPStream):
- def __init__(self, url, bdec):
- HTTPStream.__init__(self, url)
- self.alive = 0
- self.bdec = bdec
- self.parser = make_parser(['xml.sax.expatreader'])
- self.handler = StreamHandler(self, bdec)
- self.parser.setContentHandler(self.handler)
-
- def pageStart(self, parital):
- self.bdec.pageStart(self)
-
- def pagePart(self, data):
- self.parser.feed(data)
-
- def pageEnd(self):
- self.bdec.pageEnd(self)
-
-def connectTo(url, bdec):
- u = urlparse(url)
- port = u.port
- if not port:
- port = 80
- s = XMLHTTPStream(url, bdec)
- if bdec.service:
- conn = internet.TCPClient(u.hostname, u.port, s)
- conn.setServiceParent(bdec.service)
- else:
- conn = reactor.connectTCP(u.hostname, u.port, s)
- return [s, conn]
-
-CHECKBEAT_TIME = 60
PRODUCTION_RE_FILTER = re.compile("/websites/production/[^/]+/")
class BigDoEverythingClasss(object):
- def __init__(self, config, service = None):
- self.urls = [s.strip() for s in config.get_value('streams').split()]
+ def __init__(self, config):
self.svnbin = config.get_value('svnbin')
self.env = config.get_env()
+ self.tracking = config.get_track()
self.worker = BackgroundWorker(self.svnbin, self.env)
- self.service = service
- self.failures = 0
- self.alive = time.time()
- self.checker = task.LoopingCall(self._checkalive)
- self.transports = {}
- self.streams = {}
- for u in self.urls:
- self._restartStream(u)
- self.watch = []
- for path, url in config.get_track().items():
+ self.watch = [ ]
+
+ self.hostports = [ ]
+ ### switch from URLs in the config to just host:port pairs
+ for url in config.get_value('streams').split():
+ parsed = urlparse.urlparse(url.strip())
+ self.hostports.append((parsed.hostname, parsed.port))
+
+ def start(self):
+ for path, url in self.tracking.items():
# working copies auto-register with the BDEC when they are ready.
WorkingCopy(self, path, url)
- self.checker.start(CHECKBEAT_TIME)
-
- def pageStart(self, stream):
- logging.info("Stream %s Connection Established" % (stream.url))
- self.failures = 0
-
- def pageEnd(self, stream):
- logging.info("Stream %s Connection Dead" % (stream.url))
- self.streamDead(stream.url)
-
- def _restartStream(self, url):
- (self.streams[url], self.transports[url]) = connectTo(url, self)
- self.streams[url].deferred.addBoth(self.streamDead, url)
- self.streams[url].alive = time.time()
-
- def _checkalive(self):
- n = time.time()
- for k in self.streams.keys():
- s = self.streams[k]
- if n - s.alive > CHECKBEAT_TIME:
- logging.info("Stream %s is dead, reconnecting" % (s.url))
- #self.transports[s.url].disconnect()
- self.streamDead(self, s.url)
-
-# d=filter(lambda x:x not in self.streams.keys(), self.urls)
-# for u in d:
-# self._restartStream(u)
-
- def stillalive(self, stream):
- stream.alive = time.time()
-
- def streamDead(self, url, result=None):
- s = self.streams.get(url)
- if not s:
- logging.info("Stream %s is messed up" % (url))
- return
- BACKOFF_SECS = 5
- BACKOFF_MAX = 60
- #self.checker.stop()
-
- self.streams[url] = None
- self.transports[url] = None
- self.failures += 1
- backoff = min(self.failures * BACKOFF_SECS, BACKOFF_MAX)
- logging.info("Stream disconnected, trying again in %d seconds.... %s" % (backoff, s.url))
- reactor.callLater(backoff, self._restartStream, url)
def wc_ready(self, wc):
# called when a working copy object has its basic info/url,
@@ -302,8 +155,10 @@ class BigDoEverythingClasss(object):
return "/" + path
return os.path.abspath(path)
- def commit(self, stream, rev):
- logging.info("COMMIT r%d (%d paths) via %s" % (rev.rev, len(rev.dirs_changed), stream.url))
+ def commit(self, host, port, rev):
+ logging.info("COMMIT r%d (%d paths) from %s:%d"
+ % (rev.rev, len(rev.dirs_changed), host, port))
+
paths = map(self._normalize_path, rev.dirs_changed)
if len(paths):
pre = os.path.commonprefix(paths)
@@ -317,7 +172,7 @@ class BigDoEverythingClasss(object):
break
#print "Common Prefix: %s" % (pre)
- wcs = [wc for wc in self.watch if wc.update_applies(rev.repos, pre)]
+ wcs = [wc for wc in self.watch if wc.update_applies(rev.uuid, pre)]
logging.info("Updating %d WC for r%d" % (len(wcs), rev.rev))
for wc in wcs:
self.worker.add_work(OP_UPDATE, wc)
@@ -384,7 +239,6 @@ class BackgroundWorker(threading.Thread)
### still specific to the ASF setup.
args = [self.svnbin, 'update',
'--quiet',
- '--config-dir', '/home/svnwc/.subversion',
'--non-interactive',
'--trust-server-cert',
'--ignore-externals',
@@ -392,7 +246,7 @@ class BackgroundWorker(threading.Thread)
subprocess.check_call(args, env=self.env)
### check the loglevel before running 'svn info'?
- info = svn_info(self.svnbin, wc.path)
+ info = svn_info(self.svnbin, self.env, wc.path)
logging.info("updated: %s now at r%s", wc.path, info['Revision'])
def _cleanup(self, wc):
@@ -401,7 +255,6 @@ class BackgroundWorker(threading.Thread)
### we need to move some of these args into the config. these are
### still specific to the ASF setup.
args = [self.svnbin, 'cleanup',
- '--config-dir', '/home/svnwc/.subversion',
'--non-interactive',
'--trust-server-cert',
wc.path]
@@ -452,6 +305,45 @@ class ReloadableConfig(ConfigParser.Safe
return str(option)
+class Daemon(daemonize.Daemon):
+ def __init__(self, logfile, pidfile, umask, bdec):
+ daemonize.Daemon.__init__(self, logfile, pidfile)
+
+ self.umask = umask
+ self.bdec = bdec
+
+ def setup(self):
+ # There is no setup which the parent needs to wait for.
+ pass
+
+ def run(self):
+ logging.info('svnwcsub started, pid=%d', os.getpid())
+
+ # Set the umask in the daemon process. Defaults to 000 for
+ # daemonized processes. Foreground processes simply inherit
+ # the value from the parent process.
+ if self.umask is not None:
+ umask = int(self.umask, 8)
+ os.umask(umask)
+ logging.info('umask set to %03o', umask)
+
+ # Start the BDEC (on the main thread), then start the client
+ self.bdec.start()
+
+ mc = svnpubsub.client.MultiClient(self.bdec.hostports,
+ self.bdec.commit,
+ self._event)
+ mc.run_forever()
+
+ def _event(self, host, port, event_name):
+ if event_name == 'error':
+ logging.exception('from %s:%s', host, port)
+ elif event_name == 'ping':
+ logging.debug('ping from %s:%s', host, port)
+ else:
+ logging.info('"%s" from %s:%s', event_name, host, port)
+
+
def prepare_logging(logfile):
"Log to the specified file, or to stdout if None."
@@ -480,20 +372,13 @@ def handle_options(options):
# Set up the logging, then process the rest of the options.
prepare_logging(options.logfile)
- if options.pidfile:
+ # In daemon mode, we let the daemonize module handle the pidfile.
+ # Otherwise, we should write this (foreground) PID into the file.
+ if options.pidfile and not options.daemon:
pid = os.getpid()
open(options.pidfile, 'w').write('%s\n' % pid)
logging.info('pid %d written to %s', pid, options.pidfile)
- if options.uid:
- try:
- uid = int(options.uid)
- except ValueError:
- import pwd
- uid = pwd.getpwnam(options.uid)[2]
- logging.info('setting uid %d', uid)
- os.setuid(uid)
-
if options.gid:
try:
gid = int(options.gid)
@@ -503,10 +388,14 @@ def handle_options(options):
logging.info('setting gid %d', gid)
os.setgid(gid)
- if options.umask:
- umask = int(options.umask, 8)
- os.umask(umask)
- logging.info('umask set to %03o', umask)
+ if options.uid:
+ try:
+ uid = int(options.uid)
+ except ValueError:
+ import pwd
+ uid = pwd.getpwnam(options.uid)[2]
+ logging.info('setting uid %d', uid)
+ os.setuid(uid)
def main(args):
@@ -525,6 +414,8 @@ def main(args):
help='switch to this GID before running')
parser.add_option('--umask',
help='set this (octal) umask before running')
+ parser.add_option('--daemon', action='store_true',
+ help='run as a background daemon')
options, extra = parser.parse_args(args)
@@ -532,12 +423,26 @@ def main(args):
parser.error('CONFIG_FILE is required')
config_file = extra[0]
+ if options.daemon and not options.logfile:
+ parser.error('LOGFILE is required when running as a daemon')
+ if options.daemon and not options.pidfile:
+ parser.error('PIDFILE is required when running as a daemon')
+
# Process any provided options.
handle_options(options)
c = ReloadableConfig(config_file)
- big = BigDoEverythingClasss(c)
- reactor.run()
+ bdec = BigDoEverythingClasss(c)
+
+ # We manage the logfile ourselves (along with possible rotation). The
+ # daemon process can just drop stdout/stderr into /dev/null.
+ d = Daemon('/dev/null', options.pidfile, options.umask, bdec)
+ if options.daemon:
+ # Daemonize the process and call sys.exit() with appropriate code
+ d.daemonize_exit()
+ else:
+ # Just run in the foreground (the default)
+ d.foreground()
if __name__ == "__main__":