You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2008/11/03 14:45:27 UTC
svn commit: r710072 [3/3] - in /incubator/tashi/import: ./ tashi-intel-r399/
tashi-intel-r399/doc/ tashi-intel-r399/etc/ tashi-intel-r399/guest/
tashi-intel-r399/scripts/ tashi-intel-r399/src/ tashi-intel-r399/src/tashi/
tashi-intel-r399/src/tashi/agen...
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/xenpv.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/nodemanager/vmcontrol/xenpv.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,337 @@
+import os
+import os.path
+import cPickle
+import subprocess # FIXME: should switch os.system to this
+import time
+import threading
+import logging
+
+from vmcontrolinterface import VmControlInterface
+from tashi.services.ttypes import Errors, InstanceState, TashiException
+from tashi.services.ttypes import Instance, MachineType
+from tashi import boolean, convertExceptions, ConnectionManager
+from tashi.util import isolatedRPC
+
+import tashi.parallel
+from tashi.parallel import synchronized, synchronizedmethod
+
+log = logging.getLogger(__file__)
+
+# FIXME: these should throw errors on failure
+def domIdToName(domid):
+ f = os.popen("xm domname %i"%domid)
+ name = f.readline().strip()
+ f.close()
+ return name
+
+def domNameToId(domname):
+ f = os.popen("xm domid %s"%domname)
+ name = f.readline().strip()
+ f.close()
+ return int(name)
+
+def nameToId(domname, prefix='tashi'):
+ prefix = prefix + '-'
+ if domname[0:(len(prefix))] != prefix:
+ return None
+ try:
+ id = int(domname[len(prefix):])
+ except:
+ return None
+ return id
+
+
+# Try to do a listVms call using info from xend
+def listVms(prefix='tashi'):
+ fields =['name', 'vmId', 'memory', 'cores', 'state', 'time']
+ xmList = subprocess.Popen('xm list', shell=True, stdout=subprocess.PIPE)
+ # skip the first line, it's just a header
+ xmList.stdout.readline()
+ r = {}
+ for line in xmList.stdout.readlines():
+ line = line.split()
+ vminfo = {}
+ instance = Instance()
+ if len(line) != len(fields):
+ # FIXME: log message
+ print 'WARNING: cannot parse line'
+ continue
+ for i in range(len(line)):
+ vminfo[fields[i]] = line[i]
+ # if the name begins with our prefix, get the id,
+ # otherwise skip this record
+ id = nameToId(vminfo['name'], prefix)
+ if id == None:
+ continue
+
+ # fill in the instance object
+ instance.id = int(id)
+ instance.vmId = int(vminfo['vmId'])
+ instance.state = InstanceState.Running
+ if(vminfo['state'][2] !='-'):
+ instance.state = InstanceState.Paused
+ instance.typeObj = MachineType()
+ instance.typeObj.memory = int(vminfo['memory'])
+ instance.typeObj.cores = int(vminfo['cores'])
+
+
+ r[instance.vmId] = instance
+ return r
+
+
+
+
+
+class XenPV(threading.Thread):
+ def __init__(self, config, dfs, cm):
+ threading.Thread.__init__(self)
+ if self.__class__ is VmControlInterface:
+ raise NotImplementedError
+ self.config = config
+ self.dfs = dfs
+ self.cm = cm
+
+ self.vmNamePrefix = self.config.get("XenPV", "vmNamePrefix")
+ self.transientDir = self.config.get('XenPV', 'transientDir')
+
+ self.newvms = listVms(self.vmNamePrefix)
+ self.hostId = -1
+ self.sleeptime = 5
+ self.setDaemon(True)
+ self.start()
+
+ # invoked every (self.sleeptime) seconds
+ @synchronizedmethod
+ def cron(self):
+ print 'xenpv cron woke up'
+ vmlist = listVms(self.vmNamePrefix)
+ # If we are supposed to be managing a VM that is not
+ # in the list, tell the CM
+
+ # FIXME: a single set operation should do this. How
+ # do you use sets in python?
+ for vmId in self.newvms.keys():
+ if not vmlist.has_key(vmId):
+ a = self.newvms.pop(vmId)
+ # If the vm had transient disks, delete them
+ for i in range(len(a.disks)):
+ if a.disks[i].persistent == False:
+ diskname = self.transientDisk(a.id, i)
+ try:
+ os.unlink(diskname)
+ except:
+ print 'WARNING could not delete transient disk %s' % diskname
+ try:
+ isolatedRPC(self.cm, 'vmExited', self.hostId, vmId)
+ except Exception, e:
+ print "RPC failed for vmExited on CM"
+ print e
+ raise e
+ # FIXME: send this to the cm later
+ # self.exitedVms[vmId] = child
+ for vmId in vmlist.keys():
+ if not self.newvms.has_key(vmId):
+ print 'WARNING: found vm that should be managed, but is not'
+ # FIXME: log that
+
+
+ def run(self):
+ while(True):
+ time.sleep(self.sleeptime)
+ self.cron()
+########################################
+# This is an ugly function, but the muti-line string literal makes it
+# a lot easier
+########################################
+ def createXenConfig(self, vmName,
+ image, macAddr, memory, cores):
+ fn = os.path.join("/tmp", vmName)
+ cfgstr = """
+# kernel="/boot/vmlinuz-2.6.24-19-xen"
+# ramdisk="/boot/initrd.img-2.6.24-19-xen"
+bootloader="/usr/bin/pygrub"
+disk=['tap:qcow:%s,xvda1,w']
+# vif = [ 'mac=%s' ]
+vif = ['ip=172.19.158.1']
+memory=%i
+#cpus is a list of cpus for pinning, this is not what we want
+#cpus=%i
+root="/dev/xvda1"
+extra='xencons=tty'
+"""%(image, macAddr, memory, cores)
+ f = open(fn, "w")
+ f.write(cfgstr)
+ f.close()
+ return fn
+ def deleteXenConfig(self, vmName):
+ os.unlink(os.path.join("/tmp", vmName))
+########################################
+
+ def vmName(self, instanceId):
+ return "%s-%i"%(self.vmNamePrefix, int(instanceId))
+ def transientDisk(self, instanceId, disknum):
+ newdisk = os.path.join(self.transientDir,
+ 'tashi-%i-%i.qcow' %(instanceId, disknum))
+ return newdisk
+
+
+ @synchronizedmethod
+ def instantiateVm(self, instance):
+ # FIXME: this is NOT the right way to get out hostId
+ self.hostId = instance.hostId
+
+ if (len(instance.disks) != 1):
+ raise NotImplementedError
+ if (len(instance.nics) != 1):
+ raise NotImplementedError
+
+ name = self.vmName(instance.id)
+
+ for i in range(len(instance.disks)):
+ imageLocal = self.dfs.getLocalHandle(instance.disks[i].uri)
+ instance.disks[i].local = imageLocal
+ if instance.disks[i].persistent == False:
+ newdisk = self.transientDisk(instance.id, i)
+ cmd = 'qcow-create 0 %s %s' % (newdisk, imageLocal)
+ print 'creating new disk with "%s"' % cmd
+ os.system(cmd)
+ instance.disks[i].local = newdisk
+
+
+ fn = self.createXenConfig(name,
+ instance.disks[0].local,
+ instance.nics[0].mac,
+ instance.typeObj.memory,
+ instance.typeObj.cores)
+ cmd = "xm create %s"%fn
+ r = os.system(cmd)
+ # self.deleteXenConfig(name)
+ if r != 0:
+ print 'WARNING: "%s" returned %i' % ( cmd, r)
+ raise Exception, 'WARNING: "%s" returned %i' % ( cmd, r)
+ # FIXME: log/handle error
+ vmId = domNameToId(name)
+ self.newvms[vmId] = instance
+ instance.vmId = vmId
+ instance.state = InstanceState.Running
+ return vmId
+
+
+ # for susp/res we want the xen save/restore commands, not
+ # suspend/resume. save/restore allow you to specify the state
+ # file, suspend/resume do not.
+ @synchronizedmethod
+ def suspendVM(self, vmId, target, suspendCookie=None):
+ # FIXME: don't use hardcoded /tmp for temporary data.
+ # Get tmp location from config
+ infofile = target + ".info"
+ target = target + ".dat"
+ tmpfile = os.path.join("/tmp", target)
+
+ # FIXME: these files shouldn't go in the root of the
+ # dfs
+ instance = self.newvms[vmId]
+ instance.suspendCookie = suspendCookie
+ infof = self.dfs.open(infofile, "w")
+ name = domIdToName(vmId)
+ cPickle.dump(instance, infof)
+ infof.close()
+
+
+ # FIXME: handle errors
+ cmd = "xm save %i %s"%(vmId, tmpfile)
+ r = os.system(cmd)
+ if r !=0 :
+ print "xm save failed!"
+ raise Exception, "replace this with a real exception!"
+ r = self.dfs.copyTo(tmpfile, target)
+ self.newvms.pop(vmId)
+ os.unlink(tmpfile)
+ return vmId
+
+ @synchronizedmethod
+ def resumeVM(self, source):
+ infofile = source + ".info"
+ source = source + ".dat"
+ tmpfile = os.path.join("/tmp", source)
+ # FIXME: errors
+ infof = self.dfs.open(infofile, "r")
+ instance = cPickle.load(infof)
+ infof.close
+ self.dfs.unlink(infofile)
+
+ self.dfs.copyFrom(source, tmpfile)
+ r = os.system("xm restore %s"%(tmpfile))
+ os.unlink(tmpfile)
+
+ # FIXME: if the vmName function changes, suspended vms will become invalid
+ vmId = domNameToId(self.vmName(instance.id))
+ instance.vmId = vmId
+ self.newvms[vmId] = instance
+ return vmId, instance.suspendCookie
+
+ @synchronizedmethod
+ def prepReceiveVm(self, instance, source):
+ return cPickle.dumps(instance)
+ @synchronizedmethod
+ def migrateVm(self, vmId, target, transportCookie):
+ cmd = "xm migrate -l %i %s"%(vmId, target)
+ r = os.system(cmd)
+ if r != 0:
+ # FIXME: throw exception
+ print "migrate failed for VM %i"%vmId
+ raise Exception, "migrate failed for VM %i"%vmId
+ self.newvms.pop(vmId)
+ return vmId
+ @synchronizedmethod
+ def receiveVm(self, transportCookie):
+ instance = cPickle.loads(transportCookie)
+ vmId = domNameToId(self.vmName(instance.id))
+ print 'received VM, vmId=%i\n'%vmId
+ self.newvms[vmId] = instance
+ return vmId
+
+
+ @synchronizedmethod
+ def pauseVM(self, vmId):
+ r = os.system("xm pause %i"%vmId)
+ if r != 0:
+ print "xm pause failed for VM %i"%vmId
+ raise Exception, "xm pause failed for VM %i"%vmId
+ self.newvms[vmId].state = InstanceState.Paused
+ return vmId
+
+ @synchronizedmethod
+ def unpauseVM(self, VMId):
+ r = os.system("xm unpause %i"%VMId)
+ if r != 0:
+ print "xm unpause failed for VM %i"%VMId
+ raise Exception, "xm unpause failed for VM %i"%VMId
+ self.newvms[vmId].state = InstanceState.Running
+ return VMId
+
+ @synchronizedmethod
+ def shutdownVM(self, vmId):
+ r = os.system("xm shutdown %i"%vmId)
+ if r != 0:
+ print "xm shutdown failed for VM %i"%vmId
+ raise Exception, "xm shutdown failed for VM %i"%vmId
+ return vmId
+
+ @synchronizedmethod
+ def destroyVM(self, vmId):
+ r = os.system("xm destroy %i"%vmId)
+ if r != 0:
+ print "xm destroy failed for VM %i"%vmId
+ raise Exception, "xm destroy failed for VM %i"%vmId
+ return vmId
+
+
+ @synchronizedmethod
+ def getVMInfo(self, vmId):
+ return self.newvms[vmId]
+
+ @synchronizedmethod
+ def listVMs(self):
+ # On init, this should get a list from listVMs
+ return self.newvms.keys()
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/parallel.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/parallel.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/parallel.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/parallel.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,283 @@
+import threading
+import time
+import Queue
+import logging
+
+_log = logging.getLogger('tashi.parallel')
+
+def threaded(func):
+ def fn(*args, **kwargs):
+ thread = threading.Thread(target=func, args=args, kwargs=kwargs)
+ thread.start()
+ return thread
+ return fn
+
+
+class ThreadPool(Queue.Queue):
+ def __init__(self, size=8, maxsize=0):
+ Queue.Queue.__init__(self, maxsize)
+ for i in range(size):
+ thread = threading.Thread(target=self._worker)
+ thread.setDaemon(True)
+ thread.start()
+ def _worker(self):
+ while True:
+ try:
+ func, args, kwargs = self.get()
+ func(*args, **kwargs)
+ except Exception, e:
+ _log.error(e)
+ # FIXME: do something smarter here, backtrace, log,
+ # allow user-defined error handling...
+
+ def submit(self, func, *args, **kwargs):
+ self.put((func, args, kwargs))
+ def submitlist(self, func, args, kwargs):
+ self.put((func, args, kwargs))
+
+class ThreadPoolClass:
+ def __init__(self, size=8, maxsize=0):
+ self._threadpool_pool = ThreadPool(size=size, maxsize=maxsize)
+
+
+def threadpool(pool):
+ def dec(func):
+ def fn(*args, **kwargs):
+ pool.submit(func, *args, **kwargs)
+ return fn
+ return dec
+
+def threadpoolmethod(meth):
+ def fn(*args, **kwargs):
+ try:
+ pool = args[0]._threadpool_pool
+ except AttributeError:
+ pool = args[0].__dict__.setdefault('_threadpool_pool', ThreadPool())
+ # FIXME: how do we check parent class?
+# assert args[0].__class__ == ThreadPoolClass, "Thread pool method must be in a ThreadPoolClass"
+ pool.submit(meth, *args, **kwargs)
+ return fn
+
+def synchronized(lock=None):
+ if lock==None:
+ lock = threading.RLock()
+ def dec(func):
+ def fn(*args, **kwargs):
+ lock.acquire()
+ ex = None
+ try:
+ r = func(*args, **kwargs)
+ except Exception, e:
+ ex = e
+ lock.release()
+ if ex != None:
+ raise e
+ return r
+ return fn
+ return dec
+
+def synchronizedmethod(func):
+ def fn(*args, **kwargs):
+ try:
+ lock = args[0]._synchronized_lock
+ except AttributeError:
+ lock = args[0].__dict__.setdefault('_synchronized_lock', threading.RLock())
+ lock.acquire()
+ ex = None
+ try:
+ res = func(*args, **kwargs)
+ except Exception, e:
+ ex = e
+ lock.release()
+ if ex != None:
+ raise e
+ return res
+ return fn
+
+
+##############################
+# Test Code
+##############################
+import unittest
+import sys
+import time
+
+class TestThreadPool(unittest.TestCase):
+ def setUp(self):
+ self.errmargin = 0.5
+
+ def testUnthreaded(self):
+ queue = Queue.Queue()
+ def slowfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(4):
+ slowfunc()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testThreaded(self):
+ queue = Queue.Queue()
+ @threaded
+ def slowthreadfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(8):
+ slowthreadfunc()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
+
+ def testThreadPool(self):
+ pool = ThreadPool(size=4)
+ queue = Queue.Queue()
+ @threadpool(pool)
+ def slowpoolfunc(sleep=1):
+ time.sleep(sleep)
+ queue.put(None)
+ tt = time.time()
+ for i in range(8):
+ slowpoolfunc()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 2, 1)
+
+ def testUnthreadedMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(4):
+ sc.beslow()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testThreadedMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ @threaded
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(4):
+ sc.beslow()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
+
+ def testThreadPoolMethod(self):
+ queue = Queue.Queue()
+ class slowclass:
+ def __init__(self, sleep=1):
+ self.sleep=sleep
+ @threadpoolmethod
+ def beslow(self):
+ time.sleep(self.sleep)
+ queue.put(None)
+ sc = slowclass()
+ tt = time.time()
+ for i in range(16):
+ sc.beslow()
+ for i in range(16):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 2, 1)
+
+ def testSynchronized(self):
+ queue = Queue.Queue()
+ @synchronized()
+ def addtoqueue():
+ time.sleep(1)
+ queue.put(None)
+ @threaded
+ def slowthreadfunc():
+ addtoqueue()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc()
+ for i in range(4):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 4, 1)
+
+ def testSynchronizedMethod(self):
+ queue = Queue.Queue()
+ class addtoqueue:
+ @synchronizedmethod
+ def addtoqueue1(self):
+ time.sleep(1)
+ queue.put(None)
+ @synchronizedmethod
+ def addtoqueue2(self):
+ time.sleep(1)
+ queue.put(None)
+ atc = addtoqueue()
+ @threaded
+ def slowthreadfunc1():
+ atc.addtoqueue1()
+ @threaded
+ def slowthreadfunc2():
+ atc.addtoqueue2()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc1()
+ slowthreadfunc2()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 8, 1)
+
+ def testUnsynchronizedMethod(self):
+ queue = Queue.Queue()
+ class addtoqueue:
+ def addtoqueue1(self):
+ time.sleep(1)
+ queue.put(None)
+ def addtoqueue2(self):
+ time.sleep(1)
+ queue.put(None)
+ atc = addtoqueue()
+ @threaded
+ def slowthreadfunc1():
+ atc.addtoqueue1()
+ @threaded
+ def slowthreadfunc2():
+ atc.addtoqueue2()
+ tt = time.time()
+ for i in range(4):
+ slowthreadfunc1()
+ slowthreadfunc2()
+ for i in range(8):
+ queue.get()
+ tt = time.time() - tt
+ self.assertAlmostEqual(tt, 1, 1)
+
+
+
+if __name__=='__main__':
+ import sys
+
+ logging.basicConfig(level=logging.INFO,
+ format="%(asctime)s %(levelname)s:\t %(message)s",
+ stream=sys.stdout)
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestThreadPool)
+ unittest.TextTestRunner(verbosity=2).run(suite)
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/build.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/build.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/build.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/build.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,34 @@
+#!/usr/bin/python
+
+from __future__ import with_statement
+import shutil
+import os
+from os import path
+import re
+
+if __name__ == '__main__':
+ if (path.exists('gen-py')):
+ print 'Removing \'gen-py\' directory...'
+ shutil.rmtree('gen-py')
+
+ if (path.exists('../services')):
+ print 'Removing \'../services\' directory...'
+ shutil.rmtree('../services')
+
+ if (path.exists('../messaging/messagingthrift')):
+ print 'Removing \'../messaging/messagingthrift\' directory...'
+ shutil.rmtree('../messaging/messagingthrift')
+
+ print 'Generating Python code for \'services.thrift\'...'
+ os.system('thrift --gen py:new_style services.thrift')
+
+ print 'Copying generated code to \'tashi.services\' package...'
+ shutil.copytree('gen-py/services', '../services')
+
+ print 'Generatign Python code for \'messagingthrift\'...'
+ os.system('rm -rf gen-py')
+ os.system('thrift --gen py messagingthrift.thrift')
+
+ print 'Copying generated code to \'tashi.messaging.messagingthrift\' package...'
+ shutil.copytree(os.path.join('gen-py', 'messagingthrift'),
+ os.path.join('..', 'messaging', 'messagingthrift'))
Propchange: incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/build.py
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/messagingthrift.thrift
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/messagingthrift.thrift?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/messagingthrift.thrift (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/messagingthrift.thrift Mon Nov 3 06:45:25 2008
@@ -0,0 +1,20 @@
+
+typedef map<string, string> strstrmap
+
+service SubscriberThrift{
+ # the async keyword seems to slow things down in the simple
+ # tests. However, with non-trivial subscribers it will be
+ # necessary to use async here.
+ async void publish(strstrmap message)
+ async void publishList(list<strstrmap> messages)
+}
+
+service MessageBrokerThrift{
+ void log(strstrmap message),
+ void addSubscriber(string host, i16 port)
+ void removeSubscriber(string host, i16 port)
+ async void publish(strstrmap message)
+ async void publishList(list<strstrmap> messages)
+
+}
+
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/services.thrift
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/services.thrift?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/services.thrift (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/thrift/services.thrift Mon Nov 3 06:45:25 2008
@@ -0,0 +1,154 @@
+enum Errors {
+ ConvertedException = 1,
+ NoSuchInstanceId = 2,
+ NoSuchVmId = 3,
+ IncorrectVmState = 4,
+ NoSuchHost = 5,
+ NoSuchHostId = 6,
+ InstanceIdAlreadyExists = 7,
+ HostNameMismatch = 8,
+ HostNotUp = 9,
+ HostStateError = 10
+}
+
+enum InstanceState {
+ Pending = 1, // Job submitted
+ Activating = 2, // activateVm has been called, but instantiateVm hasn't finished yet
+ Running = 3, // Normal state
+ Pausing = 4, // Beginning pause sequence
+ Paused = 5 // Paused
+ Unpausing = 6, // Beginning unpause sequence
+ Suspending = 7, // Beginning suspend sequence
+ Resuming = 8, // Beginning resume sequence
+ MigratePrep = 9, // Migrate state #1
+ MigrateTrans = 10, // Migrate state #2
+ ShuttingDown = 11, // Beginning exit sequence
+ Destroying = 12, // Beginning exit sequence
+ Orphaned = 13, // Host is missing
+ Held = 14, // Activation failed
+ Exited = 15 // VM has exited
+}
+
+enum HostState {
+ Normal = 1,
+ Drained = 2
+}
+
+exception TashiException {
+ 1: Errors errno
+ 2: string msg
+}
+
+struct Host {
+ 1:i32 id,
+ 2:string name,
+ 3:bool up,
+ 4:bool decayed,
+ 5:HostState state,
+ 6:i32 memory,
+ 7:i32 cores
+ // Other properties (disk?)
+}
+
+struct Network {
+ 1:i32 id
+ 2:string name
+}
+
+struct User {
+ 1:i32 id,
+ 2:string name
+}
+
+struct MachineType {
+ 1:i32 id,
+ 2:string name,
+ 3:i32 memory,
+ 4:i32 cores
+}
+
+struct DiskConfiguration {
+ 1:string uri,
+ 2:bool persistent
+}
+
+struct NetworkConfiguration {
+ 1:i32 network
+ 2:string mac
+}
+
+struct Instance {
+ 1:i32 id,
+ 2:i32 vmId,
+ 3:i32 hostId,
+ 4:Host hostObj,
+ 5:bool decayed,
+ 6:InstanceState state,
+ 7:i32 userId,
+ 8:User userObj,
+ 9:string name, // User specified
+ 10:i32 type, // User specified
+ 11:MachineType typeObj,
+ 12:list<DiskConfiguration> disks, // User specified
+ 13:list<NetworkConfiguration> nics // User specified
+ 14:map<string, string> hints // User specified
+}
+
+service clustermanagerservice {
+ // Client-facing RPCs
+ Instance createVm(1:Instance instance) throws (1:TashiException e)
+
+ void shutdownVm(1:i32 instanceId) throws (1:TashiException e)
+ void destroyVm(1:i32 instanceId) throws (1:TashiException e)
+
+ void suspendVm(1:i32 instanceId, 2:string destination) throws (1:TashiException e)
+ Instance resumeVm(1:Instance instance, 2:string source) throws (1:TashiException e)
+
+ void migrateVm(1:i32 instanceId, 2:i32 targetHostId) throws (1:TashiException e)
+
+ void pauseVm(1:i32 instanceId) throws (1:TashiException e)
+ void unpauseVm(1:i32 instanceId) throws (1:TashiException e)
+
+ list<MachineType> getMachineTypes() throws (1:TashiException e)
+ list<Host> getHosts() throws (1:TashiException e)
+ list<Network> getNetworks() throws (1:TashiException e)
+ list<User> getUsers() throws (1:TashiException e)
+
+ list<Instance> getInstances() throws (1:TashiException e)
+
+ // NodeManager-facing RPCs
+ i32 registerNodeManager(1:Host host, 2:list<Instance> instances) throws (1:TashiException e)
+ void vmUpdate(1:i32 instanceId, 2:Instance instance, 3:InstanceState old) throws (1:TashiException e)
+
+ // Agent-facing RPCs
+ void activateVm(1:i32 instanceId, 2:Host host) throws (1:TashiException e)
+}
+
+// RPC-specific types
+struct ResumeVmRes {
+ 1:i32 vmId,
+ 2:string suspendCookie
+}
+
+service nodemanagerservice {
+ // ClusterManager-facing RPCs
+ i32 instantiateVm(1:Instance instance) throws (1:TashiException e)
+
+ void shutdownVm(1:i32 vmId) throws (1:TashiException e)
+ void destroyVm(1:i32 vmId) throws (1:TashiException e)
+
+ void suspendVm(1:i32 vmId, 2:string destination, 3:string suspendCookie) throws (1:TashiException e)
+ ResumeVmRes resumeVm(1:Instance instance, 2:string source) throws (1:TashiException e)
+
+ string prepReceiveVm(1:Instance instance, 2:Host source) throws (1:TashiException e)
+ void migrateVm(1:i32 vmId, 2:Host target, 3:string transportCookie) throws (1:TashiException e)
+ void receiveVm(1:Instance instance, 2:string transportCookie) throws (1:TashiException e)
+
+ void pauseVm(1:i32 vmId) throws (1:TashiException e)
+ void unpauseVm(1:i32 vmId) throws (1:TashiException e)
+
+ Instance getVmInfo(1:i32 vmId) throws (1:TashiException e)
+ list<i32> listVms() throws (1:TashiException e)
+
+ // Host getHostInfo() throws (1:TashiException e)
+}
Added: incubator/tashi/import/tashi-intel-r399/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/tashi/util.py?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/tashi/util.py (added)
+++ incubator/tashi/import/tashi-intel-r399/src/tashi/util.py Mon Nov 3 06:45:25 2008
@@ -0,0 +1,227 @@
+import ConfigParser
+import cPickle
+import os
+import select
+import signal
+import struct
+import sys
+import threading
+import time
+import traceback
+
+from tashi.services.ttypes import TashiException, Errors, InstanceState, HostState
+
+def broken(oldFunc):
+ """Decorator that is used to mark a function as temporarily broken"""
+ def newFunc(*args, **kw):
+ raise RuntimeError("%s is broken!" % (oldFunc.__name__))
+ newFunc.__doc__ = "[Broken] " + "" if oldFunc.__doc__ is None else oldFunc.__doc__
+ newFunc.__name__ = oldFunc.__name__
+ newFunc.__module__ = oldFunc.__module__
+ return newFunc
+
+def deprecated(oldFunc):
+ """Decorator that is used to deprecate functions"""
+ def newFunc(*args, **kw):
+ raise RuntimeError("%s has been deprecated!" % (oldFunc.__name__))
+ newFunc.__doc__ = "[Deprecated] " + str(oldFunc.__doc__)
+ newFunc.__name__ = oldFunc.__name__
+ newFunc.__module__ = oldFunc.__module__
+ return newFunc
+
+def logged(oldFunc):
+ """Decorator that is used to log a function's calls -- currently uses sys.stderr"""
+ def newFunc(*args, **kw):
+ logMsg = "%s(%s, %s) -> " % (oldFunc.__name__, str(args).strip("[]"), str(kw).strip("{}").replace(": ", "="))
+ sys.stderr.write(logMsg)
+ sys.stderr.flush()
+ try:
+ res = oldFunc(*args, **kw)
+ except Exception, e:
+ logMsg = "%s\n" % (str(e))
+ sys.stderr.write(logMsg)
+ sys.stderr.flush()
+ raise
+ logMsg = "%s\n" % (str(res))
+ sys.stderr.write(logMsg)
+ sys.stderr.flush()
+ newFunc.__doc__ = oldFunc.__doc__
+ newFunc.__name__ = oldFunc.__name__
+ newFunc.__module__ = oldFunc.__module__
+ return newFunc
+
+def timed(oldFunc):
+ """Decorator that is used to time a function's execution"""
+ def newFunc(*args, **kw):
+ start = time.time()
+ try:
+ res = oldFunc(*args, **kw)
+ finally:
+ finish = time.time()
+ print "%s: %f" % (oldFunc.__name__, finish-start)
+ return res
+ return newFunc
+
+def editAndContinue(file, mod, name):
+ def wrapper(oldFunc):
+ persist = {}
+ persist['lastMod'] = time.time()
+ persist['oldFunc'] = oldFunc
+ persist['func'] = oldFunc
+ def newFunc(*args, **kw):
+ modTime = os.stat(file)[8]
+ if (modTime > persist['lastMod']):
+ persist['lastMod'] = modTime
+ space = {}
+ exec ("import %s\nreload ( %s )" % (mod, mod)) in space
+ persist['func'] = eval(mod + "." + name, space)
+ return persist['func'](*args, **kw)
+ return newFunc
+ return wrapper
+
+class failsafe(object):
+ """Class that attempts to make RPCs, but will fall back to a local object that implements the same methods"""
+ def __attempt__(self, cur, fail):
+ def newFunc(*args, **kw):
+ try:
+ return cur(*args, **kw)
+ except Exception, e:
+ self.__dict__['__current_obj__'] = self.__dict__['__failsafe_obj__']
+ return fail(*args, **kw)
+ return newFunc
+
+ @deprecated
+ def __init__(self, obj):
+ self.__dict__['__failsafe_obj__'] = obj
+ self.__dict__['__current_obj__'] = obj
+
+ def __update_current__(self, obj):
+ self.__dict__['__current_obj__'] = obj
+
+ def __getattr__(self, name):
+ return self.__attempt__(getattr(self.__dict__['__current_obj__'], name), getattr(self.__dict__['__failsafe_obj__'], name))
+
+ def __setattr__(self, name, value):
+ return setattr(self.__dict__['__current_obj__'], name, value)
+
+ def __delattr__(self, name):
+ return delattr(self.__dict__['__current_obj__'], name)
+
+class reference(object):
+ """Class used to create a replacable reference to an object"""
+ @deprecated
+ def __init__(self, obj):
+ self.__dict__['__real_obj__'] = obj
+
+ def __update__(self, obj):
+ self.__dict__['__real_obj__'] = obj
+
+ def __getattr__(self, name):
+ return getattr(self.__dict__['__real_obj__'], name)
+
+ def __setattr__(self, name, value):
+ return setattr(self.__dict__['__real_obj__'], name, value)
+
+ def __delattr__(self, name):
+ return delattr(self.__dict__['__real_obj__'], name)
+
+def isolatedRPC(client, method, *args, **kw):
+ """Opens and closes a thrift transport for a single RPC call"""
+ if (not client._iprot.trans.isOpen()):
+ client._iprot.trans.open()
+ res = getattr(client, method)(*args, **kw)
+ client._iprot.trans.close()
+ return res
+
+def signalHandler(signalNumber):
+ """Used to denote a particular function as the signal handler for a
+ specific signal"""
+ def __decorator(function):
+ signal.signal(signalNumber, function)
+ return function
+ return __decorator
+
+def boolean(value):
+ """Convert a string to a boolean"""
+ lowercaseValue = value.lower()
+ if lowercaseValue in ['yes', 'true', '1']:
+ return True
+ elif lowercaseValue in ['no', 'false', '0']:
+ return False
+ else:
+ raise ValueError
+
+def instantiateImplementation(className, *args):
+ """Create an instance of an object with the given class name and list
+ of args to __init__"""
+ if (className.rfind(".") != -1):
+ package = className[:className.rfind(".")]
+ cmd = "import %s\n" % (package)
+ else:
+ cmd = ""
+ cmd += "obj = %s(*args)\n" % (className)
+ exec cmd in locals()
+ return obj
+
+def convertExceptions(oldFunc):
+ """This converts any exception type into a TashiException so that
+ it can be passed over a Thrift RPC"""
+ def newFunc(*args, **kw):
+ try:
+ return oldFunc(*args, **kw)
+ except TashiException, e:
+ raise
+ except Exception, e:
+ self = args[0]
+ if (self.convertExceptions):
+ raise TashiException(d={'errno':Errors.ConvertedException, 'msg': traceback.format_exc(10)})
+ raise
+ return newFunc
+
+def getConfig(additionalNames=[], additionalFiles=[]):
+ """Creates many permutations of a list of locations to look for config
+ files and then loads them"""
+ config = ConfigParser.ConfigParser()
+ baseLocations = ['./etc/', '/usr/share/tashi/', '/etc/tashi/', os.path.expanduser('~/.tashi/')]
+ names = ['Tashi'] + additionalNames
+ names = reduce(lambda x, y: x + [y+"Defaults", y], names, [])
+ allLocations = reduce(lambda x, y: x + reduce(lambda z, a: z + [y + a + ".cfg"], names, []), baseLocations, []) + additionalFiles
+ configFiles = config.read(allLocations)
+ if (len(configFiles) == 0):
+ raise Exception("No config file could be found: %s" % (str(allLocations)))
+ return (config, configFiles)
+
+def debugConsole(globalDict):
+ """A debugging console that optionally uses pysh"""
+ def realDebugConsole(globalDict):
+ try :
+ import atexit
+ from IPython.Shell import IPShellEmbed
+ def resetConsole():
+ (stdin, stdout) = os.popen2("reset")
+ stdout.read()
+ dbgshell = IPShellEmbed()
+ atexit.register(resetConsole)
+ dbgshell(local_ns=globalDict, global_ns=globalDict)
+ except Exception:
+ CONSOLE_TEXT=">>> "
+ input = " "
+ while (input != ""):
+ sys.stdout.write(CONSOLE_TEXT)
+ input = sys.stdin.readline()
+ try:
+ exec(input) in globalDict
+ except Exception, e:
+ print e
+ if (os.getenv("DEBUG", "0") == "1"):
+ threading.Thread(target=lambda: realDebugConsole(globalDict)).start()
+
+def enumToStringDict(cls):
+ d = {}
+ for i in cls.__dict__:
+ if (type(cls.__dict__[i]) is int):
+ d[cls.__dict__[i]] = i
+ return d
+
+vmStates = enumToStringDict(InstanceState)
+hostStates = enumToStringDict(HostState)
Added: incubator/tashi/import/tashi-intel-r399/src/utils/Makefile
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/utils/Makefile?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/utils/Makefile (added)
+++ incubator/tashi/import/tashi-intel-r399/src/utils/Makefile Mon Nov 3 06:45:25 2008
@@ -0,0 +1,7 @@
+all: nmd
+
+clean:
+ rm -f ./nmd
+
+nmd: nmd.c
+ ${CC} $< -o $@
Added: incubator/tashi/import/tashi-intel-r399/src/utils/nmd.c
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/src/utils/nmd.c?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/src/utils/nmd.c (added)
+++ incubator/tashi/import/tashi-intel-r399/src/utils/nmd.c Mon Nov 3 06:45:25 2008
@@ -0,0 +1,83 @@
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <dirent.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <assert.h>
+
+#define SLEEP_INTERVAL 10
+#define TASHI_PATH "/scratch/mryan3-d4/tashi/branches/mryan3/"
+#define LOG_FILE "/var/log/nodemanager.log"
+
+void make_invincible()
+{
+ int oom_adj_fd;
+ int r;
+
+ oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
+ assert(oom_adj_fd != -1);
+ r = write(oom_adj_fd, "-17\n", 4);
+ assert(r == 4);
+ close(oom_adj_fd);
+
+}
+
+void make_vulnerable()
+{
+ int oom_adj_fd;
+ int r;
+
+ oom_adj_fd = open("/proc/self/oom_adj", O_WRONLY);
+ assert(oom_adj_fd != -1);
+ r = write(oom_adj_fd, "0\n", 2);
+ assert(r == 2);
+ close(oom_adj_fd);
+}
+
+int main(int argc, char **argv)
+{
+ char* env[2];
+ int status;
+ DIR* d;
+ int pid;
+ int lfd;
+ int forground=0;
+
+ if ((argc > 1) && (strncmp(argv[1], "-f", 3)==0)) {
+ forground=1;
+ }
+ if (!forground) {
+ pid = fork();
+ if (pid != 0) {
+ exit(0);
+ }
+ close(0);
+ close(1);
+ close(2);
+ }
+ make_invincible();
+ env[0] = "PYTHONPATH="TASHI_PATH"/src/";
+ env[1] = NULL;
+ while (1) {
+ pid = fork();
+ if (pid == 0) {
+ make_vulnerable();
+ if (!forground) {
+ lfd = open(LOG_FILE, O_WRONLY|O_APPEND|O_CREAT);
+ if (lfd < 0) {
+ lfd = open("/dev/null", O_WRONLY);
+ }
+ dup2(lfd, 2);
+ dup2(lfd, 1);
+ close(0);
+ }
+ chdir(TASHI_PATH);
+ execle("./bin/nodemanager.py", "./bin/nodemanager.py", NULL, env);
+ exit(-1);
+ }
+ sleep(SLEEP_INTERVAL);
+ waitpid(pid, &status, 0);
+ }
+}
Added: incubator/tashi/import/tashi-intel-r399/svn-pull
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/svn-pull?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/svn-pull (added)
+++ incubator/tashi/import/tashi-intel-r399/svn-pull Mon Nov 3 06:45:25 2008
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+pushd ../../trunk
+tar -czf ../blah.tar.gz --exclude=".svn"
+popd
+tar xvzf ../../blah.tar.gz
+rm ../../blah.tar.gz
\ No newline at end of file
Added: incubator/tashi/import/tashi-intel-r399/test/clustermanager-rpc-data-management/test
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/test/clustermanager-rpc-data-management/test?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/test/clustermanager-rpc-data-management/test (added)
+++ incubator/tashi/import/tashi-intel-r399/test/clustermanager-rpc-data-management/test Mon Nov 3 06:45:25 2008
@@ -0,0 +1,212 @@
+#!/usr/bin/env python
+
+import unittest
+import logging
+import sys
+import signal
+import os.path
+import copy
+import time
+import random
+import subprocess
+from ConfigParser import ConfigParser
+
+from tashi.services.ttypes import *
+from thrift.transport.TSocket import TSocket
+from thrift.protocol.TBinaryProtocol import TBinaryProtocol
+from thrift.transport.TTransport import TBufferedTransport
+
+from tashi.services import clustermanagerservice
+from tashi.util import getConfig
+
+import tashi.client.client
+
+class ClientConnection():
+ '''Creates an rpc proxy'''
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+ self.socket = TSocket(host, int(port))
+ self.socket.setTimeout(5000.0)
+ self.transport = TBufferedTransport(self.socket)
+ self.protocol = TBinaryProtocol(self.transport)
+ self.client = clustermanagerservice.Client(self.protocol)
+ self.client._transport = self.transport
+ self.client._transport.open()
+ def __del__(self):
+ self.client._transport.close()
+
+class TestClient(unittest.TestCase):
+ """macro test cases for single-host tests
+
+ Assumes cwd is 'src/tashi/client/'
+ """
+ def setUp(self):
+ """Create a CM on local host"""
+ logging.info('setting up test')
+
+ (self.config, self.configFiles) = getConfig([])
+
+ self.cwd = os.getcwd()
+ self.srcd = os.path.join(self.cwd, 'src')
+
+ self.environ = copy.copy(os.environ)
+ self.environ['PYTHONPATH'] = self.srcd
+ logging.info('base path = %s' % self.cwd)
+
+ self.devnull = os.open("/dev/null", os.O_WRONLY)
+ self.cm = subprocess.Popen(args=['python', os.path.join(self.cwd,
+ 'bin', 'clustermanager.py'), '--drop',
+ '--create'], executable='python',
+ env=self.environ, stdout=self.devnull)
+ self.cm.wait()
+ self.cm = subprocess.Popen(args=['python', os.path.join(self.cwd,
+ 'bin', 'clustermanager.py')],
+ executable='python', env=self.environ,
+ stdout=self.devnull)
+ # since we are spawning with P_NOWAIT, we need to sleep to ensure that the CM is listening
+ fail_count = 0
+ try:
+ while True:
+ try:
+ self.connection = ClientConnection('localhost', '9882')
+ break
+ except Exception, e:
+ if (fail_count > 20):
+ raise
+ fail_count = fail_count + 1
+ time.sleep(0.1)
+ except Exception, e:
+ logging.warning('client connection failed')
+ ex = None
+ try:
+ logging.warning('setUp killing cluster manager ' + str(self.cm.pid))
+ os.kill(self.cm.pid, signal.SIGKILL)
+ self.cm.wait()
+ except Exception, e:
+ ex = e
+ logging.warning('could not kill cluster manager: ' + str(e))
+ if ex != None:
+ raise ex
+
+ def tearDown(self):
+ '''Kill the CM that was created by setUP'''
+ logging.info('tearing down test')
+ ex = None
+ try:
+ logging.debug("killing cluster manager " + str(self.cm.pid))
+ os.kill(self.cm.pid, signal.SIGKILL)
+ except Exception, e:
+ ex = e
+ logging.error('Could not kill cluster manager: ' + str(e))
+
+ if ex != None:
+ raise ex
+
+ def metaTest(self, (get, add, remove), setup, name):
+ '''Meta-test to insert some number of a particular type of object,
+ check that they were all inserted, remove some of them, and check
+ that the correct subset were removed'''
+ names = ['sleepy', 'sneezy', 'dopey', 'doc',
+ 'grumpy', 'bashful', 'happy']
+ for n in names:
+ obj = setup(n)
+ add(obj)
+ objects = get()
+ self.assertEqual(len(names), len(objects))
+ for o in objects:
+ names.remove(name(o))
+ self.assertEqual(0, len(names))
+ # remove a random subset
+ rm = random.sample(objects, 4)
+ for o in rm:
+ remove(o.id)
+ objects.remove(o)
+ newObjects = get()
+ for o in newObjects:
+ found = False
+ for o2 in objects:
+ if (name(o2) == name(o)):
+ objects.remove(o2)
+ found = True
+ self.assertEqual(found, True)
+ self.assertEqual(0, len(objects))
+
+ def testUserManagement(self):
+ '''test adding/removing/listing users'''
+ self.metaTest((self.connection.client.getUsers, self.connection.client.addUser, self.connection.client.removeUser), lambda n: User(d={'username':n}), lambda o: o.username)
+
+ def testHostManagement(self):
+ '''test adding/removing/listing hosts'''
+ self.metaTest((self.connection.client.getHosts, self.connection.client.addHost, self.connection.client.removeHost), lambda n: Host(d={'hostname':n,'enabled':True}), lambda o: o.hostname)
+
+ def testInstanceConfigurationManagement(self):
+ '''test adding/removing/listing instance configurations'''
+ self.metaTest((self.connection.client.getInstanceConfigurations, self.connection.client.addInstanceConfiguration, self.connection.client.removeInstanceConfiguration), lambda n: InstanceConfiguration(d={'name':n, 'cores':1, 'memory':512, 'parentId':None}), lambda o: o.name)
+
+ def testHardDiskConfigurationManagement(self):
+ '''test adding/removing/listing hard disk configurations'''
+ global idNum
+ idNum = 0
+ mapping = {}
+ revMapping = {}
+ instanceConfiguration = InstanceConfiguration(d={'name':'foobar', 'cores':1, 'memory':512, 'parentId':None})
+ self.connection.client.addInstanceConfiguration(instanceConfiguration)
+ instanceConfigurationId = self.connection.client.getInstanceConfigurations()[0].id
+ def setup(n):
+ global idNum
+ if n in mapping:
+ id = mapping[n]
+ else:
+ id = idNum
+ idNum = idNum + 1
+ mapping[n] = id
+ revMapping[id] = n
+ return HardDiskConfiguration(d={'instanceConfigurationId':instanceConfigurationId, 'index':id, 'persistentImageId':None, 'persistent':False})
+ def name(o):
+ return revMapping[o.index]
+ self.metaTest((self.connection.client.getHardDiskConfigurations, self.connection.client.addHardDiskConfiguration, self.connection.client.removeHardDiskConfiguration), setup, name)
+
+ def testNetworkInterfaceConfigurationManagement(self):
+ '''test adding/removing/listing network interface configurations'''
+ global idNum
+ idNum = 0
+ mapping = {}
+ revMapping = {}
+ instanceConfiguration = InstanceConfiguration(d={'name':'foobar', 'cores':1, 'memory':512, 'parentId':None})
+ self.connection.client.addInstanceConfiguration(instanceConfiguration)
+ instanceConfigurationId = self.connection.client.getInstanceConfigurations()[0].id
+ def setup(n):
+ global idNum
+ if n in mapping:
+ id = mapping[n]
+ else:
+ id = idNum
+ idNum = idNum + 1
+ mapping[n] = id
+ revMapping[id] = n
+ return NetworkInterfaceConfiguration(d={'instanceConfigurationId':instanceConfigurationId, 'index':id, 'macAddress':"52:54:00:2f:2f:%2.2x" % (id)})
+ def name(o):
+ return revMapping[o.index]
+ self.metaTest((self.connection.client.getNetworkInterfaceConfigurations, self.connection.client.addNetworkInterfaceConfiguration, self.connection.client.removeNetworkInterfaceConfiguration), setup, name)
+
+ def testPersistentImageManagement(self):
+ '''test adding/removing/listing persistent images'''
+ user = User(d={'username':'foobar'})
+ self.connection.client.addUser(user)
+ userId = self.connection.client.getUsers()[0].id
+ self.metaTest((self.connection.client.getPersistentImages, self.connection.client.addPersistentImage, self.connection.client.removePersistentImage), lambda n: PersistentImage(d={'name':n, 'userId':userId}), lambda o: o.name)
+
+##############################
+# Test Code
+##############################
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.NOTSET,
+ format="%(asctime)s %(levelname)s:\t %(message)s",
+ stream=sys.stdout)
+
+ suite = unittest.TestLoader().loadTestsFromTestCase(TestClient)
+ res = unittest.TextTestRunner(verbosity=2).run(suite)
+ if (len(res.errors) + len(res.failures) > 0):
+ sys.exit(1)
+ sys.exit(0)
Added: incubator/tashi/import/tashi-intel-r399/test/runall
URL: http://svn.apache.org/viewvc/incubator/tashi/import/tashi-intel-r399/test/runall?rev=710072&view=auto
==============================================================================
--- incubator/tashi/import/tashi-intel-r399/test/runall (added)
+++ incubator/tashi/import/tashi-intel-r399/test/runall Mon Nov 3 06:45:25 2008
@@ -0,0 +1,33 @@
+#! /bin/bash
+
+TEST_DIR=`echo $0 | sed 's/runall$//'`
+FIFO="/tmp/tashi-test"
+
+PASS_COUNT=0
+FAIL_COUNT=0
+RUN_COUNT=0
+
+export TESTING=1
+
+for f in `ls ${TEST_DIR}`; do
+ if [ -d ${TEST_DIR}/${f} ]; then
+ echo "${f}:"
+ mkfifo ${FIFO}
+ cat ${FIFO} | tee ${TEST_DIR}/${f}/log.txt &
+ ${TEST_DIR}/${f}/test > ${FIFO} 2>&1
+ RES=$?
+ if [[ -e ${TEST_DIR}/${f}/cleanup ]]; then
+ ${TEST_DIR}/${f}/cleanup
+ fi
+ if [[ ${RES} -eq 0 ]]; then
+ PASS_COUNT=$((PASS_COUNT+1))
+ else
+ FAIL_COUNT=$((FAIL_COUNT+1))
+ fi
+ RUN_COUNT=$((RUN_COUNT+1))
+ wait
+ rm -f ${FIFO}
+ fi
+done
+
+echo ${PASS_COUNT}/${RUN_COUNT}
Propchange: incubator/tashi/import/tashi-intel-r399/test/runall
------------------------------------------------------------------------------
svn:executable = *