You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2009/02/17 18:53:41 UTC

svn commit: r745191 [2/2] - in /incubator/tashi/trunk/src/tashi: ./ client/ clustermanager/ messaging/ nodemanager/ nodemanager/vmcontrol/ thrift/

Modified: incubator/tashi/trunk/src/tashi/messaging/thriftmessaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/messaging/thriftmessaging.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/messaging/thriftmessaging.py (original)
+++ incubator/tashi/trunk/src/tashi/messaging/thriftmessaging.py Tue Feb 17 18:53:40 2009
@@ -41,238 +41,238 @@
 from tashi.messaging.threadpool import ThreadPoolClass, threadpool, ThreadPool, threadpoolmethod, threaded
 
 class MessageBrokerThrift(MessageBroker):
-    def __init__(self, port, daemon=True):
-        MessageBroker.__init__(self)
-        self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
-        self.transport = TSocket.TServerSocket(port)
-        self.tfactory = TTransport.TBufferedTransportFactory()
-        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-        self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
-        self.ready = threading.Event()
-#         self.server = TServer.TSimpleServer(self.processor,
-#                                             self.transport,
-#                                             self.tfactory,
-#                                             self.pfactory)
-#         self.server = TServer.TThreadPoolServer(self.processor,
-#                                                 self.transport,
-#                                                 self.tfactory,
-#                                                 self.pfactory)
-        self.server = TServer.TThreadedServer(self.processor,
-                                                self.transport,
-                                                self.tfactory,
-                                                self.pfactory)
-        self.publishCalls = 0
-
-        def ssvrthrd():
-            try:
-                # FIXME: Race condition, the ready event should be set after
-                # starting the server.  However, server.serve()
-                # doesn't return under normal circumstances.  This
-                # seems to work in practice, even though it's clearly
-                # wrong.
-                self.ready.set()
-                self.server.serve()
-            except Exception, e:
-                print e
-                sys.stdout.flush()
-                pass
-        svt = threading.Thread(target=ssvrthrd)
-        svt.setDaemon(daemon)
-        svt.start()
-        self.ready.wait()
-    def log(self, message):
-        MessageBroker.log(self, message)
-    @synchronizedmethod
-    def addSubscriber(self, host, port):
-        subscribers = self.getSubscribers()
-        for sub in subscribers:
-            if sub.host == host and sub.port == port:
-                return
-        subscriber = SubscriberThriftProxy(host, port, self.proxy)
-        MessageBroker.addSubscriber(self, subscriber)
-    def removeSubscriber(self, host, port):
-        subscriber = None
-        subscribers = self.getSubscribers()
-        for sub in subscribers:
-            if sub.host == host and sub.port == port:
-                subscriber = sub
-        if subscriber != None:
-            MessageBroker.removeSubscriber(self, subscriber)
-    @synchronizedmethod
-    def publish(self, message):
-        self.publishCalls  = self.publishCalls + 1
-        sys.stdout.flush()
-        MessageBroker.publish(self, message)
+	def __init__(self, port, daemon=True):
+		MessageBroker.__init__(self)
+		self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
+		self.transport = TSocket.TServerSocket(port)
+		self.tfactory = TTransport.TBufferedTransportFactory()
+		self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+		self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
+		self.ready = threading.Event()
+#		 self.server = TServer.TSimpleServer(self.processor,
+#											 self.transport,
+#											 self.tfactory,
+#											 self.pfactory)
+#		 self.server = TServer.TThreadPoolServer(self.processor,
+#												 self.transport,
+#												 self.tfactory,
+#												 self.pfactory)
+		self.server = TServer.TThreadedServer(self.processor,
+												self.transport,
+												self.tfactory,
+												self.pfactory)
+		self.publishCalls = 0
+
+		def ssvrthrd():
+			try:
+				# FIXME: Race condition, the ready event should be set after
+				# starting the server.  However, server.serve()
+				# doesn't return under normal circumstances.  This
+				# seems to work in practice, even though it's clearly
+				# wrong.
+				self.ready.set()
+				self.server.serve()
+			except Exception, e:
+				print e
+				sys.stdout.flush()
+				pass
+		svt = threading.Thread(target=ssvrthrd)
+		svt.setDaemon(daemon)
+		svt.start()
+		self.ready.wait()
+	def log(self, message):
+		MessageBroker.log(self, message)
+	@synchronizedmethod
+	def addSubscriber(self, host, port):
+		subscribers = self.getSubscribers()
+		for sub in subscribers:
+			if sub.host == host and sub.port == port:
+				return
+		subscriber = SubscriberThriftProxy(host, port, self.proxy)
+		MessageBroker.addSubscriber(self, subscriber)
+	def removeSubscriber(self, host, port):
+		subscriber = None
+		subscribers = self.getSubscribers()
+		for sub in subscribers:
+			if sub.host == host and sub.port == port:
+				subscriber = sub
+		if subscriber != None:
+			MessageBroker.removeSubscriber(self, subscriber)
+	@synchronizedmethod
+	def publish(self, message):
+		self.publishCalls  = self.publishCalls + 1
+		sys.stdout.flush()
+		MessageBroker.publish(self, message)
 
 class MessageBrokerThriftProxy:
