You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2008/11/03 14:45:27 UTC
svn commit: r710072 [2/3] - in /incubator/tashi/import: ./ tashi-intel-r399/
tashi-intel-r399/doc/ tashi-intel-r399/etc/ tashi-intel-r399/guest/
tashi-intel-r399/scripts/ tashi-intel-r399/src/ tashi-intel-r399/src/tashi/
tashi-intel-r399/src/tashi/agen...
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/dfsinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/dfsinterface.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/dfsinterface.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/dfsinterface.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,38 @@
+class DfsInterface:
+ def __init__(self, config):
+ if self.__class__ is DfsInterface:
+ raise NotImplementedError
+ self.config = config
+
+ def copyTo(self, localSrc, dst):
+ raise NotImplementedError
+
+ def copyFrom(self, src, localDst):
+ raise NotImplementedError
+
+ def list(self, path):
+ raise NotImplementedError
+
+ def stat(self, path):
+ raise NotImplementedError
+
+ def move(self, src, dst):
+ raise NotImplementedError
+
+ def copy(self, src, dst):
+ raise NotImplementedError
+
+ def mkdir(self, path):
+ raise NotImplementedError
+
+ def unlink(self, path):
+ raise NotImplementedError
+
+ def rmdir(self, path):
+ raise NotImplementedError
+
+ def open(self, path, perm):
+ raise NotImplementedError
+
+ def getLocalHandle(self, path):
+ raise NotImplementedError
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/vfs.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/vfs.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/vfs.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/dfs/vfs.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,59 @@
+import os
+import os.path
+from dfsinterface import DfsInterface
+
+class Vfs(DfsInterface):
+ def __init__(self, config):
+ DfsInterface.__init__(self, config)
+ self.prefix = self.config.get("Vfs", "prefix")
+
+ def copyTo(self, localSrc, dst):
+ (si, so, se) = os.popen3("cp %s %s" % (localSrc,
+ os.path.join(self.prefix, dst)))
+ so.readlines()
+ return None
+
+ def copyFrom(self, src, localDst):
+ (si, so, se) = os.popen3("cp %s %s" % (os.path.join(self.prefix, src),
+ localDst))
+ so.readlines()
+ return None
+
+ def list(self, path):
+ try:
+ return os.listdir(os.path.join(self.prefix, path))
+ except OSError, e:
+ if (e.errno == 20):
+ return [path.split('/')[-1]]
+ else:
+ raise
+
+ def stat(self, path):
+ return os.stat(os.path.join(self.prefix, path))
+
+ def move(self, src, dst):
+ (si, so, se) = os.popen3("mv %s %s" % (os.path.join(self.prefix, src),
+ os.path.join(self.prefix, dst)))
+ so.readlines()
+ return None
+
+ def copy(self, src, dst):
+ (si, so, se) = os.popen3("cp %s %s" % (os.path.join(self.prefix, src),
+ os.path.join(self.prefix, dst)))
+ so.readlines()
+ return None
+
+ def mkdir(self, path):
+ return os.mkdir(os.path.join(self.prefix, path))
+
+ def unlink(self, path):
+ return os.unlink(os.path.join(self.prefix, path))
+
+ def rmdir(self, path):
+ return os.rmdir(os.path.join(self.prefix, path))
+
+ def open(self, path, perm):
+ return open(os.path.join(self.prefix, path), perm)
+
+ def getLocalHandle(self, path):
+ return os.path.join(self.prefix, path)
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/__init__.py?rev=710072&view=auto
==============================================================================
(empty)
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messageBroker.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messageBroker.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messageBroker.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messageBroker.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,32 @@
+import ConfigParser
+import getopt
+
+import os
+import sys
+import time
+
+import thriftmessaging
+
+options = []
+long_options = ['port=']
+
+# FIXME: should initialize from config file
+params = {"port":1717}
+
+try:
+ optlist, args = getopt.getopt(sys.argv[1:], options, long_options)
+except getopt.GetoptError, err:
+ print str(err)
+ sys.exit(2)
+
+for opt in optlist:
+ if opt[0] == "--port":
+ try:
+ params["port"] = int(opt[1])
+ except:
+ print "--port expects an integer, got %s" % opt[1]
+ sys.exit(0)
+
+print "Starting message broker on port %i" % params["port"]
+broker = thriftmessaging.MessageBrokerThrift(params["port"], daemon=False)
+
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messaging.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messaging.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/messaging.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,321 @@
+#!/usr/bin/python
+
+
+import threading
+import thread
+import sys
+import os
+import socket
+import Queue
+import copy
+import random
+import traceback
+
+from threadpool import ThreadPoolClass, threadpool, ThreadPool
+from threadpool import threadpoolmethod, threaded, synchronized, synchronizedmethod
+
+class RWLock():
+ """RWLock: Simple reader/writer lock implementation
+ FIXME: this implementation will starve writers!
+ Methods:
+ acquire() : take lock for read access
+ release() : release lock from read access
+ acquireWrite() : take lock for write access
+ releaseWrite() : release lock from write access"""
+ def __init__(self):
+ self.lock = threading.Condition()
+ self.readers = 0
+ def acquire(self):
+ self.lock.acquire()
+ self.readers = self.readers + 1
+ self.lock.release()
+ def release(self):
+ self.lock.acquire()
+ self.readers = self.readers - 1
+ self.lock.notify()
+ self.lock.release()
+ def acquireWrite(self):
+ self.lock.acquire()
+ while self.readers > 0:
+ self.lock.wait()
+ def releaseWrite(self):
+ self.lock.notify()
+ self.lock.release()
+
+
+
+class MessageBroker():
+ def __init__(self):
+ self.sublock = RWLock()
+ self.subscribers = []
+ self.random = random.Random()
+ def log(self, msg):
+ print "MessageBroker: Got log: '%s'" % str(msg)
+ return msg
+ def addSubscriber(self, subscriber):
+ self.sublock.acquireWrite()
+ self.subscribers.append(subscriber)
+ l = len(self.subscribers)
+ self.sublock.releaseWrite()
+ return l
+ def publish(self, message):
+ removesubs = []
+ i = self.random.randint(0,100)
+
+# subscribers = self.getSubscribers()
+# random.shuffle(subscribers)
+
+ self.sublock.acquire()
+
+ sys.stdout.flush()
+
+ for subscriber in self.subscribers:
+ try:
+ sys.stdout.flush()
+ assert(subscriber != self)
+ subscriber.publish(message)
+ sys.stdout.flush()
+ except Exception, e:
+ print e
+ removesubs.append(subscriber)
+
+ self.sublock.release()
+
+ if len(removesubs) > 0:
+ print "detected %i failed subscribers" % len(removesubs)
+ sys.stdout.flush()
+ self.sublock.acquireWrite()
+ for subscriber in removesubs:
+ try:
+ self.subscribers.remove(subscriber)
+ except:
+ pass
+ self.sublock.releaseWrite()
+ def getSubscribers(self):
+ self.sublock.acquire()
+ subs = copy.copy(self.subscribers)
+ self.sublock.release()
+ return subs
+ def removeSubscriber(self, subscriber):
+ self.sublock.acquireWrite()
+ try:
+ self.subscribers.remove(subscriber)
+ except:
+ pass
+ self.sublock.releaseWrite()
+ def publishList(self, messages):
+ for message in messages:
+ self.publish(message)
+
+class Subscriber():
+ def __init__(self, broker, pmatch={}, nmatch={}, synchronized=False):
+ self.broker = broker
+ self.lock = threading.Lock()
+ self.synchronized = synchronized
+ self.pmatch={}
+ self.nmatch={}
+ broker.addSubscriber(self)
+ def publish(self, message):
+ sys.stdout.flush()
+ msg = message
+ try:
+ if self.synchronized:
+ self.lock.acquire()
+ msg = self.filter(msg)
+ if (msg != None):
+ self.handle(msg)
+ if self.synchronized:
+ self.lock.release()
+ except Exception, x:
+ if self.synchronized:
+ self.lock.release()
+ print '%s, %s, %s' % (type(x), x, traceback.format_exc())
+ def publishList(self, messages):
+ for message in messages:
+ self.publish(message)
+ def handle(self, message):
+ print "Subscriber Default Handler: '%s'" % message
+ def setMatch(self, pmatch={}, nmatch={}):
+ self.lock.acquire()
+ self.pmatch=pmatch
+ self.nmatch=nmatch
+ self.lock.release()
+ def filter(self, message):
+ """filter(self, message) : the filter function returns
+ the message, modified to be passed to the handler.
+ Returning (None) indicates that this is not a message
+ we are interested in, and it will not be passed to the
+ handler."""
+ send = True
+ for key in self.pmatch.keys():
+ if (not message.has_key(key)):
+ send = False
+ break
+ if self.pmatch[key] != None:
+ if message[key] != self.pmatch[key]:
+ send = False
+ break
+ if send == False:
+ return None
+ for key in message.keys():
+ if self.nmatch.has_key(key):
+ if self.nmatch[key] == None:
+ send = False
+ break
+ if self.nmatch[key] == message[key]:
+ send = False
+ break
+ if send == False:
+ return None
+ return message
+
+
+
+class Publisher():
+ '''Superclass for pub/sub publishers
+
+ FIXME: use finer-grained locking'''
+ def __init__(self, broker, aggregate=100):
+ self.pending = []
+ self.pendingLock = threading.Lock()
+ self.aggregateSize = aggregate
+ self.broker = broker
+ @synchronizedmethod
+ def publish(self, message):
+ if message.has_key('aggregate') and message['aggregate'] == 'True':
+ self.aggregate(message)
+ return
+ else:
+ self.broker.publish(message)
+ @synchronizedmethod
+ def publishList(self, messages):
+ self.broker.publishList(messages)
+ @synchronizedmethod
+ def aggregate(self, message):
+ # we can make this lock-less by using a queue for pending
+ # messages
+ self.pendingLock.acquire()
+ self.pending.append(message)
+ if len(self.pending) >= self.aggregateSize:
+ self.broker.publishList(self.pending)
+ self.pending = []
+ self.pendingLock.release()
+ @synchronizedmethod
+ def setBroker(self, broker):
+ self.broker = broker
+
+##############################
+# Testing Code
+##############################
+import time
+import unittest
+import sys
+import logging
+
+
+class TestSubscriber(Subscriber):
+ def __init__(self, *args, **kwargs):
+ self.queue = Queue.Queue()
+ Subscriber.__init__(self, *args, **kwargs)
+ def handle(self, message):
+ self.queue.put(message)
+
+class TestMessaging(unittest.TestCase):
+ def setUp(self):
+ self.broker = MessageBroker()
+ self.publisher = Publisher(self.broker)
+ self.subscriber = TestSubscriber(self.broker)
+ def testPublish(self):
+ self.publisher.publish( {'message':'hello world'} )
+ self.assertEqual(self.subscriber.queue.qsize(), 1)
+ def testPublishList(self):
+ nrmsgs = 10
+ msgs = []
+ for i in range(nrmsgs):
+ msgs.append( {'msgnum':str(i)} )
+ self.publisher.publishList( msgs )
+ self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
+ def testAggregate(self):
+ nrmsgs = self.publisher.aggregateSize
+ for i in range(nrmsgs):
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ self.publisher.aggregate( {'msgnum':str(i)} )
+ self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
+ def testAggregateKeyword(self):
+ nrmsgs = self.publisher.aggregateSize
+ for i in range(nrmsgs):
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
+ self.assertEqual(self.subscriber.queue.qsize(), nrmsgs)
+
+if __name__ == '__main__':
+
+ logging.basicConfig(level=logging.INFO,
+ format="%(asctime)s %(levelname)s:\t %(message)s",
+ stream=sys.stdout)
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestMessaging)
+ unittest.TextTestRunner(verbosity=2).run(suite)
+
+ sys.exit(0)
+
+
+##############################
+# Old/Unused testing code
+##############################
+
+
+
+ print 'testing removeSubscriber'
+ broker.removeSubscriber(subscriber)
+ publisher.publish( {'message':"you shouldn't see this"} )
+
+ nsub = NullSubscriber(broker)
+ print 'timing publish'
+ nrmsg = 100000
+ tt = time.time()
+ for i in range(nrmsg):
+# publisher.publish( {"message":"hello world!"} )
+ publisher.publish( {} )
+ tt = time.time() - tt
+ print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
+ tt,
+ nrmsg/tt)
+ broker.removeSubscriber(nsub)
+
+ class SlowSubscriber(Subscriber):
+ def handle(self, message):
+ print 'called slow subscriber with message', message
+ time.sleep(1)
+ print 'returning from slow subscriber with message', message
+ class ThreadedSubscriber(Subscriber):
+ @threaded
+ def handle(self, message):
+ print 'called threaded subscriber with message', message
+ time.sleep(1)
+ print 'returning from threaded subscriber with message', message
+ class ThreadPoolSubscriber(Subscriber, ThreadPoolClass):
+ @threadpoolmethod
+ def handle(self, message):
+ print 'called threadpool subscriber with message', message
+ time.sleep(1)
+ print 'returning from threadpool subscriber with message', message
+
+
+
+ tsub = ThreadedSubscriber(broker)
+ for i in range(8):
+ publisher.publish( {"msg":str(i)} )
+ broker.removeSubscriber(tsub)
+ time.sleep(3)
+
+ tpsub = ThreadPoolSubscriber(broker)
+ for i in range(8):
+ publisher.publish( {"msg":str(i)} )
+ broker.removeSubscriber(tpsub)
+ time.sleep(3)
+
+ ssub = SlowSubscriber(broker)
+ for i in range(4):
+ publisher.publish( {"msg":str(i)} )
+ broker.removeSubscriber(ssub)
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,213 @@
+#! /usr/bin/python
+
+
+from messaging import *
+
+import cPickle
+import soaplib.wsgi_soap
+import cherrypy.wsgiserver
+from soaplib.service import soapmethod
+from soaplib.serializers.primitive import *
+import SOAPpy.WSDL
+import time
+
+class MessageBrokerSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, MessageBroker):
+ def __init__(self, port):
+ soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
+ MessageBroker.__init__(self)
+ self.port = port
+ def trdfn():
+ service = self
+ server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
+ server.start()
+ threading.Thread(target=trdfn).start()
+
+
+ @soapmethod(Array(String), Array(String), _returns=Null)
+ def log(self, keys, values):
+ message = {}
+ if len(keys) != len(values):
+ raise Exception, "Different lengths for keys and values"
+ for i in range(len(keys)):
+ message[keys[i]] = values[i]
+ MessageBroker.log(self, message)
+
+ @soapmethod(String, Integer, _returns=Null)
+ def addSubscriber(self, host, port):
+ subscriber = SubscriberSoapProxy(host, port)
+ MessageBroker.addSubscriber(self, subscriber)
+
+ @soapmethod(String, Integer, _returns=Null)
+ def removeSubscriber(self, host, port):
+ # should this method really be able to peek into subscriber.host/port
+ subscriber = None
+ subscribers = self.getSubscribers()
+ for subscriber in subscribers:
+ if subscriber.host == host and subscriber.port == port:
+ subscriber = subscriber
+ if subscriber != None:
+ MessageBroker.removeSubscriber(self, subscriber)
+
+
+ @soapmethod(Array(String), Array(String), _returns=Null)
+ def publish(self, keys, values):
+ message = {}
+ if len(keys) != len(values):
+ raise Exception, "Different lengths for keys and values"
+ for i in range(len(keys)):
+ message[keys[i]] = values[i]
+ MessageBroker.publish(self, message)
+
+
+
+class MessageBrokerSoapProxy():
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
+ def log(self, message):
+ keys = []
+ values = []
+ for k,v in message.items():
+ keys.append(k)
+ values.append(v)
+ self.connection.log(keys=keys, values=values)
+ def addSubscriber(self, subscriber):
+ self.connection.addSubscriber(host=subscriber.host, port=subscriber.port)
+ def publish(self, message):
+ keys = []
+ values = []
+ for k,v in message.items():
+ keys.append(k)
+ values.append(v)
+ self.connection.publish(keys=keys, values=values)
+ def removeSubscriber(self, subscriber):
+ self.connection.removeSubscriber(host=subscriber.host, port=subscriber.port)
+
+
+
+
+class SubscriberSoap(soaplib.wsgi_soap.SimpleWSGISoapApp, Subscriber):
+ def __init__(self, broker, port, synchronized=False):
+ soaplib.wsgi_soap.SimpleWSGISoapApp.__init__(self)
+ Subscriber.__init__(self, synchronized=synchronized)
+ self.host = socket.gethostname()
+ self.port = port
+ self.broker = broker
+ self.server = None
+ def trdfn():
+ service = self
+ self.server = cherrypy.wsgiserver.CherryPyWSGIServer(("0.0.0.0",port), service)
+ self.server.start()
+ threading.Thread(target=trdfn).start()
+# broker.log("Subscriber started")
+ broker.addSubscriber(self)
+ @soapmethod(Array(String), Array(String), _returns=Integer)
+ def publish(self, keys, values):
+ message = {}
+ if len(keys) != len(values):
+ raise Exception, "Different lengths for keys and values"
+ for i in range(len(keys)):
+ message[keys[i]] = values[i]
+ Subscriber.publish(self, message)
+ return 0
+ def stop(self):
+ self.server.stop()
+
+class SubscriberSoapProxy():
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.connection = SOAPpy.WSDL.Proxy("http://%s:%i/.wsdl"%(host,port))
+ def publish(self, message):
+ keys = []
+ values = []
+ for k,v in message.items():
+ keys.append(k)
+ values.append(v)
+ self.connection.publish(keys=keys, values=values)
+
+
+####################
+# Testing Code
+####################
+
+class CustomSubscriber(SubscriberSoap):
+ def handle(self, message):
+ print "Custom Subscriber: '%s'" % str(message)
+
+class NullSubscriber(SubscriberSoap):
+ def handle(self, message):
+ pass
+
+
+if __name__ == '__main__':
+ try:
+ portnum = 1717
+
+ print "\ntesting message broker"
+ broker = MessageBrokerSoap(portnum)
+ proxy = MessageBrokerSoapProxy("localhost", portnum)
+ portnum = portnum + 1
+
+ print "\ntesting log function"
+ proxy.log( {"message":"Hello World!"} )
+# proxy.log("It looks like log works")
+
+ print "\ntesting subscriber proxy"
+ subscriber = SubscriberSoap(proxy, portnum)
+ portnum = portnum + 1
+
+ print "\ntesting custom subscriber"
+ csub = CustomSubscriber(proxy, portnum)
+ portnum = portnum + 1
+
+ print "\ntesting publish"
+ proxy.publish( {"message":"Hello World!"} )
+
+ print "\ntesting stop"
+ subscriber.stop()
+ proxy.publish( {"message":"Everybody here?"} )
+
+ print "\ntesting removeSubscriber"
+ proxy.removeSubscriber(csub)
+ proxy.publish( {"message":"Nobody home"} )
+ proxy.addSubscriber(csub)
+ proxy.publish( {"message":"You're back!"} )
+
+ print "\ntesting filter"
+ csub.setMatch( {"print":"yes"} )
+ proxy.publish( {"print":"yes", "message":"this should be printed"} )
+ proxy.publish( {"print":"no", "message":"this should NOT be printed"} )
+ csub.setMatch()
+
+ print "\ntesting publish performance"
+ proxy.removeSubscriber(csub)
+ nrmsg = 10000
+ tt = time.time()
+ for i in range(nrmsg):
+ proxy.publish( {"message":"msg %i"%i} )
+ tt = time.time() - tt
+ print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
+ tt,
+ nrmsg/tt)
+
+ print "\ntesting publish/subscribe performance"
+ nsub = NullSubscriber(proxy, portnum)
+ portnum = portnum + 1
+ nrmsg = 10000
+ tt = time.time()
+ for i in range(nrmsg):
+ proxy.publish( {"message":"msg %i"%i} )
+ tt = time.time() - tt
+ print "Published %i messages in %f seconds, %f msg/s"%(nrmsg,
+ tt,
+ nrmsg/tt)
+
+
+
+ except Exception, e:
+# raise e
+ print e
+ sys.exit(0)
+ sys.exit(0)
Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/soapmessaging.py
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/tashimessaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/tashimessaging.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/tashimessaging.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/tashimessaging.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,131 @@
+from thriftmessaging import *
+import logging
+import Queue
+from ConfigParser import ConfigParser
+import time
+import socket
+import signal
+
+class TashiLogHandler(logging.Handler, PublisherThrift):
+ def __init__(self, config, *args, **kwargs):
+ self.messages = Queue.Queue()
+ self.config = config
+ logging.Handler.__init__(self, *args, **kwargs)
+ PublisherThrift.__init__(self,
+ config.get('MessageBroker', 'host'),
+ int(config.get('MessageBroker', 'port')))
+ def emit(self, record):
+ # 'args', 'created', 'exc_info', 'exc_text', 'filename',
+ # 'funcName', 'getMessage', 'levelname', 'levelno', 'lineno',
+ # 'module', 'msecs', 'msg', 'name', 'pathname', 'process',
+ # 'relativeCreated', 'thread', 'threadName']
+ msg = {}
+ # args
+ # created
+ # exc_info
+ # exc_text
+ msg['log-filename'] = str(record.filename)
+ msg['log-funcname'] = str(record.funcName)
+ msg['log-levelname'] = str(record.levelname)
+ msg['log-level'] = str(record.levelno)
+ msg['log-lineno'] = str(record.lineno)
+ msg['log-module'] = str(record.module)
+ msg['log-msecs'] = str(record.msecs)
+ msg['log-message'] = str(record.msg)
+ msg['log-name'] = str(record.name)
+ msg['log-pathname'] = str(record.pathname)
+ msg['log-process'] = str(record.process)
+ # relativeCreated
+ msg['log-thread'] = str(record.thread)
+ msg['log-threadname'] = str(record.threadName)
+
+ # standard message fields
+ msg['timestamp'] = str(time.time())
+ msg['hostname'] = socket.gethostname()
+ msg['message-type'] = 'log'
+
+ self.messages.put(msg)
+ self.publish(msg)
+
+class TashiSubscriber(SubscriberThrift):
+ def __init__(self, config, port, **kwargs):
+ sys.stdout.flush()
+ brokerPort = int(config.get('MessageBroker', 'port'))
+ self.broker = MessageBrokerThriftProxy(config.get('MessageBroker', 'host'), brokerPort)
+ SubscriberThrift.__init__(self, self.broker, port, **kwargs)
+
+
+
+##############################
+# Test Code
+##############################
+import unittest
+import sys
+
+class TestTashiSubscriber(TashiSubscriber):
+ def __init__(self, *args, **kwargs):
+ self.messageQueue = Queue.Queue()
+ TashiSubscriber.__init__(self, *args, **kwargs)
+ def handle(self, message):
+ self.messageQueue.put(message)
+
+
+def incrementor(start = 0):
+ while True:
+ a = start
+ start = start + 1
+ yield a
+increment = incrementor()
+
+class TestTashiMessaging(unittest.TestCase):
+ def setUp(self):
+ self.configFiles = [ '../../../etc/TestConfig.cfg']
+ self.config = ConfigParser()
+ self.configFiles = self.config.read(self.configFiles)
+ self.port = int(self.config.get('MessageBroker', 'port'))
+
+ try:
+ self.brokerPid = os.spawnlpe(os.P_NOWAIT, 'python', 'python',
+ './messageBroker.py',
+ '--port', str(self.port),
+ os.environ)
+ self.port = self.port + 1
+ # FIXME: what's the best way to wait for the broker to be ready?
+ time.sleep(1)
+ except Exception, e:
+ sys.exit(0)
+ self.initialized = True
+ self.log = logging.getLogger('TestTashiMessaging')
+ self.handler = TashiLogHandler(self.config)
+ self.log.addHandler(self.handler)
+ self.sub = TestTashiSubscriber(self.config, int(self.port) + increment.next())
+ def tearDown(self):
+ os.kill(self.brokerPid, signal.SIGKILL)
+ # FIXME: wait for the port to be ready again
+ time.sleep(2)
+ self.log.removeHandler(self.handler)
+# self.sub.broker.removeSubscriber(self.sub)
+ pass
+ def testLog(self):
+ self.log.log(50, "Hello World!")
+ self.handler.messages.get(timeout=5)
+ self.sub.messageQueue.get(timeout=5)
+ self.assertEqual(self.handler.messages.qsize(), 0)
+ self.assertEqual(self.sub.messageQueue.qsize(), 0)
+ def testPublish(self):
+ sys.stdout.flush()
+ self.port = self.port + 1
+ self.handler.publish({'message':'hello world'})
+ self.sub.messageQueue.get(timeout=5)
+ self.assertEqual(self.sub.messageQueue.qsize(), 0)
+
+
+if __name__=='__main__':
+
+
+# logging.basicConfig(level=logging.INFO,
+# format="%(asctime)s %(levelname)s:\t %(message)s",
+# stream=sys.stdout)
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestTashiMessaging)
+ unittest.TextTestRunner(verbosity=2).run(suite)
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/threadpool.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/threadpool.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/threadpool.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/threadpool.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,288 @@
+import threading
+import time
+import Queue
+import logging
+
+_log = logging.getLogger('tashi.messaging.threadpool')
+
+def threaded(func):
+ def fn(*args, **kwargs):
+ thread = threading.Thread(target=func, args=args, kwargs=kwargs)
+ thread.start()
+ return thread
+ return fn
+
+
+class ThreadPool(Queue.Queue):
+ def __init__(self, size=8, maxsize=0):
+ Queue.Queue.__init__(self, maxsize)
+ for i in range(size):
+ thread = threading.Thread(target=self._worker)
+ thread.setDaemon(True)
+ thread.start()
+ def _worker(self):
+ while True:
+ try:
+ func, args, kwargs = self.get()
+ func(*args, **kwargs)
+ except Exception, e:
+ _log.error(e)
+ # FIXME: do something smarter here, backtrace, log,
+ # allow user-defined error handling...
+
+ def submit(self, func, *args, **kwargs):
+ self.put((func, args, kwargs))
+ def submitlist(self, func, args, kwargs):
+ self.put((func, args, kwargs))
+
+class ThreadPoolClass:
+ def __init__(self, size=8, maxsize=0):
+ self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
+
+
+def threadpool(pool):
+ def dec(func):
+ def fn(*args, **kwargs):
+ pool.submit(func, *args, **kwargs)
+ return fn
+ return dec
+
+def threadpoolmethod(meth):
+ def fn(*args, **kwargs):
+ try:
+ pool = args[0]._threadpool_pool
+ except AttributeError:
+ pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
+ # FIXME: how do we check parent class?
+# assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
+ pool.submit(meth, *args, **kwargs)
+ return fn
+
+def synchronized(lock=None):
+ _log.debug('synchronized decorator factory called')
+ if lock==None:
+ lock = threading.RLock()
+ def dec(func):
+ _log.debug('synchronized decorator called')
+ def fn(*args, **kwargs):
+ _log.debug('getting sync lock')
+ lock.acquire()
+ _log.debug('got sync lock')
+ ex = None
+ try:
+ r = func(*args, **kwargs)
+ except Exception, e:
+ ex = e
+ _log.debug('releasing sync lock')
+ lock.release()
+ _log.debug('released sync lock')
+ if ex != None:
+ raise e
+ return r
+ return fn
+ return dec
+
+def synchronizedmethod(func):
+ def fn(*args, **kwargs):
+ try:
+ lock = args[0]._synchronized_lock
+ except AttributeError:
+ lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
+ lock.acquire()
+ ex = None
+ try:
+ func(*args, **kwargs)
+ except Exception, e:
+ ex = e
+ lock.release()
+ if ex != None:
+ raise e
+ return fn
+
+
+##############################
+# Test Code
+##############################
+import unittest
+import sys
+import time
+
+class TestThreadPool(unittest.TestCase):
+ def setUp(self):
+ self.errmargin = 0.5
+
+ def testUnthreaded(self):
+ queue = Queue.Queue()
+ def slowfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(4):
+ slowfunc()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testThreaded(self):
+ queue = Queue.Queue()
+ @threaded
+ def slowthreadfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(8):
+ slowthreadfunc()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
+
+ def testThreadPool(self):
+ pool = ThreadPool(size=4)
+ queue = Queue.Queue()
+ @threadpool(pool)
+ def slowpoolfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(8):
+ slowpoolfunc()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 2, 1)
+
+ def testUnthreadedMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(4):
+ sc.beslow()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testThreadedMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ @threaded
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(4):
+ sc.beslow()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
+
+ def testThreadPoolMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ @threadpoolmethod
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(16):
+ sc.beslow()
+ for i in range(16):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 2, 1)
+
+ def testSynchronized(self):
+ queue = Queue.Queue()
+ @synchronized()
+ def addtoqueue():
+ time.sleep(1)
+ queue.put(None)
+ @threaded
+ def slowthreadfunc():
+ addtoqueue()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testSynchronizedMethod(self):
+ queue = Queue.Queue()
+ class addtoqueue:
+ @synchronizedmethod
+ def addtoqueue1(self):
+ time.sleep(1)
+ queue.put(None)
+ @synchronizedmethod
+ def addtoqueue2(self):
+ time.sleep(1)
+ queue.put(None)
+ atc = addtoqueue()
+ @threaded
+ def slowthreadfunc1():
+ atc.addtoqueue1()
+ @threaded
+ def slowthreadfunc2():
+ atc.addtoqueue2()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc1()
+ slowthreadfunc2()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 8, 1)
+
+ def testUnsynchronizedMethod(self):
+ queue = Queue.Queue()
+ class addtoqueue:
+ def addtoqueue1(self):
+ time.sleep(1)
+ queue.put(None)
+ def addtoqueue2(self):
+ time.sleep(1)
+ queue.put(None)
+ atc = addtoqueue()
+ @threaded
+ def slowthreadfunc1():
+ atc.addtoqueue1()
+ @threaded
+ def slowthreadfunc2():
+ atc.addtoqueue2()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc1()
+ slowthreadfunc2()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
+
+
+
+if __name__=='__main__':
+ import sys
+
+ logging.basicConfig(level=logging.INFO,
+ format="%(asctime)s %(levelname)s:\t %(message)s",
+ stream=sys.stdout)
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
+ unittest.TextTestRunner(verbosity=2).run(suite)
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,261 @@
+#!/usr/bin/env python
+
+import sys
+import time
+import socket
+import traceback
+import threading
+
+sys.path.append('./gen-py')
+import tashi.messaging.messagingthrift
+import tashi.messaging.messagingthrift.MessageBrokerThrift
+import tashi.messaging.messagingthrift.SubscriberThrift
+from tashi.messaging.messagingthrift.ttypes import *
+
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from thrift.server import TServer
+
+from tashi import ConnectionManager
+
+from tashi.messaging.messaging import *
+from tashi.messaging.threadpool import ThreadPoolClass, threadpool, ThreadPool, threadpoolmethod, threaded
+
+class MessageBrokerThrift(MessageBroker):
+ def __init__(self, port, daemon=True):
+ MessageBroker.__init__(self)
+ self.processor = tashi.messaging.messagingthrift.MessageBrokerThrift.Processor(self)
+ self.transport = TSocket.TServerSocket(port)
+ self.tfactory = TTransport.TBufferedTransportFactory()
+ self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ self.proxy = ConnectionManager(tashi.messaging.messagingthrift.SubscriberThrift.Client, 0)
+ self.ready = threading.Event()
+# self.server = TServer.TSimpleServer(self.processor,
+# self.transport,
+# self.tfactory,
+# self.pfactory)
+# self.server = TServer.TThreadPoolServer(self.processor,
+# self.transport,
+# self.tfactory,
+# self.pfactory)
+ self.server = TServer.TThreadedServer(self.processor,
+ self.transport,
+ self.tfactory,
+ self.pfactory)
+ self.publishCalls = 0
+
+ def ssvrthrd():
+ try:
+ # FIXME: Race condition, the ready event should be set after
+ # starting the server. However, server.serve()
+ # doesn't return under normal circumstances. This
+ # seems to work in practice, even though it's clearly
+ # wrong.
+ self.ready.set()
+ self.server.serve()
+ except Exception, e:
+ print e
+ sys.stdout.flush()
+ pass
+ svt = threading.Thread(target=ssvrthrd)
+ svt.setDaemon(daemon)
+ svt.start()
+ self.ready.wait()
+ def log(self, message):
+ MessageBroker.log(self, message)
+ @synchronizedmethod
+ def addSubscriber(self, host, port):
+ subscribers = self.getSubscribers()
+ for sub in subscribers:
+ if sub.host == host and sub.port == port:
+ return
+ subscriber = SubscriberThriftProxy(host, port, self.proxy)
+ MessageBroker.addSubscriber(self, subscriber)
+ def removeSubscriber(self, host, port):
+ subscriber = None
+ subscribers = self.getSubscribers()
+ for sub in subscribers:
+ if sub.host == host and sub.port == port:
+ subscriber = sub
+ if subscriber != None:
+ MessageBroker.removeSubscriber(self, subscriber)
+ @synchronizedmethod
+ def publish(self, message):
+ self.publishCalls = self.publishCalls + 1
+ sys.stdout.flush()
+ MessageBroker.publish(self, message)
+
+class MessageBrokerThriftProxy:
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.proxy = ConnectionManager(tashi.messaging.messagingthrift.MessageBrokerThrift.Client,port)
+ @synchronizedmethod
+ def log(self, message):
+ self.proxy[self.host, self.port].log(message)
+ @synchronizedmethod
+ def publish(self, message):
+ self.proxy[self.host, self.port].publish(message)
+ @synchronizedmethod
+ def publishList(self, messages):
+ self.proxy[self.host, self.port].publishList(messages)
+ @synchronizedmethod
+ def addSubscriber(self, subscriber):
+ self.proxy[self.host, self.port].addSubscriber(host=subscriber.host, port=subscriber.port)
+ @synchronizedmethod
+ def removeSubscriber(self, subscriber):
+ self.proxy[self.host, self.port].removeSubscriber(host=subscriber.host, port=subscriber.port)
+
+
+
+class SubscriberThrift(Subscriber, threading.Thread):
+ def __init__(self, broker, port, synchronized=False):
+ self.host = socket.gethostname()
+ self.port = port
+ self.processor = tashi.messaging.messagingthrift.SubscriberThrift.Processor(self)
+ self.transport = TSocket.TServerSocket(port)
+ self.tfactory = TTransport.TBufferedTransportFactory()
+ self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
+ self.server = TServer.TThreadedServer(self.processor,
+ self.transport,
+ self.tfactory,
+ self.pfactory)
+ def ssvrthrd():
+ try:
+ self.server.serve()
+ except Exception, e:
+ pass
+
+
+ self.thread = threading.Thread(target=ssvrthrd)
+ self.thread.setDaemon(True)
+ self.thread.start()
+
+ # We have to call this AFTER initializing our server, so that
+ # the broker can contact us
+ # Wrap this in a try/catch because the broker may not be online yet
+ try:
+ Subscriber.__init__(self, broker, synchronized=synchronized)
+ except:
+ pass
+ threading.Thread.__init__(self)
+ self.setDaemon(True)
+ self.start()
+
+ def stop(self):
+# # FIXME: this is broken, there is no clear way to stop a
+# # Thrift server
+ self.broker.removeSubscriber(self)
+ self.transport.close()
+ def run(self):
+ while(True):
+ # renew subscription every 5 min
+ try:
+ self.broker.addSubscriber(self)
+ except:
+ pass
+ time.sleep(5*60)
+
+class SubscriberThriftProxy:
+ def __init__(self, host, port, proxy, aggregate = 100):
+ self.host = host
+ self.port = port
+ self.proxy = proxy
+ # for some reason, thrift clients are not thread-safe, lock during send
+ self.lock = threading.Lock()
+ self.pending = []
+ self.aggregateSize = aggregate
+ def publish(self, message):
+ self.lock.acquire()
+ sys.stdout.flush()
+ if message.has_key('aggregate') and message['aggregate'] == 'True':
+ self.pending.append(message)
+ if len(self.pending) >= self.aggregateSize:
+ try:
+ self.proxy[self.host, self.port].publishList(self.pending)
+ except Exception, e:
+ print e
+ self.lock.release()
+ raise e
+ self.pending = []
+ else:
+ try:
+ self.proxy[self.host, self.port].publish(message)
+ except Exception, e:
+ sys.stdout.flush()
+ print e
+ self.lock.release()
+ raise e
+ self.lock.release()
+
+class PublisherThrift(Publisher):
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.broker = MessageBrokerThriftProxy(host, port)
+ Publisher.__init__(self, self.broker)
+
+####################
+# Testing Code
+####################
+
+class TestSubscriberThrift(SubscriberThrift):
+ def __init__(self, *args, **kwargs):
+ self.queue = Queue.Queue()
+ SubscriberThrift.__init__(self, *args, **kwargs)
+ def handle(self, message):
+ self.queue.put(message)
+
+portnum = 1718
+class TestThriftMessaging(unittest.TestCase):
+ def setUp(self):
+ global portnum
+ self.broker = MessageBrokerThrift(portnum)
+ self.brokerPort = portnum
+ portnum = portnum + 1
+ self.proxy = MessageBrokerThriftProxy('localhost', self.brokerPort)
+ self.publisher = PublisherThrift('localhost', self.brokerPort)
+ self.subscriber = TestSubscriberThrift(self.proxy, portnum)
+ portnum = portnum + 1
+ def tearDown(self):
+ pass
+ def testSetUp(self):
+ pass
+ def testPublish(self):
+ self.publisher.publish( {'message':'hello world'} )
+ self.subscriber.queue.get(True, timeout=5)
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ def testPublishList(self):
+ nrmsgs = 10
+ msgs = []
+ for i in range(nrmsgs):
+ msgs.append( {'msgnum':str(i)} )
+ self.publisher.publishList( msgs )
+ for i in range(nrmsgs):
+ self.subscriber.queue.get(True, timeout=5)
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ def testAggregate(self):
+ nrmsgs = self.publisher.aggregateSize
+ for i in range(nrmsgs):
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ self.publisher.aggregate( {'msgnum':str(i)} )
+ for i in range(nrmsgs):
+ self.subscriber.queue.get(True, timeout=5)
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ def testAggregateKeyword(self):
+ nrmsgs = self.publisher.aggregateSize
+ for i in range(nrmsgs):
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+ self.publisher.publish( {'msgnum':str(i), 'aggregate':'True'} )
+ for i in range(nrmsgs):
+ self.subscriber.queue.get(True, timeout=5)
+ self.assertEqual(self.subscriber.queue.qsize(), 0)
+
+
+if __name__=='__main__':
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestThriftMessaging)
+ unittest.TextTestRunner(verbosity=2).run(suite)
+
+
Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/messaging/thriftmessaging.py
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/__init__.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/__init__.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/__init__.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,7 @@
+from tashi import convertExceptions
+
+def RPC(oldFunc):
+ return convertExceptions(oldFunc)
+
+from nodemanagerservice import NodeManagerService
+from notification import Notifier
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,47 @@
+#! /usr/bin/env python
+
+import logging.config
+import signal
+import sys
+from thrift.transport.TSocket import TServerSocket
+from thrift.server.TServer import TThreadedServer
+
+from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
+from tashi.services import nodemanagerservice, clustermanagerservice
+from tashi import ConnectionManager
+
+import notification
+
+@signalHandler(signal.SIGTERM)
+def handleSIGTERM(signalNumber, stackFrame):
+ sys.exit(0)
+
+def main():
+ global config, dfs, vmm, service, server, log, notifier
+
+ (config, configFiles) = getConfig(["NodeManager"])
+ logging.config.fileConfig(configFiles)
+ log = logging.getLogger(__name__)
+ log.info('Using configuration file(s) %s' % configFiles)
+ dfs = instantiateImplementation(config.get("NodeManager", "Dfs"), config)
+ vmm = instantiateImplementation(config.get("NodeManager", "VmControl"), config, dfs, None)
+ service = instantiateImplementation(config.get("NodeManager", "Service"), config, vmm)
+ vmm.nm = service
+ processor = nodemanagerservice.Processor(service)
+ transport = TServerSocket(int(config.get('NodeManagerService', 'port')))
+ server = TThreadedServer(processor, transport)
+ debugConsole(globals())
+
+ notifier = notification.Notifier(config)
+ log.addHandler(notifier)
+
+ try:
+ server.serve()
+ except KeyboardInterrupt:
+ handleSIGTERM(signal.SIGTERM, None)
+ except Exception, e:
+ sys.stderr.write(str(e) + "\n")
+ sys.exit(-1)
+
+if __name__ == "__main__":
+ main()
Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanager.py
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,265 @@
+import cPickle
+import logging
+import os
+import socket
+import sys
+import threading
+import time
+from thrift.transport.TSocket import TSocket
+from thrift.protocol.TBinaryProtocol import TBinaryProtocol
+from thrift.transport.TTransport import TBufferedTransport
+
+from tashi.services.ttypes import ResumeVmRes, Host, HostState, InstanceState, TashiException, Errors, Instance
+from tashi.services import clustermanagerservice
+from tashi.nodemanager import RPC
+from tashi import boolean, vmStates, logged, ConnectionManager, timed
+
+class NodeManagerService():
+ """RPC handler for the NodeManager
+
+ Perhaps in the future I can hide the dfs from the
+ VmControlInterface and do all dfs operations here?"""
+
+ def __init__(self, config, vmm):
+ self.config = config
+ self.vmm = vmm
+ self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
+ self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
+ self.log = logging.getLogger(__file__)
+ self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
+ self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
+ self.infoFile = self.config.get('NodeManagerService', 'infoFile')
+ self.id = None
+ self.notifyCM = []
+ self.loadVmInfo()
+ vmList = self.vmm.listVms()
+ for vmId in vmList:
+ if (vmId not in self.instances):
+ self.log.warning('vmcontrol backend reports additional vmId %d' % (vmId))
+ self.instances[vmId] = Instance(d={'vmId':vmId,'id':-1})
+ for vmId in self.instances.keys():
+ if (vmId not in vmList):
+ self.log.warning('vmcontrol backend does not report %d' % (vmId))
+ self.vmStateChange(vmId, None, InstanceState.Exited)
+ threading.Thread(target=self.junk).start()
+ threading.Thread(target=self.registerWithClusterManager).start()
+
+ def loadVmInfo(self):
+ try:
+ f = open(self.infoFile, "r")
+ data = f.read()
+ f.close()
+ self.instances = cPickle.loads(data)
+ except Exception, e:
+ self.log.exception('Failed to load VM info from %s' % (self.infoFile))
+ self.instances = {}
+
+ def saveVmInfo(self):
+ try:
+ data = cPickle.dumps(self.instances)
+ f = open(self.infoFile, "w")
+ f.write(data)
+ f.close()
+ except Exception, e:
+ self.log.exception('Failed to save VM info to %s' % (self.infoFile))
+
+ #@logged
+ def vmStateChange(self, vmId, old, cur):
+ cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ instance = self.getInstance(vmId)
+ if (old and instance.state != old):
+ self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
+ if (cur == InstanceState.Exited):
+ del self.instances[vmId]
+ instance.state = cur
+ newInst = Instance(d={'state':cur})
+ success = lambda: None
+ try:
+ cm.vmUpdate(instance.id, newInst, old)
+ except Exception, e:
+ self.log.exception('RPC failed for vmUpdate on CM')
+ self.notifyCM.append((instance.id, newInst, old, success))
+ else:
+ success()
+ return True
+
+ #@timed
+ def getHostInfo(self):
+ host = Host()
+ host.id = self.id
+ host.name = socket.gethostname()
+ memoryStr = os.popen2("head -n 1 /proc/meminfo | awk '{print $2 \" \" $3}'")[1].read().strip()
+ if (memoryStr[-2:] == "kB"):
+ host.memory = int(memoryStr[:-2])/1024
+ elif (memoryStr[-2:] == "mB"):
+ host.memory = int(memoryStr[:-2])
+ elif (memoryStr[-2:] == "gB"):
+ host.memory = int(memoryStr[:-2])*1024
+ elif (memoryStr[-2:] == " B"):
+ host.memory = int(memoryStr[:-2])/(1024*1024)
+ else:
+ self.log.warning('Unable to determine amount of physical memory - reporting 0')
+ host.memory = 0
+ host.cores = os.sysconf("SC_NPROCESSORS_ONLN")
+ host.up = True
+ host.decayed = False
+ return host
+
+ def junk(self):
+ cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ while True:
+ start = time.time()
+ try:
+ self.saveVmInfo()
+ except Exception, e:
+ self.log.exception('Failed to save VM info')
+ try:
+ notifyCM = []
+ try:
+ while (len(self.notifyCM) > 0):
+ (instanceId, newInst, old, success) = self.notifyCM.pop(0)
+ try:
+ cm.vmUpdate(instanceId, newInst, old)
+ except TashiException, e:
+ notifyCM.append((instanceId, newInst, old, success))
+ if (e.errno != Errors.IncorrectVmState):
+ raise
+ except:
+ notifyCM.append((instanceId, newInst, old, success))
+ raise
+ else:
+ success()
+ finally:
+ self.notifyCM = self.notifyCM + notifyCM
+ except Exception, e:
+ self.log.exception('Failed to register with the CM')
+ toSleep = start - time.time() + self.registerFrequency
+ if (toSleep > 0):
+ time.sleep(toSleep)
+
+ def registerWithClusterManager(self):
+ cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ #@timed
+ def body():
+ try:
+ #self.log.info('registering with CM at %f' % (time.time()))
+ host = self.getHostInfo()
+ instances = self.instances.values()
+ #@timed
+ def RPC(self):
+ self.id = cm.registerNodeManager(host, instances)
+ RPC(self)
+ except Exception, e:
+ self.log.exception('Failed to register with the CM')
+ while True:
+ start = time.time()
+ body()
+ toSleep = start - time.time() + self.registerFrequency
+ if (toSleep > 0):
+ time.sleep(toSleep)
+
+ def getInstance(self, vmId):
+ instance = self.instances.get(vmId, None)
+ if (instance is None):
+ raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
+ return instance
+
+ @RPC
+ def instantiateVm(self, instance):
+ vmId = self.vmm.instantiateVm(instance)
+ instance.vmId = vmId
+ instance.state = InstanceState.Running
+ self.instances[vmId] = instance
+ return vmId
+
+ @RPC
+ def suspendVm(self, vmId, name, suspendCookie):
+ instance = self.getInstance(vmId)
+ instance.state = InstanceState.Suspending
+ threading.Thread(target=lambda: self.vmm.suspendVm(vmId, name, suspendCookie)).start()
+
+ @RPC
+ def resumeVm(self, instance, name):
+ (vmId, suspendCookie) = self.vmm.resumeVm(name)
+ instance.vmId = vmId
+ instance.state = InstanceState.Running
+ self.instances[vmId] = instance
+ return ResumeVmRes(d={'vmId':vmId, 'suspendCookie':suspendCookie})
+
+ @RPC
+ def prepReceiveVm(self, instance, source):
+ instance.state = InstanceState.MigratePrep
+ instance.vmId = -1
+ transportCookie = self.vmm.prepReceiveVm(instance, source.name)
+ return transportCookie
+
+ def migrateVmHelper(self, instance, target, transportCookie):
+ self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
+ del self.instances[instance.vmId]
+
+ @RPC
+ def migrateVm(self, vmId, target, transportCookie):
+ instance = self.getInstance(vmId)
+ instance.state = InstanceState.MigrateTrans
+ threading.Thread(target=lambda: self.migrateVmHelper(instance, target, transportCookie)).start()
+ return
+
+ def receiveVmHelper(self, instance, transportCookie):
+ cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ vmId = self.vmm.receiveVm(transportCookie)
+ instance.state = InstanceState.Running
+ instance.hostId = self.id
+ instance.vmId = vmId
+ self.instances[vmId] = instance
+ newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
+ success = lambda: None
+ try:
+ cm.vmUpdate(newInstance.id, newInstance, InstanceState.MigrateTrans)
+ except Exception, e:
+ self.log.exception('vmUpdate failed in receiveVmHelper')
+ self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
+ else:
+ success()
+
+ @RPC
+ def receiveVm(self, instance, transportCookie):
+ instance.state = InstanceState.MigrateTrans
+ threading.Thread(target=lambda: self.receiveVmHelper(instance, transportCookie)).start()
+ return
+
+ @RPC
+ def pauseVm(self, vmId):
+ instance = self.getInstance(vmId)
+ instance.state = InstanceState.Pausing
+ self.vmm.pauseVm(vmId)
+ instance.state = InstanceState.Paused
+
+ @RPC
+ def unpauseVm(self, vmId):
+ instance = self.getInstance(vmId)
+ instance.state = InstanceState.Unpausing
+ self.vmm.unpauseVm(vmId)
+ instance.state = InstanceState.Running
+
+ @RPC
+ def shutdownVm(self, vmId):
+ instance = self.getInstance(vmId)
+ instance.state = InstanceState.ShuttingDown
+ self.vmm.shutdownVm(vmId)
+
+ @RPC
+ def destroyVm(self, vmId):
+ instance = self.getInstance(vmId)
+ instance.state = InstanceState.Destroying
+ self.vmm.destroyVm(vmId)
+
+ @RPC
+ def getVmInfo(self, vmId):
+ instance = self.getInstance(vmId)
+ return instance
+
+ @RPC
+ def listVms(self):
+ return self.instances.keys()
+
+
Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/nodemanagerservice.py
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/notification.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/notification.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/notification.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/notification.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,24 @@
+
+import tashi.messaging.tashimessaging
+
+class Notifier(tashi.messaging.tashimessaging.TashiLogHandler):
+ def vmExited(self, instance):
+ try:
+ isolatedRPC(self.cm, 'vmExited', self.hostId, vmId)
+ except Exception, e:
+ print "RPC failed for vmExited on CM"
+ print e
+ # FIXME: send this to the cm later
+ # self.exitedVms[vmId] = child
+
+ msg = {}
+
+ msg['timestamp'] = str(time.time())
+ msg['hostname'] = '' # FIXME: fill this in
+ msg['message-type'] = 'vm-event'
+ msg['vm-event'] = 'vm-exited'
+
+ msg['instance-id'] = str(instance.id)
+ msg['host-id'] = str(instance.hostId)
+ print 'Notifier publishing ', msg
+ self.publish(message)
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/__init__.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/__init__.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/__init__.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,4 @@
+from vmcontrolinterface import VmControlInterface
+from qemu import Qemu
+from xenpv import XenPV
+from newxen import NewXen
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/newxen.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/newxen.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/newxen.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/newxen.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,113 @@
+import cPickle
+import logging
+import os
+import threading
+import random
+import select
+import signal
+import socket
+import subprocess
+import sys
+import time
+
+import inspect # used to get current function
+def currentFunction(n=1):
+ # get the name of our caller, e.g. the requesting function
+ return inspect.stack()[n][3]
+
+from tashi.services.ttypes import *
+from tashi.util import broken, isolatedRPC
+from vmcontrolinterface import VmControlInterface
+
+log = logging.getLogger(__file__)
+
+
+import xenpv
+
+class NewXen(VmControlInterface):
+ """VM Control for Paravirtualized Xen"""
+
+ def __init__(self, config, dfs, cm):
+ """Base init function -- it handles inserting config and dfs
+ into the object as well as checking that the class type is
+ not VmControlInterface"""
+ print 'NewXen::init called'
+ if self.__class__ is VmControlInterface:
+ raise NotImplementedError
+ self.config = config
+ self.dfs = dfs
+ self.cm = cm
+ self.xenpv = xenpv.XenPV(self.config, self.dfs, self.cm)
+
+ def instantiateVm(self, instance):
+ """Takes an InstanceConfiguration, creates a VM based on it,
+ and returns the vmId"""
+ print 'XenPV::%s called' % currentFunction()
+ # FIXME: this is NOT the right way to get out hostId
+ self.hostId = instance.hostId
+ return self.xenpv.instantiateVm(instance)
+
+
+ def suspendVm(self, vmId, target, suspendCookie=None):
+ """Suspends a vm to the target on the dfs, including the
+ suspendCookie"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.suspendVM(vmId, target, suspendCookie)
+
+
+ def resumeVm(self, source):
+ """Resumes a vm from the dfs and returns the newly created
+ vmId as well as the suspendCookie in a tuple"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.resumeVM(source)
+
+ def prepReceiveVm(self, instance, source):
+ """First call made as part of vm migration -- it is made to
+ the target machine and it returns a transportCookie"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.prepReceiveVm(instance, source)
+
+ def migrateVm(self, vmId, target, transportCookie):
+ """Second call made as part of a vm migration -- it is made
+ to the source machine and it does not return until the
+ migration is complete"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.migrateVm(vmId, target,transportCookie)
+
+ def receiveVm(self, transportCookie):
+ """Third call made as part of a vm migration -- it is made to
+ the target machine and it does not return until the
+ migration is complete, it returns the new vmId"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.receiveVm(transportCookie)
+
+ def pauseVm(self, vmId):
+ """Pauses a vm and returns nothing"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.pauseVM(vmId)
+
+ def unpauseVm(self, vmId):
+ """Unpauses a vm and returns nothing"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.unpauseVM(vmId)
+
+ def shutdownVm(self, vmId):
+ """Performs a clean shutdown on a vm and returns nothing"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.shutdownVM(vmId)
+
+
+ def destroyVm(self, vmId):
+ """Forces the exit of a vm and returns nothing"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.destroyVM(vmId)
+
+ def getVmInfo(self, vmId):
+ """Returns the InstanceConfiguration for the given vmId"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.getVMInfo(vmId)
+
+ def listVms(self):
+ """Returns a list of vmIds to the caller"""
+ print 'XenPV::%s called' % currentFunction()
+ return self.xenpv.listVMs()
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/qemu.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/qemu.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/qemu.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,472 @@
+import cPickle
+import logging
+import os
+import threading
+import random
+import select
+import signal
+import socket
+import subprocess
+import sys
+import time
+
+from tashi.services.ttypes import *
+from tashi.util import broken, logged
+from vmcontrolinterface import VmControlInterface
+
+log = logging.getLogger(__file__)
+
+def controlConsole(child, port):
+ """This exposes a TCP port that connects to a particular child's monitor -- used for debugging"""
+ #print "controlConsole"
+ listenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ listenSocket.bind(("0.0.0.0", port))
+ #print "bound"
+ try:
+ listenSocket.listen(5)
+ ls = listenSocket.fileno()
+ input = child.monitorFd
+ output = child.monitorFd
+ #print "listen"
+ select.select([ls], [], [])
+ (s, clientAddr) = listenSocket.accept()
+ while s:
+ if (output != -1):
+ (rl, wl, el) = select.select([s, output], [], [])
+ else:
+ (rl, wl, el) = select.select([s], [], [])
+ if (len(rl) > 0):
+ if (rl[0] == s):
+ #print "from s"
+ buf = s.recv(4096)
+ if (buf == ""):
+ s.close()
+ listenSocket.close()
+ s = None
+ continue
+ if (output != -1):
+ os.write(child.monitorFd, buf)
+ elif (rl[0] == output):
+ #print "from output"
+ buf = os.read(output, 4096)
+ #print "read complete"
+ if (buf == ""):
+ output = -1
+ else:
+ s.send(buf)
+ except:
+ s.close()
+ listenSocket.close()
+ finally:
+ #print "Thread exiting"
+ pass
+
+class Qemu(VmControlInterface):
+ """This class implements the VmControlInterface for Qemu/KVM"""
+
+ def __init__(self, config, dfs, nm):
+ VmControlInterface.__init__(self, config, dfs, nm)
+ self.QEMU_BIN = self.config.get("Qemu", "qemuBin")
+ self.INFO_DIR = self.config.get("Qemu", "infoDir")
+ self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay"))
+ self.migrationRetries = int(self.config.get("Qemu", "migrationRetries"))
+ self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout"))
+ self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
+ self.controlledVMs = {}
+ self.usedPorts = []
+ self.usedPortsLock = threading.Lock()
+ self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
+ try:
+ os.mkdir(self.INFO_DIR)
+ except:
+ pass
+ self.scanInfoDir()
+ threading.Thread(target=self.pollVMsLoop).start()
+
+ class anonClass:
+ def __init__(self, **attrs):
+ self.__dict__.update(attrs)
+
+ def getSystemPids(self):
+ """Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
+ pids = []
+ for f in os.listdir("/proc"):
+ try:
+ bin = os.readlink("/proc/%s/exe" % (f))
+ if (bin.find(self.QEMU_BIN) != -1):
+ pids.append(int(f))
+ except Exception:
+ pass
+ return pids
+
+ def matchSystemPids(self, controlledVMs):
+ """This is run in a separate polling thread and it must do things that are thread safe"""
+ vmIds = controlledVMs.keys()
+ pids = self.getSystemPids()
+ for vmId in vmIds:
+ if vmId not in pids:
+ os.unlink(self.INFO_DIR + "/%d"%(vmId))
+ child = controlledVMs[vmId]
+ del controlledVMs[vmId]
+ log.info("Removing vmId %d" % (vmId))
+ if (child.OSchild):
+ os.waitpid(vmId, 0)
+ if (child.errorBit):
+ if (child.OSchild):
+ f = open("/tmp/%d.err" % (vmId), "w")
+ f.write(child.stderr.read())
+ f.close()
+ f = open("/tmp/%d.pty" % (vmId), "w")
+ for i in child.monitorHistory:
+ f.write(i)
+ f.close()
+ try:
+ if (not child.migratingOut):
+ self.nm.vmStateChange(vmId, None, InstanceState.Exited)
+ except Exception, e:
+ log.exception("vmStateChange failed")
+
+
+ def scanInfoDir(self):
+ """This is not thread-safe and must only be used during class initialization"""
+ controlledVMs = {}
+ controlledVMs.update(map(lambda x: (int(x), self.anonClass(OSchild=False, errorBit=False, migratingOut=False)), os.listdir(self.INFO_DIR + "/")))
+ if (len(controlledVMs) == 0):
+ log.info("No vm information found in %s", self.INFO_DIR)
+ for vmId in controlledVMs:
+ try:
+ child = self.loadChildInfo(vmId)
+ child.OSchild = False
+ child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
+ child.monitor = os.fdopen(child.monitorFd)
+ self.controlledVMs[child.pid] = child
+ log.info("Adding vmId %d" % (child.pid))
+ except Exception, e:
+ log.exception("Failed to load VM info for %d", vmId)
+ else:
+ log.info("Loaded VM info for %d", vmId)
+ self.matchSystemPids(controlledVMs)
+
+ def pollVMsLoop(self):
+ """Infinite loop that checks for dead VMs"""
+ while True:
+ self.matchSystemPids(self.controlledVMs)
+ time.sleep(self.POLL_DELAY)
+
+ def waitForExit(self, vmId):
+ """This waits until an element is removed from the dictionary -- the polling thread must detect an exit"""
+ while vmId in self.controlledVMs:
+ time.sleep(self.POLL_DELAY)
+
+ def getChildFromPid(self, pid):
+ """Do a simple dictionary lookup, but raise a unique exception if the key doesn't exist"""
+ child = self.controlledVMs.get(pid, None)
+ if (not child):
+ raise Exception, "Uncontrolled vmId %d" % (pid)
+ return child
+
+ def getControlConsole(self, vmId, port):
+ """Spawn a thread that attaches the control console of a particular Qemu to a TCP port -- used for debugging only"""
+ child = self.getChildFromPid(vmId)
+ threading.Thread(target=(lambda: controlConsole(child, port))).start()
+ return port
+
+ def consumeAvailable(self, child):
+ """Consume characters one-by-one until they stop coming"""
+ monitorFd = child.monitorFd
+ buf = ""
+ try:
+ (rlist, wlist, xlist) = select.select([monitorFd], [], [], 0.0)
+ while (len(rlist) > 0):
+ c = os.read(monitorFd, 1)
+ if (c == ""):
+ log.error("Early termination on monitor for vmId %d" % (child.pid))
+ child.errorBit = True
+ raise RuntimeError
+ buf = buf + c
+ (rlist, wlist, xlist) = select.select([monitorFd], [], [], 0.0)
+ finally:
+ child.monitorHistory.append(buf)
+ return buf
+
+ def consumeUntil(self, child, needle, timeout = -1):
+ """Consume characters one-by-one until something specific comes up"""
+ if (timeout == -1):
+ timeout = self.monitorTimeout
+ monitorFd = child.monitorFd
+ buf = " " * len(needle)
+ try:
+ while (buf[-(len(needle)):] != needle):
+ #print "[BUF]: %s" % (buf)
+ #print "[NEE]: %s" % (needle)
+ (rlist, wlist, xlist) = select.select([monitorFd], [], [], timeout)
+ if (len(rlist) == 0):
+ log.error("Timeout getting results from monitor for vmId %d" % (child.pid))
+ child.errorBit = True
+ raise RuntimeError
+ c = os.read(monitorFd, 1)
+ if (c == ""):
+ log.error("Early termination on monitor for vmId %d" % (child.pid))
+ child.errorBit = True
+ raise RuntimeError
+ buf = buf + c
+ finally:
+ child.monitorHistory.append(buf[len(needle):])
+ return buf[len(needle):]
+
+ def enterCommand(self, child, command, expectPrompt = True, timeout = -1):
+ """Enter a command on the qemu monitor"""
+ res = self.consumeAvailable(child)
+ os.write(child.monitorFd, command + "\n")
+ if (expectPrompt):
+ self.consumeUntil(child, command)
+ res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
+ return res
+
+ def genTmpFilename(self):
+ """Create a temporary file name for a fifo used to uncompress a suspended VM"""
+ return "/tmp/Qemu_%d" % (os.getpid())
+
+ def loadChildInfo(self, vmId):
+ child = self.anonClass(pid=vmId)
+ info = open(self.INFO_DIR + "/%d"%(child.pid), "r")
+ (image, macAddr, memory, cores, opts, pid, ptyFile) = cPickle.load(info)
+ info.close()
+ if (pid != child.pid):
+ raise Exception, "PID mismatch"
+ child.image = image
+ child.macAddr = macAddr
+ child.memory = memory
+ child.cores = cores
+ child.opts = opts
+ child.pid = pid
+ child.ptyFile = ptyFile
+ child.monitorHistory = []
+ child.errorBit = False
+ return child
+
+ def saveChildInfo(self, child):
+ info = open(self.INFO_DIR + "/%d"%(child.pid), "w")
+ cPickle.dump((child.image, child.macAddr, child.memory, child.cores, child.opts, child.pid, child.ptyFile), info)
+ info.close()
+
+ def startVm(self, instance, source):
+ """Universal function to start a VM -- used by instantiateVM, resumeVM, and prepReceiveVM"""
+ global lastCmd
+ (image, macAddr, memory, cores, diskModel, instanceId, opts) = self.instanceToOld(instance)
+ graphicString = "" if opts.get("enableDisplay", False) else "-nographic"
+ sourceString = "" if not source else "-incoming %s" % (source)
+ snapshotString = "" if diskModel == "persistent" else "-snapshot"
+ imageLocal = self.dfs.getLocalHandle("images/" + image)
+ cmd = "%s %s %s -hda %s -net nic,macaddr=%s -net tap -m %d -smp %d -serial none -monitor pty %s" % (self.QEMU_BIN, graphicString, snapshotString, imageLocal, macAddr, memory, cores, sourceString)
+ lastCmd = cmd
+ cmd = cmd.split()
+ (pipe_r, pipe_w) = os.pipe()
+ pid = os.fork()
+ if (pid == 0):
+ pid = os.getpid()
+ os.setpgid(pid, pid)
+ os.close(pipe_r)
+ os.dup2(pipe_w, sys.stderr.fileno())
+ for i in [sys.stdin.fileno(), sys.stdout.fileno()]:
+ try:
+ os.close(i)
+ except:
+ pass
+ for i in xrange(3, os.sysconf("SC_OPEN_MAX")):
+ try:
+ os.close(i)
+ except:
+ pass
+ os.execl(self.QEMU_BIN, *cmd)
+ sys.exit(-1)
+ os.close(pipe_w)
+ child = self.anonClass(pid=pid, image=image, macAddr=macAddr, memory=memory, cores=cores, opts=opts, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = False, OSchild = True)
+ child.ptyFile = None
+ self.saveChildInfo(child)
+ self.controlledVMs[child.pid] = child
+ log.info("Adding vmId %d" % (child.pid))
+ return (child.pid, cmd)
+
+ def getPtyInfo(self, child, issueContinue):
+ ptyFile = None
+ while not ptyFile:
+ l = child.stderr.readline()
+ if (l == ""):
+ os.waitpid(pid, 0)
+ raise Exception, "Failed to start VM -- ptyFile not found"
+ if (l.find("char device redirected to ") != -1):
+ ptyFile=l[26:].strip()
+ break
+ child.ptyFile = ptyFile
+ child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
+ child.monitor = os.fdopen(child.monitorFd)
+ self.saveChildInfo(child)
+ if (issueContinue):
+ self.enterCommand(child, "c")
+
+ def stopVm(self, vmId, target, stopFirst):
+ """Universal function to stop a VM -- used by suspendVM, migrateVM """
+ child = self.getChildFromPid(vmId)
+ if (stopFirst):
+ self.enterCommand(child, "stop")
+ if (target):
+ retry = self.migrationRetries
+ while (retry > 0):
+ res = self.enterCommand(child, "migrate %s" % (target), timeout=self.migrateTimeout)
+ retry = retry - 1
+ if (res.find("migration failed") == -1):
+ retry = -1
+ else:
+ log.error("Migration (transiently) failed: %s\n", res)
+ if (retry == 0):
+ log.error("Migration failed: %s\n", res)
+ child.errorBit = True
+ raise RuntimeError
+ self.enterCommand(child, "quit", expectPrompt=False)
+ return vmId
+
+ def instanceToOld(self, instance):
+ if (len(instance.disks) != 1):
+ raise NotImplementedError
+ if (len(instance.nics) != 1):
+ raise NotImplementedError
+ image = instance.disks[0].uri
+ macAddr = instance.nics[0].mac
+ memory = instance.typeObj.memory
+ cores = instance.typeObj.cores
+ diskModel = "persistent" if instance.disks[0].persistent else "transient"
+ instanceId = instance.id
+ opts = instance.hints
+ if (diskModel != "transient"):
+ raise NotImplementedError
+ return (image, macAddr, memory, cores, diskModel, instanceId, opts)
+
+ def oldToInstance(self, image, macAddr, memory, cores, diskModel, opts):
+ instance = self.anonClass()
+ instance.disks = [self.anonClass()]
+ instance.nics = [self.anonClass()]
+ instance.typeObj = self.anonClass()
+ instance.disks[0].uri = image
+ instance.nics[0].mac = macAddr
+ instance.typeObj.memory = memory
+ instance.typeObj.cores = cores
+ instance.disks[0].persistent = (diskModel == "persistent")
+ instance.id = -1
+ instance.hints = opts
+ return instance
+
+ def instantiateVm(self, instance):
+ (vmId, cmd) = self.startVm(instance, None)
+ child = self.getChildFromPid(vmId)
+ self.getPtyInfo(child, False)
+ child.cmd = cmd
+ self.saveChildInfo(child)
+ return vmId
+
+ def suspendVm(self, vmId, target, suspendCookie):
+ child = self.getChildFromPid(vmId)
+ info = self.dfs.open("%s.info" % (target), "w")
+ cPickle.dump((child.image, child.macAddr, child.memory, child.cores, child.opts, suspendCookie), info)
+ info.close()
+ # XXX: Use fifo to improve performance
+ vmId = self.stopVm(vmId, "\"exec:gzip -c > /tmp/%s.dat\"" % (target), True)
+ self.dfs.copyTo("/tmp/%s.dat" % (target), "%s.dat" % (target))
+ return vmId
+
+ def resumeVm(self, source):
+ # XXX: Read in and unzip directly (or use fifo)
+ self.dfs.copyFrom("%s.dat" % (source), "/tmp/%s.dat" % (source))
+ info = self.dfs.open("%s.info" % (source), "r")
+ (image, macAddr, memory, cores, opts, suspendCookie) = cPickle.load(info)
+ info.close()
+ tmpFile = self.genTmpFilename()
+ os.mkfifo(tmpFile)
+ zcat = subprocess.Popen(args=["/bin/bash", "-c", "zcat /tmp/%s.dat > %s" % (source, tmpFile)], executable="/bin/bash", close_fds=True)
+ instance = self.oldToInstance(image, macAddr, memory, cores, "transient", opts)
+ (vmId, cmd) = self.startVm(instance, "file://%s" % tmpFile)
+ zcat.wait()
+ os.unlink(tmpFile)
+ child = self.getChildFromPid(vmId)
+ self.getPtyInfo(child, True)
+ child.suspendCookie = suspendCookie
+ child.cmd = cmd
+ return (vmId, suspendCookie)
+
+ def prepReceiveVm(self, instance, source):
+ self.usedPortsLock.acquire()
+ port = int(random.random()*1000+19000)
+ while port in self.usedPorts:
+ port = int(random.random()*1000+19000)
+ self.usedPorts.append(port)
+ self.usedPortsLock.release()
+ (vmId, cmd) = self.startVm(instance, "tcp://0.0.0.0:%d" % (port))
+ transportCookie = cPickle.dumps((port, vmId, socket.gethostname()))
+ child = self.getChildFromPid(vmId)
+ child.cmd = cmd
+ child.transportCookie = transportCookie
+ self.saveChildInfo(child)
+ # XXX: Cleanly wait until the port is open
+ lc = 0
+ while (lc < 1):
+ (stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
+ stdin.close()
+ r = stdout.read()
+ lc = int(r.strip())
+ time.sleep(1.0 if lc < 1 else 0.0)
+ return transportCookie
+
+ def migrateVm(self, vmId, target, transportCookie):
+ self.migrationSemaphore.acquire()
+ try:
+ (port, _vmId, _hostname) = cPickle.loads(transportCookie)
+ child = self.getChildFromPid(vmId)
+ child.migratingOut = True
+ res = self.stopVm(vmId, "tcp://%s:%d" % (target, port), False)
+ # XXX: Some sort of feedback would be nice
+ # XXX: Should we block?
+ self.waitForExit(vmId)
+ finally:
+ self.migrationSemaphore.release()
+ return res
+
+ def receiveVm(self, transportCookie):
+ (port, vmId, _hostname) = cPickle.loads(transportCookie)
+ try:
+ child = self.getChildFromPid(vmId)
+ except:
+ log.error("Failed to get child info; transportCookie = %s; hostname = %s" % (str(cPickle.loads(transportCookie)), socket.hostname()))
+ raise
+ try:
+ self.getPtyInfo(child, True)
+ except RuntimeError, e:
+ log.error("Failed to get pty info -- VM likely died")
+ child.errorBit = True
+ raise
+ self.usedPortsLock.acquire()
+ self.usedPorts = filter(lambda _port: _port != port, self.usedPorts)
+ self.usedPortsLock.release()
+ return vmId
+
+ def pauseVm(self, vmId):
+ child = self.getChildFromPid(vmId)
+ self.enterCommand(child, "stop")
+
+ def unpauseVm(self, vmId):
+ child = self.getChildFromPid(vmId)
+ self.enterCommand(child, "c")
+
+ @broken
+ def shutdownVm(self, vmId):
+ """'system_powerdown' doesn't seem to actually shutdown the VM"""
+ child = self.getChildFromPid(vmId)
+ self.enterCommand(child, "system_powerdown")
+
+ def destroyVm(self, vmId):
+ child = self.getChildFromPid(vmId)
+ child.migratingOut = False
+ # XXX: the child could have exited between these two points, but I don't know how to fix that since it might not be our child process
+ os.kill(child.pid, signal.SIGKILL)
+
+ def listVms(self):
+ return self.controlledVMs.keys()
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,64 @@
+class VmControlInterface():
+ """Interface description for VM controllers -- like Qemu, Xen, etc"""
+
+ def __init__(self, config, dfs, nm):
+ """Base init function -- it handles inserting config and dfs
+ into the object as well as checking that the class type is
+ not VmControlInterface"""
+ if self.__class__ is VmControlInterface:
+ raise NotImplementedError
+ self.config = config
+ self.dfs = dfs
+ self.nm = nm
+
+ def instantiateVm(self, instance):
+ """Takes an InstanceConfiguration, creates a VM based on it,
+ and returns the vmId"""
+ raise NotImplementedError
+
+ def suspendVm(self, vmId, target, suspendCookie=None):
+ """Suspends a vm to the target on the dfs, including the
+ suspendCookie"""
+ raise NotImplementedError
+
+ def resumeVm(self, source):
+ """Resumes a vm from the dfs and returns the newly created
+ vmId as well as the suspendCookie in a tuple"""
+ raise NotImplementedError
+
+ def prepReceiveVm(self, instance, source):
+ """First call made as part of vm migration -- it is made to
+ the target machine and it returns a transportCookie"""
+ raise NotImplementedError
+
+ def migrateVm(self, vmId, target, transportCookie):
+ """Second call made as part of a vm migration -- it is made
+ to the source machine and it does not return until the
+ migration is complete"""
+ raise NotImplementedError
+
+ def receiveVm(self, transportCookie):
+ """Third call made as part of a vm migration -- it is made to
+ the target machine and it does not return until the
+ migration is complete, it returns the new vmId"""
+ raise NotImplementedError
+
+ def pauseVm(self, vmId):
+ """Pauses a vm and returns nothing"""
+ raise NotImplementedError
+
+ def unpauseVm(self, vmId):
+ """Unpauses a vm and returns nothing"""
+ raise NotImplementedError
+
+ def shutdownVm(self, vmId):
+ """Performs a clean shutdown on a vm and returns nothing"""
+ raise NotImplementedError
+
+ def destroyVm(self, vmId):
+ """Forces the exit of a vm and returns nothing"""
+ raise NotImplementedError
+
+ def listVms(self):
+ """Returns a list of vmIds to the caller"""
+ raise NotImplementedError