You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/02/08 04:56:24 UTC
svn commit: r1241776 [2/2] - in /incubator/tashi/branches/stable: ./ doc/
etc/ scripts/ src/tashi/ src/tashi/accounting/ src/tashi/agents/
src/tashi/client/ src/tashi/clustermanager/ src/tashi/clustermanager/data/
src/tashi/dfs/ src/tashi/nodemanager/ ...
Modified: incubator/tashi/branches/stable/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/branches/stable/src/tashi/nodemanager/vmcontrol/qemu.py Wed Feb 8 04:56:23 2012
@@ -26,12 +26,11 @@ import socket
import subprocess
import sys
import time
+import shlex
-# for scratch space support
-from os import system
-
-from tashi.rpycservices.rpyctypes import *
-from tashi.util import broken, logged, scrubString, boolean
+#from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import InstanceState, Host
+from tashi.util import scrubString, boolean
from tashi import version, stringPartition
from vmcontrolinterface import VmControlInterface
@@ -47,7 +46,7 @@ def controlConsole(child, port):
try:
listenSocket.listen(5)
ls = listenSocket.fileno()
- input = child.monitorFd
+ #input = child.monitorFd
output = child.monitorFd
#print "listen"
select.select([ls], [], [])
@@ -96,6 +95,10 @@ class Qemu(VmControlInterface):
self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument"))
self.statsInterval = float(self.config.get("Qemu", "statsInterval"))
+ # XXXstroucki amount of reserved memory could be configurable
+ self.reservedMem = 512
+ # XXXstroucki perhaps make this configurable
+ self.ifPrefix = "tashi"
self.controlledVMs = {}
self.usedPorts = []
self.usedPortsLock = threading.Lock()
@@ -115,8 +118,10 @@ class Qemu(VmControlInterface):
os.mkdir(self.INFO_DIR)
except:
pass
- self.scanInfoDir()
- threading.Thread(target=self.pollVMsLoop).start()
+
+ self.__scanInfoDir()
+
+ threading.Thread(target=self.__pollVMsLoop).start()
if (self.statsInterval > 0):
threading.Thread(target=self.statsThread).start()
@@ -124,7 +129,7 @@ class Qemu(VmControlInterface):
def __init__(self, **attrs):
self.__dict__.update(attrs)
- def getSystemPids(self):
+ def __getHostPids(self):
"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
pids = []
for f in os.listdir("/proc"):
@@ -136,83 +141,105 @@ class Qemu(VmControlInterface):
pass
return pids
+ # extern
def getInstances(self):
"""Will return a dict of instances by vmId to the caller"""
return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
- def matchSystemPids(self, controlledVMs):
+ def __matchHostPids(self, controlledVMs):
"""This is run in a separate polling thread and it must do things that are thread safe"""
- if self.nm is None:
- #XXXstroucki log may not be there yet either
- #self.log.info("NM hook not yet available")
- return
vmIds = controlledVMs.keys()
- pids = self.getSystemPids()
+ pids = self.__getHostPids()
+
for vmId in vmIds:
child = controlledVMs[vmId]
+ instance = child.instance
+ name = instance.name
if vmId not in pids:
+ # VM is no longer running, but is still
+ # considered controlled
+
+ # remove info file
os.unlink(self.INFO_DIR + "/%d"%(vmId))
+
+ # XXXstroucki why not use self.controlledVMs
+ # argument, so why modify this fn's formal?
del controlledVMs[vmId]
+
+ # remove any stats (appropriate?)
try:
del self.stats[vmId]
except:
pass
+
if (child.vncPort >= 0):
self.vncPortLock.acquire()
self.vncPorts.remove(child.vncPort)
self.vncPortLock.release()
- log.info("Removing vmId %d" % (vmId))
+
+ log.info("Removing vmId %d because it is no longer running" % (vmId))
+
+ # if the VM was started from this process,
+ # wait on it
if (child.OSchild):
try:
os.waitpid(vmId, 0)
except:
- log.exception("waitpid failed")
+ log.exception("waitpid failed for vmId" % (vmId))
+ # recover the child's stderr and monitor
+ # output if possible
if (child.errorBit):
if (child.OSchild):
f = open("/tmp/%d.err" % (vmId), "w")
f.write(child.stderr.read())
f.close()
+
f = open("/tmp/%d.pty" % (vmId), "w")
for i in child.monitorHistory:
f.write(i)
f.close()
- #XXXstroucki remove scratch storage
+
+ # remove scratch storage
try:
if self.scratchVg is not None:
- scratch_name = child.instance.name
- log.info("Removing any scratch for " + scratch_name)
- cmd = "/sbin/lvremove -f %s" % self.scratchVg
- result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
+ log.info("Removing any scratch for %s" % (name))
+ cmd = "/sbin/lvremove --quiet -f %s" % self.scratchVg
+ result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
except:
+ log.warning("Problem cleaning scratch volumes")
pass
+ # let the NM know
try:
if (not child.migratingOut):
self.nm.vmStateChange(vmId, None, InstanceState.Exited)
- except Exception, e:
- log.exception("vmStateChange failed")
+ except Exception:
+ log.exception("vmStateChange failed for VM %s" % (name))
else:
+ # VM is still running
try:
+
if (child.migratingOut):
self.nm.vmStateChange(vmId, None, InstanceState.MigrateTrans)
- else:
+ elif (instance.state == InstanceState.Orphaned) or \
+ (instance.state == InstanceState.Activating):
self.nm.vmStateChange(vmId, None, InstanceState.Running)
except:
- #XXXstroucki nm is initialised at different time
- log.exception("vmStateChange failed")
+ log.exception("vmStateChange failed for VM %s" % (name))
-
- def scanInfoDir(self):
+
+ # called once on startup
+ def __scanInfoDir(self):
"""This is not thread-safe and must only be used during class initialization"""
controlledVMs = {}
controlledVMs.update(map(lambda x: (int(x), self.anonClass(OSchild=False, errorBit=False, migratingOut=False)), os.listdir(self.INFO_DIR + "/")))
if (len(controlledVMs) == 0):
- log.info("No vm information found in %s", self.INFO_DIR)
+ log.info("No VM information found in %s" % (self.INFO_DIR))
for vmId in controlledVMs:
try:
- child = self.loadChildInfo(vmId)
+ child = self.__loadChildInfo(vmId)
self.vncPortLock.acquire()
if (child.vncPort >= 0):
self.vncPorts.append(child.vncPort)
@@ -223,40 +250,46 @@ class Qemu(VmControlInterface):
#XXXstroucki ensure instance has vmId
child.instance.vmId = vmId
- self.controlledVMs[child.pid] = child
- log.info("Adding vmId %d" % (child.pid))
- except Exception, e:
+ self.controlledVMs[vmId] = child
+ except Exception:
log.exception("Failed to load VM info for %d", vmId)
else:
log.info("Loaded VM info for %d", vmId)
- # XXXstroucki NM may not be available yet here.
- try:
- self.matchSystemPids(self.controlledVMs)
- except:
- pass
-
- def pollVMsLoop(self):
+ # service thread
+ def __pollVMsLoop(self):
"""Infinite loop that checks for dead VMs"""
+
+ # As of 2011-12-30, nm is None when this is called, and
+ # is set later by the NM. Things further down require
+ # access to the NM, so wait until it is set.
+ # Moved into __pollVMsLoop since putting it in this thread
+ # will allow the init to complete and nm to be actually
+ # set.
+
+ while self.nm is None:
+ log.info("Waiting for NM initialization")
+ time.sleep(2)
+
while True:
try:
time.sleep(self.POLL_DELAY)
- self.matchSystemPids(self.controlledVMs)
+ self.__matchHostPids(self.controlledVMs)
except:
log.exception("Exception in poolVMsLoop")
- def waitForExit(self, vmId):
+ def __waitForExit(self, vmId):
"""This waits until an element is removed from the dictionary -- the polling thread must detect an exit"""
while vmId in self.controlledVMs:
time.sleep(self.POLL_DELAY)
- def getChildFromPid(self, pid):
+ def __getChildFromPid(self, pid):
"""Do a simple dictionary lookup, but raise a unique exception if the key doesn't exist"""
child = self.controlledVMs.get(pid, None)
if (not child):
raise Exception, "Uncontrolled vmId %d" % (pid)
return child
- def consumeAvailable(self, child):
+ def __consumeAvailable(self, child):
"""Consume characters one-by-one until they stop coming"""
monitorFd = child.monitorFd
buf = ""
@@ -299,9 +332,9 @@ class Qemu(VmControlInterface):
child.monitorHistory.append(buf[len(needle):])
return buf[len(needle):]
- def enterCommand(self, child, command, expectPrompt = True, timeout = -1):
+ def __enterCommand(self, child, command, expectPrompt = True, timeout = -1):
"""Enter a command on the qemu monitor"""
- res = self.consumeAvailable(child)
+ res = self.__consumeAvailable(child)
os.write(child.monitorFd, command + "\n")
if (expectPrompt):
# XXXstroucki: receiving a vm can take a long time
@@ -309,7 +342,7 @@ class Qemu(VmControlInterface):
res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
return res
- def loadChildInfo(self, vmId):
+ def __loadChildInfo(self, vmId):
child = self.anonClass(pid=vmId)
info = open(self.INFO_DIR + "/%d"%(child.pid), "r")
(instance, pid, ptyFile) = cPickle.load(info)
@@ -331,11 +364,12 @@ class Qemu(VmControlInterface):
child.vncPort = -1
return child
- def saveChildInfo(self, child):
+ def __saveChildInfo(self, child):
info = open(self.INFO_DIR + "/%d"%(child.pid), "w")
cPickle.dump((child.instance, child.pid, child.ptyFile), info)
info.close()
+ # extern
def getHostInfo(self, service):
host = Host()
host.id = service.id
@@ -344,7 +378,7 @@ class Qemu(VmControlInterface):
memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
if (memoryStr[2] == "kB"):
# XXXstroucki should have parameter for reserved mem
- host.memory = (int(memoryStr[1])/1024) - 512
+ host.memory = (int(memoryStr[1])/1024) - self.reservedMem
else:
log.warning('Unable to determine amount of physical memory - reporting 0')
host.memory = 0
@@ -353,22 +387,39 @@ class Qemu(VmControlInterface):
host.decayed = False
host.version = version
return host
-
- def startVm(self, instance, source):
+
+ def __stripSpace(self, s):
+ return "".join(s.split())
+
+ def __startVm(self, instance, source):
"""Universal function to start a VM -- used by instantiateVM, resumeVM, and prepReceiveVM"""
- # Capture startVm Hints
+ # Capture __startVm Hints
# CPU hints
cpuModel = instance.hints.get("cpumodel")
+
cpuString = ""
if cpuModel:
+ # clean off whitespace
+ cpuModel = self.__stripSpace(cpuModel)
cpuString = "-cpu " + cpuModel
# Clock hints
clockString = instance.hints.get("clock", "dynticks")
+ # clean off whitespace
+ clockString = self.__stripSpace(clockString)
# Disk hints
+ # XXXstroucki: insert commentary on jcipar's performance
+ # measurements
+ # virtio is recommended, but linux will name devices
+ # vdX instead of sdX. This adds a trap for someone who
+ # converts a physical machine or other virtualization
+ # layer's image to run under Tashi.
diskInterface = instance.hints.get("diskInterface", "ide")
+ # clean off whitespace
+ diskInterface = self.__stripSpace(diskInterface)
+
diskString = ""
for index in range(0, len(instance.disks)):
@@ -398,10 +449,10 @@ class Qemu(VmControlInterface):
diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
- # scratch disk (should be integrated better)
+ # scratch disk
scratchSize = instance.hints.get("scratchSpace", "0")
scratchSize = int(scratchSize)
- scratch_file = None
+ scratchName = None
try:
if scratchSize > 0:
@@ -410,18 +461,21 @@ class Qemu(VmControlInterface):
# create scratch disk
# XXXstroucki: needs to be cleaned somewhere
# XXXstroucki: clean user provided instance name
- scratch_name = "lv" + instance.name
+ scratchName = "lv%s" % instance.name
# XXXstroucki hold lock
# XXXstroucki check for capacity
- cmd = "/sbin/lvcreate -n" + scratch_name + " -L" + str(scratchSize) + "G " + self.scratchVg
+ cmd = "/sbin/lvcreate --quiet -n%s -L %dG %s" % (scratchName, scratchSize, self.scratchVg)
+ # XXXstroucki check result
result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
index += 1
- thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratch_name) ]
+ thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratchName) ]
thisDiskList.append("if=%s" % diskInterface)
thisDiskList.append("index=%d" % index)
thisDiskList.append("cache=off")
+ # XXXstroucki force scratch disk to be
+ # persistent
if (True or disk.persistent):
snapshot = "off"
migrate = "off"
@@ -434,18 +488,21 @@ class Qemu(VmControlInterface):
if (self.useMigrateArgument):
thisDiskList.append("migrate=%s" % migrate)
- diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
+ diskString = "%s-drive %s " % (diskString, ",".join(thisDiskList))
except:
- print 'caught exception'
- raise 'exception'
+ log.exception('caught exception in scratch disk formation')
+ raise
# Nic hints
nicModel = instance.hints.get("nicModel", "virtio")
+ # clean off whitespace
+ nicModel = self.__stripSpace(nicModel)
+
nicString = ""
for i in range(0, len(instance.nics)):
nic = instance.nics[i]
- nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=tashi%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, instance.id, i, nic.network, nic.network)
+ nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=%s%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, self.ifPrefix, instance.id, i, nic.network, nic.network)
# ACPI
if (boolean(instance.hints.get("noAcpi", False))):
@@ -455,14 +512,16 @@ class Qemu(VmControlInterface):
# Construct the qemu command
strCmd = "%s %s %s -clock %s %s %s -m %d -smp %d -serial null -vnc none -monitor pty" % (self.QEMU_BIN, noAcpiString, cpuString, clockString, diskString, nicString, instance.memory, instance.cores)
- cmd = strCmd.split()
if (source):
- cmd = cmd + ["-incoming", source]
- strCmd = strCmd + " -incoming %s" % (source)
- log.info("QEMU command: %s" % (strCmd))
+ strCmd = '%s -incoming "%s"' % (strCmd, source)
+ # XXXstroucki perhaps we're doing it backwards
+ cmd = shlex.split(strCmd)
+
+ log.info("Executing command: %s" % (strCmd))
(pipe_r, pipe_w) = os.pipe()
pid = os.fork()
if (pid == 0):
+ # child process
pid = os.getpid()
os.setpgid(pid, pid)
os.close(pipe_r)
@@ -477,146 +536,195 @@ class Qemu(VmControlInterface):
os.close(i)
except:
pass
+
# XXXstroucki unfortunately no kvm option yet
+ # to direct COW differences elsewhere, so change
+ # this process' TMPDIR, which kvm will honour
os.environ['TMPDIR'] = self.scratchDir
os.execl(self.QEMU_BIN, *cmd)
sys.exit(-1)
+
+ # parent process
os.close(pipe_w)
child = self.anonClass(pid=pid, instance=instance, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = True, OSchild = True)
child.ptyFile = None
child.vncPort = -1
child.instance.vmId = child.pid
- self.saveChildInfo(child)
+ self.__saveChildInfo(child)
self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
return (child.pid, cmd)
- def getPtyInfo(self, child, issueContinue):
+ def __getPtyInfo(self, child, issueContinue):
ptyFile = None
while not ptyFile:
- l = child.stderr.readline()
- if (l == ""):
+ line = child.stderr.readline()
+ if (line == ""):
try:
os.waitpid(child.pid, 0)
except:
log.exception("waitpid failed")
raise Exception, "Failed to start VM -- ptyFile not found"
- if (l.find("char device redirected to ") != -1):
- ptyFile=l[26:].strip()
+ redirLine = "char device redirected to "
+ if (line.find(redirLine) != -1):
+ ptyFile=line[len(redirLine):].strip()
break
child.ptyFile = ptyFile
child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
child.monitor = os.fdopen(child.monitorFd)
- self.saveChildInfo(child)
+ self.__saveChildInfo(child)
if (issueContinue):
# XXXstroucki: receiving a vm can take a long time
- self.enterCommand(child, "c", timeout=None)
+ self.__enterCommand(child, "c", timeout=None)
- def stopVm(self, vmId, target, stopFirst):
+ def __stopVm(self, vmId, target, stopFirst):
"""Universal function to stop a VM -- used by suspendVM, migrateVM """
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
if (stopFirst):
- self.enterCommand(child, "stop")
+ self.__enterCommand(child, "stop")
if (target):
retry = self.migrationRetries
while (retry > 0):
- res = self.enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
+ # migrate in foreground respecting cow backed
+ # images
+ # XXXstroucki if we're doing this in the fg
+ # then it may still be ongoing when the timeout
+ # happens, and no way of interrupting it
+ # trying to restart the migration by running
+ # the command again (when qemu is ready to
+ # listen again) is probably not helpful
+ success = False
+ res = self.__enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
retry = retry - 1
if (res.find("migration failed") == -1):
- retry = -1
+ success = True
+ retry = 0
+ break
else:
log.error("Migration (transiently) failed: %s\n", res)
- if (retry == 0):
+ if (retry == 0) and (success is False):
log.error("Migration failed: %s\n", res)
child.errorBit = True
raise RuntimeError
- self.enterCommand(child, "quit", expectPrompt=False)
+ # XXXstroucki what if migration is still ongoing, and
+ # qemu is not listening?
+ self.__enterCommand(child, "quit", expectPrompt=False)
return vmId
-
+
+ # extern
def instantiateVm(self, instance):
- (vmId, cmd) = self.startVm(instance, None)
- child = self.getChildFromPid(vmId)
- self.getPtyInfo(child, False)
- child.cmd = cmd
- self.saveChildInfo(child)
- return vmId
+ try:
+ (vmId, cmd) = self.__startVm(instance, None)
+ child = self.__getChildFromPid(vmId)
+ self.__getPtyInfo(child, False)
+ child.cmd = cmd
+ self.nm.createInstance(child.instance)
+ self.nm.vmStateChange(vmId, None, InstanceState.Running)
+ # XXXstroucki Should make sure Running state is saved
+ # otherwise on restart it will appear as Activating
+ # until we update the state in __matchHostPids
+ child.instance.state = InstanceState.Running
+ self.__saveChildInfo(child)
+ return vmId
+ except:
+ log.exception("instantiateVm failed")
+ raise
+ # extern
def suspendVm(self, vmId, target):
- tmpTarget = "/tmp/tashi_qemu_suspend_%d_%d" % (os.getpid(), vmId)
+ tmpTarget = "/%s/tashi_qemu_suspend_%d_%d" % (self.scratchDir, os.getpid(), vmId)
# XXX: Use fifo to improve performance
- vmId = self.stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
+ vmId = self.__stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
self.dfs.copyTo(tmpTarget, target)
+ os.unlink(tmpTarget)
return vmId
+ # extern
def resumeVmHelper(self, instance, source):
- child = self.getChildFromPid(instance.vmId)
+ child = self.__getChildFromPid(instance.vmId)
try:
- self.getPtyInfo(child, True)
- except RuntimeError, e:
+ self.__getPtyInfo(child, True)
+ except RuntimeError:
log.error("Failed to get pty info -- VM likely died")
child.errorBit = True
raise
status = "paused"
while ("running" not in status):
- status = self.enterCommand(child, "info status")
+ status = self.__enterCommand(child, "info status")
time.sleep(1)
+ child.instance.state = InstanceState.Running
+ self.__saveChildInfo(child)
+ # extern
def resumeVm(self, instance, source):
fn = self.dfs.getLocalHandle("%s" % (source))
- (vmId, cmd) = self.startVm(instance, "exec:zcat %s" % (fn))
- child = self.getChildFromPid(vmId)
+ (vmId, cmd) = self.__startVm(instance, "exec:zcat %s" % (fn))
+ child = self.__getChildFromPid(vmId)
child.cmd = cmd
return vmId
-
+
+ def __checkPortListening(self, port):
+ lc = 0
+ # XXXpipe: find whether something is listening yet on the port
+ (stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
+ stdin.close()
+ r = stdout.read()
+ lc = int(r.strip())
+ if (lc < 1):
+ return False
+ else:
+ return True
+
+ # extern
def prepReceiveVm(self, instance, source):
self.usedPortsLock.acquire()
- port = int(random.random()*1000+19000)
- while port in self.usedPorts:
- port = int(random.random()*1000+19000)
+ while True:
+ port = random.randint(19000, 20000)
+ if port not in self.usedPorts:
+ break
+
self.usedPorts.append(port)
self.usedPortsLock.release()
- (vmId, cmd) = self.startVm(instance, "tcp:0.0.0.0:%d" % (port))
+ (vmId, cmd) = self.__startVm(instance, "tcp:0.0.0.0:%d" % (port))
transportCookie = cPickle.dumps((port, vmId, socket.gethostname()))
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
child.cmd = cmd
child.transportCookie = transportCookie
- self.saveChildInfo(child)
- # XXX: Cleanly wait until the port is open
- lc = 0
- while (lc < 1):
-# XXXpipe: find whether something is listening yet on the port
- (stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
- stdin.close()
- r = stdout.read()
- lc = int(r.strip())
- if (lc < 1):
- time.sleep(1.0)
+ self.__saveChildInfo(child)
+ # XXX: Cleanly wait until the port is listening
+ while self.__checkPortListening(port) is not True:
+ time.sleep(1)
+
return transportCookie
+ # extern
def migrateVm(self, vmId, target, transportCookie):
self.migrationSemaphore.acquire()
try:
(port, _vmId, _hostname) = cPickle.loads(transportCookie)
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
child.migratingOut = True
- res = self.stopVm(vmId, "tcp:%s:%d" % (target, port), False)
+ # tell the VM to live-migrate out
+ res = self.__stopVm(vmId, "tcp:%s:%d" % (target, port), False)
# XXX: Some sort of feedback would be nice
# XXX: Should we block?
- self.waitForExit(vmId)
+ # XXXstroucki: isn't this what __waitForExit does?
+ self.__waitForExit(vmId)
finally:
self.migrationSemaphore.release()
return res
+ # extern
def receiveVm(self, transportCookie):
(port, vmId, _hostname) = cPickle.loads(transportCookie)
try:
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
except:
log.error("Failed to get child info; transportCookie = %s; hostname = %s" % (str(cPickle.loads(transportCookie)), socket.hostname()))
raise
try:
- self.getPtyInfo(child, True)
- except RuntimeError, e:
+ self.__getPtyInfo(child, True)
+ except RuntimeError:
log.error("Failed to get pty info -- VM likely died")
child.errorBit = True
raise
@@ -625,79 +733,121 @@ class Qemu(VmControlInterface):
self.usedPortsLock.release()
return vmId
+ # extern
def pauseVm(self, vmId):
- child = self.getChildFromPid(vmId)
- self.enterCommand(child, "stop")
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "stop")
+ # XXXstroucki we have no Stopped state, so consider
+ # the VM still Running?
+ # extern
def unpauseVm(self, vmId):
- child = self.getChildFromPid(vmId)
- self.enterCommand(child, "c")
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "c")
+ # XXXstroucki as above, should this be a state change
+ # or not?
+ # extern
def shutdownVm(self, vmId):
"""'system_powerdown' doesn't seem to actually shutdown the VM on some versions of KVM with some versions of Linux"""
- child = self.getChildFromPid(vmId)
- self.enterCommand(child, "system_powerdown")
+ # If clean shutdown is desired, should try on VM first,
+ # shutdownVm second and if that doesn't work use
+ # destroyVm
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "system_powerdown")
+ # extern
def destroyVm(self, vmId):
- child = self.getChildFromPid(vmId)
+ child = self.__getChildFromPid(vmId)
child.migratingOut = False
# XXX: the child could have exited between these two points, but I don't know how to fix that since it might not be our child process
os.kill(child.pid, signal.SIGKILL)
+ def __specificStartVnc(self, vmId):
+ child = self.__getChildFromPid(vmId)
+ hostname = socket.gethostname()
+ if (child.vncPort == -1):
+ self.vncPortLock.acquire()
+ port = 0
+ while (port in self.vncPorts):
+ port += 1
+
+ self.vncPorts.append(port)
+ self.vncPortLock.release()
+ self.__enterCommand(child, "change vnc :%d" % (port))
+ child.vncPort = port
+ self.__saveChildInfo(child)
+ port = child.vncPort
+ return "VNC running on %s:%d" % (hostname, port + 5900)
+
+ def __specificStopVnc(self, vmId):
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "change vnc none")
+ if (child.vncPort != -1):
+ self.vncPortLock.acquire()
+ self.vncPorts.remove(child.vncPort)
+ self.vncPortLock.release()
+ child.vncPort = -1
+ self.__saveChildInfo(child)
+ return "VNC halted"
+
+ def __specificChangeCdRom(self, vmId, iso):
+ child = self.__getChildFromPid(vmId)
+ imageLocal = self.dfs.getLocalHandle("images/" + iso)
+ self.__enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
+ return "Changed ide1-cd0 to %s" % (iso)
+
+ def __specificStartConsole(self, vmId):
+ child = self.__getChildFromPid(vmId)
+ hostname = socket.gethostname()
+ self.consolePortLock.acquire()
+ # XXXstroucki why not use the existing ports scheme?
+ consolePort = self.consolePort
+ self.consolePort += 1
+ self.consolePortLock.release()
+ threading.Thread(target=controlConsole, args=(child,consolePort)).start()
+ return "Control console listening on %s:%d" % (hostname, consolePort)
+
+ # extern
def vmmSpecificCall(self, vmId, arg):
arg = arg.lower()
+ changeCdText = "changecdrom:"
+
if (arg == "startvnc"):
- child = self.getChildFromPid(vmId)
- hostname = socket.gethostname()
- if (child.vncPort == -1):
- self.vncPortLock.acquire()
- port = 0
- while (port in self.vncPorts):
- port = port + 1
- self.vncPorts.append(port)
- self.vncPortLock.release()
- self.enterCommand(child, "change vnc :%d" % (port))
- child.vncPort = port
- self.saveChildInfo(child)
- port = child.vncPort
- return "VNC started on %s:%d" % (hostname, port+5900)
+ return self.__specificStartVnc(vmId)
+
elif (arg == "stopvnc"):
- child = self.getChildFromPid(vmId)
- self.enterCommand(child, "change vnc none")
- if (child.vncPort != -1):
- self.vncPortLock.acquire()
- self.vncPorts.remove(child.vncPort)
- self.vncPortLock.release()
- child.vncPort = -1
- self.saveChildInfo(child)
- return "VNC halted"
- elif (arg.startswith("changecdrom:")):
- child = self.getChildFromPid(vmId)
- iso = scrubString(arg[12:])
- imageLocal = self.dfs.getLocalHandle("images/" + iso)
- self.enterCommand(child, "change ide1-cd0 %s" % (imageLocal))
- return "Changed ide1-cd0 to %s" % (iso)
+ return self.__specificStopVnc(vmId)
+
+ elif (arg.startswith(changeCdText)):
+ iso = scrubString(arg[len(changeCdText):])
+ return self.__specificChangeCdRom(vmId, iso)
+
elif (arg == "startconsole"):
- child = self.getChildFromPid(vmId)
- hostname = socket.gethostname()
- self.consolePortLock.acquire()
- consolePort = self.consolePort
- self.consolePort = self.consolePort+1
- self.consolePortLock.release()
- threading.Thread(target=controlConsole, args=(child,consolePort)).start()
- return "Control console listenting on %s:%d" % (hostname, consolePort)
+ return self.__specificStartConsole(vmId)
+
elif (arg == "list"):
- return "startVnc\nstopVnc\nchangeCdrom:<image.iso>\nstartConsole"
+ commands = [
+ "startVnc",
+ "stopVnc",
+ "changeCdrom:<image.iso>",
+ "startConsole",
+ ]
+ return "\n".join(commands)
+
else:
- return "Unknown arg %s" % (arg)
+ return "Unknown command %s" % (arg)
+ # extern
def listVms(self):
return self.controlledVMs.keys()
-
+
+ # thread
def statsThread(self):
ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
netStats = {}
cpuStats = {}
+ # XXXstroucki be more exact here?
last = time.time() - self.statsInterval
while True:
now = time.time()
@@ -706,7 +856,7 @@ class Qemu(VmControlInterface):
netData = f.readlines()
f.close()
for l in netData:
- if (l.find("tashi") != -1):
+ if (l.find(self.ifPrefix) != -1):
(dev, sep, ld) = stringPartition(l, ":")
dev = dev.strip()
ws = ld.split()
@@ -714,6 +864,9 @@ class Qemu(VmControlInterface):
sendBytes = float(ws[8])
(recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
if (recvBytes < lastRecvBytes):
+ # We seem to have overflowed
+ # XXXstroucki How likely is this to happen?
+
if (lastRecvBytes > 2**32):
lastRecvBytes = lastRecvBytes - 2**64
else:
@@ -743,13 +896,13 @@ class Qemu(VmControlInterface):
child = self.controlledVMs[vmId]
(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
for i in range(0, len(child.instance.nics)):
- netDev = "tashi%d.%d" % (child.instance.id, i)
+ netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
(tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
self.stats[vmId] = self.stats.get(vmId, {})
child = self.controlledVMs.get(vmId, None)
if (child):
- res = self.enterCommand(child, "info blockstats")
+ res = self.__enterCommand(child, "info blockstats")
for l in res.split("\n"):
(device, sep, data) = stringPartition(l, ": ")
if (data != ""):
@@ -767,6 +920,7 @@ class Qemu(VmControlInterface):
log.exception("statsThread threw an exception")
last = now
time.sleep(self.statsInterval)
-
+
+ # extern
def getStats(self, vmId):
return self.stats.get(vmId, {})
Modified: incubator/tashi/branches/stable/src/tashi/parallel.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/parallel.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/parallel.py (original)
+++ incubator/tashi/branches/stable/src/tashi/parallel.py Wed Feb 8 04:56:23 2012
@@ -116,8 +116,8 @@ def synchronizedmethod(func):
# Test Code
##############################
import unittest
-import sys
-import time
+#import sys
+#import time
class TestThreadPool(unittest.TestCase):
def setUp(self):
Modified: incubator/tashi/branches/stable/src/tashi/rpycservices/rpycservices.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/rpycservices/rpycservices.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/rpycservices/rpycservices.py (original)
+++ incubator/tashi/branches/stable/src/tashi/rpycservices/rpycservices.py Wed Feb 8 04:56:23 2012
@@ -16,11 +16,12 @@
# under the License.
import rpyc
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Instance, Host, User
import cPickle
clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 'vmUpdate', 'activateVm', 'registerHost', 'getImages', 'copyImage']
nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'prepReceiveVm', 'prepSourceVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo', 'liveCheck']
+accountingRPCs = ['record']
def clean(args):
"""Cleans the object so cPickle can be used."""
@@ -61,7 +62,7 @@ class client:
"""Returns a function that makes the RPC call. No keyword arguments allowed when calling this function."""
if self.conn.closed == True:
self.conn = self.createConn()
- if name not in clusterManagerRPCs and name not in nodeManagerRPCs:
+ if name not in clusterManagerRPCs and name not in nodeManagerRPCs and name not in accountingRPCs:
return None
def connectWrap(*args):
args = cPickle.dumps(clean(args))
@@ -81,6 +82,8 @@ class ManagerService(rpyc.Service):
# Note: self.service and self._type are set before rpyc.utils.server.ThreadedServer is started.
def checkValidUser(self, functionName, clientUsername, args):
"""Checks whether the operation requested by the user is valid based on clientUsername. An exception will be thrown if not valid."""
+ if self._type == 'AccountingService':
+ return
if self._type == 'NodeManagerService':
return
if clientUsername in ['nodeManager', 'agent', 'root']:
@@ -114,4 +117,7 @@ class ManagerService(rpyc.Service):
return makeCall
if self._type == 'NodeManagerService' and name in nodeManagerRPCs:
return makeCall
+ if self._type == 'AccountingService' and name in accountingRPCs:
+ return makeCall
+
raise AttributeError('RPC does not exist')
Modified: incubator/tashi/branches/stable/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/util.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/util.py (original)
+++ incubator/tashi/branches/stable/src/tashi/util.py Wed Feb 8 04:56:23 2012
@@ -16,11 +16,11 @@
# under the License.
import ConfigParser
-import cPickle
+#import cPickle
import os
-import select
+#import select
import signal
-import struct
+#import struct
import sys
import threading
import time
@@ -28,7 +28,6 @@ import traceback
import types
import getpass
-import rpyc
from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
@@ -109,7 +108,7 @@ class failsafe(object):
def newFunc(*args, **kw):
try:
return cur(*args, **kw)
- except Exception, e:
+ except:
self.__dict__['__current_obj__'] = self.__dict__['__failsafe_obj__']
return fail(*args, **kw)
return newFunc
@@ -197,9 +196,9 @@ def convertExceptions(oldFunc):
def newFunc(*args, **kw):
try:
return oldFunc(*args, **kw)
- except TashiException, e:
+ except TashiException:
raise
- except Exception, e:
+ except:
self = args[0]
if (self.convertExceptions):
raise TashiException(d={'errno':Errors.ConvertedException, 'msg': traceback.format_exc(10)})
@@ -210,7 +209,7 @@ def getConfig(additionalNames=[], additi
"""Creates many permutations of a list of locations to look for config
files and then loads them"""
config = ConfigParser.ConfigParser()
- baseLocations = ['./etc/', '/usr/share/tashi/', '/etc/tashi/', os.path.expanduser('~/.tashi/')]
+ baseLocations = ['/usr/local/tashi/etc/', '/usr/share/tashi/', '/etc/tashi/', os.path.expanduser('~/.tashi/')]
names = ['Tashi'] + additionalNames
names = reduce(lambda x, y: x + [y+"Defaults", y], names, [])
allLocations = reduce(lambda x, y: x + reduce(lambda z, a: z + [y + a + ".cfg"], names, []), baseLocations, []) + additionalFiles
@@ -264,10 +263,12 @@ def scrubString(s, allowed="ABCDEFGHIJKL
def createClient(config):
cfgHost = config.get('Client', 'clusterManagerHost')
cfgPort = config.get('Client', 'clusterManagerPort')
- cfgTimeout = config.get('Client', 'clusterManagerTimeout')
+ #XXXstroucki nothing uses timeout right now
+ #cfgTimeout = config.get('Client', 'clusterManagerTimeout')
host = os.getenv('TASHI_CM_HOST', cfgHost)
port = os.getenv('TASHI_CM_PORT', cfgPort)
- timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
+ #XXXstroucki nothing uses timeout right now
+ #timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
if authAndEncrypt:
Modified: incubator/tashi/branches/stable/src/tashi/version.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/version.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/version.py (original)
+++ incubator/tashi/branches/stable/src/tashi/version.py Wed Feb 8 04:56:23 2012
@@ -15,4 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-version = "201111"
+version = "201202"