-    def __init__(self, host, port):
-        self.host = host
-        self.port = port
-        self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
-    @synchronizedmethod
-    def log(self, message):
-        self.proxy[self.host, self.port].log(message)
-    @synchronizedmethod
-    def publish(self, message):
-        self.proxy[self.host, self.port].publish(message)
-    @synchronizedmethod
-    def publishList(self, messages):
-        self.proxy[self.host, self.port].publishList(messages)
-    @synchronizedmethod
-    def addSubscriber(self, subscriber):
-        self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
-    @synchronizedmethod
-    def removeSubscriber(self, subscriber):
-        self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
+	def __init__(self, host, port):
+		self.host = host
+		self.port = port
+		self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
+	@synchronizedmethod
+	def log(self, message):
+		self.proxy[self.host, self.port].log(message)
+	@synchronizedmethod
+	def publish(self, message):
+		self.proxy[self.host, self.port].publish(message)
+	@synchronizedmethod
+	def publishList(self, messages):
+		self.proxy[self.host, self.port].publishList(messages)
+	@synchronizedmethod
+	def addSubscriber(self, subscriber):
+		self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
+	@synchronizedmethod
+	def removeSubscriber(self, subscriber):
+		self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
 
 
 
 class SubscriberThrift(Subscriber, threading.Thread):
-    def __init__(self, broker, port, synchronized=False):
-        self.host = socket.gethostname()
-        self.port = port
-        self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
-        self.transport = TSocket.TServerSocket(port)
-        self.tfactory = TTransport.TBufferedTransportFactory()
-        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
-        self.server = TServer.TThreadedServer(self.processor,
-                                              self.transport,
-                                              self.tfactory,
-                                              self.pfactory)
-        def ssvrthrd():
-            try:
-                self.server.serve()
-            except Exception, e:
-                pass
-
-
-        self.thread = threading.Thread(target=ssvrthrd)
-        self.thread.setDaemon(True)
-        self.thread.start()
-
-        # We have to call this AFTER initializing our server, so that
-        # the broker can contact us
-        # Wrap this in a try/catch because the broker may not be online yet
-        try:
-            Subscriber.__init__(self, broker,  synchronized=synchronized)        
-        except:
-            pass
-        threading.Thread.__init__(self)
-        self.setDaemon(True)
-        self.start()
-
-    def stop(self):
-#         # FIXME: this is broken, there is no clear way to stop a
-#         # Thrift server
-        self.broker.removeSubscriber(self)
-        self.transport.close()
-    def run(self):
-        while(True):
-            # renew subscription every 5 min
-            try:
-                self.broker.addSubscriber(self)
-            except:
-                pass
-            time.sleep(5*60)
+	def __init__(self, broker, port, synchronized=False):
+		self.host = socket.gethostname()
+		self.port = port
+		self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
+		self.transport = TSocket.TServerSocket(port)
+		self.tfactory = TTransport.TBufferedTransportFactory()
+		self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+		self.server = TServer.TThreadedServer(self.processor,
+											  self.transport,
+											  self.tfactory,
+											  self.pfactory)
+		def ssvrthrd():
+			try:
+				self.server.serve()
+			except Exception, e:
+				pass
+
+
+		self.thread = threading.Thread(target=ssvrthrd)
+		self.thread.setDaemon(True)
+		self.thread.start()
+
+		# We have to call this AFTER initializing our server, so that
+		# the broker can contact us
+		# Wrap this in a try/catch because the broker may not be online yet
+		try:
+			Subscriber.__init__(self, broker,  synchronized=synchronized)		
+		except:
+			pass
+		threading.Thread.__init__(self)
+		self.setDaemon(True)
+		self.start()
+
+	def stop(self):
+#		 # FIXME: this is broken, there is no clear way to stop a
+#		 # Thrift server
+		self.broker.removeSubscriber(self)
+		self.transport.close()
+	def run(self):
+		while(True):
+			# renew subscription every 5 min
+			try:
+				self.broker.addSubscriber(self)
+			except:
+				pass
+			time.sleep(5*60)
 
 class SubscriberThriftProxy:
-    def __init__(self, host, port, proxy, aggregate = 100):
-        self.host = host
-        self.port = port
-        self.proxy = proxy
-        # for some reason, thrift clients are not thread-safe, lock during send
-        self.lock = threading.Lock()
-        self.pending = []
-        self.aggregateSize = aggregate
-    def publish(self, message):
-        self.lock.acquire()
-        sys.stdout.flush()
-        if message.has_key('aggregate') and message['aggregate'] == 'True':
-            self.pending.append(message)
-            if len(self.pending) >= self.aggregateSize:
-                try:
-                    self.proxy[self.host, self.port].publishList(self.pending)
-                except Exception, e:
-                    print e
-                    self.lock.release()
-                    raise e
-                self.pending = []
-        else:
-            try:
-                self.proxy[self.host, self.port].publish(message)
-            except Exception, e:
-                sys.stdout.flush()
-                print e
-                self.lock.release()
-                raise e
-        self.lock.release()
+	def __init__(self, host, port, proxy, aggregate = 100):
+		self.host = host
+		self.port = port
+		self.proxy = proxy
+		# for some reason, thrift clients are not thread-safe, lock during send
+		self.lock = threading.Lock()
+		self.pending = []
+		self.aggregateSize = aggregate
+	def publish(self, message):
+		self.lock.acquire()
+		sys.stdout.flush()
+		if message.has_key('aggregate') and message['aggregate'] == 'True':
+			self.pending.append(message)
+			if len(self.pending) >= self.aggregateSize:
+				try:
+					self.proxy[self.host, self.port].publishList(self.pending)
+				except Exception, e:
+					print e
+					self.lock.release()
+					raise e
+				self.pending = []
+		else:
+			try:
+				self.proxy[self.host, self.port].publish(message)
+			except Exception, e:
+				sys.stdout.flush()
+				print e
+				self.lock.release()
+				raise e
+		self.lock.release()
 
 class PublisherThrift(Publisher):
-    def __init__(self, host, port):
-        self.host = host
-        self.port = port
-        self.broker = MessageBrokerThriftProxy(host, port)
-        Publisher.__init__(self, self.broker)
-        
+	def __init__(self, host, port):
+		self.host = host
+		self.port = port
+		self.broker = MessageBrokerThriftProxy(host, port)
+		Publisher.__init__(self, self.broker)
+		
 ####################
 # Testing Code 
 ####################
 
 class TestSubscriberThrift(SubscriberThrift):
-    def __init__(self, *args, **kwargs):
-        self.queue = Queue.Queue()
-        SubscriberThrift.__init__(self, *args, **kwargs)
-    def handle(self, message):
-        self.queue.put(message)
+	def __init__(self, *args, **kwargs):
+		self.queue = Queue.Queue()
+		SubscriberThrift.__init__(self, *args, **kwargs)
+	def handle(self, message):
+		self.queue.put(message)
 
 portnum = 1718
 class TestThriftMessaging(unittest.TestCase):
-    def setUp(self):
-        global portnum
-        self.broker = MessageBrokerThrift(portnum)
-        self.brokerPort = portnum
-        portnum = portnum + 1 
-        self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
-        self.publisher = PublisherThrift('localhost', self.brokerPort)
-        self.subscriber = TestSubscriberThrift(self.proxy, portnum)
-        portnum = portnum + 1
-    def tearDown(self):
-        pass
-    def testSetUp(self):
-        pass
-    def testPublish(self):
-        self.publisher.publish( {'message':'hello world'} )
-        self.subscriber.queue.get(True, timeout=5)
-        self.assertEqual(self.subscriber.queue.qsize(), 0)
-    def testPublishList(self):
-        nrmsgs = 10
-        msgs = []
-        for i in range(nrmsgs):
-            msgs.append( {'msgnum':str(i)} )
-        self.publisher.publishList( msgs )
-        for i in range(nrmsgs):
-            self.subscriber.queue.get(True, timeout=5)
-        self.assertEqual(self.subscriber.queue.qsize(), 0)
-    def testAggregate(self):
-        nrmsgs = self.publisher.aggregateSize
-        for i in range(nrmsgs):
-            self.assertEqual(self.subscriber.queue.qsize(), 0)
-            self.publisher.aggregate( {'msgnum':str(i)} )
-        for i in range(nrmsgs):
-            self.subscriber.queue.get(True, timeout=5)
-        self.assertEqual(self.subscriber.queue.qsize(), 0)
-    def testAggregateKeyword(self):
-        nrmsgs = self.publisher.aggregateSize
-        for i in range(nrmsgs):
-            self.assertEqual(self.subscriber.queue.qsize(), 0)
-            self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
-        for i in range(nrmsgs):
-            self.subscriber.queue.get(True, timeout=5)
-        self.assertEqual(self.subscriber.queue.qsize(), 0)
+	def setUp(self):
+		global portnum
+		self.broker = MessageBrokerThrift(portnum)
+		self.brokerPort = portnum
+		portnum = portnum + 1 
+		self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
+		self.publisher = PublisherThrift('localhost', self.brokerPort)
+		self.subscriber = TestSubscriberThrift(self.proxy, portnum)
+		portnum = portnum + 1
+	def tearDown(self):
+		pass
+	def testSetUp(self):
+		pass
+	def testPublish(self):
+		self.publisher.publish( {'message':'hello world'} )
+		self.subscriber.queue.get(True, timeout=5)
+		self.assertEqual(self.subscriber.queue.qsize(), 0)
+	def testPublishList(self):
+		nrmsgs = 10
+		msgs = []
+		for i in range(nrmsgs):
+			msgs.append( {'msgnum':str(i)} )
+		self.publisher.publishList( msgs )
+		for i in range(nrmsgs):
+			self.subscriber.queue.get(True, timeout=5)
+		self.assertEqual(self.subscriber.queue.qsize(), 0)
+	def testAggregate(self):
+		nrmsgs = self.publisher.aggregateSize
+		for i in range(nrmsgs):
+			self.assertEqual(self.subscriber.queue.qsize(), 0)
+			self.publisher.aggregate( {'msgnum':str(i)} )
+		for i in range(nrmsgs):
+			self.subscriber.queue.get(True, timeout=5)
+		self.assertEqual(self.subscriber.queue.qsize(), 0)
+	def testAggregateKeyword(self):
+		nrmsgs = self.publisher.aggregateSize
+		for i in range(nrmsgs):
+			self.assertEqual(self.subscriber.queue.qsize(), 0)
+			self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
+		for i in range(nrmsgs):
+			self.subscriber.queue.get(True, timeout=5)
+		self.assertEqual(self.subscriber.queue.qsize(), 0)
 
 
 if __name__=='__main__':
