You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2011/09/02 20:54:38 UTC

svn commit: r1164711 - in /incubator/tashi/trunk/src/tashi: agents/primitive.py client/tashi-client.py clustermanager/clustermanagerservice.py clustermanager/data/sql.py nodemanager/vmcontrol/qemu.py

Author: stroucki
Date: Fri Sep  2 20:54:38 2011
New Revision: 1164711

URL: http://svn.apache.org/viewvc?rev=1164711&view=rev
Log:
qemu.py: avoid throwing exception when some object handles are not yet set
primitive.py: added empty line
clustermanagerservice.py: create local function __now() rather than using time.time()
   keep VMs without host in status pending when restarting CM
   fix deadlock when instance had been removed from CM DB and it was being reconciled
   checking for decayed instances, do time check before locking instance to avoid unnecessary locking
   convert vmId to printable string just in case it happens to be NaN
sql.py: if no instances exist, maxInstanceId = 0
tashi-client.py: fix counter string length corner case if instance count = 10^x

Modified:
    incubator/tashi/trunk/src/tashi/agents/primitive.py
    incubator/tashi/trunk/src/tashi/client/tashi-client.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
    incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py

Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Fri Sep  2 20:54:38 2011
@@ -62,6 +62,7 @@ class Primitive(object):
 		# load's keys are the host id, or None if not on a host. values are instance ids
 		load = {}
 		ctr = 0
+
 		for h in self.cm.getHosts():
 			#XXXstroucki get all hosts here?
 			#if (h.up == True and h.state == HostState.Normal):

Modified: incubator/tashi/trunk/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/client/tashi-client.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/trunk/src/tashi/client/tashi-client.py Fri Sep  2 20:54:38 2011
@@ -158,7 +158,8 @@ def getVmLayout():
 	return hosts.values()
 
 def createMany(instance, count):
-	l = len(str(count))
+	# will create instances from 0 to count-1
+	l = len(str(count) - 1)
 	basename = instance.name
 	instances = []
 	for i in range(0, count):
@@ -359,7 +360,6 @@ def makeTable(list, keys=None):
 		stdout = os.popen("stty size")
 		r = stdout.read()
 		stdout.close()
-		#(consoleHeight, consoleWidth) = map(lambda x: int(x.strip()), r.split())
 	except:
 		pass
 	for obj in list:

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Fri Sep  2 20:54:38 2011
@@ -51,12 +51,17 @@ class ClusterManagerService(object):
 		self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
 		self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
 		self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
-		now = time.time()
+		now = self.__now()
 		for instance in self.data.getInstances().itervalues():
 			instanceId = instance.id
 			instance = self.data.acquireInstance(instanceId)
 			instance.decayed = False
-			self.stateTransition(instance, None, InstanceState.Orphaned)
+
+			if instance.hostId is None:
+				self.stateTransition(instance, None, InstanceState.Pending)
+			else:
+				self.stateTransition(instance, None, InstanceState.Orphaned)
+
 			self.data.releaseInstance(instance)
 		for host in self.data.getHosts().itervalues():
 			hostId = host.id
@@ -71,6 +76,9 @@ class ClusterManagerService(object):
 			raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
 		instance.state = cur
 
+	def __now(self):
+		return time.time()
+
 	def __downHost(self, host):
 		self.log.warning('Host %s is down' % (host.name))
 		host.up = False
@@ -99,9 +107,8 @@ class ClusterManagerService(object):
 		# Check if hosts have been heard from recently
 		# Otherwise, see if it is alive
 
-		now = time.time()
 		for hostId in self.hostLastContactTime.keys():
-			if (self.hostLastContactTime[hostId] < (now-self.expireHostTime)):
+			if (self.hostLastContactTime[hostId] < (self.__now() - self.expireHostTime)):
 				host = self.data.acquireHost(hostId)
 				string = None
 				try:
@@ -114,7 +121,7 @@ class ClusterManagerService(object):
 					del self.hostLastContactTime[hostId]
 				else:
 					self.__upHost(host)
-					self.hostLastContactTime[hostId] = now
+					self.hostLastContactTime[hostId] = self.__now()
 
 				self.data.releaseHost(host)
 
@@ -132,13 +139,11 @@ class ClusterManagerService(object):
 		if myInstancesError == True:
 			return
 
-		now = time.time()
-
 		# iterate through all hosts I believe are up
 		for hostId in self.hostLastContactTime.keys():
 			#self.log.warning("iterate %d" % hostId)
 			host = self.data.acquireHost(hostId)
-			if (self.hostLastContactTime[hostId] < (now - self.allowDecayed)):
+			if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
 				host.decayed = True
 
 				self.log.info('Fetching state from host %s because it is decayed' % (host.name))
