You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2008/11/03 14:45:27 UTC

svn commit: r710072 [2/3] - in /incubator/tashi/import: ./ tashi-intel-r399/ tashi-intel-r399/doc/ tashi-intel-r399/etc/ tashi-intel-r399/guest/ tashi-intel-r399/scripts/ tashi-intel-r399/src/ tashi-intel-r399/src/tashi/ tashi-intel-r399/src/tashi/agen...

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/dfsinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/dfsinterface.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/dfsinterface.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/dfsinterface.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,38 @@
+class DfsInterface:
+	def __init__(self, config):
+		if self.__class__ is DfsInterface:
+			raise NotImplementedError
+		self.config = config
+	
+	def copyTo(self, localSrc, dst):
+		raise NotImplementedError
+	
+	def copyFrom(self, src, localDst):
+		raise NotImplementedError
+	
+	def list(self, path):
+		raise NotImplementedError
+	
+	def stat(self, path):
+		raise NotImplementedError
+	
+	def move(self, src, dst):
+		raise NotImplementedError
+	
+	def copy(self, src, dst):
+		raise NotImplementedError
+	
+	def mkdir(self, path):
+		raise NotImplementedError
+	
+	def unlink(self, path):
+		raise NotImplementedError
+	
+	def rmdir(self, path):
+		raise NotImplementedError
+	
+	def open(self, path, perm):
+		raise NotImplementedError
+	
+	def getLocalHandle(self, path):
+		raise NotImplementedError

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/vfs.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/vfs.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/vfs.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/vfs.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,59 @@
+import os
+import os.path
+from dfsinterface import DfsInterface
+
+class Vfs(DfsInterface):
+	def __init__(self, config):
+		DfsInterface.__init__(self, config)
+		self.prefix = self.config.get("Vfs", "prefix")
+	
+	def copyTo(self, localSrc, dst):
+		(si, so, se) = os.popen3("cp %s %s" % (localSrc, 
+						       os.path.join(self.prefix, dst)))
+		so.readlines()
+		return None
+	
+	def copyFrom(self, src, localDst):
+		(si, so, se) = os.popen3("cp %s %s" % (os.path.join(self.prefix, src),
+						       localDst))
+		so.readlines()
+		return None
+
+	def list(self, path):
+		try:
+			return os.listdir(os.path.join(self.prefix, path))
+		except OSError, e:
+			if (e.errno == 20):
+				return [path.split('/')[-1]]
+			else:
+				raise
+	
+	def stat(self, path):
+		return os.stat(os.path.join(self.prefix, path))
+	
+	def move(self, src, dst):
+		(si, so, se) = os.popen3("mv %s %s" % (os.path.join(self.prefix, src), 
+						       os.path.join(self.prefix, dst)))
+		so.readlines()
+		return None
+	
+	def copy(self, src, dst):
+		(si, so, se) = os.popen3("cp %s %s" % (os.path.join(self.prefix, src), 
+						       os.path.join(self.prefix, dst)))
+		so.readlines()
+		return None
+	
+	def mkdir(self, path):
+		return os.mkdir(os.path.join(self.prefix, path))
+	
+	def unlink(self, path):
+		return os.unlink(os.path.join(self.prefix, path))
+	
+	def rmdir(self, path):
+		return os.rmdir(os.path.join(self.prefix, path))
+	
+	def open(self, path, perm):
+		return open(os.path.join(self.prefix, path), perm)
+	
+	def getLocalHandle(self, path):
+		return os.path.join(self.prefix, path)

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/__init__.py?rev=710072&view=auto
==============================================================================
    (empty)

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messageBroker.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messageBroker.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messageBroker.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messageBroker.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,32 @@
+import ConfigParser
+import getopt
+
+import os
+import sys
+import time
+
+import thriftmessaging
+
+options = []
+long_options = ['port=']
+
+# FIXME: should initialize from config file
+params = {"port":1717}
+
+try:
+    optlist, args = getopt.getopt(sys.argv[1:], options, long_options)
+except getopt.GetoptError, err:
+    print str(err)
+    sys.exit(2)
+
+for opt in optlist:
+    if opt[0] == "--port":
+        try:
+            params["port"] = int(opt[1])
+        except:
+            print "--port expects an integer, got %s" % opt[1]
+            sys.exit(0)
+
+print "Starting message broker on port %i" % params["port"]
+broker = thriftmessaging.MessageBrokerThrift(params["port"], daemon=False)
+

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messaging.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messaging.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messaging.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,321 @@
+#!/usr/bin/python
+
+
+import threading
+import thread
+import sys
+import os
+import socket
+import Queue
+import copy
+import random
+import traceback
+
+from threadpool import ThreadPoolClass, threadpool, ThreadPool
+from threadpool import threadpoolmethod, threaded, synchronized, synchronizedmethod
+
+class RWLock():
+    """RWLock: Simple reader/writer lock implementation
+    FIXME: this implementation will starve writers!
+    Methods:
+        acquire() : take lock for read access
+        release() : release lock from read access
+        acquireWrite() : take lock for write access
+        releaseWrite() : release lock from write access"""
+    def __init__(self):
+        self.lock = threading.Condition()
+        self.readers = 0
+    def acquire(self):
+        self.lock.acquire()
+        self.readers = self.readers + 1
+        self.lock.release()
+    def release(self):
+        self.lock.acquire()
+        self.readers = self.readers - 1
+        self.lock.notify()
+        self.lock.release()
+    def acquireWrite(self):
+        self.lock.acquire()
+        while self.readers > 0:
+            self.lock.wait()
+    def releaseWrite(self):
+        self.lock.notify()
+        self.lock.release()
+
+
+
+class MessageBroker():
+    def __init__(self):
+        self.sublock = RWLock()
+        self.subscribers = []
+        self.random = random.Random()
+    def log(self, msg):
+        print "MessageBroker: Got log: '%s'" % str(msg)
+        return msg
+    def addSubscriber(self, subscriber):
+        self.sublock.acquireWrite()
+        self.subscribers.append(subscriber)
+        l = len(self.subscribers)
+        self.sublock.releaseWrite()
+        return l
+    def publish(self, message):
+        removesubs = []
+        i = self.random.randint(0,100)
+
+#         subscribers = self.getSubscribers()
+#         random.shuffle(subscribers)
+
+        self.sublock.acquire()
+
+        sys.stdout.flush()
+
+        for subscriber in self.subscribers:
+            try:
+                sys.stdout.flush()
+                assert(subscriber != self)
+                subscriber.publish(message)
+                sys.stdout.flush()
+            except Exception, e:
+                print e
+                removesubs.append(subscriber)
+
+        self.sublock.release()
+
+        if len(removesubs) > 0:
+            print "detected %i failed subscribers" % len(removesubs)
+            sys.stdout.flush()
+            self.sublock.acquireWrite()
+            for subscriber in removesubs:
+                try:
+                    self.subscribers.remove(subscriber)
+                except:
+                    pass
+            self.sublock.releaseWrite()
+    def getSubscribers(self):
+        self.sublock.acquire()
+        subs = copy.copy(self.subscribers)
+        self.sublock.release()
+        return subs
+    def removeSubscriber(self, subscriber):
+        self.sublock.acquireWrite()
+        try:
+            self.subscribers.remove(subscriber)
+        except:
+            pass
+        self.sublock.releaseWrite()
+    def publishList(self, messages):
+        for message in messages:
+            self.publish(message)
+
+class Subscriber():
+    def __init__(self, broker, pmatch={}, nmatch={}, synchronized=False):
+        self.broker = broker
+        self.lock = threading.Lock()
+        self.synchronized = synchronized
+        self.pmatch={}
+        self.nmatch={}
+        broker.addSubscriber(self)
+    def publish(self, message):
+        sys.stdout.flush()
+        msg = message
+        try:
+            if self.synchronized:
+                self.lock.acquire()
+            msg = self.filter(msg)
+            if (msg != None):
+                self.handle(msg)
+            if self.synchronized:
+                self.lock.release()
+        except Exception, x:
+            if self.synchronized:
+                self.lock.release()
+            print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+    def publishList(self, messages):
+        for message in messages:
+            self.publish(message)
+    def handle(self, message):
+        print "Subscriber Default Handler: '%s'" % message
+    def setMatch(self, pmatch={}, nmatch={}):
+        self.lock.acquire()
+        self.pmatch=pmatch
+        self.nmatch=nmatch
+        self.lock.release()
+    def filter(self, message):
+        """filter(self, message) : the filter function returns
+        the message, modified to be passed to the handler.
+        Returning (None) indicates that this is not a message
+        we are interested in, and it will not be passed to the
+        handler."""
+        send = True
+        for key in self.pmatch.keys():
+            if (not message.has_key(key)):
+                send = False
+                break
+            if self.pmatch[key] != None:
+                if message[key] != self.pmatch[key]:
+                    send = False
+                    break
+        if send == False:
+            return None
+        for key in message.keys():
+            if self.nmatch.has_key(key):
+                if self.nmatch[key] == None:
+                    send = False
+                    break
+                if self.nmatch[key] == message[key]:
+                    send = False
+                    break
+        if send == False:
+            return None
+        return message
+
+
+    
+class Publisher():
+    '''Superclass for pub/sub publishers
+
+    FIXME: use finer-grained locking'''
+    def __init__(self, broker, aggregate=100):
+        self.pending = []
+        self.pendingLock = threading.Lock()
+        self.aggregateSize = aggregate
+        self.broker = broker
+    @synchronizedmethod
+    def publish(self, message):
+        if message.has_key('aggregate') and message['aggregate'] == 'True':
+            self.aggregate(message)
+            return
+        else:
+            self.broker.publish(message)
+    @synchronizedmethod
+    def publishList(self, messages):
+        self.broker.publishList(messages)
+    @synchronizedmethod
+    def aggregate(self, message):
+        # we can make this lock-less by using a queue for pending
+        # messages
+        self.pendingLock.acquire()
+        self.pending.append(message)
+        if len(self.pending) >= self.aggregateSize:
+            self.broker.publishList(self.pending)
+            self.pending = []
+        self.pendingLock.release()
+    @synchronizedmethod
+    def setBroker(self, broker):
+        self.broker = broker
+
+##############################
+# Testing Code
+##############################
+import time
+import unittest
+import sys
+import logging
+
+        
+class TestSubscriber(Subscriber):
+    def __init__(self, *args, **kwargs):
+        self.queue = Queue.Queue()
+        Subscriber.__init__(self, *args, **kwargs)
+    def handle(self, message):
+        self.queue.put(message)
+
+class TestMessaging(unittest.TestCase):
+    def setUp(self):
+        self.broker = MessageBroker()
+        self.publisher = Publisher(self.broker)
+        self.subscriber = TestSubscriber(self.broker)
+    def testPublish(self):
+        self.publisher.publish( {'message':'hello world'} )
+        self.assertEqual(self.subscriber.queue.qsize(), 1)
+    def testPublishList(self):
+        nrmsgs = 10
+        msgs = []
+        for i in range(nrmsgs):
+            msgs.append( {'msgnum':str(i)} )
+        self.publisher.publishList( msgs )
+        self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
+    def testAggregate(self):
+        nrmsgs = self.publisher.aggregateSize
+        for i in range(nrmsgs):
+            self.assertEqual(self.subscriber.queue.qsize(), 0)
+            self.publisher.aggregate( {'msgnum':str(i)} )
+        self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
+    def testAggregateKeyword(self):
+        nrmsgs = self.publisher.aggregateSize
+        for i in range(nrmsgs):
+            self.assertEqual(self.subscriber.queue.qsize(), 0)
+            self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
+        self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
+
+if __name__ == '__main__':
+
+    logging.basicConfig(level=logging.INFO,
+                        format="%(asctime)s %(levelname)s:\t %(message)s",
+                        stream=sys.stdout)
+
+    suite = unittest.TestLoader().loadTestsFromTestCase(TestMessaging)
+    unittest.TextTestRunner(verbosity=2).run(suite) 
+
+    sys.exit(0)
+
+
+##############################
+# Old/Unused testing code
+##############################
+
+
+
+    print 'testing removeSubscriber'
+    broker.removeSubscriber(subscriber)
+    publisher.publish( {'message':"you shouldn't see this"} )
+
+    nsub = NullSubscriber(broker)
+    print 'timing publish'
+    nrmsg = 100000
+    tt = time.time()
+    for i in range(nrmsg):
+#        publisher.publish( {"message":"hello world!"} )
+        publisher.publish( {} )
+    tt = time.time() - tt
+    print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
+                                                           tt,
+                                                           nrmsg/tt)
+    broker.removeSubscriber(nsub)
+
+    class SlowSubscriber(Subscriber):
+        def handle(self, message):
+            print 'called slow subscriber with message', message
+            time.sleep(1)
+            print 'returning from slow subscriber with message', message
+    class ThreadedSubscriber(Subscriber):
+        @threaded
+        def handle(self, message):
+            print 'called threaded subscriber with message', message
+            time.sleep(1)
+            print 'returning from threaded subscriber with message', message
+    class ThreadPoolSubscriber(Subscriber, ThreadPoolClass):
+        @threadpoolmethod
+        def handle(self, message):
+            print 'called threadpool subscriber with message', message
+            time.sleep(1)
+            print 'returning from threadpool subscriber with message', message
+
+
+
+    tsub = ThreadedSubscriber(broker)
+    for i in range(8):
+        publisher.publish( {"msg":str(i)} )
+    broker.removeSubscriber(tsub)
+    time.sleep(3)
+
+    tpsub = ThreadPoolSubscriber(broker)
+    for i in range(8):
+        publisher.publish( {"msg":str(i)} )
+    broker.removeSubscriber(tpsub)
+    time.sleep(3)
+
+    ssub = SlowSubscriber(broker)
+    for i in range(4):
+        publisher.publish( {"msg":str(i)} )
+    broker.removeSubscriber(ssub)

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,213 @@
+#! /usr/bin/python
+
+
+from messaging import *
+
+import cPickle
+import soaplib.wsgi_soap
+import cherrypy.wsgiserver
+from soaplib.service import soapmethod
+from soaplib.serializers.primitive import *
+import SOAPpy.WSDL
+import time
+
+class MessageBrokerSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, MessageBroker):
+    def __init__(self, port):
+        soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
+        MessageBroker.__init__(self)
+        self.port = port
+        def trdfn():
+            service = self
+            server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
+            server.start()
+        threading.Thread(target=trdfn).start()
+
+
+    @soapmethod(Array(String), Array(String), _returns=Null)
+    def log(self, keys, values):
+        message = {}
+        if len(keys) != len(values):
+            raise Exception, "Different lengths for keys and values"
+        for i in range(len(keys)):
+            message[keys[i]] = values[i]
+        MessageBroker.log(self, message)
+
+    @soapmethod(String, Integer, _returns=Null)
+    def addSubscriber(self, host, port):
+        subscriber = SubscriberSoapProxy(host, port)
+        MessageBroker.addSubscriber(self, subscriber)
+    
+    @soapmethod(String, Integer, _returns=Null)
+    def removeSubscriber(self, host, port):
+        # should this method really be able to peek into subscriber.host/port 
+        subscriber = None
+        subscribers = self.getSubscribers()
+        for subscriber in subscribers:
+            if subscriber.host == host and subscriber.port == port:
+                subscriber = subscriber
+        if subscriber != None:
+            MessageBroker.removeSubscriber(self, subscriber)
+        
+
+    @soapmethod(Array(String), Array(String), _returns=Null)
+    def publish(self, keys, values):
+        message = {}
+        if len(keys) != len(values):
+            raise Exception, "Different lengths for keys and values"
+        for i in range(len(keys)):
+            message[keys[i]] = values[i]
+        MessageBroker.publish(self, message)
+
+
+
+class MessageBrokerSoapProxy():
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
+    def log(self, message):
+        keys = []
+        values = []
+        for k,v in message.items():
+            keys.append(k)
+            values.append(v)
+        self.connection.log(keys=keys, values=values)
+    def addSubscriber(self, subscriber):
+        self.connection.addSubscriber(host=subscriber.host, port=subscriber.port)
+    def publish(self, message):
+        keys = []
+        values = []
+        for k,v in message.items():
+            keys.append(k)
+            values.append(v)
+        self.connection.publish(keys=keys, values=values)
+    def removeSubscriber(self, subscriber):
+        self.connection.removeSubscriber(host=subscriber.host, port=subscriber.port)
+
+
+
+
+class SubscriberSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, Subscriber):
+    def __init__(self, broker, port, synchronized=False):
+        soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
+        Subscriber.__init__(self, synchronized=synchronized)
+        self.host = socket.gethostname()
+        self.port = port
+        self.broker = broker
+        self.server = None
+        def trdfn():
+            service = self
+            self.server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
+            self.server.start()
+        threading.Thread(target=trdfn).start()
+#        broker.log("Subscriber started")
+        broker.addSubscriber(self)
+    @soapmethod(Array(String), Array(String), _returns=Integer)
+    def publish(self, keys, values):
+        message = {}
+        if len(keys) != len(values):
+            raise Exception, "Different lengths for keys and values"
+        for i in range(len(keys)):
+            message[keys[i]] = values[i]
+        Subscriber.publish(self, message)
+        return 0
+    def stop(self):
+        self.server.stop()
+
+class SubscriberSoapProxy():
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
+    def publish(self, message):
+        keys = []
+        values = []
+        for k,v in message.items():
+            keys.append(k)
+            values.append(v)
+        self.connection.publish(keys=keys, values=values)
+
+
+####################
+# Testing Code 
+####################
+
+class CustomSubscriber(SubscriberSoap):
+    def handle(self, message):
+        print "Custom Subscriber: '%s'" % str(message)
+
+class NullSubscriber(SubscriberSoap):
+    def handle(self, message):
+        pass
+
+
+if __name__ == '__main__':
+    try:
+        portnum = 1717
+
+        print "\ntesting message broker"
+        broker = MessageBrokerSoap(portnum)
+        proxy = MessageBrokerSoapProxy("localhost", portnum)
+        portnum = portnum + 1 
+
+        print "\ntesting log function"
+        proxy.log( {"message":"Hello World!"} )
+#        proxy.log("It looks like log works")
+
+        print "\ntesting subscriber proxy"
+        subscriber = SubscriberSoap(proxy, portnum)
+        portnum = portnum + 1
+
+        print "\ntesting custom subscriber"
+        csub = CustomSubscriber(proxy, portnum)
+        portnum = portnum + 1
+
+        print "\ntesting publish"
+        proxy.publish( {"message":"Hello World!"} )
+
+        print "\ntesting stop"
+        subscriber.stop()
+        proxy.publish( {"message":"Everybody here?"} )
+
+        print "\ntesting removeSubscriber"
+        proxy.removeSubscriber(csub)
+        proxy.publish( {"message":"Nobody home"} )
+        proxy.addSubscriber(csub)
+        proxy.publish( {"message":"You're back!"} )
+
+        print "\ntesting filter"
+        csub.setMatch( {"print":"yes"} )
+        proxy.publish( {"print":"yes", "message":"this should be printed"} )
+        proxy.publish( {"print":"no", "message":"this should NOT be printed"} )
+        csub.setMatch()
+
+        print "\ntesting publish performance"
+        proxy.removeSubscriber(csub)
+        nrmsg = 10000
+        tt = time.time()
+        for i in range(nrmsg):
+            proxy.publish( {"message":"msg %i"%i} )
+        tt = time.time() - tt
+        print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
+                                                               tt,
+                                                               nrmsg/tt)
+
+        print "\ntesting publish/subscribe performance"
+        nsub = NullSubscriber(proxy, portnum)
+        portnum = portnum + 1
+        nrmsg = 10000
+        tt = time.time()
+        for i in range(nrmsg):
+            proxy.publish( {"message":"msg %i"%i} )
+        tt = time.time() - tt
+        print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
+                                                               tt,
+                                                               nrmsg/tt)
+
+                                                                   
+
+    except Exception, e:
+#        raise e
+        print  e
+        sys.exit(0)
+    sys.exit(0)

Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/tashimessaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/tashimessaging.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/tashimessaging.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/tashimessaging.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,131 @@
+from thriftmessaging import *
+import logging
+import Queue
+from ConfigParser import ConfigParser
+import time
+import socket
+import signal
+
+class TashiLogHandler(logging.Handler, PublisherThrift):
+    def __init__(self, config, *args, **kwargs):
+        self.messages = Queue.Queue()
+        self.config = config
+        logging.Handler.__init__(self, *args, **kwargs)
+        PublisherThrift.__init__(self, 
+                                 config.get('MessageBroker', 'host'),
+                                 int(config.get('MessageBroker', 'port')))
+    def emit(self, record):
+        # 'args', 'created', 'exc_info', 'exc_text', 'filename',
+        # 'funcName', 'getMessage', 'levelname', 'levelno', 'lineno',
+        # 'module', 'msecs', 'msg', 'name', 'pathname', 'process',
+        # 'relativeCreated', 'thread', 'threadName']
+        msg = {}
+        # args
+        # created
+        # exc_info
+        # exc_text
+        msg['log-filename'] = str(record.filename)
+        msg['log-funcname'] = str(record.funcName)
+        msg['log-levelname'] = str(record.levelname)
+        msg['log-level'] = str(record.levelno)
+        msg['log-lineno'] = str(record.lineno)
+        msg['log-module'] = str(record.module)
+        msg['log-msecs'] = str(record.msecs)
+        msg['log-message'] = str(record.msg)
+        msg['log-name'] = str(record.name)
+        msg['log-pathname'] = str(record.pathname)
+        msg['log-process'] = str(record.process)
+        # relativeCreated
+        msg['log-thread'] = str(record.thread)
+        msg['log-threadname'] = str(record.threadName)
+
+        # standard message fields
+        msg['timestamp'] = str(time.time())
+        msg['hostname'] = socket.gethostname()
+        msg['message-type'] = 'log'
+
+        self.messages.put(msg)
+        self.publish(msg)
+
+class TashiSubscriber(SubscriberThrift):
+    def __init__(self, config, port, **kwargs):
+        sys.stdout.flush()
+        brokerPort = int(config.get('MessageBroker', 'port'))
+        self.broker = MessageBrokerThriftProxy(config.get('MessageBroker', 'host'), brokerPort)
+        SubscriberThrift.__init__(self, self.broker, port, **kwargs)
+
+        
+
+##############################
+# Test Code
+##############################
+import unittest
+import sys
+
+class TestTashiSubscriber(TashiSubscriber):
+    def __init__(self, *args, **kwargs):
+        self.messageQueue = Queue.Queue()
+        TashiSubscriber.__init__(self, *args, **kwargs)
+    def handle(self, message):
+        self.messageQueue.put(message)
+
+
+def incrementor(start = 0):
+    while True:
+        a = start
+        start = start + 1
+        yield a
+increment = incrementor()
+
+class TestTashiMessaging(unittest.TestCase):
+    def setUp(self):
+        self.configFiles = [ '../../../etc/TestConfig.cfg']
+        self.config = ConfigParser()
+        self.configFiles = self.config.read(self.configFiles)
+        self.port = int(self.config.get('MessageBroker', 'port'))
+
+        try:
+            self.brokerPid = os.spawnlpe(os.P_NOWAIT, 'python', 'python', 
+                                         './messageBroker.py', 
+                                         '--port', str(self.port),
+                                         os.environ)
+            self.port = self.port + 1
+            # FIXME: what's the best way to wait for the broker to be ready?
+            time.sleep(1)
+        except Exception, e:
+            sys.exit(0)
+        self.initialized = True
+        self.log = logging.getLogger('TestTashiMessaging')
+        self.handler = TashiLogHandler(self.config)
+        self.log.addHandler(self.handler)
+        self.sub = TestTashiSubscriber(self.config, int(self.port) + increment.next())
+    def tearDown(self):
+        os.kill(self.brokerPid, signal.SIGKILL)
+        # FIXME: wait for the port to be ready again
+        time.sleep(2)
+        self.log.removeHandler(self.handler)
+#         self.sub.broker.removeSubscriber(self.sub)
+        pass
+    def testLog(self):
+        self.log.log(50, "Hello World!")
+        self.handler.messages.get(timeout=5)
+        self.sub.messageQueue.get(timeout=5)
+        self.assertEqual(self.handler.messages.qsize(), 0)
+        self.assertEqual(self.sub.messageQueue.qsize(), 0)
+    def testPublish(self):
+        sys.stdout.flush()
+        self.port = self.port + 1
+        self.handler.publish({'message':'hello world'})
+        self.sub.messageQueue.get(timeout=5)
+        self.assertEqual(self.sub.messageQueue.qsize(), 0)
+        
+
+if __name__=='__main__':
+
+
+#     logging.basicConfig(level=logging.INFO,
+#                         format="%(asctime)s %(levelname)s:\t %(message)s",
+#                         stream=sys.stdout)
+
+    suite = unittest.TestLoader().loadTestsFromTestCase(TestTashiMessaging)
+    unittest.TextTestRunner(verbosity=2).run(suite)

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/threadpool.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/threadpool.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/threadpool.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/threadpool.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,288 @@
+import threading
+import time
+import Queue
+import logging
+
+_log = logging.getLogger('tashi.messaging.threadpool')
+
+def threaded(func):
+    def fn(*args, **kwargs):
+        thread = threading.Thread(target=func, args=args, kwargs=kwargs)
+        thread.start()
+        return thread
+    return fn
+
+
+class ThreadPool(Queue.Queue):
+    def __init__(self, size=8, maxsize=0):
+        Queue.Queue.__init__(self, maxsize)
+        for i in range(size):
+            thread = threading.Thread(target=self._worker)
+            thread.setDaemon(True)
+            thread.start()
+    def _worker(self):
+        while True:
+            try:
+                func, args, kwargs = self.get()
+                func(*args, **kwargs)
+            except Exception, e:
+                _log.error(e)
+                # FIXME: do something smarter here, backtrace, log,
+                # allow user-defined error handling...
+                
+    def submit(self, func, *args, **kwargs):
+        self.put((func, args, kwargs))
+    def submitlist(self, func, args, kwargs):
+        self.put((func, args, kwargs))
+
+class ThreadPoolClass:
+    def __init__(self, size=8, maxsize=0):
+        self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
+
+
+def threadpool(pool):
+    def dec(func):
+        def fn(*args, **kwargs):
+            pool.submit(func, *args, **kwargs)
+        return fn
+    return dec
+
+def threadpoolmethod(meth):
+    def fn(*args, **kwargs):
+        try:
+            pool = args[0]._threadpool_pool
+        except AttributeError:
+            pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
+        # FIXME: how do we check parent class?
+#        assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
+        pool.submit(meth, *args, **kwargs)
+    return fn
+
+def synchronized(lock=None):
+    _log.debug('synchronized decorator factory called')
+    if lock==None:
+        lock = threading.RLock()
+    def dec(func):
+        _log.debug('synchronized decorator called')
+        def fn(*args, **kwargs):
+            _log.debug('getting sync lock')
+            lock.acquire()
+            _log.debug('got sync lock')
+            ex = None
+            try:
+                r = func(*args, **kwargs)
+            except Exception, e:
+                ex = e
+            _log.debug('releasing sync lock')
+            lock.release()
+            _log.debug('released sync lock')
+            if ex != None:
+                raise e
+            return r
+        return fn
+    return dec
+            
+def synchronizedmethod(func):
+    def fn(*args, **kwargs):
+        try:
+            lock = args[0]._synchronized_lock
+        except AttributeError:
+            lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
+        lock.acquire()
+        ex = None
+        try:
+            func(*args, **kwargs)
+        except Exception, e:
+            ex = e
+        lock.release()
+        if ex != None:
+            raise e
+    return fn
+        
+
+##############################
+# Test Code
+##############################
+import unittest
+import sys
+import time
+
+class TestThreadPool(unittest.TestCase):
+    def setUp(self):
+        self.errmargin = 0.5
+
+    def testUnthreaded(self):
+        queue = Queue.Queue()
+        def slowfunc(sleep=1):
+            time.sleep(sleep)
+            queue.put(None)
+        tt = time.time()
+        for i in range(4):
+            slowfunc()
+        for i in range(4):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 4, 1) 
+
+    def testThreaded(self):
+        queue = Queue.Queue()
+        @threaded
+        def slowthreadfunc(sleep=1):
+            time.sleep(sleep)
+            queue.put(None)
+        tt = time.time()
+        for i in range(8):
+            slowthreadfunc()
+        for i in range(8):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 1, 1) 
+
+    def testThreadPool(self):
+        pool = ThreadPool(size=4)
+        queue = Queue.Queue()
+        @threadpool(pool)
+        def slowpoolfunc(sleep=1):
+            time.sleep(sleep)
+            queue.put(None)
+        tt = time.time()
+        for i in range(8):
+            slowpoolfunc()
+        for i in range(8):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 2, 1) 
+
+    def testUnthreadedMethod(self):
+        queue = Queue.Queue()
+        class slowclass:
+            def __init__(self, sleep=1):
+                self.sleep=sleep
+            def beslow(self):
+                time.sleep(self.sleep)
+                queue.put(None)
+        sc = slowclass()
+        tt = time.time()
+        for i in range(4):
+            sc.beslow()
+        for i in range(4):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 4, 1)
+    
+    def testThreadedMethod(self):
+        queue = Queue.Queue()
+        class slowclass:
+            def __init__(self, sleep=1):
+                self.sleep=sleep
+            @threaded
+            def beslow(self):
+                time.sleep(self.sleep)
+                queue.put(None)
+        sc = slowclass()
+        tt = time.time()
+        for i in range(4):
+            sc.beslow()
+        for i in range(4):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 1, 1)
+    
+    def testThreadPoolMethod(self):
+        queue = Queue.Queue()
+        class slowclass:
+            def __init__(self, sleep=1):
+                self.sleep=sleep
+            @threadpoolmethod
+            def beslow(self):
+                time.sleep(self.sleep)
+                queue.put(None)
+        sc = slowclass()
+        tt = time.time()
+        for i in range(16):
+            sc.beslow()
+        for i in range(16):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 2, 1)
+    
+    def testSynchronized(self):
+        queue = Queue.Queue()
+        @synchronized()
+        def addtoqueue():
+            time.sleep(1)
+            queue.put(None)
+        @threaded
+        def slowthreadfunc():
+            addtoqueue()
+        tt = time.time()
+        for i in range(4):
+            slowthreadfunc()
+        for i in range(4):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 4, 1) 
+
+    def testSynchronizedMethod(self):
+        queue = Queue.Queue()
+        class addtoqueue:
+            @synchronizedmethod
+            def addtoqueue1(self):
+                time.sleep(1)
+                queue.put(None)
+            @synchronizedmethod
+            def addtoqueue2(self):
+                time.sleep(1)
+                queue.put(None)
+        atc = addtoqueue()
+        @threaded
+        def slowthreadfunc1():
+            atc.addtoqueue1()
+        @threaded
+        def slowthreadfunc2():
+            atc.addtoqueue2()
+        tt = time.time()
+        for i in range(4):
+            slowthreadfunc1()
+            slowthreadfunc2()
+        for i in range(8):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 8, 1) 
+
+    def testUnsynchronizedMethod(self):
+        queue = Queue.Queue()
+        class addtoqueue:
+            def addtoqueue1(self):
+                time.sleep(1)
+                queue.put(None)
+            def addtoqueue2(self):
+                time.sleep(1)
+                queue.put(None)
+        atc = addtoqueue()
+        @threaded
+        def slowthreadfunc1():
+            atc.addtoqueue1()
+        @threaded
+        def slowthreadfunc2():
+            atc.addtoqueue2()
+        tt = time.time()
+        for i in range(4):
+            slowthreadfunc1()
+            slowthreadfunc2()
+        for i in range(8):
+            queue.get()
+        tt = time.time() - tt
+        self.assertAlmostEqual(tt, 1, 1) 
+
+
+
+if __name__=='__main__':
+    import sys
+
+    logging.basicConfig(level=logging.INFO,
+                        format="%(asctime)s %(levelname)s:\t %(message)s",
+                        stream=sys.stdout)
+
+    suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
+    unittest.TextTestRunner(verbosity=2).run(suite)

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,261 @@
+#!/usr/bin/env python
+
+import sys
+import time
+import socket
+import traceback
+import threading
+
+sys.path.append('./gen-py')
+import tashi.messaging.messagingthrift
+import tashi.messaging.messagingthrift.MessageBrokerThrift
+import tashi.messaging.messagingthrift.SubscriberThrift
+from tashi.messaging.messagingthrift.ttypes import *
+
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+from tashi import ConnectionManager
+
+from tashi.messaging.messaging import *
+from tashi.messaging.threadpool import ThreadPoolClass, threadpool, ThreadPool, threadpoolmethod, threaded
+
+class MessageBrokerThrift(MessageBroker):
+    def __init__(self, port, daemon=True):
+        MessageBroker.__init__(self)
+        self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
+        self.transport = TSocket.TServerSocket(port)
+        self.tfactory = TTransport.TBufferedTransportFactory()
+        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+        self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
+        self.ready = threading.Event()
+#         self.server = TServer.TSimpleServer(self.processor,
+#                                             self.transport,
+#                                             self.tfactory,
+#                                             self.pfactory)
+#         self.server = TServer.TThreadPoolServer(self.processor,
+#                                                 self.transport,
+#                                                 self.tfactory,
+#                                                 self.pfactory)
+        self.server = TServer.TThreadedServer(self.processor,
+                                                self.transport,
+                                                self.tfactory,
+                                                self.pfactory)
+        self.publishCalls = 0
+
+        def ssvrthrd():
+            try:
+                # FIXME: Race condition, the ready event should be set after
+                # starting the server.  However, server.serve()
+                # doesn't return under normal circumstances.  This
+                # seems to work in practice, even though it's clearly
+                # wrong.
+                self.ready.set()
+                self.server.serve()
+            except Exception, e:
+                print e
+                sys.stdout.flush()
+                pass
+        svt = threading.Thread(target=ssvrthrd)
+        svt.setDaemon(daemon)
+        svt.start()
+        self.ready.wait()
+    def log(self, message):
+        MessageBroker.log(self, message)
+    @synchronizedmethod
+    def addSubscriber(self, host, port):
+        subscribers = self.getSubscribers()
+        for sub in subscribers:
+            if sub.host == host and sub.port == port:
+                return
+        subscriber = SubscriberThriftProxy(host, port, self.proxy)
+        MessageBroker.addSubscriber(self, subscriber)
+    def removeSubscriber(self, host, port):
+        subscriber = None
+        subscribers = self.getSubscribers()
+        for sub in subscribers:
+            if sub.host == host and sub.port == port:
+                subscriber = sub
+        if subscriber != None:
+            MessageBroker.removeSubscriber(self, subscriber)
+    @synchronizedmethod
+    def publish(self, message):
+        self.publishCalls  = self.publishCalls + 1
+        sys.stdout.flush()
+        MessageBroker.publish(self, message)
+
+class MessageBrokerThriftProxy:
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
+    @synchronizedmethod
+    def log(self, message):
+        self.proxy[self.host, self.port].log(message)
+    @synchronizedmethod
+    def publish(self, message):
+        self.proxy[self.host, self.port].publish(message)
+    @synchronizedmethod
+    def publishList(self, messages):
+        self.proxy[self.host, self.port].publishList(messages)
+    @synchronizedmethod
+    def addSubscriber(self, subscriber):
+        self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
+    @synchronizedmethod
+    def removeSubscriber(self, subscriber):
+        self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
+
+
+
+class SubscriberThrift(Subscriber, threading.Thread):
+    def __init__(self, broker, port, synchronized=False):
+        self.host = socket.gethostname()
+        self.port = port
+        self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
+        self.transport = TSocket.TServerSocket(port)
+        self.tfactory = TTransport.TBufferedTransportFactory()
+        self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+        self.server = TServer.TThreadedServer(self.processor,
+                                              self.transport,
+                                              self.tfactory,
+                                              self.pfactory)
+        def ssvrthrd():
+            try:
+                self.server.serve()
+            except Exception, e:
+                pass
+
+
+        self.thread = threading.Thread(target=ssvrthrd)
+        self.thread.setDaemon(True)
+        self.thread.start()
+
+        # We have to call this AFTER initializing our server, so that
+        # the broker can contact us
+        # Wrap this in a try/catch because the broker may not be online yet
+        try:
+            Subscriber.__init__(self, broker,  synchronized=synchronized)        
+        except:
+            pass
+        threading.Thread.__init__(self)
+        self.setDaemon(True)
+        self.start()
+
+    def stop(self):
+#         # FIXME: this is broken, there is no clear way to stop a
+#         # Thrift server
+        self.broker.removeSubscriber(self)
+        self.transport.close()
+    def run(self):
+        while(True):
+            # renew subscription every 5 min
+            try:
+                self.broker.addSubscriber(self)
+            except:
+                pass
+            time.sleep(5*60)
+
+class SubscriberThriftProxy:
+    def __init__(self, host, port, proxy, aggregate = 100):
+        self.host = host
+        self.port = port
+        self.proxy = proxy
+        # for some reason, thrift clients are not thread-safe, lock during send
+        self.lock = threading.Lock()
+        self.pending = []
+        self.aggregateSize = aggregate
+    def publish(self, message):
+        self.lock.acquire()
+        sys.stdout.flush()
+        if message.has_key('aggregate') and message['aggregate'] == 'True':
+            self.pending.append(message)
+            if len(self.pending) >= self.aggregateSize:
+                try:
+                    self.proxy[self.host, self.port].publishList(self.pending)
+                except Exception, e:
+                    print e
+                    self.lock.release()
+                    raise e
+                self.pending = []
+        else:
+            try:
+                self.proxy[self.host, self.port].publish(message)
+            except Exception, e:
+                sys.stdout.flush()
+                print e
+                self.lock.release()
+                raise e
+        self.lock.release()
+
+class PublisherThrift(Publisher):
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.broker = MessageBrokerThriftProxy(host, port)
+        Publisher.__init__(self, self.broker)
+        
+####################
+# Testing Code 
+####################
+
+class TestSubscriberThrift(SubscriberThrift):
+    def __init__(self, *args, **kwargs):
+        self.queue = Queue.Queue()
+        SubscriberThrift.__init__(self, *args, **kwargs)
+    def handle(self, message):
+        self.queue.put(message)
+
+portnum = 1718
+class TestThriftMessaging(unittest.TestCase):
+    def setUp(self):
+        global portnum
+        self.broker = MessageBrokerThrift(portnum)
+        self.brokerPort = portnum
+        portnum = portnum + 1 
+        self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
+        self.publisher = PublisherThrift('localhost', self.brokerPort)
+        self.subscriber = TestSubscriberThrift(self.proxy, portnum)
+        portnum = portnum + 1
+    def tearDown(self):
+        pass
+    def testSetUp(self):
+        pass
+    def testPublish(self):
+        self.publisher.publish( {'message':'hello world'} )
+        self.subscriber.queue.get(True, timeout=5)
+        self.assertEqual(self.subscriber.queue.qsize(), 0)
+    def testPublishList(self):
+        nrmsgs = 10
+        msgs = []
+        for i in range(nrmsgs):
+            msgs.append( {'msgnum':str(i)} )
+        self.publisher.publishList( msgs )
+        for i in range(nrmsgs):
+            self.subscriber.queue.get(True, timeout=5)
+        self.assertEqual(self.subscriber.queue.qsize(), 0)
+    def testAggregate(self):
+        nrmsgs = self.publisher.aggregateSize
+        for i in range(nrmsgs):
+            self.assertEqual(self.subscriber.queue.qsize(), 0)
+            self.publisher.aggregate( {'msgnum':str(i)} )
+        for i in range(nrmsgs):
+            self.subscriber.queue.get(True, timeout=5)
+        self.assertEqual(self.subscriber.queue.qsize(), 0)
+    def testAggregateKeyword(self):
+        nrmsgs = self.publisher.aggregateSize
+        for i in range(nrmsgs):
+            self.assertEqual(self.subscriber.queue.qsize(), 0)
+            self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
+        for i in range(nrmsgs):
+            self.subscriber.queue.get(True, timeout=5)
+        self.assertEqual(self.subscriber.queue.qsize(), 0)
+
+
+if __name__=='__main__':
+    suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
+    unittest.TextTestRunner(verbosity=2).run(suite)
+
+

Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/__init__.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/__init__.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/__init__.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,7 @@
+from tashi import convertExceptions
+
+def RPC(oldFunc):
+	return convertExceptions(oldFunc)
+
+from nodemanagerservice import NodeManagerService
+from notification import Notifier

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,47 @@
+#! /usr/bin/env python
+
+import logging.config
+import signal
+import sys
+from thrift.transport.TSocket import TServerSocket
+from thrift.server.TServer import TThreadedServer
+
+from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
+from tashi.services import nodemanagerservice, clustermanagerservice
+from tashi import ConnectionManager
+
+import notification
+
+@signalHandler(signal.SIGTERM)
+def handleSIGTERM(signalNumber, stackFrame):
+	sys.exit(0)
+
+def main():
+	global config, dfs, vmm, service, server, log, notifier
+	
+	(config, configFiles) = getConfig(["NodeManager"])
+	logging.config.fileConfig(configFiles)
+	log = logging.getLogger(__name__)
+	log.info('Using configuration file(s) %s' % configFiles)
+	dfs = instantiateImplementation(config.get("NodeManager", "Dfs"), config)
+	vmm = instantiateImplementation(config.get("NodeManager", "VmControl"), config, dfs, None)
+	service = instantiateImplementation(config.get("NodeManager", "Service"), config, vmm)
+	vmm.nm = service
+	processor = nodemanagerservice.Processor(service)
+	transport = TServerSocket(int(config.get('NodeManagerService', 'port')))
+	server = TThreadedServer(processor, transport)
+	debugConsole(globals())
+	
+        notifier = notification.Notifier(config)
+        log.addHandler(notifier)
+
+	try:
+		server.serve()
+	except KeyboardInterrupt:
+		handleSIGTERM(signal.SIGTERM, None)
+	except Exception, e:
+		sys.stderr.write(str(e) + "\n")
+		sys.exit(-1)
+
+if __name__ == "__main__":
+	main()

Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,265 @@
+import cPickle
+import logging
+import os
+import socket
+import sys
+import threading
+import time
+from thrift.transport.TSocket import TSocket
+from thrift.protocol.TBinaryProtocol import TBinaryProtocol
+from thrift.transport.TTransport import TBufferedTransport
+
+from tashi.services.ttypes import ResumeVmRes, Host, HostState, InstanceState, TashiException, Errors, Instance
+from tashi.services import clustermanagerservice
+from tashi.nodemanager import RPC
+from tashi import boolean, vmStates, logged, ConnectionManager, timed
+
+class NodeManagerService():
+	"""RPC handler for the NodeManager
+	   
+	   Perhaps in the future I can hide the dfs from the 
+	   VmControlInterface and do all dfs operations here?"""
+	
+	def __init__(self, config, vmm):
+		self.config = config
+		self.vmm = vmm
+		self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
+		self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
+		self.log = logging.getLogger(__file__)
+		self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
+		self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
+		self.infoFile = self.config.get('NodeManagerService', 'infoFile')
+		self.id = None
+		self.notifyCM = []
+		self.loadVmInfo()
+		vmList = self.vmm.listVms()
+		for vmId in vmList:
+			if (vmId not in self.instances):
+				self.log.warning('vmcontrol backend reports additional vmId %d' % (vmId))
+				self.instances[vmId] = Instance(d={'vmId':vmId,'id':-1})
+		for vmId in self.instances.keys():
+			if (vmId not in vmList):
+				self.log.warning('vmcontrol backend does not report %d' % (vmId))
+				self.vmStateChange(vmId, None, InstanceState.Exited)
+		threading.Thread(target=self.junk).start()
+		threading.Thread(target=self.registerWithClusterManager).start()
+	
+	def loadVmInfo(self):
+		try:
+			f = open(self.infoFile, "r")
+			data = f.read()
+			f.close()
+			self.instances = cPickle.loads(data)
+		except Exception, e:
+			self.log.exception('Failed to load VM info from %s' % (self.infoFile))
+			self.instances = {}
+	
+	def saveVmInfo(self):
+		try:
+			data = cPickle.dumps(self.instances)
+			f = open(self.infoFile, "w")
+			f.write(data)
+			f.close()
+		except Exception, e:
+			self.log.exception('Failed to save VM info to %s' % (self.infoFile))
+	
+	#@logged
+	def vmStateChange(self, vmId, old, cur):
+		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		instance = self.getInstance(vmId)
+		if (old and instance.state != old):
+			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
+		if (cur == InstanceState.Exited):
+			del self.instances[vmId]
+		instance.state = cur
+		newInst = Instance(d={'state':cur})
+		success = lambda: None
+		try:
+			cm.vmUpdate(instance.id, newInst, old)
+		except Exception, e:
+			self.log.exception('RPC failed for vmUpdate on CM')
+			self.notifyCM.append((instance.id, newInst, old, success))
+		else:
+			success()
+		return True
+	
+	#@timed	
+	def getHostInfo(self):
+		host = Host()
+		host.id = self.id
+		host.name = socket.gethostname()
+		memoryStr = os.popen2("head -n 1 /proc/meminfo | awk '{print $2 \" \" $3}'")[1].read().strip()
+		if (memoryStr[-2:] == "kB"):
+			host.memory = int(memoryStr[:-2])/1024
+		elif (memoryStr[-2:] == "mB"):
+			host.memory = int(memoryStr[:-2])
+		elif (memoryStr[-2:] == "gB"):
+			host.memory = int(memoryStr[:-2])*1024
+		elif (memoryStr[-2:] == " B"):
+			host.memory = int(memoryStr[:-2])/(1024*1024)
+		else:
+			self.log.warning('Unable to determine amount of physical memory - reporting 0')
+			host.memory = 0
+		host.cores = os.sysconf("SC_NPROCESSORS_ONLN")
+		host.up = True
+		host.decayed = False
+		return host
+	
+	def junk(self):
+		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		while True:
+			start = time.time()
+			try:
+				self.saveVmInfo()
+			except Exception, e:
+				self.log.exception('Failed to save VM info')
+			try:
+				notifyCM = []
+				try:
+					while (len(self.notifyCM) > 0):
+						(instanceId, newInst, old, success) = self.notifyCM.pop(0)
+						try:
+							cm.vmUpdate(instanceId, newInst, old)
+						except TashiException, e:
+							notifyCM.append((instanceId, newInst, old, success))
+							if (e.errno != Errors.IncorrectVmState):
+								raise
+						except:
+							notifyCM.append((instanceId, newInst, old, success))
+							raise
+						else:
+							success()
+				finally:
+					self.notifyCM = self.notifyCM + notifyCM
+			except Exception, e:
+				self.log.exception('Failed to register with the CM')
+			toSleep = start - time.time() + self.registerFrequency
+			if (toSleep > 0):
+				time.sleep(toSleep)
+	
+	def registerWithClusterManager(self):
+		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		#@timed
+		def body():
+			try:
+				#self.log.info('registering with CM at %f' % (time.time()))
+				host = self.getHostInfo()
+				instances = self.instances.values()
+				#@timed
+				def RPC(self):
+					self.id = cm.registerNodeManager(host, instances)
+				RPC(self)
+			except Exception, e:
+				self.log.exception('Failed to register with the CM')
+		while True:
+			start = time.time()
+			body()
+			toSleep = start - time.time() + self.registerFrequency
+			if (toSleep > 0):
+				time.sleep(toSleep)
+	
+	def getInstance(self, vmId):
+		instance = self.instances.get(vmId, None)
+		if (instance is None):
+			raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
+		return instance
+	
+	@RPC
+	def instantiateVm(self, instance):
+		vmId = self.vmm.instantiateVm(instance)
+		instance.vmId = vmId
+		instance.state = InstanceState.Running
+		self.instances[vmId] = instance
+		return vmId
+	
+	@RPC
+	def suspendVm(self, vmId, name, suspendCookie):
+		instance = self.getInstance(vmId)
+		instance.state = InstanceState.Suspending
+		threading.Thread(target=lambda: self.vmm.suspendVm(vmId, name, suspendCookie)).start()
+	
+	@RPC
+	def resumeVm(self, instance, name):
+		(vmId, suspendCookie) = self.vmm.resumeVm(name)
+		instance.vmId = vmId
+		instance.state = InstanceState.Running
+		self.instances[vmId] = instance
+		return ResumeVmRes(d={'vmId':vmId, 'suspendCookie':suspendCookie})
+	
+	@RPC
+	def prepReceiveVm(self, instance, source):
+		instance.state = InstanceState.MigratePrep
+		instance.vmId = -1
+		transportCookie = self.vmm.prepReceiveVm(instance, source.name)
+		return transportCookie
+	
+	def migrateVmHelper(self, instance, target, transportCookie):
+		self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
+		del self.instances[instance.vmId]
+		
+	@RPC
+	def migrateVm(self, vmId, target, transportCookie):
+		instance = self.getInstance(vmId)
+		instance.state = InstanceState.MigrateTrans
+		threading.Thread(target=lambda: self.migrateVmHelper(instance, target, transportCookie)).start()
+		return
+	
+	def receiveVmHelper(self, instance, transportCookie):
+		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		vmId = self.vmm.receiveVm(transportCookie)
+		instance.state = InstanceState.Running
+		instance.hostId = self.id
+		instance.vmId = vmId
+		self.instances[vmId] = instance
+		newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
+		success = lambda: None
+		try:
+			cm.vmUpdate(newInstance.id, newInstance, InstanceState.MigrateTrans)
+		except Exception, e:
+			self.log.exception('vmUpdate failed in receiveVmHelper')
+			self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
+		else:
+			success()
+	
+	@RPC
+	def receiveVm(self, instance, transportCookie):
+		instance.state = InstanceState.MigrateTrans
+		threading.Thread(target=lambda: self.receiveVmHelper(instance, transportCookie)).start()
+		return
+	
+	@RPC
+	def pauseVm(self, vmId):
+		instance = self.getInstance(vmId)
+		instance.state = InstanceState.Pausing
+		self.vmm.pauseVm(vmId)
+		instance.state = InstanceState.Paused
+	
+	@RPC
+	def unpauseVm(self, vmId):
+		instance = self.getInstance(vmId)
+		instance.state = InstanceState.Unpausing
+		self.vmm.unpauseVm(vmId)
+		instance.state = InstanceState.Running
+	
+	@RPC
+	def shutdownVm(self, vmId):
+		instance = self.getInstance(vmId)
+		instance.state = InstanceState.ShuttingDown
+		self.vmm.shutdownVm(vmId)
+	
+	@RPC
+	def destroyVm(self, vmId):
+		instance = self.getInstance(vmId)
+		instance.state = InstanceState.Destroying
+		self.vmm.destroyVm(vmId)
+	
+	@RPC
+	def getVmInfo(self, vmId):
+		instance = self.getInstance(vmId)
+		return instance
+	
+	@RPC
+	def listVms(self):
+		return self.instances.keys()
+	
+

Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/notification.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/notification.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/notification.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/notification.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,24 @@
+
+import tashi.messaging.tashimessaging
+
+class Notifier(tashi.messaging.tashimessaging.TashiLogHandler):
+    def vmExited(self, instance):
+        try:
+            isolatedRPC(self.cm, 'vmExited', self.hostId, vmId)
+        except Exception, e:
+            print "RPC failed for vmExited on CM"
+            print e
+            # FIXME: send this to the cm later
+            # self.exitedVms[vmId] = child
+
+        msg = {}
+
+        msg['timestamp'] = str(time.time())
+        msg['hostname'] = ''    # FIXME: fill this in
+        msg['message-type'] = 'vm-event'
+        msg['vm-event'] = 'vm-exited'
+        
+        msg['instance-id'] = str(instance.id)
+        msg['host-id'] = str(instance.hostId)
+        print 'Notifier publishing ', msg
+        self.publish(message)

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/__init__.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/__init__.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/__init__.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,4 @@
+from vmcontrolinterface import VmControlInterface
+from qemu import Qemu
+from xenpv import XenPV
+from newxen import NewXen

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/newxen.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/newxen.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/newxen.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/newxen.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,113 @@
+import cPickle
+import logging
+import os
+import threading
+import random
+import select
+import signal
+import socket
+import subprocess
+import sys
+import time
+
+import inspect			# used to get current function
+def currentFunction(n=1):
+	# get the name of our caller, e.g. the requesting function
+	return inspect.stack()[n][3]
+
+from tashi.services.ttypes import *
+from tashi.util import broken, isolatedRPC
+from vmcontrolinterface import VmControlInterface
+
+log = logging.getLogger(__file__)
+
+
+import xenpv
+
+class NewXen(VmControlInterface):
+	"""VM Control for Paravirtualized Xen"""
+
+	def __init__(self, config, dfs, cm):
+		"""Base init function -- it handles inserting config and dfs 
+		   into the object as well as checking that the class type is 
+		   not VmControlInterface"""
+		print 'NewXen::init called'
+		if self.__class__ is VmControlInterface:
+			raise NotImplementedError
+		self.config = config
+		self.dfs = dfs
+		self.cm = cm
+		self.xenpv = xenpv.XenPV(self.config, self.dfs, self.cm)
+	
+	def instantiateVm(self, instance):
+		"""Takes an InstanceConfiguration, creates a VM based on it, 
+		   and returns the vmId"""
+		print 'XenPV::%s called' % currentFunction()
+		# FIXME: this is NOT the right way to get out hostId
+		self.hostId = instance.hostId
+		return self.xenpv.instantiateVm(instance)
+
+	
+	def suspendVm(self, vmId, target, suspendCookie=None):
+		"""Suspends a vm to the target on the dfs, including the 
+		   suspendCookie"""
+		print 'XenPV::%s called' % currentFunction()		
+		return self.xenpv.suspendVM(vmId, target, suspendCookie)
+
+
+	def resumeVm(self, source):
+		"""Resumes a vm from the dfs and returns the newly created 
+		   vmId as well as the suspendCookie in a tuple"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.resumeVM(source)
+	
+	def prepReceiveVm(self, instance, source):
+		"""First call made as part of vm migration -- it is made to 
+		   the target machine and it returns a transportCookie"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.prepReceiveVm(instance, source)
+	
+	def migrateVm(self, vmId, target, transportCookie):
+		"""Second call made as part of a vm migration -- it is made 
+		   to the source machine and it does not return until the 
+		   migration is complete"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.migrateVm(vmId, target,transportCookie)
+	
+	def receiveVm(self, transportCookie):
+		"""Third call made as part of a vm migration -- it is made to 
+		   the target machine and it does not return until the 
+		   migration is complete, it returns the new vmId"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.receiveVm(transportCookie)
+	
+	def pauseVm(self, vmId):
+		"""Pauses a vm and returns nothing"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.pauseVM(vmId)
+	
+	def unpauseVm(self, vmId):
+		"""Unpauses a vm and returns nothing"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.unpauseVM(vmId)
+	
+	def shutdownVm(self, vmId):
+		"""Performs a clean shutdown on a vm and returns nothing"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.shutdownVM(vmId)
+		
+	
+	def destroyVm(self, vmId):
+		"""Forces the exit of a vm and returns nothing"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.destroyVM(vmId)
+	
+	def getVmInfo(self, vmId):
+		"""Returns the InstanceConfiguration for the given vmId"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.getVMInfo(vmId)
+	
+	def listVms(self):
+		"""Returns a list of vmIds to the caller"""
+		print 'XenPV::%s called' % currentFunction()
+		return self.xenpv.listVMs()

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/qemu.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/qemu.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/qemu.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,472 @@
+import cPickle
+import logging
+import os
+import threading
+import random
+import select
+import signal
+import socket
+import subprocess
+import sys
+import time
+
+from tashi.services.ttypes import *
+from tashi.util import broken, logged
+from vmcontrolinterface import VmControlInterface
+
+log = logging.getLogger(__file__)
+
+def controlConsole(child, port):
+	"""This exposes a TCP port that connects to a particular child's monitor -- used for debugging"""
+	#print "controlConsole"
+	listenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+	listenSocket.bind(("0.0.0.0", port))
+	#print "bound"
+	try:
+		listenSocket.listen(5)
+		ls = listenSocket.fileno()
+		input = child.monitorFd
+		output = child.monitorFd
+		#print "listen"
+		select.select([ls], [], [])
+		(s, clientAddr) = listenSocket.accept()
+		while s:
+			if (output != -1):
+				(rl, wl, el) = select.select([s, output], [], [])
+			else:
+				(rl, wl, el) = select.select([s], [], [])
+			if (len(rl) > 0):
+				if (rl[0] == s):
+					#print "from s"
+					buf = s.recv(4096)
+					if (buf == ""):
+						s.close()
+						listenSocket.close()
+						s = None
+						continue
+					if (output != -1):
+						os.write(child.monitorFd, buf)
+				elif (rl[0] == output):
+					#print "from output"
+					buf = os.read(output, 4096)
+					#print "read complete"
+					if (buf == ""):
+						output = -1
+					else:
+						s.send(buf)
+	except:
+		s.close()
+		listenSocket.close()
+	finally:
+		#print "Thread exiting"
+		pass
+
+class Qemu(VmControlInterface):
+	"""This class implements the VmControlInterface for Qemu/KVM"""
+	
+	def __init__(self, config, dfs, nm):
+		VmControlInterface.__init__(self, config, dfs, nm)
+		self.QEMU_BIN = self.config.get("Qemu", "qemuBin")
+		self.INFO_DIR = self.config.get("Qemu", "infoDir")
+		self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay"))
+		self.migrationRetries = int(self.config.get("Qemu", "migrationRetries"))
+		self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout"))
+		self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
+		self.controlledVMs = {}
+		self.usedPorts = []
+		self.usedPortsLock = threading.Lock()
+		self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
+		try:
+			os.mkdir(self.INFO_DIR)
+		except:
+			pass
+		self.scanInfoDir()
+		threading.Thread(target=self.pollVMsLoop).start()
+	
+	class anonClass:
+		def __init__(self, **attrs):
+			self.__dict__.update(attrs)
+	
+	def getSystemPids(self):
+		"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
+		pids = []
+		for f in os.listdir("/proc"):
+			try:
+				bin = os.readlink("/proc/%s/exe" % (f))
+				if (bin.find(self.QEMU_BIN) != -1):
+					pids.append(int(f))
+			except Exception:
+				pass
+		return pids
+	
+	def matchSystemPids(self, controlledVMs):
+		"""This is run in a separate polling thread and it must do things that are thread safe"""
+		vmIds = controlledVMs.keys()
+		pids = self.getSystemPids()
+		for vmId in vmIds:
+			if vmId not in pids:
+				os.unlink(self.INFO_DIR + "/%d"%(vmId))
+				child = controlledVMs[vmId]
+				del controlledVMs[vmId]
+				log.info("Removing vmId %d" % (vmId))
+				if (child.OSchild):
+					os.waitpid(vmId, 0)
+				if (child.errorBit):
+					if (child.OSchild):
+						f = open("/tmp/%d.err" % (vmId), "w")
+						f.write(child.stderr.read())
+						f.close()
+					f = open("/tmp/%d.pty" % (vmId), "w")
+					for i in child.monitorHistory:
+						f.write(i)
+					f.close()
+				try:
+					if (not child.migratingOut):
+						self.nm.vmStateChange(vmId, None, InstanceState.Exited)
+				except Exception, e:
+					log.exception("vmStateChange failed")
+						
+	
+	def scanInfoDir(self):
+		"""This is not thread-safe and must only be used during class initialization"""
+		controlledVMs = {}
+		controlledVMs.update(map(lambda x: (int(x), self.anonClass(OSchild=False, errorBit=False, migratingOut=False)), os.listdir(self.INFO_DIR + "/")))
+		if (len(controlledVMs) == 0):
+			log.info("No vm information found in %s", self.INFO_DIR)
+		for vmId in controlledVMs:
+			try:
+				child = self.loadChildInfo(vmId)
+				child.OSchild = False
+				child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
+				child.monitor = os.fdopen(child.monitorFd)
+				self.controlledVMs[child.pid] = child
+				log.info("Adding vmId %d" % (child.pid))
+			except Exception, e:
+				log.exception("Failed to load VM info for %d", vmId)
+			else:
+				log.info("Loaded VM info for %d", vmId)
+		self.matchSystemPids(controlledVMs)
+	
+	def pollVMsLoop(self):
+		"""Infinite loop that checks for dead VMs"""
+		while True:
+			self.matchSystemPids(self.controlledVMs)
+			time.sleep(self.POLL_DELAY)
+	
+	def waitForExit(self, vmId):
+		"""This waits until an element is removed from the dictionary -- the polling thread must detect an exit"""
+		while vmId in self.controlledVMs:
+			time.sleep(self.POLL_DELAY)
+	
+	def getChildFromPid(self, pid):
+		"""Do a simple dictionary lookup, but raise a unique exception if the key doesn't exist"""
+		child = self.controlledVMs.get(pid, None)
+		if (not child):
+			raise Exception, "Uncontrolled vmId %d" % (pid)
+		return child
+	
+	def getControlConsole(self, vmId, port):
+		"""Spawn a thread that attaches the control console of a particular Qemu to a TCP port -- used for debugging only"""
+		child = self.getChildFromPid(vmId)
+		threading.Thread(target=(lambda: controlConsole(child, port))).start()
+		return port
+	
+	def consumeAvailable(self, child):
+		"""Consume characters one-by-one until they stop coming"""
+		monitorFd = child.monitorFd
+		buf = ""
+		try:
+			(rlist, wlist, xlist) = select.select([monitorFd], [], [], 0.0)
+			while (len(rlist) > 0):
+				c = os.read(monitorFd, 1)
+				if (c == ""):
+					log.error("Early termination on monitor for vmId %d" % (child.pid))
+					child.errorBit = True
+					raise RuntimeError
+				buf = buf + c
+				(rlist, wlist, xlist) = select.select([monitorFd], [], [], 0.0)
+		finally:
+			child.monitorHistory.append(buf)
+		return buf
+	
+	def consumeUntil(self, child, needle, timeout = -1):
+		"""Consume characters one-by-one until something specific comes up"""
+		if (timeout == -1):
+			timeout = self.monitorTimeout
+		monitorFd = child.monitorFd
+		buf = " " * len(needle)
+		try:
+			while (buf[-(len(needle)):] != needle):
+				#print "[BUF]: %s" % (buf)
+				#print "[NEE]: %s" % (needle)
+				(rlist, wlist, xlist) = select.select([monitorFd], [], [], timeout)
+				if (len(rlist) == 0):
+					log.error("Timeout getting results from monitor for vmId %d" % (child.pid))
+					child.errorBit = True
+					raise RuntimeError
+				c = os.read(monitorFd, 1)
+				if (c == ""):
+					log.error("Early termination on monitor for vmId %d" % (child.pid))
+					child.errorBit = True
+					raise RuntimeError
+				buf = buf + c
+		finally:
+			child.monitorHistory.append(buf[len(needle):])
+		return buf[len(needle):]
+		
+	def enterCommand(self, child, command, expectPrompt = True, timeout = -1):
+		"""Enter a command on the qemu monitor"""
+		res = self.consumeAvailable(child)
+		os.write(child.monitorFd, command + "\n")
+		if (expectPrompt):
+			self.consumeUntil(child, command)
+			res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
+		return res
+
+	def genTmpFilename(self):
+		"""Create a temporary file name for a fifo used to uncompress a suspended VM"""
+		return "/tmp/Qemu_%d" % (os.getpid())
+	
+	def loadChildInfo(self, vmId):
+		child = self.anonClass(pid=vmId)
+		info = open(self.INFO_DIR + "/%d"%(child.pid), "r")
+		(image, macAddr, memory, cores, opts, pid, ptyFile) = cPickle.load(info)
+		info.close()
+		if (pid != child.pid):
+			raise Exception, "PID mismatch"
+		child.image = image
+		child.macAddr = macAddr
+		child.memory = memory
+		child.cores = cores
+		child.opts = opts
+		child.pid = pid
+		child.ptyFile = ptyFile
+		child.monitorHistory = []
+		child.errorBit = False
+		return child
+	
+	def saveChildInfo(self, child):
+		info = open(self.INFO_DIR + "/%d"%(child.pid), "w")
+		cPickle.dump((child.image, child.macAddr, child.memory, child.cores, child.opts, child.pid, child.ptyFile), info)
+		info.close()
+
+	def startVm(self, instance, source):
+		"""Universal function to start a VM -- used by instantiateVM, resumeVM, and prepReceiveVM"""
+		global lastCmd
+		(image, macAddr, memory, cores, diskModel, instanceId, opts) = self.instanceToOld(instance)
+		graphicString = "" if opts.get("enableDisplay", False) else "-nographic"
+		sourceString = "" if not source else "-incoming %s" % (source)
+		snapshotString = "" if diskModel == "persistent" else "-snapshot"
+		imageLocal = self.dfs.getLocalHandle("images/" + image)
+		cmd = "%s %s %s -hda %s -net nic,macaddr=%s -net tap -m %d -smp %d -serial none -monitor pty %s" % (self.QEMU_BIN, graphicString, snapshotString, imageLocal, macAddr, memory, cores, sourceString)
+		lastCmd = cmd
+		cmd = cmd.split()
+		(pipe_r, pipe_w) = os.pipe()
+		pid = os.fork()
+		if (pid == 0):
+			pid = os.getpid()
+			os.setpgid(pid, pid)
+			os.close(pipe_r)
+			os.dup2(pipe_w, sys.stderr.fileno())
+			for i in [sys.stdin.fileno(), sys.stdout.fileno()]:
+				try:
+					os.close(i)
+				except:
+					pass
+			for i in xrange(3, os.sysconf("SC_OPEN_MAX")):
+				try:
+					os.close(i)
+				except:
+					pass
+			os.execl(self.QEMU_BIN, *cmd)
+			sys.exit(-1)
+		os.close(pipe_w)
+		child = self.anonClass(pid=pid, image=image, macAddr=macAddr, memory=memory, cores=cores, opts=opts, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = False, OSchild = True)
+		child.ptyFile = None
+		self.saveChildInfo(child)
+		self.controlledVMs[child.pid] = child
+		log.info("Adding vmId %d" % (child.pid))
+		return (child.pid, cmd)
+
+	def getPtyInfo(self, child, issueContinue):
+		ptyFile = None
+		while not ptyFile:
+			l = child.stderr.readline()
+			if (l == ""):
+				os.waitpid(pid, 0)
+				raise Exception, "Failed to start VM -- ptyFile not found"
+			if (l.find("char device redirected to ") != -1):
+				ptyFile=l[26:].strip()
+				break
+		child.ptyFile = ptyFile
+		child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
+		child.monitor = os.fdopen(child.monitorFd)
+		self.saveChildInfo(child)
+		if (issueContinue):
+			self.enterCommand(child, "c")
+	
+	def stopVm(self, vmId, target, stopFirst):
+		"""Universal function to stop a VM -- used by suspendVM, migrateVM """
+		child = self.getChildFromPid(vmId)
+		if (stopFirst):
+			self.enterCommand(child, "stop")
+		if (target):
+			retry = self.migrationRetries
+			while (retry > 0):
+				res = self.enterCommand(child, "migrate %s" % (target), timeout=self.migrateTimeout)
+				retry = retry - 1
+				if (res.find("migration failed") == -1):
+					retry = -1
+				else:
+					log.error("Migration (transiently) failed: %s\n", res)
+			if (retry == 0):
+				log.error("Migration failed: %s\n", res)
+				child.errorBit = True
+				raise RuntimeError
+		self.enterCommand(child, "quit", expectPrompt=False)
+		return vmId
+	
+	def instanceToOld(self, instance):
+		if (len(instance.disks) != 1):
+			raise NotImplementedError
+		if (len(instance.nics) != 1):
+			raise NotImplementedError
+		image = instance.disks[0].uri
+		macAddr = instance.nics[0].mac
+		memory = instance.typeObj.memory
+		cores = instance.typeObj.cores
+		diskModel = "persistent" if instance.disks[0].persistent else "transient"
+		instanceId = instance.id
+		opts = instance.hints
+		if (diskModel != "transient"):
+			raise NotImplementedError
+		return (image, macAddr, memory, cores, diskModel, instanceId, opts)
+	
+	def oldToInstance(self, image, macAddr, memory, cores, diskModel, opts):
+		instance = self.anonClass()
+		instance.disks = [self.anonClass()]
+		instance.nics = [self.anonClass()]
+		instance.typeObj = self.anonClass()
+		instance.disks[0].uri = image
+		instance.nics[0].mac = macAddr
+		instance.typeObj.memory = memory
+		instance.typeObj.cores = cores
+		instance.disks[0].persistent = (diskModel == "persistent")
+		instance.id = -1
+		instance.hints = opts
+		return instance
+	
+	def instantiateVm(self, instance):
+		(vmId, cmd) = self.startVm(instance, None)
+		child = self.getChildFromPid(vmId)
+		self.getPtyInfo(child, False)
+		child.cmd = cmd
+		self.saveChildInfo(child)
+		return vmId
+	
+	def suspendVm(self, vmId, target, suspendCookie):
+		child = self.getChildFromPid(vmId)
+		info = self.dfs.open("%s.info" % (target), "w")
+		cPickle.dump((child.image, child.macAddr, child.memory, child.cores, child.opts, suspendCookie), info)
+		info.close()
+		# XXX: Use fifo to improve performance
+		vmId = self.stopVm(vmId, "\"exec:gzip -c > /tmp/%s.dat\"" % (target), True)
+		self.dfs.copyTo("/tmp/%s.dat" % (target), "%s.dat" % (target))
+		return vmId
+	
+	def resumeVm(self, source):
+		# XXX: Read in and unzip directly (or use fifo)
+		self.dfs.copyFrom("%s.dat" % (source), "/tmp/%s.dat" % (source))
+		info = self.dfs.open("%s.info" % (source), "r")
+		(image, macAddr, memory, cores, opts, suspendCookie) = cPickle.load(info)
+		info.close()
+		tmpFile = self.genTmpFilename()
+		os.mkfifo(tmpFile)
+		zcat = subprocess.Popen(args=["/bin/bash", "-c", "zcat /tmp/%s.dat > %s" % (source, tmpFile)], executable="/bin/bash", close_fds=True)
+		instance = self.oldToInstance(image, macAddr, memory, cores, "transient", opts)
+		(vmId, cmd) = self.startVm(instance, "file://%s" % tmpFile)
+		zcat.wait()
+		os.unlink(tmpFile)
+		child = self.getChildFromPid(vmId)
+		self.getPtyInfo(child, True)
+		child.suspendCookie = suspendCookie
+		child.cmd = cmd
+		return (vmId, suspendCookie)
+	
+	def prepReceiveVm(self, instance, source):
+		self.usedPortsLock.acquire()
+		port = int(random.random()*1000+19000)
+		while port in self.usedPorts:
+			port = int(random.random()*1000+19000)
+		self.usedPorts.append(port)
+		self.usedPortsLock.release()
+		(vmId, cmd) = self.startVm(instance, "tcp://0.0.0.0:%d" % (port))
+		transportCookie = cPickle.dumps((port, vmId, socket.gethostname()))
+		child = self.getChildFromPid(vmId)
+		child.cmd = cmd
+		child.transportCookie = transportCookie
+		self.saveChildInfo(child)
+		# XXX: Cleanly wait until the port is open
+		lc = 0
+		while (lc < 1):
+			(stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
+			stdin.close()
+			r = stdout.read()
+			lc = int(r.strip())
+			time.sleep(1.0 if lc < 1 else 0.0)
+		return transportCookie
+	
+	def migrateVm(self, vmId, target, transportCookie):
+		self.migrationSemaphore.acquire()
+		try:
+			(port, _vmId, _hostname) = cPickle.loads(transportCookie)
+			child = self.getChildFromPid(vmId)
+			child.migratingOut = True
+			res = self.stopVm(vmId, "tcp://%s:%d" % (target, port), False)
+			# XXX: Some sort of feedback would be nice
+			# XXX: Should we block?
+			self.waitForExit(vmId)
+		finally:
+			self.migrationSemaphore.release()
+		return res
+	
+	def receiveVm(self, transportCookie):
+		(port, vmId, _hostname) = cPickle.loads(transportCookie)
+		try:
+			child = self.getChildFromPid(vmId)
+		except:
+			log.error("Failed to get child info; transportCookie = %s; hostname = %s" % (str(cPickle.loads(transportCookie)), socket.hostname()))
+			raise
+		try:
+			self.getPtyInfo(child, True)
+		except RuntimeError, e:
+			log.error("Failed to get pty info -- VM likely died")
+			child.errorBit = True
+			raise
+		self.usedPortsLock.acquire()
+		self.usedPorts = filter(lambda _port: _port != port, self.usedPorts)
+		self.usedPortsLock.release()
+		return vmId
+	
+	def pauseVm(self, vmId):
+		child = self.getChildFromPid(vmId)
+		self.enterCommand(child, "stop")
+	
+	def unpauseVm(self, vmId):
+		child = self.getChildFromPid(vmId)
+		self.enterCommand(child, "c")
+	
+	@broken
+	def shutdownVm(self, vmId):
+		"""'system_powerdown' doesn't seem to actually shutdown the VM"""
+		child = self.getChildFromPid(vmId)
+		self.enterCommand(child, "system_powerdown")
+	
+	def destroyVm(self, vmId):
+		child = self.getChildFromPid(vmId)
+		child.migratingOut = False
+		# XXX: the child could have exited between these two points, but I don't know how to fix that since it might not be our child process
+		os.kill(child.pid, signal.SIGKILL)
+	
+	def listVms(self):
+		return self.controlledVMs.keys()

Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py Mon Nov  3 06:45:25 2008
@@ -0,0 +1,64 @@
+class VmControlInterface():
+	"""Interface description for VM controllers -- like Qemu, Xen, etc"""
+
+	def __init__(self, config, dfs, nm):
+		"""Base init function -- it handles inserting config and dfs 
+		   into the object as well as checking that the class type is 
+		   not VmControlInterface"""
+		if self.__class__ is VmControlInterface:
+			raise NotImplementedError
+		self.config = config
+		self.dfs = dfs
+		self.nm = nm
+	
+	def instantiateVm(self, instance):
+		"""Takes an InstanceConfiguration, creates a VM based on it, 
+		   and returns the vmId"""
+		raise NotImplementedError
+	
+	def suspendVm(self, vmId, target, suspendCookie=None):
+		"""Suspends a vm to the target on the dfs, including the 
+		   suspendCookie"""
+		raise NotImplementedError
+	
+	def resumeVm(self, source):
+		"""Resumes a vm from the dfs and returns the newly created 
+		   vmId as well as the suspendCookie in a tuple"""
+		raise NotImplementedError
+	
+	def prepReceiveVm(self, instance, source):
+		"""First call made as part of vm migration -- it is made to 
+		   the target machine and it returns a transportCookie"""
+		raise NotImplementedError
+	
+	def migrateVm(self, vmId, target, transportCookie):
+		"""Second call made as part of a vm migration -- it is made 
+		   to the source machine and it does not return until the 
+		   migration is complete"""
+		raise NotImplementedError
+	
+	def receiveVm(self, transportCookie):
+		"""Third call made as part of a vm migration -- it is made to 
+		   the target machine and it does not return until the 
+		   migration is complete, it returns the new vmId"""
+		raise NotImplementedError
+	
+	def pauseVm(self, vmId):
+		"""Pauses a vm and returns nothing"""
+		raise NotImplementedError
+	
+	def unpauseVm(self, vmId):
+		"""Unpauses a vm and returns nothing"""
+		raise NotImplementedError
+	
+	def shutdownVm(self, vmId):
+		"""Performs a clean shutdown on a vm and returns nothing"""
+		raise NotImplementedError
+	
+	def destroyVm(self, vmId):
+		"""Forces the exit of a vm and returns nothing"""
+		raise NotImplementedError
+	
+	def listVms(self):
+		"""Returns a list of vmIds to the caller"""
+		raise NotImplementedError