-    suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
-    unittest.TextTestRunner(verbosity=2).run(suite)
+	suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
+	unittest.TextTestRunner(verbosity=2).run(suite)
 
 

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Tue Feb 17 18:53:40 2009
@@ -49,8 +49,8 @@
 	server = TThreadedServer(processor, transport)
 	debugConsole(globals())
 	
-        notifier = notification.Notifier(config)
-        log.addHandler(notifier)
+	notifier = notification.Notifier(config)
+	log.addHandler(notifier)
 
 	try:
 		server.serve()

Modified: incubator/tashi/trunk/src/tashi/nodemanager/notification.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/notification.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/notification.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/notification.py Tue Feb 17 18:53:40 2009
@@ -18,23 +18,23 @@
 import tashi.messaging.tashimessaging
 
 class Notifier(tashi.messaging.tashimessaging.TashiLogHandler):
-    def vmExited(self, instance):
-        try:
-            isolatedRPC(self.cm, 'vmExited', self.hostId, vmId)
-        except Exception, e:
-            print "RPC failed for vmExited on CM"
-            print e
-            # FIXME: send this to the cm later
-            # self.exitedVms[vmId] = child
+	def vmExited(self, instance):
+		try:
+			isolatedRPC(self.cm, 'vmExited', self.hostId, vmId)
+		except Exception, e:
+			print "RPC failed for vmExited on CM"
+			print e
+			# FIXME: send this to the cm later
+			# self.exitedVms[vmId] = child
 
-        msg = {}
+		msg = {}
 
-        msg['timestamp'] = str(time.time())
-        msg['hostname'] = ''    # FIXME: fill this in
-        msg['message-type'] = 'vm-event'
-        msg['vm-event'] = 'vm-exited'
-        
-        msg['instance-id'] = str(instance.id)
-        msg['host-id'] = str(instance.hostId)
-        print 'Notifier publishing ', msg
-        self.publish(message)
+		msg['timestamp'] = str(time.time())
+		msg['hostname'] = ''	# FIXME: fill this in
+		msg['message-type'] = 'vm-event'
+		msg['vm-event'] = 'vm-exited'
+		
+		msg['instance-id'] = str(instance.id)
+		msg['host-id'] = str(instance.hostId)
+		print 'Notifier publishing ', msg
+		self.publish(message)

Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py Tue Feb 17 18:53:40 2009
@@ -108,7 +108,7 @@
 		self.dfs = dfs
 		self.cm = cm
 
-                self.vmNamePrefix = self.config.get("XenPV", "vmNamePrefix")
+		self.vmNamePrefix = self.config.get("XenPV", "vmNamePrefix")
 		self.transientDir = self.config.get('XenPV', 'transientDir')
 
 		self.newvms = listVms(self.vmNamePrefix)
@@ -130,15 +130,15 @@
 		for vmId in self.newvms.keys():
 			if not vmlist.has_key(vmId):
 				a = self.newvms.pop(vmId)
-                                # If the vm had transient disks, delete them
-                                for i in range(len(a.disks)):
-                                        if a.disks[i].persistent == False:
-                                                diskname = self.transientDisk(a.id, i)
-                                                try:
-                                                        os.unlink(diskname)
-                                                except:
-                                                        print 'WARNING could not delete transient disk %s' % diskname
-                                self.nm.vmStateChange(a.vmId, a.state, InstanceState.Exited)
+				# If the vm had transient disks, delete them
+				for i in range(len(a.disks)):
+					if a.disks[i].persistent == False:
+						diskname = self.transientDisk(a.id, i)
+						try:
+							os.unlink(diskname)
+						except:
+							print 'WARNING could not delete transient disk %s' % diskname
+				self.nm.vmStateChange(a.vmId, a.state, InstanceState.Exited)
 		for vmId in vmlist.keys():
 			if not self.newvms.has_key(vmId):
 				print 'WARNING: found vm that should be managed, but is not'
@@ -175,17 +175,17 @@
 		f.close()
 		return fn
 	def deleteXenConfig(self, vmName):
-                pass
+		pass
 #		os.unlink(os.path.join("/tmp", vmName))
 ########################################
 