@@ -164,12 +169,20 @@ class ClusterManagerService(object):
 				# remove instances that shouldn't be running
 				for instance in myInstancesThisHost:
 					if (instance.id not in remoteInstanceIds):
-						instance = self.data.acquireInstance(instance.id)
+						# XXXstroucki before 20110902 excepted here with host lock
+						try:
+							instance = self.data.acquireInstance(instance.id)
+						except:
+							continue
+
 						# XXXstroucki destroy?
-						del self.instanceLastContactTime[instance.id]
+						try:
+							del self.instanceLastContactTime[instance.id]
+						except:
+							pass
 						self.data.removeInstance(instance)
 
-				self.hostLastContactTime[hostId] = now
+				self.hostLastContactTime[hostId] = self.__now()
 				host.decayed = False
 
 			self.data.releaseHost(host)
@@ -177,8 +190,12 @@ class ClusterManagerService(object):
 		
 		# iterate through all VMs I believe are active
 		for instanceId in self.instanceLastContactTime.keys():
-			instance = self.data.acquireInstance(instanceId)
-			if (self.instanceLastContactTime[instanceId] < (now - self.allowDecayed)):
+			if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+				try:
+					instance = self.data.acquireInstance(instanceId)
+				except:
+					continue
+
 				instance.decayed = True
 				self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
 				if instance.hostId is None: raise AssertionError
@@ -198,10 +215,10 @@ class ClusterManagerService(object):
 				# replace existing state with new state
 				# XXXstroucki more?
 				instance.state = newInstance.state
-				self.instanceLastContactTime[instanceId] = now
+				self.instanceLastContactTime[instanceId] = self.__now()
 				instance.decayed = False
+				self.data.releaseInstance(instance)
 
-			self.data.releaseInstance(instance)
 
 
 	def monitorCluster(self):
@@ -247,6 +264,7 @@ class ClusterManagerService(object):
 	
 	def createVm(self, instance):
 		"""Function to add a VM to the list of pending VMs"""
+		# XXXstroucki: check for exception here
 		instance = self.normalize(instance)
 		instance = self.data.registerInstance(instance)
 		self.data.releaseInstance(instance)
@@ -284,7 +302,7 @@ class ClusterManagerService(object):
 						self.proxy[hostname].destroyVm(instance.vmId)
 						self.data.releaseInstance(instance)
 				except:
-					self.log.exception('destroyVm failed on host %s vmId %d' % (hostname, instance.vmId))
+					self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
 					self.data.removeInstance(instance)
 
 

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Fri Sep  2 20:54:38 2011
@@ -32,7 +32,7 @@ class SQL(DataInterface):
 		if (self.uri.startswith("sqlite://")):
 			import sqlite
 			self.dbEngine = "sqlite"
-			self.conn = sqlite.connect(self.uri[9:], autocommit=1)
+			self.conn = sqlite.connect(self.uri[9:], autocommit=1, timeout=1500)
 		elif (self.uri.startswith("mysql://")):
 			import MySQLdb
 			self.dbEngine = "mysql"
@@ -73,6 +73,9 @@ class SQL(DataInterface):
 		self.instanceIdLock.acquire()
 		cur = self.executeStatement("SELECT MAX(id) FROM instances")
 		self.maxInstanceId = cur.fetchone()[0]
+		# XXXstroucki perhaps this can be handled nicer
+		if (self.maxInstanceId is None):
+			self.maxInstanceId = 0
 		self.maxInstanceId = self.maxInstanceId + 1
 		instanceId = self.maxInstanceId
 		self.instanceIdLock.release()
@@ -165,6 +168,7 @@ class SQL(DataInterface):
 			self.instanceBusy[instance.id] = True
 		finally:
 			self.instanceLock.release()
+
 		return instance
 	
 	def releaseInstance(self, instance):

Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Fri Sep  2 20:54:38 2011
@@ -141,6 +141,11 @@ class Qemu(VmControlInterface):
 
 	def matchSystemPids(self, controlledVMs):
 		"""This is run in a separate polling thread and it must do things that are thread safe"""
+		if self.nm is None:
+			#XXXstroucki log may not be there yet either
+			#self.log.info("NM hook not yet available")
+			return
+
 		vmIds = controlledVMs.keys()
 		pids = self.getSystemPids()
 		for vmId in vmIds:
@@ -219,6 +224,7 @@ class Qemu(VmControlInterface):
 				log.exception("Failed to load VM info for %d", vmId)
 			else:
 				log.info("Loaded VM info for %d", vmId)
+		# XXXstroucki NM may not be available yet here.
 		self.matchSystemPids(self.controlledVMs)
 	
 	def pollVMsLoop(self):