You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2011/05/16 23:41:53 UTC
svn commit: r1103954 - /incubator/tashi/trunk/src/tashi/agents/primitive.py
Author: stroucki
Date: Mon May 16 23:41:53 2011
New Revision: 1103954
URL: http://svn.apache.org/viewvc?rev=1103954&view=rev
Log:
primitive.py: refactor code to eliminate duplication and indentation
primitive.py: round-robin through host list (replacing randomisation)
Modified:
incubator/tashi/trunk/src/tashi/agents/primitive.py
Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=1103954&r1=1103953&r2=1103954&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Mon May 16 23:41:53 2011
@@ -48,139 +48,156 @@ class Primitive(object):
self.hooks.append(instantiateImplementation(value, config, client, False))
except:
self.log.exception("Failed to load hook %s" % (value))
+ self.hosts = {}
+ self.load = {}
+ self.instances = {}
+ self.muffle = {}
+ self.lastScheduledHost = 0
+
+
+ def __getState(self):
+ # Generate a list of hosts and
+ # current loading of VMs per host
+ hosts = {}
+ # load's keys are the host id, or None if not on a host. values are instance ids
+ load = {}
+ ctr = 0
+ for h in self.client.getHosts():
+ if (h.up == True and h.state == HostState.Normal):
+ hosts[ctr] = h
+ ctr = ctr + 1
+ load[h.id] = []
+
+ load[None] = []
+ _instances = self.client.getInstances()
+ instances = {}
+ for i in _instances:
+ instances[i.id] = i
+ for i in instances.itervalues():
+ # XXXstroucki: do we need to look at Held machines here?
+ if (i.hostId or i.state == InstanceState.Pending):
+ # Nonrunning VMs will have hostId of None
+ load[i.hostId] = load[i.hostId] + [i.id]
+
+ self.hosts = hosts
+ self.load = load
+ self.instances = instances
+
+ def __checkCapacity(self, host, inst):
+ # ensure host can carry new load
+ memUsage = reduce(lambda x, y: x + self.instances[y].memory, self.load[host.id], inst.memory)
+ coreUsage = reduce(lambda x, y: x + self.instances[y].cores, self.load[host.id], inst.cores)
+ if (memUsage <= host.memory and coreUsage <= host.cores):
+ return True
+
+ return False
+ def __scheduleInstance(self, inst):
+ try:
+ minMax = None
+ minMaxHost = None
+ minMaxCtr = None
+
+ targetHost = inst.hints.get("targetHost", None)
+ try:
+ allowElsewhere = boolean(inst.hints.get("allowElsewhere", "False"))
+ except Exception, e:
+ allowElsewhere = False
+ # has a host preference been expressed?
+ if (targetHost != None):
+ for h in hosts.values():
+ # if this is not the host we are looking for, continue
+ if ((str(h.id) != targetHost and h.name != targetHost)):
+ continue
+ # we found the targetHost
+ # If a host machine is reserved, only allow if userid is in reserved list
+ if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
+ # Machine is reserved and not available for userId.
+ # XXXstroucki: Should we log something here for analysis?
+ break
+ if self.__checkCapacity(h, inst):
+ minMax = len(load[h.id])
+ minMaxHost = h
+
+
+ # end targethost != none
+
+
+ # If we don't have a host yet, find one here
+ if ((targetHost == None or allowElsewhere) and minMaxHost == None):
+ # cycle list
+ for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
+ h = self.hosts[ctr]
+
+ # if it's reserved, see if we can use it
+ if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
+ # reserved for somebody else, so find another machine
+ continue
+
+ # implement dense packing policy:
+ # consider this host if
+ # minMax has not been modified or
+ # the number of vms here is greater than minmax if we're dense packing or
+ # the number of vms here is less than minmax if we're not dense packing
+ if (minMax is None or (self.densePack and len(self.load[h.id]) > minMax) or (not self.densePack and len(self.load[h.id]) < minMax)):
+ if self.__checkCapacity(h, inst):
+ minMax = len(self.load[h.id])
+ minMaxHost = h
+ minMaxCtr = ctr
+
+ if (minMaxHost):
+ # found a host
+ self.lastScheduledHost = minMaxCtr
+ if (not inst.hints.get("__resume_source", None)):
+ # only run preCreate hooks if newly starting
+ for hook in self.hooks:
+ hook.preCreate(inst)
+ self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))
+ self.client.activateVm(inst.id, minMaxHost)
+ self.load[minMaxHost.id] = self.load[minMaxHost.id] + [inst.id]
+ # get rid of its possible entry in muffle if VM is scheduled to a host
+ if (inst.name in self.muffle):
+ self.muffle.pop(inst.name)
+ else:
+ # did not find a host
+ if (inst.name not in self.muffle):
+ self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
+ self.muffle[inst.name] = True
+
+ except Exception, e:
+ # XXXstroucki: how can we get here?
+ if (inst.name not in self.muffle):
+ self.log.exception("Failed to schedule or activate %s" % (inst.name))
+ self.muffle[inst.name] = True
+
def start(self):
oldInstances = {}
- muffle = {}
+
while True:
try:
- # Generate a list of hosts and
- # current loading of VMs per host
- hosts = {}
- load = {}
- ctr = 0
- for h in self.client.getHosts():
- hosts[ctr] = h
- ctr = ctr + 1
- load[h.id] = []
-
- # shuffle host list to not continuously schedule on a failed or unsuitable machine
- random.shuffle(hosts)
-
- load[None] = []
- _instances = self.client.getInstances()
- instances = {}
- for i in _instances:
- instances[i.id] = i
- for i in instances.itervalues():
- # XXXstroucki: do we need to look at Held machines here?
- if (i.hostId or i.state == InstanceState.Pending):
- # Nonrunning VMs will have hostId of None
- load[i.hostId] = load[i.hostId] + [i.id]
-
+ self.__getState()
# Check for VMs that have exited and call
# postDestroy hook
for i in oldInstances:
- # XXXstroucki: do we need to look at Held machines here?
- if (i not in instances and oldInstances[i].state != InstanceState.Pending):
+ # XXXstroucki what about paused and saved VMs?
+ # XXXstroucki: do we need to look at Held VMs here?
+ if (i not in self.instances and oldInstances[i].state == InstanceState.Running):
for hook in self.hooks:
hook.postDestroy(oldInstances[i])
- oldInstances = instances
+ oldInstances = self.instances
- if (len(load.get(None, [])) > 0):
+ if (len(self.load.get(None, [])) > 0):
# Schedule VMs if they are waiting
# sort by id number (FIFO?)
- load[None].sort()
- for i in load[None]:
- inst = instances[i]
- try:
- minMax = None
- minMaxHost = None
- targetHost = inst.hints.get("targetHost", None)
- try:
- allowElsewhere = boolean(inst.hints.get("allowElsewhere", "False"))
- except Exception, e:
- allowElsewhere = False
- # has a host preference been expressed?
- if (targetHost != None):
- for h in hosts.values():
- # if this is not the host we are looking for, continue
- if ((str(h.id) != targetHost and h.name != targetHost)):
- continue
- # we found the targetHost
- # If a host machine is reserved, only allow if userid is in reserved list
- if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
- # Machine is reserved and not available for userId.
- # XXXstroucki: Should we log something here for analysis?
- break
- # make sure that host is up, and in a normal state
- if (h.up == True and h.state == HostState.Normal):
- # ensure host can carry new load
- memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
- coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
- if (memUsage <= h.memory and coreUsage <= h.cores):
- minMax = len(load[h.id])
- minMaxHost = h
-
- # end targethost != none
-
-
- # If we don't have a host yet, find one here
- if ((targetHost == None or allowElsewhere) and minMaxHost == None):
- for h in hosts.values():
- # if the machine is suitable to host a vm, lets look at it
- if (h.up == True and h.state == HostState.Normal):
- pass
- else:
- # otherwise find another machine
- continue
-
- # if it's reserved, see if we can use it
- if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
- # reserved for somebody else, so find another machine
- continue
-
- # implement dense packing policy:
- # consider this host if
- # minMax has not been modified or
- # the number of vms here is greater than minmax if we're dense packing or
- # the number of vms here is less than minmax if we're not dense packing
- if (minMax is None or (self.densePack and len(load[h.id]) > minMax) or (not self.densePack and len(load[h.id]) < minMax)):
- memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
- coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
- if (memUsage <= h.memory and coreUsage <= h.cores):
- minMax = len(load[h.id])
- minMaxHost = h
-
- if (minMaxHost):
- # found a host
- if (not inst.hints.get("__resume_source", None)):
- # only run preCreate hooks if newly starting
- for hook in self.hooks:
- hook.preCreate(inst)
- self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))
- self.client.activateVm(i, minMaxHost)
- load[minMaxHost.id] = load[minMaxHost.id] + [i]
- # get rid of its possible entry in muffle if VM is scheduled to a host
- if (inst.name in muffle):
- muffle.pop(inst.name)
- else:
- # did not find a host
- if (inst.name not in muffle):
- self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
- muffle[inst.name] = True
-
-
- except Exception, e:
- # XXXstroucki: how can we get here?
- if (inst.name not in muffle):
- self.log.exception("Failed to schedule or activate %s" % (inst.name))
- muffle[inst.name] = True
-
+ self.load[None].sort()
+ for i in self.load[None]:
+ inst = self.instances[i]
+ self.__scheduleInstance(inst)
+ # end for unassigned vms
except TashiException, e: