You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2011/07/16 10:04:56 UTC

svn commit: r1147396 - in /incubator/tashi/trunk/src/tashi: agents/primitive.py clustermanager/clustermanagerservice.py clustermanager/data/sql.py nodemanager/nodemanagerservice.py nodemanager/vmcontrol/qemu.py rpycservices/rpycservices.py

Author: stroucki
Date: Sat Jul 16 10:04:54 2011
New Revision: 1147396

URL: http://svn.apache.org/viewvc?rev=1147396&view=rev
Log:
qemu.py: Use LVM for scratch space
qemu.py: Let VMM be authoritative for VM state on machine
qemu.py: Ensure instance entry has vmId set
qemu.py: Remove non-existent units from /proc/meminfo parsing
nodemanagerservice.py: Let VMM be authoritative for VM state on machine
primitive.py: handle all defined hosts, not just those up
clustermanagerservice.py: clean up node cleanups
sql.py: Fix deadlock in manipulating instances

Modified:
    incubator/tashi/trunk/src/tashi/agents/primitive.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
    incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py

Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Sat Jul 16 10:04:54 2011
@@ -63,7 +63,8 @@ class Primitive(object):
 		load = {}
 		ctr = 0
 		for h in self.client.getHosts():
-			if (h.up == True and h.state == HostState.Normal):
+			#XXXstroucki get all hosts here?
+			#if (h.up == True and h.state == HostState.Normal):
 				hosts[ctr] = h
 				ctr = ctr + 1
 				load[h.id] = []
@@ -128,6 +129,10 @@ class Primitive(object):
 				# cycle list
 				for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
 					h = self.hosts[ctr]
+
+					# XXXstroucki if it's down, find another machine
+					if (h.up == False):
+						continue
 		
 					# if it's reserved, see if we can use it
 					if ((len(h.reserved) > 0) and inst.userId not in h.reserved):

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Sat Jul 16 10:04:54 2011
@@ -69,7 +69,6 @@ class ClusterManagerService(object):
 
 	def stateTransition(self, instance, old, cur):
 		if (old and instance.state != old):
-			self.data.releaseInstance(instance)
 			raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
 		instance.state = cur
 
@@ -108,6 +107,7 @@ class ClusterManagerService(object):
 						sleepFor = min(self.lastContacted[k] + self.expireHostTime - now, sleepFor)
 				for hostId in self.decayedHosts.keys():
 					# XXXstroucki: what if the host undecays here?
+					# XXXstroucki: exceptions every so often; should get a lock
 					if (self.decayedHosts[hostId] < (now-self.allowDecayed)):
 						host = self.data.getHost(hostId)
 						self.log.warning('Fetching state from host %s because it is decayed' % (host.name))
@@ -217,8 +217,6 @@ class ClusterManagerService(object):
 			# XXXstroucki: This is a problem with keeping
 			# clean state.
 			self.stateTransition(instance, None, InstanceState.Destroying)
-			self.data.releaseInstance(instance)
-
 			if instance.hostId is None:
 				self.data.removeInstance(instance)
 			else:
@@ -226,6 +224,7 @@ class ClusterManagerService(object):
 				try:
 					if hostname is not None:
 						self.proxy[hostname].destroyVm(instance.vmId)
+						self.data.releaseInstance(instance)
 				except Exception:
 					self.log.exception('destroyVm failed on host %s vmId %d' % (hostname, instance.vmId))
 					self.data.removeInstance(instance)
@@ -284,11 +283,12 @@ class ClusterManagerService(object):
 		except Exception, e:
 			self.log.exception('migrateVm failed')
 			raise
-		#instance = self.data.acquireInstance(instance.id)
-		#try:
-		#	instance.hostId = targetHost.id
-		#finally:
-		#	self.data.releaseInstance(instance)
+		try:
+			instance = self.data.acquireInstance(instance.id)
+			instance.hostId = targetHost.id
+		finally:
+			self.data.releaseInstance(instance)
+
 		try:
 			# Notify the target
 			vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
@@ -376,6 +376,15 @@ class ClusterManagerService(object):
 				if (host.version == version and oldHost.state == HostState.VersionMismatch):
 					oldHost.state = HostState.Normal
 				for instance in instances:
+					string = ""
+					string = string + "id %d " % instance.id
+					string = string + "host %d " % host.id
+					string = string + "vmId %d " % instance.vmId
+					string = string + "user %d " % instance.userId
+					string = string + "cores %d " % instance.cores
+					string = string + "memory %d " % instance.memory
+					self.log.info('Accounting: ' + string)
+					#self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
 					try:
 						oldInstance = self.data.acquireInstance(instance.id)
 					except TashiException, e:
@@ -385,7 +394,9 @@ class ClusterManagerService(object):
 							continue
 							#oldInstance = self.data.registerInstance(instance)
 						else:
+							self.log.exception("failed to acquire instance")
 							raise
+
 					try:
 						if (oldInstance.hostId != host.id):
 							self.log.info('Host %s is claiming instance %d actually owned by hostId %s (decay)' % (host.name, oldInstance.id, str(oldInstance.hostId)))
@@ -407,6 +418,7 @@ class ClusterManagerService(object):
 						oldHost.decayed = True
 						self.data.releaseInstance(instance)
 			except Exception, e:
+				self.log.exception("Exception in RegisterNodeManager")
 				oldHost.decayed = True
 				raise
 		finally:
@@ -419,11 +431,14 @@ class ClusterManagerService(object):
 		try:
 			oldInstance = self.data.acquireInstance(instanceId)
 		except TashiException, e:
+			self.data.releaseInstance(oldInstance)
 			if (e.errno == Errors.NoSuchInstanceId):
 				self.log.exception('Got vmUpdate for unknown instanceId %d' % (instanceId))
 				return
 			else:
+				self.log.exception("Could not acquire instance")
 				raise
+
 		if (instance.state == InstanceState.Exited):
 			oldInstance.decayed = False
 			self.updateDecay(self.decayedInstances, oldInstance)

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Sat Jul 16 10:04:54 2011
@@ -49,6 +49,7 @@ class SQL(DataInterface):
 		self.instanceLock = threading.Lock()
 		self.instanceIdLock = threading.Lock()
 		self.instanceLocks = {}
+		self.instanceBusy = {}
 		self.hostLock = threading.Lock()
 		self.hostLocks = {}
 		self.maxInstanceId = 1
@@ -63,7 +64,7 @@ class SQL(DataInterface):
 			try:
 				cur.execute(stmt)
 			except:
-				self.log.exception('Exception executing SQL statement')
+				self.log.exception('Exception executing SQL statement %s' % stmt)
 		finally:
 			self.sqlLock.release()
 		return cur
@@ -135,6 +136,7 @@ class SQL(DataInterface):
 			instance._lock = threading.Lock()
 			self.instanceLocks[instance.id] = instance._lock
 			instance._lock.acquire()
+			self.instanceBusy[instance.id] = True
 			l = self.makeInstanceList(instance)
 			self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
 		finally:
@@ -142,7 +144,13 @@ class SQL(DataInterface):
 		return instance
 	
 	def acquireInstance(self, instanceId):
-		self.instanceLock.acquire()
+		busyCheck = True
+		while busyCheck == True:
+			self.instanceLock.acquire()
+			busyCheck = self.instanceBusy.setdefault(instanceId, False)
+			if busyCheck:
+				self.instanceLock.release()
+
 		try:
 			cur = self.executeStatement("SELECT * from instances WHERE id = %d" % (instanceId))
 			l = cur.fetchone()
@@ -152,6 +160,7 @@ class SQL(DataInterface):
 			self.instanceLocks[instance.id] = self.instanceLocks.get(instance.id, threading.Lock())
 			instance._lock = self.instanceLocks[instance.id]
 			instance._lock.acquire()
+			self.instanceBusy[instance.id] = True
 		finally:
 			self.instanceLock.release()
 		return instance
@@ -166,7 +175,11 @@ class SQL(DataInterface):
 				if (e < len(self.instanceOrder)-1):
 					s = s + ", "
 			self.executeStatement("UPDATE instances SET %s WHERE id = %d" % (s, instance.id))
+			self.instanceBusy[instance.id] = False
 			instance._lock.release()
+		except:
+			self.log.exception("Excepted while holding lock")
+			raise
 		finally:
 			self.instanceLock.release()
 	
@@ -174,8 +187,13 @@ class SQL(DataInterface):
 		self.instanceLock.acquire()
 		try:
 			self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
-			instance._lock.release()
+			#XXXstroucki extraneous instance won't have a lock
+			try:
+				instance._lock.release()
+			except:
+				pass
 			del self.instanceLocks[instance.id]
+			del self.instanceBusy[instance.id]
 		finally:
 			self.instanceLock.release()
 	
@@ -292,7 +310,6 @@ class SQL(DataInterface):
 				self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, id))
 				self.hostLock.release()
 				return r[0], True
-
 		id = self.getNewId("hosts")
 		host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
 		l = self.makeHostList(host)

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Sat Jul 16 10:04:54 2011
@@ -41,7 +41,6 @@ class NodeManagerService(object):
 		self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
 		self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
 		self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
-		self.cmRegisterHost = boolean(config.get("NodeManagerService", "registerHost"))
 		if self.authAndEncrypt:
 			self.username = config.get('AccessClusterManager', 'username')
 			self.password = config.get('AccessClusterManager', 'password')
@@ -65,22 +64,16 @@ class NodeManagerService(object):
 			if (vmId not in vmList):
 				self.log.warning('vmcontrol backend does not report %d' % (vmId))
 				self.vmStateChange(vmId, None, InstanceState.Exited)
-
-		if self.cmRegisterHost:
-			self.registerHost()
-
+		self.registerHost()
 		threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
 		threading.Thread(target=self.registerWithClusterManager).start()
 		threading.Thread(target=self.statsThread).start()
 	
 	def loadVmInfo(self):
 		try:
-			f = open(self.infoFile, "r")
-			data = f.read()
-			f.close()
-			self.instances = cPickle.loads(data)
+			self.instances = self.vmm.getInstances()
 		except Exception, e:
-			self.log.warning('Failed to load VM info from %s' % (self.infoFile))
+			self.log.exception('Failed to obtain VM info')
 			self.instances = {}
 	
 	def saveVmInfo(self):
@@ -321,14 +314,10 @@ class NodeManagerService(object):
 
         def registerHost(self):
                 cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
-		try:
-                	hostname = socket.gethostname()
-			# populate some defaults
-			# XXXstroucki: I think it's better if the nodemanager fills these in properly when sending updates to the clustermanager
-			# XXXstroucki: plus, do you want strange hosts joining your cluster?
-			memory = 0
-			cores = 0
-			version = "empty"
-                	cm.registerHost(hostname, memory, cores, version)
-		except:
-			self.log.exception("Could not auto-register host with CM")
+                hostname = socket.gethostname()
+		# populate some defaults
+		# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
+		memory = 0
+		cores = 0
+		version = "empty"
+                #cm.registerHost(hostname, memory, cores, version)

Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Sat Jul 16 10:04:54 2011
@@ -105,6 +105,7 @@ class Qemu(VmControlInterface):
 		self.consolePortLock = threading.Lock()
 		self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
 		self.stats = {}
+		# XXXstroucki revise
 		self.scratchDir = self.config.get("Qemu", "scratchDir")
 		if len(self.scratchDir) == 0:
 			self.scratchDir = "/tmp"
@@ -122,10 +123,6 @@ class Qemu(VmControlInterface):
 		def __init__(self, **attrs):
 			self.__dict__.update(attrs)
 
-	def __cleanScratchSpace(self, file):
-		time.sleep(5)
-		os.unlink(file)
-	
 	def getSystemPids(self):
 		"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
 		pids = []
@@ -137,15 +134,20 @@ class Qemu(VmControlInterface):
 			except Exception:
 				pass
 		return pids
-	
+
+	def getInstances(self):
+		"""Will return a dict of instances by vmId to the caller"""
+		return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
+
 	def matchSystemPids(self, controlledVMs):
 		"""This is run in a separate polling thread and it must do things that are thread safe"""
 		vmIds = controlledVMs.keys()
 		pids = self.getSystemPids()
 		for vmId in vmIds:
+			child = controlledVMs[vmId]
+
 			if vmId not in pids:
 				os.unlink(self.INFO_DIR + "/%d"%(vmId))
-				child = controlledVMs[vmId]
 				del controlledVMs[vmId]
 				try:
 					del self.stats[vmId]
@@ -170,11 +172,26 @@ class Qemu(VmControlInterface):
 					for i in child.monitorHistory:
 						f.write(i)
 					f.close()