-        def vmName(self, instanceId):
-                return "%s-%i"%(self.vmNamePrefix, int(instanceId))
-        def transientDisk(self, instanceId, disknum):
-                newdisk = os.path.join(self.transientDir,
-                                       'tashi-%i-%i.qcow' %(instanceId, disknum))
-                return newdisk
-                
+	def vmName(self, instanceId):
+		return "%s-%i"%(self.vmNamePrefix, int(instanceId))
+	def transientDisk(self, instanceId, disknum):
+		newdisk = os.path.join(self.transientDir,
+				       'tashi-%i-%i.qcow' %(instanceId, disknum))
+		return newdisk
+		
 
 	@synchronizedmethod
 	def instantiateVm(self, instance):
@@ -203,9 +203,9 @@
 			imageLocal = self.dfs.getLocalHandle(instance.disks[i].uri)
 			instance.disks[i].local = imageLocal
 			if instance.disks[i].persistent == False:
-                                newdisk = self.transientDisk(instance.id, i)
-                                cmd = 'qcow-create 0 %s %s' % (newdisk, imageLocal)
-                                print 'creating new disk with "%s"' % cmd
+				newdisk = self.transientDisk(instance.id, i)
+				cmd = 'qcow-create 0 %s %s' % (newdisk, imageLocal)
+				print 'creating new disk with "%s"' % cmd
 				os.system(cmd)
 				instance.disks[i].local = newdisk
 
@@ -217,7 +217,7 @@
 					  instance.typeObj.cores)
 		cmd = "xm create %s"%fn
 		r = os.system(cmd)
-#                self.deleteXenConfig(name)
+#		self.deleteXenConfig(name)
 		if r != 0:
 			print 'WARNING: "%s" returned %i' % ( cmd, r)
 			raise Exception, 'WARNING: "%s" returned %i' % ( cmd, r)
@@ -248,7 +248,7 @@
 		name = domIdToName(vmId)
 		cPickle.dump(instance, infof)
 		infof.close()
-                
+		
 
 		# FIXME: handle errors
 		cmd = "xm save %i %s"%(vmId, tmpfile)
@@ -258,7 +258,7 @@
 			raise Exception,  "replace this with a real exception!"
 		r = self.dfs.copyTo(tmpfile, target)
 		self.newvms.pop(vmId)
- 		os.unlink(tmpfile)
+		os.unlink(tmpfile)
 		return vmId
 	
 	@synchronizedmethod
@@ -288,13 +288,13 @@
 	@synchronizedmethod
 	def migrateVm(self, vmId, target, transportCookie):
 		cmd = "xm migrate -l %i %s"%(vmId, target)
-                r = os.system(cmd)
-                if r != 0:
-                        # FIXME: throw exception
-                        print "migrate failed for VM %i"%vmId
-                        raise Exception,  "migrate failed for VM %i"%vmId
+		r = os.system(cmd)
+		if r != 0:
+			# FIXME: throw exception
+			print "migrate failed for VM %i"%vmId
+			raise Exception,  "migrate failed for VM %i"%vmId
 		self.newvms.pop(vmId)
-                return vmId
+		return vmId
 	@synchronizedmethod
 	def receiveVm(self, transportCookie):
 		instance = cPickle.loads(transportCookie)
@@ -349,19 +349,19 @@
 		return self.newvms.keys()
 
 
-        @synchronizedmethod
-        def getHostInfo(self):
-                host = Host()
-                memp = subprocess.Popen("xm info | awk '/^total_memory/ { print $3 }' ",
-                                        shell = True,
-                                        stdout = subprocess.PIPE)
-                mems = memp.stdout.readline()
-                host.memory = int(mems)
-                corep = subprocess.Popen("xm info | awk '/^nr_cpus/ { print $3 }' ",
-                                        shell = True,
-                                        stdout = subprocess.PIPE)
-                cores = corep.stdout.readline()
-                host.cores = int(cores)
-                return host
+	@synchronizedmethod
+	def getHostInfo(self):
+		host = Host()
+		memp = subprocess.Popen("xm info | awk '/^total_memory/ { print $3 }' ",
+					shell = True,
+					stdout = subprocess.PIPE)
+		mems = memp.stdout.readline()
+		host.memory = int(mems)
+		corep = subprocess.Popen("xm info | awk '/^nr_cpus/ { print $3 }' ",
+					shell = True,
+					stdout = subprocess.PIPE)
+		cores = corep.stdout.readline()
+		host.cores = int(cores)
+		return host
 
 

Modified: incubator/tashi/trunk/src/tashi/parallel.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/parallel.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/parallel.py (original)
+++ incubator/tashi/trunk/src/tashi/parallel.py Tue Feb 17 18:53:40 2009
@@ -23,94 +23,94 @@
 _log = logging.getLogger('tashi.parallel')
 
 def threaded(func):
-    def fn(*args, **kwargs):
-        thread = threading.Thread(target=func, args=args, kwargs=kwargs)
-        thread.start()
-        return thread
-    return fn
+	def fn(*args, **kwargs):
+		thread = threading.Thread(target=func, args=args, kwargs=kwargs)
+		thread.start()
+		return thread
+	return fn
 
 
 class ThreadPool(Queue.Queue):
