You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2011/09/02 20:54:38 UTC
svn commit: r1164711 - in /incubator/tashi/trunk/src/tashi:
agents/primitive.py client/tashi-client.py
clustermanager/clustermanagerservice.py clustermanager/data/sql.py
nodemanager/vmcontrol/qemu.py
Author: stroucki
Date: Fri Sep 2 20:54:38 2011
New Revision: 1164711
URL: http://svn.apache.org/viewvc?rev=1164711&view=rev
Log:
qemu.py: avoid throwing exception when some object handles are not yet set
primitive.py: added empty line
clustermanagerservice.py: create local function __now() rather than using time.time()
keep VMs without host in status pending when restarting CM
fix deadlock when instance had been removed from CM DB and it was being reconciled
checking for decayed instances, do time check before locking instance to avoid unnecessary locking
convert vmId to printable string just in case it happens to be NaN
sql.py: if no instances exist, maxInstanceId = 0
tashi-client.py: fix counter string length corner case if instance count = 10^x
Modified:
incubator/tashi/trunk/src/tashi/agents/primitive.py
incubator/tashi/trunk/src/tashi/client/tashi-client.py
incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Fri Sep 2 20:54:38 2011
@@ -62,6 +62,7 @@ class Primitive(object):
# load's keys are the host id, or None if not on a host. values are instance ids
load = {}
ctr = 0
+
for h in self.cm.getHosts():
#XXXstroucki get all hosts here?
#if (h.up == True and h.state == HostState.Normal):
Modified: incubator/tashi/trunk/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/client/tashi-client.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/trunk/src/tashi/client/tashi-client.py Fri Sep 2 20:54:38 2011
@@ -158,7 +158,8 @@ def getVmLayout():
return hosts.values()
def createMany(instance, count):
- l = len(str(count))
+ # will create instances from 0 to count-1
+ l = len(str(count) - 1)
basename = instance.name
instances = []
for i in range(0, count):
@@ -359,7 +360,6 @@ def makeTable(list, keys=None):
stdout = os.popen("stty size")
r = stdout.read()
stdout.close()
- #(consoleHeight, consoleWidth) = map(lambda x: int(x.strip()), r.split())
except:
pass
for obj in list:
Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Fri Sep 2 20:54:38 2011
@@ -51,12 +51,17 @@ class ClusterManagerService(object):
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
- now = time.time()
+ now = self.__now()
for instance in self.data.getInstances().itervalues():
instanceId = instance.id
instance = self.data.acquireInstance(instanceId)
instance.decayed = False
- self.stateTransition(instance, None, InstanceState.Orphaned)
+
+ if instance.hostId is None:
+ self.stateTransition(instance, None, InstanceState.Pending)
+ else:
+ self.stateTransition(instance, None, InstanceState.Orphaned)
+
self.data.releaseInstance(instance)
for host in self.data.getHosts().itervalues():
hostId = host.id
@@ -71,6 +76,9 @@ class ClusterManagerService(object):
raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
instance.state = cur
+ def __now(self):
+ return time.time()
+
def __downHost(self, host):
self.log.warning('Host %s is down' % (host.name))
host.up = False
@@ -99,9 +107,8 @@ class ClusterManagerService(object):
# Check if hosts have been heard from recently
# Otherwise, see if it is alive
- now = time.time()
for hostId in self.hostLastContactTime.keys():
- if (self.hostLastContactTime[hostId] < (now-self.expireHostTime)):
+ if (self.hostLastContactTime[hostId] < (self.__now() - self.expireHostTime)):
host = self.data.acquireHost(hostId)
string = None
try:
@@ -114,7 +121,7 @@ class ClusterManagerService(object):
del self.hostLastContactTime[hostId]
else:
self.__upHost(host)
- self.hostLastContactTime[hostId] = now
+ self.hostLastContactTime[hostId] = self.__now()
self.data.releaseHost(host)
@@ -132,13 +139,11 @@ class ClusterManagerService(object):
if myInstancesError == True:
return
- now = time.time()
-
# iterate through all hosts I believe are up
for hostId in self.hostLastContactTime.keys():
#self.log.warning("iterate %d" % hostId)
host = self.data.acquireHost(hostId)
- if (self.hostLastContactTime[hostId] < (now - self.allowDecayed)):
+ if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
host.decayed = True
self.log.info('Fetching state from host %s because it is decayed' % (host.name))
@@ -164,12 +169,20 @@ class ClusterManagerService(object):
# remove instances that shouldn't be running
for instance in myInstancesThisHost:
if (instance.id not in remoteInstanceIds):
- instance = self.data.acquireInstance(instance.id)
+ # XXXstroucki before 20110902 excepted here with host lock
+ try:
+ instance = self.data.acquireInstance(instance.id)
+ except:
+ continue
+
# XXXstroucki destroy?
- del self.instanceLastContactTime[instance.id]
+ try:
+ del self.instanceLastContactTime[instance.id]
+ except:
+ pass
self.data.removeInstance(instance)
- self.hostLastContactTime[hostId] = now
+ self.hostLastContactTime[hostId] = self.__now()
host.decayed = False
self.data.releaseHost(host)
@@ -177,8 +190,12 @@ class ClusterManagerService(object):
# iterate through all VMs I believe are active
for instanceId in self.instanceLastContactTime.keys():
- instance = self.data.acquireInstance(instanceId)
- if (self.instanceLastContactTime[instanceId] < (now - self.allowDecayed)):
+ if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+ try:
+ instance = self.data.acquireInstance(instanceId)
+ except:
+ continue
+
instance.decayed = True
self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
if instance.hostId is None: raise AssertionError
@@ -198,10 +215,10 @@ class ClusterManagerService(object):
# replace existing state with new state
# XXXstroucki more?
instance.state = newInstance.state
- self.instanceLastContactTime[instanceId] = now
+ self.instanceLastContactTime[instanceId] = self.__now()
instance.decayed = False
+ self.data.releaseInstance(instance)
- self.data.releaseInstance(instance)
def monitorCluster(self):
@@ -247,6 +264,7 @@ class ClusterManagerService(object):
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
+ # XXXstroucki: check for exception here
instance = self.normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
@@ -284,7 +302,7 @@ class ClusterManagerService(object):
self.proxy[hostname].destroyVm(instance.vmId)
self.data.releaseInstance(instance)
except:
- self.log.exception('destroyVm failed on host %s vmId %d' % (hostname, instance.vmId))
+ self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
self.data.removeInstance(instance)
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Fri Sep 2 20:54:38 2011
@@ -32,7 +32,7 @@ class SQL(DataInterface):
if (self.uri.startswith("sqlite://")):
import sqlite
self.dbEngine = "sqlite"
- self.conn = sqlite.connect(self.uri[9:], autocommit=1)
+ self.conn = sqlite.connect(self.uri[9:], autocommit=1, timeout=1500)
elif (self.uri.startswith("mysql://")):
import MySQLdb
self.dbEngine = "mysql"
@@ -73,6 +73,9 @@ class SQL(DataInterface):
self.instanceIdLock.acquire()
cur = self.executeStatement("SELECT MAX(id) FROM instances")
self.maxInstanceId = cur.fetchone()[0]
+ # XXXstroucki perhaps this can be handled nicer
+ if (self.maxInstanceId is None):
+ self.maxInstanceId = 0
self.maxInstanceId = self.maxInstanceId + 1
instanceId = self.maxInstanceId
self.instanceIdLock.release()
@@ -165,6 +168,7 @@ class SQL(DataInterface):
self.instanceBusy[instance.id] = True
finally:
self.instanceLock.release()
+
return instance
def releaseInstance(self, instance):
Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1164711&r1=1164710&r2=1164711&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Fri Sep 2 20:54:38 2011
@@ -141,6 +141,11 @@ class Qemu(VmControlInterface):
def matchSystemPids(self, controlledVMs):
"""This is run in a separate polling thread and it must do things that are thread safe"""
+ if self.nm is None:
+ #XXXstroucki log may not be there yet either
+ #self.log.info("NM hook not yet available")
+ return
+
vmIds = controlledVMs.keys()
pids = self.getSystemPids()
for vmId in vmIds:
@@ -219,6 +224,7 @@ class Qemu(VmControlInterface):
log.exception("Failed to load VM info for %d", vmId)
else:
log.info("Loaded VM info for %d", vmId)
+ # XXXstroucki NM may not be available yet here.
self.matchSystemPids(self.controlledVMs)
def pollVMsLoop(self):