+				#XXXstroucki remove scratch storage
+				scratch_name = child.instance.name
+				log.info("Removing scratch for " + scratch_name)
+				#cmd = "/sbin/lvremove -f vgscratch/lv" + scratch_name
+				cmd = "/sbin/lvremove -f vgscratch"
+    				result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
 				try:
 					if (not child.migratingOut):
 						self.nm.vmStateChange(vmId, None, InstanceState.Exited)
 				except Exception, e:
 					log.exception("vmStateChange failed")
+			else:
+				try:
+					if (child.migratingOut):
+						self.nm.vmStateChange(vmId, None, InstanceState.MigrateTrans)
+					else:
+						self.nm.vmStateChange(vmId, None, InstanceState.Running)
+				except:
+					#XXXstroucki nm is initialised at different time
+					log.exception("vmStateChange failed")
 						
 	
 	def scanInfoDir(self):
@@ -192,6 +209,10 @@ class Qemu(VmControlInterface):
 				self.vncPortLock.release()
 				child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
 				child.monitor = os.fdopen(child.monitorFd)
+
+				#XXXstroucki ensure instance has vmId
+				child.instance.vmId = vmId
+				
 				self.controlledVMs[child.pid] = child
 				log.info("Adding vmId %d" % (child.pid))
 			except Exception, e:
@@ -308,12 +329,6 @@ class Qemu(VmControlInterface):
 		memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
 		if (memoryStr[2] == "kB"):
 			host.memory = int(memoryStr[1])/1024
-		elif (memoryStr[2] == "mB"):
-			host.memory = int(memoryStr[1])
-		elif (memoryStr[2] == "gB"):
-			host.memory = int(memoryStr[1])*1024
-		elif (memoryStr[2] == " B"):
-			host.memory = int(memoryStr[1])/(1024*1024)
 		else:
 			log.warning('Unable to determine amount of physical memory - reporting 0')
 			host.memory = 0
@@ -374,18 +389,17 @@ class Qemu(VmControlInterface):
 
 		try:
 			if scratchSize > 0:
-				print 'creating scratch file'
 				# create scratch disk
 				# XXXstroucki: needs to be cleaned somewhere
-				scratch_file = os.path.join(self.scratchDir, instance.name + ".scratch")
-				print 'scratch file name is ', scratch_file
-				tempfd = open(scratch_file, "w")
-				tempfd.seek( (scratchSize * 2 ** 30) - 1 )
-				tempfd.write('x')
-				tempfd.close()
+				# XXXstroucki: clean user provided instance name
+				scratch_name = "lv" + instance.name
+				# XXXstroucki hold lock
+				# XXXstroucki check for capacity
+				cmd = "/sbin/lvcreate -n" + scratch_name + " -L" + str(scratchSize) + "G vgscratch"
+				result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
 				index += 1
 
-				thisDiskList = [ "file=%s" % scratch_file ]
+				thisDiskList = [ "file=/dev/vgscratch/%s" % scratch_name ]
 				thisDiskList.append("if=%s" % diskInterface)
 				thisDiskList.append("index=%d" % index)
 				thisDiskList.append("cache=off")
@@ -453,16 +467,10 @@ class Qemu(VmControlInterface):
 		child = self.anonClass(pid=pid, instance=instance, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = True, OSchild = True)
 		child.ptyFile = None
 		child.vncPort = -1
+		child.instance.vmId = child.pid
 		self.saveChildInfo(child)
 		self.controlledVMs[child.pid] = child
 		log.info("Adding vmId %d" % (child.pid))
-
-		# clean up scratch file
-		if scratchSize > 0 and scratch_file is not None:
-			# do this in the background
-			threading.Thread(target=self.__cleanScratchSpace,args=[scratch_file]).start()
-
-
 		return (child.pid, cmd)
 
 	def getPtyInfo(self, child, issueContinue):

Modified: incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py?rev=1147396&r1=1147395&r2=1147396&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py (original)
+++ incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py Sat Jul 16 10:04:54 2011
@@ -49,7 +49,9 @@ class client:
 		def connectWrap(*args):
 			args = cPickle.dumps(clean(args))
 			try:
+				# XXXstroucki this is raising exception but not getting handled
 				res = getattr(self.conn.root, name)(args)
+				# XXXstroucki handle exception coming back
 			except Exception, e:
 				self.conn.close()
 				raise e