You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2009/02/17 18:53:41 UTC
svn commit: r745191 [2/2] - in /incubator/tashi/trunk/src/tashi: ./ client/
clustermanager/ messaging/ nodemanager/ nodemanager/vmcontrol/ thrift/
Modified: incubator/tashi/trunk/src/tashi/messaging/thriftmessaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/messaging/thriftmessaging.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/messaging/thriftmessaging.py (original)
+++ incubator/tashi/trunk/src/tashi/messaging/thriftmessaging.py Tue Feb 17 18:53:40 2009
@@ -41,238 +41,238 @@
from tashi.messaging.threadpool import ThreadPoolClass, threadpool, ThreadPool, threadpoolmethod, threaded
class MessageBrokerThrift(MessageBroker):
- def __init__(self, port, daemon=True):
- MessageBroker.__init__(self)
- self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
- self.transport = TSocket.TServerSocket(port)
- self.tfactory = TTransport.TBufferedTransportFactory()
- self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
- self.ready = threading.Event()
-# self.server = TServer.TSimpleServer(self.processor,
-# self.transport,
-# self.tfactory,
-# self.pfactory)
-# self.server = TServer.TThreadPoolServer(self.processor,
-# self.transport,
-# self.tfactory,
-# self.pfactory)
- self.server = TServer.TThreadedServer(self.processor,
- self.transport,
- self.tfactory,
- self.pfactory)
- self.publishCalls = 0
-
- def ssvrthrd():
- try:
- # FIXME: Race condition, the ready event should be set after
- # starting the server. However, server.serve()
- # doesn't return under normal circumstances. This
- # seems to work in practice, even though it's clearly
- # wrong.
- self.ready.set()
- self.server.serve()
- except Exception, e:
- print e
- sys.stdout.flush()
- pass
- svt = threading.Thread(target=ssvrthrd)
- svt.setDaemon(daemon)
- svt.start()
- self.ready.wait()
- def log(self, message):
- MessageBroker.log(self, message)
- @synchronizedmethod
- def addSubscriber(self, host, port):
- subscribers = self.getSubscribers()
- for sub in subscribers:
- if sub.host == host and sub.port == port:
- return
- subscriber = SubscriberThriftProxy(host, port, self.proxy)
- MessageBroker.addSubscriber(self, subscriber)
- def removeSubscriber(self, host, port):
- subscriber = None
- subscribers = self.getSubscribers()
- for sub in subscribers:
- if sub.host == host and sub.port == port:
- subscriber = sub
- if subscriber != None:
- MessageBroker.removeSubscriber(self, subscriber)
- @synchronizedmethod
- def publish(self, message):
- self.publishCalls = self.publishCalls + 1
- sys.stdout.flush()
- MessageBroker.publish(self, message)
+ def __init__(self, port, daemon=True):
+ MessageBroker.__init__(self)
+ self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
+ self.transport = TSocket.TServerSocket(port)
+ self.tfactory = TTransport.TBufferedTransportFactory()
+ self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
+ self.ready = threading.Event()
+# self.server = TServer.TSimpleServer(self.processor,
+# self.transport,
+# self.tfactory,
+# self.pfactory)
+# self.server = TServer.TThreadPoolServer(self.processor,
+# self.transport,
+# self.tfactory,
+# self.pfactory)
+ self.server = TServer.TThreadedServer(self.processor,
+ self.transport,
+ self.tfactory,
+ self.pfactory)
+ self.publishCalls = 0
+
+ def ssvrthrd():
+ try:
+ # FIXME: Race condition, the ready event should be set after
+ # starting the server. However, server.serve()
+ # doesn't return under normal circumstances. This
+ # seems to work in practice, even though it's clearly
+ # wrong.
+ self.ready.set()
+ self.server.serve()
+ except Exception, e:
+ print e
+ sys.stdout.flush()
+ pass
+ svt = threading.Thread(target=ssvrthrd)
+ svt.setDaemon(daemon)
+ svt.start()
+ self.ready.wait()
+ def log(self, message):
+ MessageBroker.log(self, message)
+ @synchronizedmethod
+ def addSubscriber(self, host, port):
+ subscribers = self.getSubscribers()
+ for sub in subscribers:
+ if sub.host == host and sub.port == port:
+ return
+ subscriber = SubscriberThriftProxy(host, port, self.proxy)
+ MessageBroker.addSubscriber(self, subscriber)
+ def removeSubscriber(self, host, port):
+ subscriber = None
+ subscribers = self.getSubscribers()
+ for sub in subscribers:
+ if sub.host == host and sub.port == port:
+ subscriber = sub
+ if subscriber != None:
+ MessageBroker.removeSubscriber(self, subscriber)
+ @synchronizedmethod
+ def publish(self, message):
+ self.publishCalls = self.publishCalls + 1
+ sys.stdout.flush()
+ MessageBroker.publish(self, message)
class MessageBrokerThriftProxy:
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
- @synchronizedmethod
- def log(self, message):
- self.proxy[self.host, self.port].log(message)
- @synchronizedmethod
- def publish(self, message):
- self.proxy[self.host, self.port].publish(message)
- @synchronizedmethod
- def publishList(self, messages):
- self.proxy[self.host, self.port].publishList(messages)
- @synchronizedmethod
- def addSubscriber(self, subscriber):
- self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
- @synchronizedmethod
- def removeSubscriber(self, subscriber):
- self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
+ @synchronizedmethod
+ def log(self, message):
+ self.proxy[self.host, self.port].log(message)
+ @synchronizedmethod
+ def publish(self, message):
+ self.proxy[self.host, self.port].publish(message)
+ @synchronizedmethod
+ def publishList(self, messages):
+ self.proxy[self.host, self.port].publishList(messages)
+ @synchronizedmethod
+ def addSubscriber(self, subscriber):
+ self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
+ @synchronizedmethod
+ def removeSubscriber(self, subscriber):
+ self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
class SubscriberThrift(Subscriber, threading.Thread):
- def __init__(self, broker, port, synchronized=False):
- self.host = socket.gethostname()
- self.port = port
- self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
- self.transport = TSocket.TServerSocket(port)
- self.tfactory = TTransport.TBufferedTransportFactory()
- self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
- self.server = TServer.TThreadedServer(self.processor,
- self.transport,
- self.tfactory,
- self.pfactory)
- def ssvrthrd():
- try:
- self.server.serve()
- except Exception, e:
- pass
-
-
- self.thread = threading.Thread(target=ssvrthrd)
- self.thread.setDaemon(True)
- self.thread.start()
-
- # We have to call this AFTER initializing our server, so that
- # the broker can contact us
- # Wrap this in a try/catch because the broker may not be online yet
- try:
- Subscriber.__init__(self, broker, synchronized=synchronized)
- except:
- pass
- threading.Thread.__init__(self)
- self.setDaemon(True)
- self.start()
-
- def stop(self):
-# # FIXME: this is broken, there is no clear way to stop a
-# # Thrift server
- self.broker.removeSubscriber(self)
- self.transport.close()
- def run(self):
- while(True):
- # renew subscription every 5 min
- try:
- self.broker.addSubscriber(self)
- except:
- pass
- time.sleep(5*60)
+ def __init__(self, broker, port, synchronized=False):
+ self.host = socket.gethostname()
+ self.port = port
+ self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
+ self.transport = TSocket.TServerSocket(port)
+ self.tfactory = TTransport.TBufferedTransportFactory()
+ self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ self.server = TServer.TThreadedServer(self.processor,
+ self.transport,
+ self.tfactory,
+ self.pfactory)
+ def ssvrthrd():
+ try:
+ self.server.serve()
+ except Exception, e:
+ pass
+
+
+ self.thread = threading.Thread(target=ssvrthrd)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ # We have to call this AFTER initializing our server, so that
+ # the broker can contact us
+ # Wrap this in a try/catch because the broker may not be online yet
+ try:
+ Subscriber.__init__(self, broker, synchronized=synchronized)
+ except:
+ pass
+ threading.Thread.__init__(self)
+ self.setDaemon(True)
+ self.start()
+
+ def stop(self):
+# # FIXME: this is broken, there is no clear way to stop a
+# # Thrift server
+ self.broker.removeSubscriber(self)
+ self.transport.close()
+ def run(self):
+ while(True):
+ # renew subscription every 5 min
+ try:
+ self.broker.addSubscriber(self)
+ except:
+ pass
+ time.sleep(5*60)
class SubscriberThriftProxy:
- def __init__(self, host, port, proxy, aggregate = 100):
- self.host = host
- self.port = port
- self.proxy = proxy
- # for some reason, thrift clients are not thread-safe, lock during send
- self.lock = threading.Lock()
- self.pending = []
- self.aggregateSize = aggregate
- def publish(self, message):
- self.lock.acquire()
- sys.stdout.flush()
- if message.has_key('aggregate') and message['aggregate'] == 'True':
- self.pending.append(message)
- if len(self.pending) >= self.aggregateSize:
- try:
- self.proxy[self.host, self.port].publishList(self.pending)
- except Exception, e:
- print e
- self.lock.release()
- raise e
- self.pending = []
- else:
- try:
- self.proxy[self.host, self.port].publish(message)
- except Exception, e:
- sys.stdout.flush()
- print e
- self.lock.release()
- raise e
- self.lock.release()
+ def __init__(self, host, port, proxy, aggregate = 100):
+ self.host = host
+ self.port = port
+ self.proxy = proxy
+ # for some reason, thrift clients are not thread-safe, lock during send
+ self.lock = threading.Lock()
+ self.pending = []
+ self.aggregateSize = aggregate
+ def publish(self, message):
+ self.lock.acquire()
+ sys.stdout.flush()
+ if message.has_key('aggregate') and message['aggregate'] == 'True':
+ self.pending.append(message)
+ if len(self.pending) >= self.aggregateSize:
+ try:
+ self.proxy[self.host, self.port].publishList(self.pending)
+ except Exception, e:
+ print e
+ self.lock.release()
+ raise e
+ self.pending = []
+ else:
+ try:
+ self.proxy[self.host, self.port].publish(message)
+ except Exception, e:
+ sys.stdout.flush()
+ print e
+ self.lock.release()
+ raise e
+ self.lock.release()
class PublisherThrift(Publisher):
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.broker = MessageBrokerThriftProxy(host, port)
- Publisher.__init__(self, self.broker)
-
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.broker = MessageBrokerThriftProxy(host, port)
+ Publisher.__init__(self, self.broker)
+
####################
# Testing Code
####################
class TestSubscriberThrift(SubscriberThrift):
- def __init__(self, *args, **kwargs):
- self.queue = Queue.Queue()
- SubscriberThrift.__init__(self, *args, **kwargs)
- def handle(self, message):
- self.queue.put(message)
+ def __init__(self, *args, **kwargs):
+ self.queue = Queue.Queue()
+ SubscriberThrift.__init__(self, *args, **kwargs)
+ def handle(self, message):
+ self.queue.put(message)
portnum = 1718
class TestThriftMessaging(unittest.TestCase):
- def setUp(self):
- global portnum
- self.broker = MessageBrokerThrift(portnum)
- self.brokerPort = portnum
- portnum = portnum + 1
- self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
- self.publisher = PublisherThrift('localhost', self.brokerPort)
- self.subscriber = TestSubscriberThrift(self.proxy, portnum)
- portnum = portnum + 1
- def tearDown(self):
- pass
- def testSetUp(self):
- pass
- def testPublish(self):
- self.publisher.publish( {'message':'hello world'} )
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testPublishList(self):
- nrmsgs = 10
- msgs = []
- for i in range(nrmsgs):
- msgs.append( {'msgnum':str(i)} )
- self.publisher.publishList( msgs )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testAggregate(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.aggregate( {'msgnum':str(i)} )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- def testAggregateKeyword(self):
- nrmsgs = self.publisher.aggregateSize
- for i in range(nrmsgs):
- self.assertEqual(self.subscriber.queue.qsize(), 0)
- self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
- for i in range(nrmsgs):
- self.subscriber.queue.get(True, timeout=5)
- self.assertEqual(self.subscriber.queue.qsize(), 0)
+ def setUp(self):
+ global portnum
+ self.broker = MessageBrokerThrift(portnum)
+ self.brokerPort = portnum
+ portnum = portnum + 1
+ self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
+ self.publisher = PublisherThrift('localhost', self.brokerPort)
+ self.subscriber = TestSubscriberThrift(self.proxy, portnum)
+ portnum = portnum + 1
+ def tearDown(self):
+ pass
+ def testSetUp(self):
+ pass
+ def testPublish(self):
+ self.publisher.publish( {'message':'hello world'} )
+ self.subscriber.queue.get(True, timeout=5)
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ def testPublishList(self):
+ nrmsgs = 10
+ msgs = []
+ for i in range(nrmsgs):
+ msgs.append( {'msgnum':str(i)} )
+ self.publisher.publishList( msgs )
+ for i in range(nrmsgs):
+ self.subscriber.queue.get(True, timeout=5)
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ def testAggregate(self):
+ nrmsgs = self.publisher.aggregateSize
+ for i in range(nrmsgs):
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ self.publisher.aggregate( {'msgnum':str(i)} )
+ for i in range(nrmsgs):
+ self.subscriber.queue.get(True, timeout=5)
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ def testAggregateKeyword(self):
+ nrmsgs = self.publisher.aggregateSize
+ for i in range(nrmsgs):
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
+ for i in range(nrmsgs):
+ self.subscriber.queue.get(True, timeout=5)
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
if __name__=='__main__':
- suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
- unittest.TextTestRunner(verbosity=2).run(suite)
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
+ unittest.TextTestRunner(verbosity=2).run(suite)
Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Tue Feb 17 18:53:40 2009
@@ -49,8 +49,8 @@
server = TThreadedServer(processor, transport)
debugConsole(globals())
- notifier = notification.Notifier(config)
- log.addHandler(notifier)
+ notifier = notification.Notifier(config)
+ log.addHandler(notifier)
try:
server.serve()
Modified: incubator/tashi/trunk/src/tashi/nodemanager/notification.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/notification.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/notification.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/notification.py Tue Feb 17 18:53:40 2009
@@ -18,23 +18,23 @@
import tashi.messaging.tashimessaging
class Notifier(tashi.messaging.tashimessaging.TashiLogHandler):
- def vmExited(self, instance):
- try:
- isolatedRPC(self.cm, 'vmExited', self.hostId, vmId)
- except Exception, e:
- print "RPC failed for vmExited on CM"
- print e
- # FIXME: send this to the cm later
- # self.exitedVms[vmId] = child
+ def vmExited(self, instance):
+ try:
+ isolatedRPC(self.cm, 'vmExited', self.hostId, vmId)
+ except Exception, e:
+ print "RPC failed for vmExited on CM"
+ print e
+ # FIXME: send this to the cm later
+ # self.exitedVms[vmId] = child
- msg = {}
+ msg = {}
- msg['timestamp'] = str(time.time())
- msg['hostname'] = '' # FIXME: fill this in
- msg['message-type'] = 'vm-event'
- msg['vm-event'] = 'vm-exited'
-
- msg['instance-id'] = str(instance.id)
- msg['host-id'] = str(instance.hostId)
- print 'Notifier publishing ', msg
- self.publish(message)
+ msg['timestamp'] = str(time.time())
+ msg['hostname'] = '' # FIXME: fill this in
+ msg['message-type'] = 'vm-event'
+ msg['vm-event'] = 'vm-exited'
+
+ msg['instance-id'] = str(instance.id)
+ msg['host-id'] = str(instance.hostId)
+ print 'Notifier publishing ', msg
+ self.publish(message)
Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py Tue Feb 17 18:53:40 2009
@@ -108,7 +108,7 @@
self.dfs = dfs
self.cm = cm
- self.vmNamePrefix = self.config.get("XenPV", "vmNamePrefix")
+ self.vmNamePrefix = self.config.get("XenPV", "vmNamePrefix")
self.transientDir = self.config.get('XenPV', 'transientDir')
self.newvms = listVms(self.vmNamePrefix)
@@ -130,15 +130,15 @@
for vmId in self.newvms.keys():
if not vmlist.has_key(vmId):
a = self.newvms.pop(vmId)
- # If the vm had transient disks, delete them
- for i in range(len(a.disks)):
- if a.disks[i].persistent == False:
- diskname = self.transientDisk(a.id, i)
- try:
- os.unlink(diskname)
- except:
- print 'WARNING could not delete transient disk %s' % diskname
- self.nm.vmStateChange(a.vmId, a.state, InstanceState.Exited)
+ # If the vm had transient disks, delete them
+ for i in range(len(a.disks)):
+ if a.disks[i].persistent == False:
+ diskname = self.transientDisk(a.id, i)
+ try:
+ os.unlink(diskname)
+ except:
+ print 'WARNING could not delete transient disk %s' % diskname
+ self.nm.vmStateChange(a.vmId, a.state, InstanceState.Exited)
for vmId in vmlist.keys():
if not self.newvms.has_key(vmId):
print 'WARNING: found vm that should be managed, but is not'
@@ -175,17 +175,17 @@
f.close()
return fn
def deleteXenConfig(self, vmName):
- pass
+ pass
# os.unlink(os.path.join("/tmp", vmName))
########################################
- def vmName(self, instanceId):
- return "%s-%i"%(self.vmNamePrefix, int(instanceId))
- def transientDisk(self, instanceId, disknum):
- newdisk = os.path.join(self.transientDir,
- 'tashi-%i-%i.qcow' %(instanceId, disknum))
- return newdisk
-
+ def vmName(self, instanceId):
+ return "%s-%i"%(self.vmNamePrefix, int(instanceId))
+ def transientDisk(self, instanceId, disknum):
+ newdisk = os.path.join(self.transientDir,
+ 'tashi-%i-%i.qcow' %(instanceId, disknum))
+ return newdisk
+
@synchronizedmethod
def instantiateVm(self, instance):
@@ -203,9 +203,9 @@
imageLocal = self.dfs.getLocalHandle(instance.disks[i].uri)
instance.disks[i].local = imageLocal
if instance.disks[i].persistent == False:
- newdisk = self.transientDisk(instance.id, i)
- cmd = 'qcow-create 0 %s %s' % (newdisk, imageLocal)
- print 'creating new disk with "%s"' % cmd
+ newdisk = self.transientDisk(instance.id, i)
+ cmd = 'qcow-create 0 %s %s' % (newdisk, imageLocal)
+ print 'creating new disk with "%s"' % cmd
os.system(cmd)
instance.disks[i].local = newdisk
@@ -217,7 +217,7 @@
instance.typeObj.cores)
cmd = "xm create %s"%fn
r = os.system(cmd)
-# self.deleteXenConfig(name)
+# self.deleteXenConfig(name)
if r != 0:
print 'WARNING: "%s" returned %i' % ( cmd, r)
raise Exception, 'WARNING: "%s" returned %i' % ( cmd, r)
@@ -248,7 +248,7 @@
name = domIdToName(vmId)
cPickle.dump(instance, infof)
infof.close()
-
+
# FIXME: handle errors
cmd = "xm save %i %s"%(vmId, tmpfile)
@@ -258,7 +258,7 @@
raise Exception, "replace this with a real exception!"
r = self.dfs.copyTo(tmpfile, target)
self.newvms.pop(vmId)
- os.unlink(tmpfile)
+ os.unlink(tmpfile)
return vmId
@synchronizedmethod
@@ -288,13 +288,13 @@
@synchronizedmethod
def migrateVm(self, vmId, target, transportCookie):
cmd = "xm migrate -l %i %s"%(vmId, target)
- r = os.system(cmd)
- if r != 0:
- # FIXME: throw exception
- print "migrate failed for VM %i"%vmId
- raise Exception, "migrate failed for VM %i"%vmId
+ r = os.system(cmd)
+ if r != 0:
+ # FIXME: throw exception
+ print "migrate failed for VM %i"%vmId
+ raise Exception, "migrate failed for VM %i"%vmId
self.newvms.pop(vmId)
- return vmId
+ return vmId
@synchronizedmethod
def receiveVm(self, transportCookie):
instance = cPickle.loads(transportCookie)
@@ -349,19 +349,19 @@
return self.newvms.keys()
- @synchronizedmethod
- def getHostInfo(self):
- host = Host()
- memp = subprocess.Popen("xm info | awk '/^total_memory/ { print $3 }' ",
- shell = True,
- stdout = subprocess.PIPE)
- mems = memp.stdout.readline()
- host.memory = int(mems)
- corep = subprocess.Popen("xm info | awk '/^nr_cpus/ { print $3 }' ",
- shell = True,
- stdout = subprocess.PIPE)
- cores = corep.stdout.readline()
- host.cores = int(cores)
- return host
+ @synchronizedmethod
+ def getHostInfo(self):
+ host = Host()
+ memp = subprocess.Popen("xm info | awk '/^total_memory/ { print $3 }' ",
+ shell = True,
+ stdout = subprocess.PIPE)
+ mems = memp.stdout.readline()
+ host.memory = int(mems)
+ corep = subprocess.Popen("xm info | awk '/^nr_cpus/ { print $3 }' ",
+ shell = True,
+ stdout = subprocess.PIPE)
+ cores = corep.stdout.readline()
+ host.cores = int(cores)
+ return host
Modified: incubator/tashi/trunk/src/tashi/parallel.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/parallel.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/parallel.py (original)
+++ incubator/tashi/trunk/src/tashi/parallel.py Tue Feb 17 18:53:40 2009
@@ -23,94 +23,94 @@
_log = logging.getLogger('tashi.parallel')
def threaded(func):
- def fn(*args, **kwargs):
- thread = threading.Thread(target=func, args=args, kwargs=kwargs)
- thread.start()
- return thread
- return fn
+ def fn(*args, **kwargs):
+ thread = threading.Thread(target=func, args=args, kwargs=kwargs)
+ thread.start()
+ return thread
+ return fn
class ThreadPool(Queue.Queue):
- def __init__(self, size=8, maxsize=0):
- Queue.Queue.__init__(self, maxsize)
- for i in range(size):
- thread = threading.Thread(target=self._worker)
- thread.setDaemon(True)
- thread.start()
- def _worker(self):
- while True:
- try:
- func, args, kwargs = self.get()
- func(*args, **kwargs)
- except Exception, e:
- _log.error(e)
- # FIXME: do something smarter here, backtrace, log,
- # allow user-defined error handling...
-
- def submit(self, func, *args, **kwargs):
- self.put((func, args, kwargs))
- def submitlist(self, func, args, kwargs):
- self.put((func, args, kwargs))
+ def __init__(self, size=8, maxsize=0):
+ Queue.Queue.__init__(self, maxsize)
+ for i in range(size):
+ thread = threading.Thread(target=self._worker)
+ thread.setDaemon(True)
+ thread.start()
+ def _worker(self):
+ while True:
+ try:
+ func, args, kwargs = self.get()
+ func(*args, **kwargs)
+ except Exception, e:
+ _log.error(e)
+ # FIXME: do something smarter here, backtrace, log,
+ # allow user-defined error handling...
+
+ def submit(self, func, *args, **kwargs):
+ self.put((func, args, kwargs))
+ def submitlist(self, func, args, kwargs):
+ self.put((func, args, kwargs))
class ThreadPoolClass:
- def __init__(self, size=8, maxsize=0):
- self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
+ def __init__(self, size=8, maxsize=0):
+ self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
def threadpool(pool):
- def dec(func):
- def fn(*args, **kwargs):
- pool.submit(func, *args, **kwargs)
- return fn
- return dec
+ def dec(func):
+ def fn(*args, **kwargs):
+ pool.submit(func, *args, **kwargs)
+ return fn
+ return dec
def threadpoolmethod(meth):
- def fn(*args, **kwargs):
- try:
- pool = args[0]._threadpool_pool
- except AttributeError:
- pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
- # FIXME: how do we check parent class?
-# assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
- pool.submit(meth, *args, **kwargs)
- return fn
+ def fn(*args, **kwargs):
+ try:
+ pool = args[0]._threadpool_pool
+ except AttributeError:
+ pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
+ # FIXME: how do we check parent class?
+# assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
+ pool.submit(meth, *args, **kwargs)
+ return fn
def synchronized(lock=None):
- if lock==None:
- lock = threading.RLock()
- def dec(func):
- def fn(*args, **kwargs):
- lock.acquire()
- ex = None
- try:
- r = func(*args, **kwargs)
- except Exception, e:
- ex = e
- lock.release()
- if ex != None:
- raise e
- return r
- return fn
- return dec
-
+ if lock==None:
+ lock = threading.RLock()
+ def dec(func):
+ def fn(*args, **kwargs):
+ lock.acquire()
+ ex = None
+ try:
+ r = func(*args, **kwargs)
+ except Exception, e:
+ ex = e
+ lock.release()
+ if ex != None:
+ raise e
+ return r
+ return fn
+ return dec
+
def synchronizedmethod(func):
- def fn(*args, **kwargs):
- try:
- lock = args[0]._synchronized_lock
- except AttributeError:
- lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
- lock.acquire()
- ex = None
- try:
- res = func(*args, **kwargs)
- except Exception, e:
- ex = e
- lock.release()
- if ex != None:
- raise e
- return res
- return fn
-
+ def fn(*args, **kwargs):
+ try:
+ lock = args[0]._synchronized_lock
+ except AttributeError:
+ lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
+ lock.acquire()
+ ex = None
+ try:
+ res = func(*args, **kwargs)
+ except Exception, e:
+ ex = e
+ lock.release()
+ if ex != None:
+ raise e
+ return res
+ return fn
+
##############################
# Test Code
@@ -120,181 +120,181 @@
import time
class TestThreadPool(unittest.TestCase):
- def setUp(self):
- self.errmargin = 0.5
+ def setUp(self):
+ self.errmargin = 0.5
- def testUnthreaded(self):
- queue = Queue.Queue()
- def slowfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(4):
- slowfunc()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testThreaded(self):
- queue = Queue.Queue()
- @threaded
- def slowthreadfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(8):
- slowthreadfunc()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
-
- def testThreadPool(self):
- pool = ThreadPool(size=4)
- queue = Queue.Queue()
- @threadpool(pool)
- def slowpoolfunc(sleep=1):
- time.sleep(sleep)
- queue.put(None)
- tt = time.time()
- for i in range(8):
- slowpoolfunc()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 2, 1)
-
- def testUnthreadedMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(4):
- sc.beslow()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testThreadedMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- @threaded
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(4):
- sc.beslow()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
-
- def testThreadPoolMethod(self):
- queue = Queue.Queue()
- class slowclass:
- def __init__(self, sleep=1):
- self.sleep=sleep
- @threadpoolmethod
- def beslow(self):
- time.sleep(self.sleep)
- queue.put(None)
- sc = slowclass()
- tt = time.time()
- for i in range(16):
- sc.beslow()
- for i in range(16):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 2, 1)
-
- def testSynchronized(self):
- queue = Queue.Queue()
- @synchronized()
- def addtoqueue():
- time.sleep(1)
- queue.put(None)
- @threaded
- def slowthreadfunc():
- addtoqueue()
- tt = time.time()
- for i in range(4):
- slowthreadfunc()
- for i in range(4):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 4, 1)
-
- def testSynchronizedMethod(self):
- queue = Queue.Queue()
- class addtoqueue:
- @synchronizedmethod
- def addtoqueue1(self):
- time.sleep(1)
- queue.put(None)
- @synchronizedmethod
- def addtoqueue2(self):
- time.sleep(1)
- queue.put(None)
- atc = addtoqueue()
- @threaded
- def slowthreadfunc1():
- atc.addtoqueue1()
- @threaded
- def slowthreadfunc2():
- atc.addtoqueue2()
- tt = time.time()
- for i in range(4):
- slowthreadfunc1()
- slowthreadfunc2()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 8, 1)
-
- def testUnsynchronizedMethod(self):
- queue = Queue.Queue()
- class addtoqueue:
- def addtoqueue1(self):
- time.sleep(1)
- queue.put(None)
- def addtoqueue2(self):
- time.sleep(1)
- queue.put(None)
- atc = addtoqueue()
- @threaded
- def slowthreadfunc1():
- atc.addtoqueue1()
- @threaded
- def slowthreadfunc2():
- atc.addtoqueue2()
- tt = time.time()
- for i in range(4):
- slowthreadfunc1()
- slowthreadfunc2()
- for i in range(8):
- queue.get()
- tt = time.time() - tt
- self.assertAlmostEqual(tt, 1, 1)
+ def testUnthreaded(self):
+ queue = Queue.Queue()
+ def slowfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(4):
+ slowfunc()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testThreaded(self):
+ queue = Queue.Queue()
+ @threaded
+ def slowthreadfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(8):
+ slowthreadfunc()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
+
+ def testThreadPool(self):
+ pool = ThreadPool(size=4)
+ queue = Queue.Queue()
+ @threadpool(pool)
+ def slowpoolfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(8):
+ slowpoolfunc()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 2, 1)
+
+ def testUnthreadedMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(4):
+ sc.beslow()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testThreadedMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ @threaded
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(4):
+ sc.beslow()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
+
+ def testThreadPoolMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ @threadpoolmethod
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(16):
+ sc.beslow()
+ for i in range(16):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 2, 1)
+
+ def testSynchronized(self):
+ queue = Queue.Queue()
+ @synchronized()
+ def addtoqueue():
+ time.sleep(1)
+ queue.put(None)
+ @threaded
+ def slowthreadfunc():
+ addtoqueue()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testSynchronizedMethod(self):
+ queue = Queue.Queue()
+ class addtoqueue:
+ @synchronizedmethod
+ def addtoqueue1(self):
+ time.sleep(1)
+ queue.put(None)
+ @synchronizedmethod
+ def addtoqueue2(self):
+ time.sleep(1)
+ queue.put(None)
+ atc = addtoqueue()
+ @threaded
+ def slowthreadfunc1():
+ atc.addtoqueue1()
+ @threaded
+ def slowthreadfunc2():
+ atc.addtoqueue2()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc1()
+ slowthreadfunc2()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 8, 1)
+
+ def testUnsynchronizedMethod(self):
+ queue = Queue.Queue()
+ class addtoqueue:
+ def addtoqueue1(self):
+ time.sleep(1)
+ queue.put(None)
+ def addtoqueue2(self):
+ time.sleep(1)
+ queue.put(None)
+ atc = addtoqueue()
+ @threaded
+ def slowthreadfunc1():
+ atc.addtoqueue1()
+ @threaded
+ def slowthreadfunc2():
+ atc.addtoqueue2()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc1()
+ slowthreadfunc2()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
if __name__=='__main__':
- import sys
+ import sys
- logging.basicConfig(level=logging.INFO,
- format="%(asctime)s %(levelname)s:\t %(message)s",
- stream=sys.stdout)
+ logging.basicConfig(level=logging.INFO,
+ format="%(asctime)s %(levelname)s:\t %(message)s",
+ stream=sys.stdout)
- suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
- unittest.TextTestRunner(verbosity=2).run(suite)
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
+ unittest.TextTestRunner(verbosity=2).run(suite)
Modified: incubator/tashi/trunk/src/tashi/thrift/build.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/thrift/build.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/thrift/build.py (original)
+++ incubator/tashi/trunk/src/tashi/thrift/build.py Tue Feb 17 18:53:40 2009
@@ -41,10 +41,10 @@
print 'Copying generated code to \'tashi.services\' package...'
shutil.copytree('gen-py/services', '../services')
- print 'Generatign Python code for \'messagingthrift\'...'
- os.system('rm -rf gen-py')
- os.system('thrift --gen py messagingthrift.thrift')
-
- print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
- shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
- os.path.join('..', 'messaging', 'messagingthrift'))
+ print 'Generatign Python code for \'messagingthrift\'...'
+ os.system('rm -rf gen-py')
+ os.system('thrift --gen py messagingthrift.thrift')
+
+ print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
+ shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
+ os.path.join('..', 'messaging', 'messagingthrift'))
Modified: incubator/tashi/trunk/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/util.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/util.py (original)
+++ incubator/tashi/trunk/src/tashi/util.py Tue Feb 17 18:53:40 2009
@@ -155,26 +155,26 @@
return res
def signalHandler(signalNumber):
- """Used to denote a particular function as the signal handler for a
- specific signal"""
- def __decorator(function):
- signal.signal(signalNumber, function)
- return function
- return __decorator
+ """Used to denote a particular function as the signal handler for a
+ specific signal"""
+ def __decorator(function):
+ signal.signal(signalNumber, function)
+ return function
+ return __decorator
def boolean(value):
- """Convert a variable to a boolean"""
- if (type(value) == types.BooleanType):
- return value
- if (type(value) == types.IntType):
- return (value != 0)
- lowercaseValue = value.lower()
- if lowercaseValue in ['yes', 'true', '1']:
- return True
- elif lowercaseValue in ['no', 'false', '0']:
- return False
- else:
- raise ValueError
+ """Convert a variable to a boolean"""
+ if (type(value) == types.BooleanType):
+ return value
+ if (type(value) == types.IntType):
+ return (value != 0)
+ lowercaseValue = value.lower()
+ if lowercaseValue in ['yes', 'true', '1']:
+ return True
+ elif lowercaseValue in ['no', 'false', '0']:
+ return False
+ else:
+ raise ValueError
def instantiateImplementation(className, *args):
"""Create an instance of an object with the given class name and list