You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2011/07/16 10:04:56 UTC
svn commit: r1147396 - in /incubator/tashi/trunk/src/tashi:
agents/primitive.py clustermanager/clustermanagerservice.py
clustermanager/data/sql.py nodemanager/nodemanagerservice.py
nodemanager/vmcontrol/qemu.py rpycservices/rpycservices.py
Author: stroucki
Date: Sat Jul 16 10:04:54 2011
New Revision: 1147396
URL: http://svn.apache.org/viewvc?rev=1147396&view=rev
Log:
qemu.py: Use LVM for scratch space
qemu.py: Let VMM be authoritative for VM state on machine
qemu.py: Ensure instance entry has vmId set
qemu.py: Remove non-existent units from /proc/meminfo parsing
nodemanagerservice.py: Let VMM be authoritative for VM state on machine
primitive.py: handle all defined hosts, not just those up
clustermanagerservice.py: clean up node cleanups
sql.py: Fix deadlock in manipulating instances
Modified:
incubator/tashi/trunk/src/tashi/agents/primitive.py
incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Sat Jul 16 10:04:54 2011
@@ -63,7 +63,8 @@ class Primitive(object):
load = {}
ctr = 0
for h in self.client.getHosts():
- if (h.up == True and h.state == HostState.Normal):
+ #XXXstroucki get all hosts here?
+ #if (h.up == True and h.state == HostState.Normal):
hosts[ctr] = h
ctr = ctr + 1
load[h.id] = []
@@ -128,6 +129,10 @@ class Primitive(object):
# cycle list
for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
h = self.hosts[ctr]
+
+ # XXXstroucki if it's down, find another machine
+ if (h.up == False):
+ continue
# if it's reserved, see if we can use it
if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Sat Jul 16 10:04:54 2011
@@ -69,7 +69,6 @@ class ClusterManagerService(object):
def stateTransition(self, instance, old, cur):
if (old and instance.state != old):
- self.data.releaseInstance(instance)
raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
instance.state = cur
@@ -108,6 +107,7 @@ class ClusterManagerService(object):
sleepFor = min(self.lastContacted[k] + self.expireHostTime - now, sleepFor)
for hostId in self.decayedHosts.keys():
# XXXstroucki: what if the host undecays here?
+ # XXXstroucki: exceptions every so often; should get a lock
if (self.decayedHosts[hostId] < (now-self.allowDecayed)):
host = self.data.getHost(hostId)
self.log.warning('Fetching state from host %s because it is decayed' % (host.name))
@@ -217,8 +217,6 @@ class ClusterManagerService(object):
# XXXstroucki: This is a problem with keeping
# clean state.
self.stateTransition(instance, None, InstanceState.Destroying)
- self.data.releaseInstance(instance)
-
if instance.hostId is None:
self.data.removeInstance(instance)
else:
@@ -226,6 +224,7 @@ class ClusterManagerService(object):
try:
if hostname is not None:
self.proxy[hostname].destroyVm(instance.vmId)
+ self.data.releaseInstance(instance)
except Exception:
self.log.exception('destroyVm failed on host %s vmId %d' % (hostname, instance.vmId))
self.data.removeInstance(instance)
@@ -284,11 +283,12 @@ class ClusterManagerService(object):
except Exception, e:
self.log.exception('migrateVm failed')
raise
- #instance = self.data.acquireInstance(instance.id)
- #try:
- # instance.hostId = targetHost.id
- #finally:
- # self.data.releaseInstance(instance)
+ try:
+ instance = self.data.acquireInstance(instance.id)
+ instance.hostId = targetHost.id
+ finally:
+ self.data.releaseInstance(instance)
+
try:
# Notify the target
vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
@@ -376,6 +376,15 @@ class ClusterManagerService(object):
if (host.version == version and oldHost.state == HostState.VersionMismatch):
oldHost.state = HostState.Normal
for instance in instances:
+ string = ""
+ string = string + "id %d " % instance.id
+ string = string + "host %d " % host.id
+ string = string + "vmId %d " % instance.vmId
+ string = string + "user %d " % instance.userId
+ string = string + "cores %d " % instance.cores
+ string = string + "memory %d " % instance.memory
+ self.log.info('Accounting: ' + string)
+ #self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
try:
oldInstance = self.data.acquireInstance(instance.id)
except TashiException, e:
@@ -385,7 +394,9 @@ class ClusterManagerService(object):
continue
#oldInstance = self.data.registerInstance(instance)
else:
+ self.log.exception("failed to acquire instance")
raise
+
try:
if (oldInstance.hostId != host.id):
self.log.info('Host %s is claiming instance %d actually owned by hostId %s (decay)' % (host.name, oldInstance.id, str(oldInstance.hostId)))
@@ -407,6 +418,7 @@ class ClusterManagerService(object):
oldHost.decayed = True
self.data.releaseInstance(instance)
except Exception, e:
+ self.log.exception("Exception in RegisterNodeManager")
oldHost.decayed = True
raise
finally:
@@ -419,11 +431,14 @@ class ClusterManagerService(object):
try:
oldInstance = self.data.acquireInstance(instanceId)
except TashiException, e:
+ self.data.releaseInstance(oldInstance)
if (e.errno == Errors.NoSuchInstanceId):
self.log.exception('Got vmUpdate for unknown instanceId %d' % (instanceId))
return
else:
+ self.log.exception("Could not acquire instance")
raise
+
if (instance.state == InstanceState.Exited):
oldInstance.decayed = False
self.updateDecay(self.decayedInstances, oldInstance)
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Sat Jul 16 10:04:54 2011
@@ -49,6 +49,7 @@ class SQL(DataInterface):
self.instanceLock = threading.Lock()
self.instanceIdLock = threading.Lock()
self.instanceLocks = {}
+ self.instanceBusy = {}
self.hostLock = threading.Lock()
self.hostLocks = {}
self.maxInstanceId = 1
@@ -63,7 +64,7 @@ class SQL(DataInterface):
try:
cur.execute(stmt)
except:
- self.log.exception('Exception executing SQL statement')
+ self.log.exception('Exception executing SQL statement %s' % stmt)
finally:
self.sqlLock.release()
return cur
@@ -135,6 +136,7 @@ class SQL(DataInterface):
instance._lock = threading.Lock()
self.instanceLocks[instance.id] = instance._lock
instance._lock.acquire()
+ self.instanceBusy[instance.id] = True
l = self.makeInstanceList(instance)
self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
finally:
@@ -142,7 +144,13 @@ class SQL(DataInterface):
return instance
def acquireInstance(self, instanceId):
- self.instanceLock.acquire()
+ busyCheck = True
+ while busyCheck == True:
+ self.instanceLock.acquire()
+ busyCheck = self.instanceBusy.setdefault(instanceId, False)
+ if busyCheck:
+ self.instanceLock.release()
+
try:
cur = self.executeStatement("SELECT * from instances WHERE id = %d" % (instanceId))
l = cur.fetchone()
@@ -152,6 +160,7 @@ class SQL(DataInterface):
self.instanceLocks[instance.id] = self.instanceLocks.get(instance.id, threading.Lock())
instance._lock = self.instanceLocks[instance.id]
instance._lock.acquire()
+ self.instanceBusy[instance.id] = True
finally:
self.instanceLock.release()
return instance
@@ -166,7 +175,11 @@ class SQL(DataInterface):
if (e < len(self.instanceOrder)-1):
s = s + ", "
self.executeStatement("UPDATE instances SET %s WHERE id = %d" % (s, instance.id))
+ self.instanceBusy[instance.id] = False
instance._lock.release()
+ except:
+ self.log.exception("Excepted while holding lock")
+ raise
finally:
self.instanceLock.release()
@@ -174,8 +187,13 @@ class SQL(DataInterface):
self.instanceLock.acquire()
try:
self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
- instance._lock.release()
+ #XXXstroucki extraneous instance won't have a lock
+ try:
+ instance._lock.release()
+ except:
+ pass
del self.instanceLocks[instance.id]
+ del self.instanceBusy[instance.id]
finally:
self.instanceLock.release()
@@ -292,7 +310,6 @@ class SQL(DataInterface):
self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, id))
self.hostLock.release()
return r[0], True
-
id = self.getNewId("hosts")
host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
l = self.makeHostList(host)
Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Sat Jul 16 10:04:54 2011
@@ -41,7 +41,6 @@ class NodeManagerService(object):
self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
- self.cmRegisterHost = boolean(config.get("NodeManagerService", "registerHost"))
if self.authAndEncrypt:
self.username = config.get('AccessClusterManager', 'username')
self.password = config.get('AccessClusterManager', 'password')
@@ -65,22 +64,16 @@ class NodeManagerService(object):
if (vmId not in vmList):
self.log.warning('vmcontrol backend does not report %d' % (vmId))
self.vmStateChange(vmId, None, InstanceState.Exited)
-
- if self.cmRegisterHost:
- self.registerHost()
-
+ self.registerHost()
threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
threading.Thread(target=self.registerWithClusterManager).start()
threading.Thread(target=self.statsThread).start()
def loadVmInfo(self):
try:
- f = open(self.infoFile, "r")
- data = f.read()
- f.close()
- self.instances = cPickle.loads(data)
+ self.instances = self.vmm.getInstances()
except Exception, e:
- self.log.warning('Failed to load VM info from %s' % (self.infoFile))
+ self.log.exception('Failed to obtain VM info')
self.instances = {}
def saveVmInfo(self):
@@ -321,14 +314,10 @@ class NodeManagerService(object):
def registerHost(self):
cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- try:
- hostname = socket.gethostname()
- # populate some defaults
- # XXXstroucki: I think it's better if the nodemanager fills these in properly when sending updates to the clustermanager
- # XXXstroucki: plus, do you want strange hosts joining your cluster?
- memory = 0
- cores = 0
- version = "empty"
- cm.registerHost(hostname, memory, cores, version)
- except:
- self.log.exception("Could not auto-register host with CM")
+ hostname = socket.gethostname()
+ # populate some defaults
+ # XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
+ memory = 0
+ cores = 0
+ version = "empty"
+ #cm.registerHost(hostname, memory, cores, version)
Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Sat Jul 16 10:04:54 2011
@@ -105,6 +105,7 @@ class Qemu(VmControlInterface):
self.consolePortLock = threading.Lock()
self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
self.stats = {}
+ # XXXstroucki revise
self.scratchDir = self.config.get("Qemu", "scratchDir")
if len(self.scratchDir) == 0:
self.scratchDir = "/tmp"
@@ -122,10 +123,6 @@ class Qemu(VmControlInterface):
def __init__(self, **attrs):
self.__dict__.update(attrs)
- def __cleanScratchSpace(self, file):
- time.sleep(5)
- os.unlink(file)
-
def getSystemPids(self):
"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
pids = []
@@ -137,15 +134,20 @@ class Qemu(VmControlInterface):
except Exception:
pass
return pids
-
+
+ def getInstances(self):
+ """Will return a dict of instances by vmId to the caller"""
+ return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
+
def matchSystemPids(self, controlledVMs):
"""This is run in a separate polling thread and it must do things that are thread safe"""
vmIds = controlledVMs.keys()
pids = self.getSystemPids()
for vmId in vmIds:
+ child = controlledVMs[vmId]
+
if vmId not in pids:
os.unlink(self.INFO_DIR + "/%d"%(vmId))
- child = controlledVMs[vmId]
del controlledVMs[vmId]
try:
del self.stats[vmId]
@@ -170,11 +172,26 @@ class Qemu(VmControlInterface):
for i in child.monitorHistory:
f.write(i)
f.close()
+ #XXXstroucki remove scratch storage
+ scratch_name = child.instance.name
+ log.info("Removing scratch for " + scratch_name)
+ #cmd = "/sbin/lvremove -f vgscratch/lv" + scratch_name
+ cmd = "/sbin/lvremove -f vgscratch"
+ result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
try:
if (not child.migratingOut):
self.nm.vmStateChange(vmId, None, InstanceState.Exited)
except Exception, e:
log.exception("vmStateChange failed")
+ else:
+ try:
+ if (child.migratingOut):
+ self.nm.vmStateChange(vmId, None, InstanceState.MigrateTrans)
+ else:
+ self.nm.vmStateChange(vmId, None, InstanceState.Running)
+ except:
+ #XXXstroucki nm is initialised at different time
+ log.exception("vmStateChange failed")
def scanInfoDir(self):
@@ -192,6 +209,10 @@ class Qemu(VmControlInterface):
self.vncPortLock.release()
child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
child.monitor = os.fdopen(child.monitorFd)
+
+ #XXXstroucki ensure instance has vmId
+ child.instance.vmId = vmId
+
self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
except Exception, e:
@@ -308,12 +329,6 @@ class Qemu(VmControlInterface):
memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
if (memoryStr[2] == "kB"):
host.memory = int(memoryStr[1])/1024
- elif (memoryStr[2] == "mB"):
- host.memory = int(memoryStr[1])
- elif (memoryStr[2] == "gB"):
- host.memory = int(memoryStr[1])*1024
- elif (memoryStr[2] == " B"):
- host.memory = int(memoryStr[1])/(1024*1024)
else:
log.warning('Unable to determine amount of physical memory - reporting 0')
host.memory = 0
@@ -374,18 +389,17 @@ class Qemu(VmControlInterface):
try:
if scratchSize > 0:
- print 'creating scratch file'
# create scratch disk
# XXXstroucki: needs to be cleaned somewhere
- scratch_file = os.path.join(self.scratchDir, instance.name + ".scratch")
- print 'scratch file name is ', scratch_file
- tempfd = open(scratch_file, "w")
- tempfd.seek( (scratchSize * 2 ** 30) - 1 )
- tempfd.write('x')
- tempfd.close()
+ # XXXstroucki: clean user provided instance name
+ scratch_name = "lv" + instance.name
+ # XXXstroucki hold lock
+ # XXXstroucki check for capacity
+ cmd = "/sbin/lvcreate -n" + scratch_name + " -L" + str(scratchSize) + "G vgscratch"
+ result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
index += 1
- thisDiskList = [ "file=%s" % scratch_file ]
+ thisDiskList = [ "file=/dev/vgscratch/%s" % scratch_name ]
thisDiskList.append("if=%s" % diskInterface)
thisDiskList.append("index=%d" % index)
thisDiskList.append("cache=off")
@@ -453,16 +467,10 @@ class Qemu(VmControlInterface):
child = self.anonClass(pid=pid, instance=instance, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = True, OSchild = True)
child.ptyFile = None
child.vncPort = -1
+ child.instance.vmId = child.pid
self.saveChildInfo(child)
self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
-
- # clean up scratch file
- if scratchSize > 0 and scratch_file is not None:
- # do this in the background
- threading.Thread(target=self.__cleanScratchSpace,args=[scratch_file]).start()
-
-
return (child.pid, cmd)
def getPtyInfo(self, child, issueContinue):
Modified: incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py (original)
+++ incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py Sat Jul 16 10:04:54 2011
@@ -49,7 +49,9 @@ class client:
def connectWrap(*args):
args = cPickle.dumps(clean(args))
try:
+ # XXXstroucki this is raising exception but not getting handled
res = getattr(self.conn.root, name)(args)
+ # XXXstroucki handle exception coming back
except Exception, e:
self.conn.close()
raise e