-    def __init__(self, size=8, maxsize=0):
-        Queue.Queue.__init__(self, maxsize)
-        for i in range(size):
-            thread = threading.Thread(target=self._worker)
-            thread.setDaemon(True)
-            thread.start()
-    def _worker(self):
-        while True:
-            try:
-                func, args, kwargs = self.get()
-                func(*args, **kwargs)
-            except Exception, e:
-                _log.error(e)
-                # FIXME: do something smarter here, backtrace, log,
-                # allow user-defined error handling...
-                
-    def submit(self, func, *args, **kwargs):
-        self.put((func, args, kwargs))
-    def submitlist(self, func, args, kwargs):
-        self.put((func, args, kwargs))
+	def __init__(self, size=8, maxsize=0):
+		Queue.Queue.__init__(self, maxsize)
+		for i in range(size):
+			thread = threading.Thread(target=self._worker)
+			thread.setDaemon(True)
+			thread.start()
+	def _worker(self):
+		while True:
+			try:
+				func, args, kwargs = self.get()
+				func(*args, **kwargs)
+			except Exception, e:
+				_log.error(e)
+				# FIXME: do something smarter here, backtrace, log,
+				# allow user-defined error handling...
+				
+	def submit(self, func, *args, **kwargs):
+		self.put((func, args, kwargs))
+	def submitlist(self, func, args, kwargs):
+		self.put((func, args, kwargs))
 
 class ThreadPoolClass:
-    def __init__(self, size=8, maxsize=0):
-        self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
+	def __init__(self, size=8, maxsize=0):
+		self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
 
 
 def threadpool(pool):
-    def dec(func):
-        def fn(*args, **kwargs):
-            pool.submit(func, *args, **kwargs)
-        return fn
-    return dec
+	def dec(func):
+		def fn(*args, **kwargs):
+			pool.submit(func, *args, **kwargs)
+		return fn
+	return dec
 
 def threadpoolmethod(meth):
-    def fn(*args, **kwargs):
-        try:
-            pool = args[0]._threadpool_pool
-        except AttributeError:
-            pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
-        # FIXME: how do we check parent class?
-#        assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
-        pool.submit(meth, *args, **kwargs)
-    return fn
+	def fn(*args, **kwargs):
+		try:
+			pool = args[0]._threadpool_pool
+		except AttributeError:
+			pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
+		# FIXME: how do we check parent class?
+#		assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
+		pool.submit(meth, *args, **kwargs)
+	return fn
 
 def synchronized(lock=None):
-    if lock==None:
-        lock = threading.RLock()
-    def dec(func):
-        def fn(*args, **kwargs):
-            lock.acquire()
-            ex = None
-            try:
-                r = func(*args, **kwargs)
-            except Exception, e:
-                ex = e
-            lock.release()
-            if ex != None:
-                raise e
-            return r
-        return fn
-    return dec
-            
+	if lock==None:
+		lock = threading.RLock()
+	def dec(func):
+		def fn(*args, **kwargs):
+			lock.acquire()
+			ex = None
+			try:
+				r = func(*args, **kwargs)
+			except Exception, e:
+				ex = e
+			lock.release()
+			if ex != None:
+				raise e
+			return r
+		return fn
+	return dec
+			
 def synchronizedmethod(func):
-    def fn(*args, **kwargs):
-        try:
-            lock = args[0]._synchronized_lock
-        except AttributeError:
-            lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
-        lock.acquire()
-        ex = None
-        try:
-            res = func(*args, **kwargs)
-        except Exception, e:
-            ex = e
-        lock.release()
-        if ex != None:
-            raise e
-        return res
-    return fn
-        
+	def fn(*args, **kwargs):
+		try:
+			lock = args[0]._synchronized_lock
+		except AttributeError:
+			lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
+		lock.acquire()
+		ex = None
+		try:
+			res = func(*args, **kwargs)
+		except Exception, e:
+			ex = e
+		lock.release()
+		if ex != None:
+			raise e
+		return res
+	return fn
+		
 
 ##############################
 # Test Code
@@ -120,181 +120,181 @@
 import time
 
 class TestThreadPool(unittest.TestCase):
-    def setUp(self):
-        self.errmargin = 0.5
+	def setUp(self):
+		self.errmargin = 0.5
 
-    def testUnthreaded(self):
-        queue = Queue.Queue()
-        def slowfunc(sleep=1):
-            time.sleep(sleep)
-            queue.put(None)
-        tt = time.time()
-        for i in range(4):
-            slowfunc()
-        for i in range(4):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 4, 1) 
-
-    def testThreaded(self):
-        queue = Queue.Queue()
-        @threaded
-        def slowthreadfunc(sleep=1):
-            time.sleep(sleep)
-            queue.put(None)
-        tt = time.time()
-        for i in range(8):
-            slowthreadfunc()
-        for i in range(8):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 1, 1) 
-
-    def testThreadPool(self):
-        pool = ThreadPool(size=4)
-        queue = Queue.Queue()
-        @threadpool(pool)
-        def slowpoolfunc(sleep=1):
-            time.sleep(sleep)
-            queue.put(None)
-        tt = time.time()
-        for i in range(8):
-            slowpoolfunc()
-        for i in range(8):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 2, 1) 
-
-    def testUnthreadedMethod(self):
-        queue = Queue.Queue()
-        class slowclass:
-            def __init__(self, sleep=1):
-                self.sleep=sleep
-            def beslow(self):
-                time.sleep(self.sleep)
-                queue.put(None)
-        sc = slowclass()
-        tt = time.time()
-        for i in range(4):
-            sc.beslow()
-        for i in range(4):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 4, 1)
-    
-    def testThreadedMethod(self):
-        queue = Queue.Queue()
-        class slowclass:
-            def __init__(self, sleep=1):
-                self.sleep=sleep
-            @threaded
-            def beslow(self):
-                time.sleep(self.sleep)
-                queue.put(None)
-        sc = slowclass()
-        tt = time.time()
-        for i in range(4):
-            sc.beslow()
-        for i in range(4):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 1, 1)
-    
-    def testThreadPoolMethod(self):
-        queue = Queue.Queue()
-        class slowclass:
-            def __init__(self, sleep=1):
-                self.sleep=sleep
-            @threadpoolmethod
-            def beslow(self):
-                time.sleep(self.sleep)
-                queue.put(None)
-        sc = slowclass()
-        tt = time.time()
-        for i in range(16):
-            sc.beslow()
-        for i in range(16):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 2, 1)
-    
-    def testSynchronized(self):
-        queue = Queue.Queue()
-        @synchronized()
-        def addtoqueue():
-            time.sleep(1)
-            queue.put(None)
-        @threaded
-        def slowthreadfunc():
-            addtoqueue()
-        tt = time.time()
-        for i in range(4):
-            slowthreadfunc()
-        for i in range(4):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 4, 1) 
-
-    def testSynchronizedMethod(self):
-        queue = Queue.Queue()
-        class addtoqueue:
-            @synchronizedmethod
-            def addtoqueue1(self):
-                time.sleep(1)
-                queue.put(None)
-            @synchronizedmethod
-            def addtoqueue2(self):
-                time.sleep(1)
-                queue.put(None)
-        atc = addtoqueue()
-        @threaded
-        def slowthreadfunc1():
-            atc.addtoqueue1()
-        @threaded
-        def slowthreadfunc2():
-            atc.addtoqueue2()
-        tt = time.time()
-        for i in range(4):
-            slowthreadfunc1()
-            slowthreadfunc2()
-        for i in range(8):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 8, 1) 
-
-    def testUnsynchronizedMethod(self):
-        queue = Queue.Queue()
-        class addtoqueue:
-            def addtoqueue1(self):
-                time.sleep(1)
-                queue.put(None)
-            def addtoqueue2(self):
-                time.sleep(1)
-                queue.put(None)
-        atc = addtoqueue()
-        @threaded
-        def slowthreadfunc1():
-            atc.addtoqueue1()
-        @threaded
-        def slowthreadfunc2():
-            atc.addtoqueue2()
-        tt = time.time()
-        for i in range(4):
-            slowthreadfunc1()
-            slowthreadfunc2()
-        for i in range(8):
-            queue.get()
-        tt = time.time() - tt
-        self.assertAlmostEqual(tt, 1, 1) 
+	def testUnthreaded(self):
+		queue = Queue.Queue()
+		def slowfunc(sleep=1):
+			time.sleep(sleep)
+			queue.put(None)
+		tt = time.time()
+		for i in range(4):
+			slowfunc()
+		for i in range(4):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 4, 1) 
+
+	def testThreaded(self):
+		queue = Queue.Queue()
+		@threaded
+		def slowthreadfunc(sleep=1):
+			time.sleep(sleep)
+			queue.put(None)
+		tt = time.time()
+		for i in range(8):
+			slowthreadfunc()
+		for i in range(8):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 1, 1) 
+
+	def testThreadPool(self):
+		pool = ThreadPool(size=4)
+		queue = Queue.Queue()
+		@threadpool(pool)
+		def slowpoolfunc(sleep=1):
+			time.sleep(sleep)
+			queue.put(None)
+		tt = time.time()
+		for i in range(8):
+			slowpoolfunc()
+		for i in range(8):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 2, 1) 
+
+	def testUnthreadedMethod(self):
+		queue = Queue.Queue()
+		class slowclass:
+			def __init__(self, sleep=1):
+				self.sleep=sleep
+			def beslow(self):
+				time.sleep(self.sleep)
+				queue.put(None)
+		sc = slowclass()
+		tt = time.time()
+		for i in range(4):
+			sc.beslow()
+		for i in range(4):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 4, 1)
+	
+	def testThreadedMethod(self):
+		queue = Queue.Queue()
+		class slowclass:
+			def __init__(self, sleep=1):
+				self.sleep=sleep
+			@threaded
+			def beslow(self):
+				time.sleep(self.sleep)
+				queue.put(None)
+		sc = slowclass()
+		tt = time.time()
+		for i in range(4):
+			sc.beslow()
+		for i in range(4):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 1, 1)
+	
+	def testThreadPoolMethod(self):
+		queue = Queue.Queue()
+		class slowclass:
+			def __init__(self, sleep=1):
+				self.sleep=sleep
+			@threadpoolmethod
+			def beslow(self):
+				time.sleep(self.sleep)
+				queue.put(None)
+		sc = slowclass()
+		tt = time.time()
+		for i in range(16):
+			sc.beslow()
+		for i in range(16):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 2, 1)
+	
+	def testSynchronized(self):
+		queue = Queue.Queue()
+		@synchronized()
+		def addtoqueue():
+			time.sleep(1)
+			queue.put(None)
+		@threaded
+		def slowthreadfunc():
+			addtoqueue()
+		tt = time.time()
+		for i in range(4):
+			slowthreadfunc()
+		for i in range(4):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 4, 1) 
+
+	def testSynchronizedMethod(self):
+		queue = Queue.Queue()
+		class addtoqueue:
+			@synchronizedmethod
+			def addtoqueue1(self):
+				time.sleep(1)
+				queue.put(None)
+			@synchronizedmethod
+			def addtoqueue2(self):
+				time.sleep(1)
+				queue.put(None)
+		atc = addtoqueue()
+		@threaded
+		def slowthreadfunc1():
+			atc.addtoqueue1()
+		@threaded
+		def slowthreadfunc2():
+			atc.addtoqueue2()
+		tt = time.time()
+		for i in range(4):
+			slowthreadfunc1()
+			slowthreadfunc2()
+		for i in range(8):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 8, 1) 
+
+	def testUnsynchronizedMethod(self):
+		queue = Queue.Queue()
+		class addtoqueue:
+			def addtoqueue1(self):
+				time.sleep(1)
+				queue.put(None)
+			def addtoqueue2(self):
+				time.sleep(1)
+				queue.put(None)
+		atc = addtoqueue()
+		@threaded
+		def slowthreadfunc1():
+			atc.addtoqueue1()
+		@threaded
+		def slowthreadfunc2():
+			atc.addtoqueue2()
+		tt = time.time()
+		for i in range(4):
+			slowthreadfunc1()
+			slowthreadfunc2()
+		for i in range(8):
+			queue.get()
+		tt = time.time() - tt
+		self.assertAlmostEqual(tt, 1, 1) 
 
 
 
 if __name__=='__main__':
-    import sys
+	import sys
 
-    logging.basicConfig(level=logging.INFO,
-                        format="%(asctime)s %(levelname)s:\t %(message)s",
-                        stream=sys.stdout)
+	logging.basicConfig(level=logging.INFO,
+	                    format="%(asctime)s %(levelname)s:\t %(message)s",
+	                    stream=sys.stdout)
 
-    suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
-    unittest.TextTestRunner(verbosity=2).run(suite)
+	suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
+	unittest.TextTestRunner(verbosity=2).run(suite)

Modified: incubator/tashi/trunk/src/tashi/thrift/build.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/thrift/build.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/thrift/build.py (original)
+++ incubator/tashi/trunk/src/tashi/thrift/build.py Tue Feb 17 18:53:40 2009
@@ -41,10 +41,10 @@
 	print 'Copying generated code to \'tashi.services\' package...'
 	shutil.copytree('gen-py/services', '../services')
 	
-        print 'Generatign Python code for \'messagingthrift\'...'
-        os.system('rm -rf gen-py')
-        os.system('thrift --gen py messagingthrift.thrift')
-        
-        print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
-        shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
-                        os.path.join('..', 'messaging', 'messagingthrift'))
+	print 'Generatign Python code for \'messagingthrift\'...'
+	os.system('rm -rf gen-py')
+	os.system('thrift --gen py messagingthrift.thrift')
+	
+	print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
+	shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
+			os.path.join('..', 'messaging', 'messagingthrift'))

Modified: incubator/tashi/trunk/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/util.py?rev=745191&r1=745190&r2=745191&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/util.py (original)
+++ incubator/tashi/trunk/src/tashi/util.py Tue Feb 17 18:53:40 2009
@@ -155,26 +155,26 @@
 	return res
 
 def signalHandler(signalNumber):
-    """Used to denote a particular function as the signal handler for a 
-       specific signal"""
-    def __decorator(function):
-        signal.signal(signalNumber, function)
-        return function
-    return __decorator
+	"""Used to denote a particular function as the signal handler for a 
+	   specific signal"""
+	def __decorator(function):
+		signal.signal(signalNumber, function)
+		return function
+	return __decorator
 
 def boolean(value):
-    """Convert a variable to a boolean"""
-    if (type(value) == types.BooleanType):
-        return value
-    if (type(value) == types.IntType):
-        return (value != 0)
-    lowercaseValue = value.lower()
-    if lowercaseValue in ['yes', 'true', '1']:
-        return True
-    elif lowercaseValue in ['no', 'false', '0']:
-        return False
-    else:
-        raise ValueError
+	"""Convert a variable to a boolean"""
+	if (type(value) == types.BooleanType):
+		return value
+	if (type(value) == types.IntType):
+		return (value != 0)
+	lowercaseValue = value.lower()
+	if lowercaseValue in ['yes', 'true', '1']:
+		return True
+	elif lowercaseValue in ['no', 'false', '0']:
+		return False
+	else:
+		raise ValueError
 
 def instantiateImplementation(className, *args):
 	"""Create an instance of an object with the given class name and list