You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/02/08 04:44:11 UTC
svn commit: r1241773 [1/3] - in /incubator/tashi/branches/oldstable: ./ etc/
src/ src/tashi/ src/tashi/agents/ src/tashi/client/
src/tashi/clustermanager/ src/tashi/clustermanager/data/
src/tashi/nodemanager/ src/tashi/nodemanager/vmcontrol/ src/tashi/...
Author: stroucki
Date: Wed Feb 8 04:44:09 2012
New Revision: 1241773
URL: http://svn.apache.org/viewvc?rev=1241773&view=rev
Log:
merge oldstable from stablefix (201111)
Added:
incubator/tashi/branches/oldstable/src/tashi/agents/primitive_zoni.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/tashi/agents/primitive_zoni.py
incubator/tashi/branches/oldstable/src/zoni/data/reservation.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/data/reservation.py
incubator/tashi/branches/oldstable/src/zoni/data/reservationmanagementinterface.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/data/reservationmanagementinterface.py
incubator/tashi/branches/oldstable/src/zoni/hardware/f10s50switch.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/hardware/f10s50switch.py
incubator/tashi/branches/oldstable/src/zoni/hardware/hpilo.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/hardware/hpilo.py
incubator/tashi/branches/oldstable/src/zoni/hardware/systemmanagement.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/hardware/systemmanagement.py
incubator/tashi/branches/oldstable/src/zoni/install/dnsdhcp/
- copied from r1203835, incubator/tashi/trunk/src/zoni/install/dnsdhcp/
incubator/tashi/branches/oldstable/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
incubator/tashi/branches/oldstable/src/zoni/services/
- copied from r1203835, incubator/tashi/trunk/src/zoni/services/
incubator/tashi/branches/oldstable/src/zoni/services/__init__.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/services/__init__.py
incubator/tashi/branches/oldstable/src/zoni/services/pcvciservice.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/services/pcvciservice.py
incubator/tashi/branches/oldstable/src/zoni/services/rpycservices.py
- copied, changed from r1203835, incubator/tashi/trunk/src/zoni/services/rpycservices.py
incubator/tashi/branches/oldstable/src/zoni/services/zonimanager.py
- copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/services/zonimanager.py
Modified:
incubator/tashi/branches/oldstable/ (props changed)
incubator/tashi/branches/oldstable/Makefile (contents, props changed)
incubator/tashi/branches/oldstable/NOTICE
incubator/tashi/branches/oldstable/etc/ (props changed)
incubator/tashi/branches/oldstable/etc/NodeManager.cfg
incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg
incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg (contents, props changed)
incubator/tashi/branches/oldstable/src/ (props changed)
incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py
incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py
incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py
incubator/tashi/branches/oldstable/src/tashi/rpycservices/rpycservices.py
incubator/tashi/branches/oldstable/src/tashi/rpycservices/rpyctypes.py
incubator/tashi/branches/oldstable/src/tashi/util.py
incubator/tashi/branches/oldstable/src/tashi/version.py
incubator/tashi/branches/oldstable/src/zoni/Makefile (props changed)
incubator/tashi/branches/oldstable/src/zoni/__init__.py (props changed)
incubator/tashi/branches/oldstable/src/zoni/agents/dhcpdns.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/bootstrap/bootstrapinterface.py (props changed)
incubator/tashi/branches/oldstable/src/zoni/bootstrap/pxe.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/client/zoni-cli.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/data/resourcequerysql.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/data/usermanagement.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/data/usermanagementinterface.py (props changed)
incubator/tashi/branches/oldstable/src/zoni/extra/util.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/hardware/delldrac.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/hardware/dellswitch.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/hardware/hpswitch.py (props changed)
incubator/tashi/branches/oldstable/src/zoni/hardware/hwswitchinterface.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/hardware/ipmi.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/hardware/raritanpdu.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/hardware/systemmanagementinterface.py (contents, props changed)
incubator/tashi/branches/oldstable/src/zoni/install/db/zoniDbSetup.py
incubator/tashi/branches/oldstable/src/zoni/install/pxe/base-menu
incubator/tashi/branches/oldstable/src/zoni/install/pxe/zoniPxeSetup.py
incubator/tashi/branches/oldstable/src/zoni/install/www/zoniWebSetup.py
incubator/tashi/branches/oldstable/src/zoni/system/registration/register/register_automate
incubator/tashi/branches/oldstable/src/zoni/system/registration/register/register_node
incubator/tashi/branches/oldstable/src/zoni/system/registration/www/include/zoni_functions.php
incubator/tashi/branches/oldstable/src/zoni/system/registration/www/zoni-register.php
incubator/tashi/branches/oldstable/src/zoni/version.py (contents, props changed)
Propchange: incubator/tashi/branches/oldstable/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Wed Feb 8 04:44:09 2012
@@ -0,0 +1,4 @@
+/incubator/tashi/branches/cmu:1178106-1187632
+/incubator/tashi/branches/stablefix:1203848-1241771
+/incubator/tashi/branches/zoni-dev/trunk:1034098-1177646
+/incubator/tashi/trunk:1081943-1203835,1203845
Modified: incubator/tashi/branches/oldstable/Makefile
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/Makefile?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/Makefile (original)
+++ incubator/tashi/branches/oldstable/Makefile Wed Feb 8 04:44:09 2012
@@ -110,8 +110,12 @@ rmdoc:
bin/zoni-cli.py:
@echo Symlinking in zoni-cli...
(cd bin; ln -s ../src/zoni/client/zoni-cli.py .)
+usr/local/bin/zoni:
+ @echo Creating /usr/local/bin/zoni
+ (echo '#!/bin/bash\nPYTHONPATH=$(shell pwd)/src $(shell pwd)/bin/zoni-cli.py $$*' > /usr/local/bin/zoni; chmod 755 /usr/local/bin/zoni)
rmzoni-cli:
if test -e bin/zoni-cli.py; then echo Removing zoni-cli symlink...; rm bin/zoni-cli.py; fi
+ if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks
disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')
Propchange: incubator/tashi/branches/oldstable/Makefile
('svn:mergeinfo' removed)
Modified: incubator/tashi/branches/oldstable/NOTICE
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/NOTICE?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/NOTICE (original)
+++ incubator/tashi/branches/oldstable/NOTICE Wed Feb 8 04:44:09 2012
@@ -1,5 +1,5 @@
Apache Tashi
-Copyright 2008 The Apache Software Foundation
+Copyright 2008-2011 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Propchange: incubator/tashi/branches/oldstable/etc/
('svn:mergeinfo' removed)
Modified: incubator/tashi/branches/oldstable/etc/NodeManager.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/NodeManager.cfg?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/NodeManager.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/NodeManager.cfg Wed Feb 8 04:44:09 2012
@@ -73,6 +73,7 @@ defaultBridgeFormat=br%s
[NodeManagerService]
convertExceptions = True
port = 9883
+registerHost = False
registerFrequency = 10.0
infoFile = /var/tmp/nm.dat
clusterManagerHost = localhost ; Clustermanager hostname
Modified: incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg Wed Feb 8 04:44:09 2012
@@ -45,7 +45,8 @@ publisher = tashi.messaging.GangliaPubli
nodeManagerPort = 9883
[ClusterManagerService]
-host = localhost ; Clustermanager hostname
+# Clustermanager hostname
+host = localhost
convertExceptions = True
port = 9882
expireHostTime = 30.0
@@ -97,9 +98,11 @@ publisher = tashi.messaging.GangliaPubli
[NodeManagerService]
convertExceptions = True
port = 9883
+registerHost = False
registerFrequency = 10.0
infoFile = /var/tmp/nm.dat
-clusterManagerHost = localhost ; Clustermanger hostname
+# Clustermanger hostname
+clusterManagerHost = localhost
clusterManagerPort = 9882
statsInterval = 0.0
;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
@@ -115,6 +118,8 @@ migrateTimeout = 300.0
maxParallelMigrations = 10
useMigrateArgument = False
statsInterval = 0.0
+scratchDir = /tmp
+scratchVg = vgscratch
[XenPV]
vmNamePrefix = tashi
@@ -135,7 +140,8 @@ staticLayout = /location/of/layout/file
# Client configuration
[Client]
-clusterManagerHost = localhost ; Clustermanager hostname
+# Clustermanager hostname
+clusterManagerHost = localhost
clusterManagerPort = 9882
clusterManagerTimeout = 5.0
@@ -157,17 +163,21 @@ defaultJobTime = 8640000000
[DhcpDns]
dnsEnabled = True
-dnsKeyFile = /location/of/private/key/for/dns
+dnsSecretKey = ABcdEf12GhIJKLmnOpQrsT==
+dnsKeyName = name_of_dns_key_hostname
dnsServer = 1.2.3.4 53
dnsDomain = tashi.example.com
dnsExpire = 300
dhcpEnabled = True
dhcpServer = 1.2.3.4
+# Host key name
dhcpKeyName = OMAPI
dhcpSecretKey = ABcdEf12GhIJKLmnOpQrsT==
+# ipRangeX - where X is the vlan number
ipRange1 = 172.16.128.2-172.16.255.254
reverseDns = True
-clustermanagerhost = localhost ; Clustermanager hostname
+# Clustermanager hostname
+clustermanagerhost = localhost
clustermanagerport = 9886
[GangliaPublisher]
Modified: incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg Wed Feb 8 04:44:09 2012
@@ -26,6 +26,11 @@ INSTALL_BASE_DIR = /usr/local/tashi
[logging]
LOG_FILE = /var/tmp/zoni_logfile.txt
+[management]
+# Specify data store
+INFO_STORE = sql
+USER_MANAGEMENT = ldap
+
# DB host
[dbConnection]
DB_HOST = xx_hostname_or_ip_
@@ -50,8 +55,9 @@ TFTP_UPDATE_FILE = /var/lib/tftpboot/pxe
TFTP_BASE_FILE = /var/lib/tftpboot/pxelinux.cfg/base.zoni
TFTP_BASE_MENU_FILE = /var/lib/tftpboot/pxelinux.cfg/base-menu
PXE_SERVER_IP = IP_OF_PXE_SERVER_IN_DOMAIN_0
-# Relative to TFTP_ROOT_DIR
-INITRD_ROOT = initrd
+# Must be relative to TFTP_ROOT_DIR
+INITRD_ROOT = builds/initrd
+KERNEL_ROOT = builds/kernel
[www]
WWW_DOCUMENT_ROOT = /var/www
@@ -71,7 +77,8 @@ ZONI_IPMI_NETWORK = 10.10.16.0/20
VLAN_MAX = 4095
[hardware]
-HARDWARE_CONTROL = ["ipmi", "drac", "pdu"]
+# Hardware control available for the cluster
+HARDWARE_CONTROL = {"ipmi": {"class":"zoni.hardware.ipmi.Ipmi"}, "drac": {"class":"zoni.hardware.delldrac.dellDrac"}, "pdu":{"class": "zoni.hardware.raritanpdu.raritanDominionPx"}, "dellswitch" : {"class":"zoni.hardware.dellswitch.HwDellSwitch", "accessmode" : "ssh"}}
HARDWARE_PDU = "raritan"
HARDWARE_DRAC = "DELL DRAC"
@@ -79,7 +86,9 @@ HARDWARE_DRAC = "DELL DRAC"
dnsEnabled = True
reverseDns = True
# Key file must be in the same directory or this will get denied
-dnsKeyFile = xx_Kname.+157+36480.private_xx
+#dnsKeyFile = xx_Kname.+157+36480.private_xx
+dnsKeyName = xx_dnsKeyName__xx
+dnsSecretKey = xx_secretkey_xx
dnsServer = xx_dns_server_ip_xx xx_port_xx
dnsDomain = xx_fqdn_xx
dnsExpire = 60
Propchange: incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg
('svn:mergeinfo' removed)
Propchange: incubator/tashi/branches/oldstable/src/
('svn:mergeinfo' removed)
Modified: incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py Wed Feb 8 04:44:09 2012
@@ -28,7 +28,8 @@ from tashi import boolean
class DhcpDns(InstanceHook):
def __init__(self, config, client, post=False):
InstanceHook.__init__(self, config, client, post)
- self.dnsKeyFile = self.config.get('DhcpDns', 'dnsKeyFile')
+ self.dnsKeyName = self.config.get('DhcpDns', 'dnsKeyName')
+ self.dnsSecretKey = self.config.get('DhcpDns', 'dnsSecretKey')
self.dnsServer = self.config.get('DhcpDns', 'dnsServer')
self.dnsDomain = self.config.get('DhcpDns', 'dnsDomain')
self.dnsExpire = int(self.config.get('DhcpDns', 'dnsExpire'))
@@ -153,14 +154,12 @@ class DhcpDns(InstanceHook):
self.removeDns(name)
except:
pass
- if (self.dnsKeyFile != ""):
- cmd = "nsupdate -k %s" % (self.dnsKeyFile)
- else:
- cmd = "nsupdate"
+ cmd = "nsupdate"
child = subprocess.Popen(args=cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
try:
(stdin, stdout) = (child.stdin, child.stdout)
stdin.write("server %s\n" % (self.dnsServer))
+ stdin.write("key %s %s\n" % (self.dnsKeyName, self.dnsSecretKey))
stdin.write("update add %s.%s %d A %s\n" % (name, self.dnsDomain, self.dnsExpire, ip))
stdin.write("\n")
if (self.reverseDns):
@@ -181,14 +180,12 @@ class DhcpDns(InstanceHook):
(pid, status) = os.waitpid(child.pid, os.WNOHANG)
def removeDns(self, name):
- if (self.dnsKeyFile != ""):
- cmd = "nsupdate -k %s" % (self.dnsKeyFile)
- else:
- cmd = "nsupdate"
+ cmd = "nsupdate"
child = subprocess.Popen(args=cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
try:
(stdin, stdout) = (child.stdin, child.stdout)
stdin.write("server %s\n" % (self.dnsServer))
+ stdin.write("key %s %s\n" % (self.dnsKeyName, self.dnsSecretKey))
if (self.reverseDns):
ip = socket.gethostbyname(name)
ipSegments = map(int, ip.split("."))
Modified: incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py Wed Feb 8 04:44:09 2012
@@ -1,4 +1,4 @@
-#! /usr/bin/env python
+#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -23,6 +23,7 @@ import socket
import sys
import threading
import time
+import random
import logging.config
from tashi.rpycservices.rpyctypes import *
@@ -30,9 +31,9 @@ from tashi.util import getConfig, create
import tashi
class Primitive(object):
- def __init__(self, config, client):
+ def __init__(self, config, cmclient):
self.config = config
- self.client = client
+ self.cm = cmclient
self.hooks = []
self.log = logging.getLogger(__file__)
self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -44,108 +45,252 @@ class Primitive(object):
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, client, False))
+ self.hooks.append(instantiateImplementation(value, config, cmclient, False))
except:
self.log.exception("Failed to load hook %s" % (value))
+ self.hosts = {}
+ self.load = {}
+ self.instances = {}
+ self.muffle = {}
+ self.lastScheduledHost = 0
+ self.clearHints = {}
+
+
+ def __getState(self):
+ # Generate a list of hosts and
+ # current loading of VMs per host
+ hosts = {}
+ # load's keys are the host id, or None if not on a host. values are instance ids
+ load = {}
+ ctr = 0
+
+ for h in self.cm.getHosts():
+ #XXXstroucki get all hosts here?
+ #if (h.up == True and h.state == HostState.Normal):
+ hosts[ctr] = h
+ ctr = ctr + 1
+ load[h.id] = []
+
+ load[None] = []
+ _instances = self.cm.getInstances()
+ instances = {}
+ for i in _instances:
+ instances[i.id] = i
+
+ # XXXstroucki put held machines behind pending ones
+ heldInstances = []
+ for i in instances.itervalues():
+ if (i.hostId or i.state == InstanceState.Pending):
+ # Nonrunning VMs will have hostId of None
+ load[i.hostId] = load[i.hostId] + [i.id]
+ elif (i.hostId is None and i.state == InstanceState.Held):
+ heldInstances = heldInstances + [i.id]
+
+ load[None] = load[None] + heldInstances
+
+ self.hosts = hosts
+ self.load = load
+ self.instances = instances
+
+ def __checkCapacity(self, host, inst):
+ # ensure host can carry new load
+ memUsage = reduce(lambda x, y: x + self.instances[y].memory, self.load[host.id], inst.memory)
+ coreUsage = reduce(lambda x, y: x + self.instances[y].cores, self.load[host.id], inst.cores)
+ if (memUsage <= host.memory and coreUsage <= host.cores):
+ return True
+
+ return False
+
+ def __clearHints(self, hint, name):
+ # remove the clearHint if the host comes back to normal mode
+ if name in self.clearHints[hint]:
+ popit = self.clearHints[hint].index(name)
+ self.clearHints[hint].pop(popit)
+ def __scheduleInstance(self, inst):
+
+ try:
+
+ minMax = None
+ minMaxHost = None
+ minMaxCtr = None
+
+ densePack = inst.hints.get("densePack", None)
+ if (densePack is None):
+ densePack = self.densePack
+ else:
+ densePack = boolean(densePack)
+
+ # Grab the targetHost config options if passed
+ targetHost = inst.hints.get("targetHost", None)
+ # Check to see if we have already handled this hint
+ clearHints = self.clearHints
+ clearHints["targetHost"] = clearHints.get("targetHost", [])
+ # If we handled the hint, don't look at it anymore
+ if targetHost in clearHints["targetHost"]:
+ targetHost = None
+
+ try:
+ allowElsewhere = boolean(inst.hints.get("allowElsewhere", "False"))
+ except Exception, e:
+ allowElsewhere = False
+ # has a host preference been expressed?
+ if (targetHost != None):
+ for h in self.hosts.values():
+ if (h.state == HostState.Normal):
+ self.__clearHints("targetHost", h.name)
+ # if this is not the host we are looking for, continue
+ if ((str(h.id) != targetHost and h.name != targetHost)):
+ continue
+ # we found the targetHost
+ # If a host machine is reserved, only allow if userid is in reserved list
+ if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
+ # Machine is reserved and not available for userId.
+ # XXXstroucki: Should we log something here for analysis?
+ break
+ if self.__checkCapacity(h, inst):
+ minMax = len(self.load[h.id])
+ minMaxHost = h
+
+
+ # end targethost != none
+
+
+ # If we don't have a host yet, find one here
+ if ((targetHost == None or allowElsewhere) and minMaxHost == None):
+ # cycle list
+ # Adding this to catch if this gets set to None. Fix
+ if self.lastScheduledHost == None:
+ self.lastScheduledHost = 0
+ for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
+ h = self.hosts[ctr]
+
+ # XXXstroucki if it's down, find another machine
+ if (h.up == False):
+ continue
+
+ # If the host not in normal operating state,
+ # find another machine
+ if (h.state != HostState.Normal):
+ continue
+ else:
+ # If the host is back to normal, get rid of the entry in clearHints
+ self.__clearHints("targetHost", h.name)
+
+ # if it's reserved, see if we can use it
+ if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
+ # reserved for somebody else, so find another machine
+ continue
+
+ # implement dense packing policy:
+ # consider this host if
+ # minMax has not been modified or
+ # the number of vms here is greater than minmax if we're dense packing or
+ # the number of vms here is less than minmax if we're not dense packing
+ if (minMax is None or (densePack and len(self.load[h.id]) > minMax) or (not densePack and len(self.load[h.id]) < minMax)):
+ if self.__checkCapacity(h, inst):
+ minMax = len(self.load[h.id])
+ minMaxHost = h
+ minMaxCtr = ctr
+
+ # check that VM image isn't mounted persistent already
+ # Should set a status code to alert user
+ # Tried to update the state of the instance and set persistent=False but
+ # couldn't do it, should work until we find a better way to do this
+ if inst.disks[0].persistent == True:
+ count = 0
+ myDisk = inst.disks[0].uri
+ for i in self.cm.getInstances():
+ if myDisk == i.disks[0].uri and i.disks[0].persistent == True:
+ count += 1
+ if count > 1:
+ minMaxHost = None
+
+ if (minMaxHost):
+ # found a host
+ if (not inst.hints.get("__resume_source", None)):
+ # only run preCreate hooks if newly starting
+ for hook in self.hooks:
+ hook.preCreate(inst)
+ self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))
+ rv = "fail"
+ try:
+ rv = self.cm.activateVm(inst.id, minMaxHost)
+ if rv == "success":
+ self.lastScheduledHost = minMaxCtr
+ self.load[minMaxHost.id] = self.load[minMaxHost.id] + [inst.id]
+ # get rid of its possible entry in muffle if VM is scheduled to a host
+ if (inst.name in self.muffle):
+ self.muffle.pop(inst.name)
+ else:
+ self.log.warning("Instance %s failed to activate on host %s" % (inst.name, minMaxHost.name))
+ except TashiException, e :
+ # If we try to activate the VM and get errno 10, host not in normal mode, add it to the list
+ # check for other errors later
+ if e.errno == Errors.HostStateError:
+ self.clearHints["targetHost"] = self.clearHints.get("targetHost", [])
+ self.clearHints["targetHost"].append(targetHost)
+
+ else:
+ # did not find a host
+ if (inst.name not in self.muffle):
+ self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
+ self.muffle[inst.name] = True
+
+ except Exception, e:
+ # XXXstroucki: how can we get here?
+ if (inst.name not in self.muffle):
+ self.log.exception("Failed to schedule or activate %s" % (inst.name))
+ self.muffle[inst.name] = True
+
def start(self):
oldInstances = {}
- muffle = {}
+
while True:
try:
- # Generate a list of VMs/host
- hosts = {}
- load = {}
- for h in self.client.getHosts():
- hosts[h.id] = h
- load[h.id] = []
- load[None] = []
- _instances = self.client.getInstances()
- instances = {}
- for i in _instances:
- instances[i.id] = i
- for i in instances.itervalues():
- if (i.hostId or i.state == InstanceState.Pending):
- load[i.hostId] = load[i.hostId] + [i.id]
- # Check for VMs that have exited
+ self.__getState()
+
+ # Check for VMs that have exited and call
+ # postDestroy hook
for i in oldInstances:
- if (i not in instances and oldInstances[i].state != InstanceState.Pending):
+ # XXXstroucki what about paused and saved VMs?
+ # XXXstroucki: do we need to look at Held VMs here?
+ if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying)):
+ self.log.info("VM exited: %s" % (oldInstances[i].name))
for hook in self.hooks:
hook.postDestroy(oldInstances[i])
- # Schedule new VMs
- oldInstances = instances
- if (len(load.get(None, [])) > 0):
- load[None].sort()
- for i in load[None]:
- inst = instances[i]
- try:
- minMax = None
- minMaxHost = None
- targetHost = inst.hints.get("targetHost", None)
- try:
- allowElsewhere = boolean(inst.hints.get("allowElsewhere", "False"))
- except Exception, e:
- allowElsewhere = False
- if (targetHost != None):
- for h in hosts.values():
- if ((str(h.id) == targetHost or h.name == targetHost)):
- # make sure that host is up, in a normal state and is not reserved
- if (h.up == True and h.state == HostState.Normal and len(h.reserved) == 0):
- memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
- coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
- if (memUsage <= h.memory and coreUsage <= h.cores):
- minMax = len(load[h.id])
- minMaxHost = h
-
- # If a host machine is reserved, only allow if userid is in reserved list
- if ((len(h.reserved) > 0) and inst.userId in h.reserved):
- memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
- coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
- if (memUsage <= h.memory and coreUsage <= h.cores):
- minMax = len(load[h.id])
- minMaxHost = h
-
-
- if ((targetHost == None or allowElsewhere) and minMaxHost == None):
- for h in hosts.values():
- if (h.up == True and h.state == HostState.Normal and len(h.reserved) == 0):
- if (minMax is None or (self.densePack and len(load[h.id]) > minMax) or (not self.densePack and len(load[h.id]) < minMax)):
- memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
- coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
- if (memUsage <= h.memory and coreUsage <= h.cores):
- minMax = len(load[h.id])
- minMaxHost = h
- if (minMaxHost):
- if (not inst.hints.get("__resume_source", None)):
- for hook in self.hooks:
- hook.preCreate(inst)
- self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))
- self.client.activateVm(i, minMaxHost)
- load[minMaxHost.id] = load[minMaxHost.id] + [i]
- muffle.clear()
- else:
- if (inst.name not in muffle):
- self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
- muffle[inst.name] = True
- except Exception, e:
- if (inst.name not in muffle):
- self.log.exception("Failed to schedule or activate %s" % (inst.name))
- muffle[inst.name] = True
- time.sleep(self.scheduleDelay)
+
+ oldInstances = self.instances
+
+
+ if (len(self.load.get(None, [])) > 0):
+ # Schedule VMs if they are waiting
+
+ # sort by id number (FIFO?)
+ self.load[None].sort()
+ for i in self.load[None]:
+ inst = self.instances[i]
+ self.__scheduleInstance(inst)
+ # end for unassigned vms
+
+
except TashiException, e:
self.log.exception("Tashi exception")
- time.sleep(self.scheduleDelay)
+
except Exception, e:
- self.log.exception("General exception")
- time.sleep(self.scheduleDelay)
+ self.log.warning("Scheduler iteration failed")
+
+
+ # wait to do the next iteration
+ time.sleep(self.scheduleDelay)
def main():
(config, configFiles) = getConfig(["Agent"])
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
- client = createClient(config)
+ cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- agent = Primitive(config, client)
+ agent = Primitive(config, cmclient)
agent.start()
if __name__ == "__main__":
Modified: incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py Wed Feb 8 04:44:09 2012
@@ -157,8 +157,26 @@ def getVmLayout():
hosts[i.hostId].usedCores += i.cores
return hosts.values()
+def getSlots(cores, memory):
+ hosts = getVmLayout()
+ count = 0
+
+ for h in hosts:
+ if h.up is False or h.state != HostState.Normal:
+ continue
+ countbycores = int((h.cores - h.usedCores) / cores)
+ countbymemory = int((h.memory - h.usedMemory) / memory)
+ count += min(countbycores, countbymemory)
+
+ print "%d" % (count),
+ print (lambda:"instances", lambda:"instance")[count == 1](),
+ print "with %d" % (cores),
+ print (lambda:"cores", lambda:"core")[cores == 1](),
+ print "and %d MB memory could be created." % (memory)
+
def createMany(instance, count):
- l = len(str(count))
+ # will create instances from 0 to count-1
+ l = len(str(count - 1))
basename = instance.name
instances = []
for i in range(0, count):
@@ -190,6 +208,9 @@ def getMyInstances():
# Used to define default views on functions and to provide extra functionality (getVmLayout)
extraViews = {
+'getSlots': (getSlots, None),
+'getImages': (None, ['id', 'imageName', 'imageSize']),
+'copyImage': (None, None),
'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
'destroyMany': (destroyMany, None),
'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
@@ -209,6 +230,9 @@ argLists = {
'migrateVm': [('instance', checkIid, lambda: requiredArg('instance'), True), ('targetHostId', int, lambda: requiredArg('targetHostId'), True)],
'pauseVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'unpauseVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
+'getSlots': [('cores', int, lambda: 1, False), ('memory', int, lambda: 128, False)],
+'getImages': [],
+'copyImage': [('src', str, lambda: requiredArg('src'),True), ('dst', str, lambda: requiredArg('dst'), True)],
'getHosts': [],
'getUsers': [],
'getNetworks': [],
@@ -219,7 +243,7 @@ argLists = {
'unregisterHost': [('hostId', int, lambda: requiredArg('hostId'), True)],
}
-# Used to convert the dictionary built from the arguments into an object that can be used by thrift
+# Used to convert the dictionary built from the arguments into an object that can be used by rpyc
convertArgs = {
'createVm': '[Instance(d={"userId":userId,"name":name,"cores":cores,"memory":memory,"disks":disks,"nics":nics,"hints":hints})]',
'createMany': '[Instance(d={"userId":userId,"name":basename,"cores":cores,"memory":memory,"disks":disks,"nics":nics,"hints":hints}), count]',
@@ -233,6 +257,8 @@ convertArgs = {
'unpauseVm': '[instance]',
'vmmSpecificCall': '[instance, arg]',
'unregisterHost' : '[hostId]',
+'getSlots' : '[cores, memory]',
+'copyImage' : '[src, dst]',
}
# Descriptions
@@ -247,14 +273,17 @@ description = {
'migrateVm': 'Live-migrates a VM to a different host',
'pauseVm': 'Pauses a running VM',
'unpauseVm': 'Unpauses a paused VM',
+'getSlots': 'Get a count of how many VMs could be started in the cluster',
'getHosts': 'Gets a list of hosts running Node Managers',
'getUsers': 'Gets a list of users',
'getNetworks': 'Gets a list of available networks for VMs to be placed on',
-'getInstances': 'Gets a list of all VMs in Tashi',
+'getInstances': 'Gets a list of all VMs in the cluster',
'getMyInstances': 'Utility function that only lists VMs owned by the current user',
'getVmLayout': 'Utility function that displays what VMs are placed on what hosts',
-'vmmSpecificCall': 'Direct access to VMM-specific functionality',
+'vmmSpecificCall': 'Direct access to VM manager specific functionality',
'unregisterHost' : 'Unregisters host. Registration happens when starting node manager',
+'getImages' : 'Gets a list of available VM images',
+'copyImage' : 'Copies a VM image',
}
# Example use strings
@@ -266,15 +295,18 @@ examples = {
'destroyMany': ['--basename foobar'],
'suspendVm': ['--instance 12345', '--instance foobar'],
'resumeVm': ['--instance 12345', '--instance foobar'],
-'migrateVm': ['--instanc 12345 --targetHostId 73', '--instance foobar --targetHostId 73'],
+'migrateVm': ['--instance 12345 --targetHostId 73', '--instance foobar --targetHostId 73'],
'pauseVm': ['--instance 12345', '--instance foobar'],
-'unpauseVm': ['---instance 12345', '--instance foobar'],
+'unpauseVm': ['--instance 12345', '--instance foobar'],
+'getSlots': ['--cores 1 --memory 128'],
'getHosts': [''],
'getUsers': [''],
'getNetworks': [''],
'getInstances': [''],
'getMyInstances': [''],
'getVmLayout': [''],
+'getImages': [''],
+'copyImage': ['--src src.qcow2 --dst dst.qcow2'],
'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar --arg stopVnc'],
'unregisterHost' : ['--hostId 2'],
}
@@ -359,7 +391,6 @@ def makeTable(list, keys=None):
stdout = os.popen("stty size")
r = stdout.read()
stdout.close()
- (consoleHeight, consoleWidth) = map(lambda x: int(x.strip()), r.split())
except:
pass
for obj in list:
@@ -480,30 +511,57 @@ def main():
usage()
function = matchFunction(sys.argv[1])
(config, configFiles) = getConfig(["Client"])
- possibleArgs = argLists[function]
+
+ # build a structure of possible arguments
+ possibleArgs = {}
+ argList = argLists[function]
+ for i in range(0, len(argList)):
+ possibleArgs[argList[i][0]]=argList[i]
+
args = sys.argv[2:]
- for arg in args:
- if (arg == "--help" or arg == "--examples"):
- usage(function)
+
+ vals = {}
+
try:
- vals = {}
+ # create client handle
client = createClient(config)
- for parg in possibleArgs:
+
+ # set defaults
+ for parg in possibleArgs.values():
(parg, conv, default, required) = parg
- val = None
- for i in range(0, len(args)):
- arg = args[i]
- if (arg.startswith("--") and arg[2:] == parg):
- val = conv(args[i+1])
- if (val == None):
- val = default()
- vals[parg] = val
- for arg in args:
+ if (required is False):
+ vals[parg] = default()
+
+ while (len(args) > 0):
+ arg = args.pop(0)
+
+ if (arg == "--help" or arg == "--examples"):
+ usage(function)
+ # this exits
+
if (arg.startswith("--hide-")):
show_hide.append((False, arg[7:]))
+ continue
+
if (arg.startswith("--show-")):
show_hide.append((True, arg[7:]))
+ continue
+
+ if (arg.startswith("--")):
+ if (arg[2:] in possibleArgs):
+ (parg, conv, default, required) = possibleArgs[arg[2:]]
+ val = conv(args.pop(0))
+ if (val == None):
+ val = default()
+
+ vals[parg] = val
+ continue
+
+ raise ValueError("Unknown argument %s" % (arg))
+
+
f = getattr(client, function, None)
+
if (f is None):
f = extraViews[function][0]
if (function in convertArgs):
@@ -526,8 +584,8 @@ def main():
print "TashiException:"
print e.msg
exitCode = e.errno
- except Exception, e:
- print e
+# except Exception, e:
+# print e
# XXXstroucki: exception may be unrelated to usage of function
# so don't print usage on exception as if there were a problem
# with the arguments
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py Wed Feb 8 04:44:09 2012
@@ -30,7 +30,9 @@ import tashi
from tashi.rpycservices import rpycservices
from rpyc.utils.server import ThreadedServer
-from rpyc.utils.authenticators import VdbAuthenticator
+from rpyc.utils.authenticators import TlsliteVdbAuthenticator
+
+log = None
def startClusterManager(config):
global service, data
@@ -47,11 +49,11 @@ def startClusterManager(config):
users[user.name] = user.passwd
users[config.get('AllowedUsers', 'nodeManagerUser')] = config.get('AllowedUsers', 'nodeManagerPassword')
users[config.get('AllowedUsers', 'agentUser')] = config.get('AllowedUsers', 'agentPassword')
- authenticator = VdbAuthenticator.from_dict(users)
+ authenticator = TlsliteVdbAuthenticator.from_dict(users)
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False)
- t.logger.quiet = True
+ t.logger.setLevel(logging.ERROR)
t.service.service = service
t.service._type = 'ClusterManagerService'
@@ -64,6 +66,8 @@ def startClusterManager(config):
@signalHandler(signal.SIGTERM)
def handleSIGTERM(signalNumber, stackFrame):
+ global log
+
log.info('Exiting cluster manager after receiving a SIGINT signal')
sys.exit(0)
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py Wed Feb 8 04:44:09 2012
@@ -1,4 +1,4 @@
- # Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
@@ -42,21 +42,26 @@ class ClusterManagerService(object):
self.dfs = dfs
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
- self.lastContacted = {}
- self.decayedHosts = {}
- self.decayedInstances = {}
+ self.hostLastContactTime = {}
+ #self.hostLastUpdateTime = {}
+ self.instanceLastContactTime = {}
self.expireHostTime = float(self.config.get('ClusterManagerService', 'expireHostTime'))
self.allowDecayed = float(self.config.get('ClusterManagerService', 'allowDecayed'))
self.allowMismatchedVersions = boolean(self.config.get('ClusterManagerService', 'allowMismatchedVersions'))
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
- now = time.time()
+ now = self.__now()
for instance in self.data.getInstances().itervalues():
instanceId = instance.id
instance = self.data.acquireInstance(instanceId)
instance.decayed = False
- self.stateTransition(instance, None, InstanceState.Orphaned)
+
+ if instance.hostId is None:
+ self.stateTransition(instance, None, InstanceState.Pending)
+ else:
+ self.stateTransition(instance, None, InstanceState.Orphaned)
+
self.data.releaseInstance(instance)
for host in self.data.getHosts().itervalues():
hostId = host.id
@@ -64,101 +69,177 @@ class ClusterManagerService(object):
host.up = False
host.decayed = False
self.data.releaseHost(host)
- self.decayLock = threading.Lock()
- threading.Thread(target=self.monitorHosts).start()
+ threading.Thread(target=self.monitorCluster).start()
def stateTransition(self, instance, old, cur):
if (old and instance.state != old):
- self.data.releaseInstance(instance)
raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
+ if (instance.state == cur):
+ # don't do anything if we're already at current state
+ return
+
instance.state = cur
+ # pass something down to the NM?
- def updateDecay(self, set, obj):
- now = time.time()
- self.decayLock.acquire()
- if (obj.decayed and obj.id not in set):
- set[obj.id] = now
- elif (not obj.decayed and obj.id in set):
- del set[obj.id]
- self.decayLock.release()
+ def __now(self):
+ return time.time()
+
+ def __downHost(self, host):
+ self.log.warning('Host %s is down' % (host.name))
+ host.up = False
+ host.decayed = False
+
+ self.__orphanInstances(host)
+
+ def __upHost(self, host):
+ self.log.warning('Host %s is up' % (host.name))
+ host.up = True
+ host.decayed = True
+
+ def __orphanInstances(self, host):
+ # expects lock to be held on host
+ instances = [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId == host.id]
+
+ for instanceId in instances:
+ instance = self.data.acquireInstance(instanceId)
+ if instance.hostId == host.id:
+ instance.decayed = True
+ self.stateTransition(instance, None, InstanceState.Orphaned)
+
+ self.data.releaseInstance(instance)
+
+ def __checkHosts(self):
+ # Check if hosts have been heard from recently
+ # Otherwise, see if it is alive
+
+ for hostId in self.hostLastContactTime.keys():
+ if (self.hostLastContactTime[hostId] < (self.__now() - self.expireHostTime)):
+ host = self.data.acquireHost(hostId)
+ string = None
+ try:
+ string = self.proxy[host.name].liveCheck()
+ except:
+ pass
+
+ if string != "alive":
+ self.__downHost(host)
+ del self.hostLastContactTime[hostId]
+ else:
+ self.__upHost(host)
+ self.hostLastContactTime[hostId] = self.__now()
+
+ self.data.releaseHost(host)
+
+ def __checkInstances(self):
+ # Reconcile instances with nodes
+
+ # obtain a list of instances I know about
+ myInstancesError = False
+ try:
+ myInstances = self.data.getInstances()
+ except:
+ myInstancesError = True
+ self.log.warning('Failure communicating with my database')
+
+ if myInstancesError == True:
+ return
+
+ # iterate through all hosts I believe are up
+ for hostId in self.hostLastContactTime.keys():
+ #self.log.warning("iterate %d" % hostId)
+ host = self.data.acquireHost(hostId)
+ if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+ host.decayed = True
+
+ self.log.info('Fetching state from host %s because it is decayed' % (host.name))
+
+ myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
+
+ # get a list of VMs running on host
+ try:
+ hostProxy = self.proxy[host.name]
+ remoteInstances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
+ except:
+ self.log.warning('Failure getting instances from host %s' % (host.name))
+ self.data.releaseHost(host)
+ continue
+
+ # register instances I don't know about
+ for instance in remoteInstances:
+ if (instance.id not in myInstances):
+ instance.hostId = host.id
+ instance = self.data.registerInstance(instance)
+ self.data.releaseInstance(instance)
+ remoteInstanceIds = [i.id for i in remoteInstances]
+ # remove instances that shouldn't be running
+ for instance in myInstancesThisHost:
+ if (instance.id not in remoteInstanceIds):
+ # XXXstroucki before 20110902 excepted here with host lock
+ try:
+ instance = self.data.acquireInstance(instance.id)
+ except:
+ continue
+
+ # XXXstroucki destroy?
+ try:
+ del self.instanceLastContactTime[instance.id]
+ except:
+ pass
+ self.data.removeInstance(instance)
+
+ self.hostLastContactTime[hostId] = self.__now()
+ host.decayed = False
+
+ self.data.releaseHost(host)
+ #self.log.warning("iterate %d done" % hostId)
+ # iterate through all VMs I believe are active
+ for instanceId in self.instanceLastContactTime.keys():
+ if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+ try:
+ instance = self.data.acquireInstance(instanceId)
+ except:
+ continue
+
+ instance.decayed = True
+ self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
+ if instance.hostId is None: raise AssertionError
- def monitorHosts(self):
- # XXX: retry multiple hosts (iterate through them even with an exception)
+ # XXXstroucki check if host is down?
+ host = self.data.getHost(instance.hostId)
+
+ # get updated state on VM
+ try:
+ hostProxy = self.proxy[host.name]
+ newInstance = hostProxy.getVmInfo(instance.vmId)
+ except:
+ self.log.warning('Failure getting data for instance %s from host %s' % (instance.name, host.name))
+ self.data.releaseInstance(instance)
+ continue
+
+ # replace existing state with new state
+ # XXXstroucki more?
+ instance.state = newInstance.state
+ self.instanceLastContactTime[instanceId] = self.__now()
+ instance.decayed = False
+ self.data.releaseInstance(instance)
+
+
+
+ def monitorCluster(self):
while True:
- now = time.time()
sleepFor = min(self.expireHostTime, self.allowDecayed)
- try:
- for k in self.lastContacted.keys():
- if (self.lastContacted[k] < (now-self.expireHostTime)):
- host = self.data.acquireHost(k)
- try:
- self.log.warning('Host %s has expired after %f seconds' % (host.name, self.expireHostTime))
- for instanceId in [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId == host.id]:
- instance = self.data.acquireInstance(instanceId)
- instance.decayed = True
- self.stateTransition(instance, None, InstanceState.Orphaned)
- self.data.releaseInstance(instance)
- host.up = False
- host.decayed = False
- finally:
- self.data.releaseHost(host)
- del self.lastContacted[k]
- else:
- sleepFor = min(self.lastContacted[k] + self.expireHostTime - now, sleepFor)
- for hostId in self.decayedHosts.keys():
- # XXXstroucki: what if the host undecays here?
- if (self.decayedHosts[hostId] < (now-self.allowDecayed)):
- host = self.data.getHost(hostId)
- self.log.warning('Fetching state from host %s because it is decayed' % (host.name))
- hostProxy = self.proxy[host.name]
- oldInstances = [i for i in self.data.getInstances().values() if i.hostId == host.id]
- instances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
- instanceIds = [i.id for i in instances]
- for instance in instances:
- if (instance.id not in self.data.getInstances()):
- instance.hostId = host.id
- instance = self.data.registerInstance(instance)
- self.data.releaseInstance(instance)
- for instance in oldInstances:
- if (instance.id not in instanceIds):
- instance = self.data.acquireInstance(instance.id)
- self.data.removeInstance(instance)
- self.decayedHosts[hostId] = now
- else:
- sleepFor = min(self.decayedHosts[hostId] + self.allowDecayed - now, sleepFor)
- for instanceId in self.decayedInstances.keys():
- try:
- if (self.decayedInstances[instanceId] < (now-self.allowDecayed)):
- self.log.warning('Fetching state on instance %d because it is decayed' % (instanceId))
- try:
- instance = self.data.getInstance(instanceId)
- if instance.hostId is None: raise AssertionError
- except TashiException, e:
- if (e.errno == Errors.NoSuchInstanceId):
- del self.decayedInstances[instanceId]
- continue
- else:
- raise
- host = self.data.getHost(instance.hostId)
- hostProxy = self.proxy[host.name]
- instance = hostProxy.getVmInfo(instance.vmId)
- oldInstance = self.data.acquireInstance(instanceId)
- oldInstance.state = instance.state
- self.data.releaseInstance(oldInstance)
- self.decayedInstances[instanceId] = now
- else:
- sleepFor = min(self.decayedInstances[instanceId] + self.allowDecayed - now, sleepFor)
- except (KeyError, TashiException):
- self.log.warning("Don't know about instance %d anymore." % instanceId)
- self.data.removeInstance(instance)
- except Exception, e:
- self.log.exception('Exception in monitorHosts trying to get instance information')
- except Exception, e:
- self.log.exception('Exception in monitorHosts')
+ try:
+ self.__checkHosts()
+ self.__checkInstances()
+ except:
+ self.log.exception('monitorCluster iteration failed')
+ # XXXrgass too chatty. Remove
+ #self.log.info("Sleeping for %d seconds" % sleepFor)
time.sleep(sleepFor)
-
+
+
def normalize(self, instance):
instance.id = None
instance.vmId = None
@@ -166,7 +247,7 @@ class ClusterManagerService(object):
instance.decayed = False
instance.name = scrubString(instance.name, allowed="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-.")
instance.state = InstanceState.Pending
- # At some point, check userId
+ # XXXstroucki At some point, check userId
if (not self.allowDuplicateNames):
for i in self.data.getInstances().itervalues():
if (i.name == instance.name):
@@ -181,7 +262,7 @@ class ClusterManagerService(object):
raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Amount of memory must be <= %d" % (self.maxMemory)})
# Make sure disk spec is valid
# Make sure network spec is valid
- # Ignore hints
+ # Ignore internal hints
for hint in instance.hints:
if (hint.startswith("__")):
del instance.hints[hint]
@@ -189,6 +270,7 @@ class ClusterManagerService(object):
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
+ # XXXstroucki: check for exception here
instance = self.normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
@@ -217,8 +299,6 @@ class ClusterManagerService(object):
# XXXstroucki: This is a problem with keeping
# clean state.
self.stateTransition(instance, None, InstanceState.Destroying)
- self.data.releaseInstance(instance)
-
if instance.hostId is None:
self.data.removeInstance(instance)
else:
@@ -226,8 +306,9 @@ class ClusterManagerService(object):
try:
if hostname is not None:
self.proxy[hostname].destroyVm(instance.vmId)
- except Exception:
- self.log.exception('destroyVm failed on host %s vmId %d' % (hostname, instance.vmId))
+ self.data.releaseInstance(instance)
+ except:
+ self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
self.data.removeInstance(instance)
@@ -284,11 +365,12 @@ class ClusterManagerService(object):
except Exception, e:
self.log.exception('migrateVm failed')
raise
- #instance = self.data.acquireInstance(instance.id)
- #try:
- # instance.hostId = targetHost.id
- #finally:
- # self.data.releaseInstance(instance)
+ try:
+ instance = self.data.acquireInstance(instance.id)
+ instance.hostId = targetHost.id
+ finally:
+ self.data.releaseInstance(instance)
+
try:
# Notify the target
vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
@@ -338,7 +420,23 @@ class ClusterManagerService(object):
def getInstances(self):
return self.data.getInstances().values()
+
+ def getImages(self):
+ return self.data.getImages()
+ def copyImage(self, src, dst):
+ imageSrc = self.dfs.getLocalHandle("images/" + src)
+ imageDst = self.dfs.getLocalHandle("images/" + dst)
+ try:
+ # Attempt to restrict to the image directory
+ if ".." not in imageSrc and ".." not in imageDst:
+ self.dfs.copy(imageSrc, imageDst)
+ self.log.info('DFS image copy: %s->%s' % (imageSrc, imageDst))
+ else:
+ self.log.warning('DFS image copy bad path: %s->%s' % (imageSrc, imageDst))
+ except Exception, e:
+ self.log.exception('DFS image copy failed: %s (%s->%s)' % (e, imageSrc, imageDst))
+
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
@@ -352,93 +450,75 @@ class ClusterManagerService(object):
# @timed
def registerNodeManager(self, host, instances):
"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
+
+ # Handle a new registration
if (host.id == None):
hostList = [h for h in self.data.getHosts().itervalues() if h.name == host.name]
if (len(hostList) != 1):
raise TashiException(d={'errno':Errors.NoSuchHost, 'msg':'A host with name %s is not identifiable' % (host.name)})
host.id = hostList[0].id
+
+ # Check if remote host information matches mine
oldHost = self.data.acquireHost(host.id)
if (oldHost.name != host.name):
self.data.releaseHost(oldHost)
raise TashiException(d={'errno':Errors.NoSuchHostId, 'msg':'Host id and hostname mismatch'})
- try:
- try:
- self.lastContacted[host.id] = time.time()
- oldHost.version = host.version
- oldHost.memory = host.memory
- oldHost.cores = host.cores
- oldHost.up = True
- oldHost.decayed = False
-
-# compare whether CM / NM versions are compatible
- if (host.version != version and not self.allowMismatchedVersions):
- oldHost.state = HostState.VersionMismatch
- if (host.version == version and oldHost.state == HostState.VersionMismatch):
- oldHost.state = HostState.Normal
- for instance in instances:
- try:
- oldInstance = self.data.acquireInstance(instance.id)
- except TashiException, e:
- if (e.errno == Errors.NoSuchInstanceId):
- self.log.info('Host %s reported an instance %d that did not previously exist (decay)' % (host.name, instance.id))
- oldHost.decayed = True
- continue
- #oldInstance = self.data.registerInstance(instance)
- else:
- raise
- try:
- if (oldInstance.hostId != host.id):
- self.log.info('Host %s is claiming instance %d actually owned by hostId %s (decay)' % (host.name, oldInstance.id, str(oldInstance.hostId)))
- oldHost.decayed = True
- continue
- oldInstance.decayed = (oldInstance.state != instance.state)
- self.updateDecay(self.decayedInstances, oldInstance)
- if (oldInstance.decayed):
- self.log.info('State reported as %s instead of %s for instance %d on host %s (decay)' % (vmStates[instance.state], vmStates[oldInstance.state], instance.id, host.name))
- finally:
- self.data.releaseInstance(oldInstance)
- instanceIds = [instance.id for instance in instances]
- for instanceId in [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId == host.id]:
- if (instanceId not in instanceIds):
- self.log.info('instance %d was not reported by host %s as expected (decay)' % (instanceId, host.name))
- instance = self.data.acquireInstance(instanceId)
- instance.decayed = True
- self.updateDecay(self.decayedInstances, instance)
- oldHost.decayed = True
- self.data.releaseInstance(instance)
- except Exception, e:
- oldHost.decayed = True
- raise
- finally:
- self.updateDecay(self.decayedHosts, oldHost)
- self.data.releaseHost(oldHost)
+ if oldHost.up == False:
+ self.__upHost(oldHost)
+ self.hostLastContactTime[host.id] = time.time()
+ #self.hostLastUpdateTime[host.id] = time.time()
+ oldHost.version = host.version
+ oldHost.memory = host.memory
+ oldHost.cores = host.cores
+
+ # compare whether CM / NM versions are compatible
+ if (host.version != version and not self.allowMismatchedVersions):
+ oldHost.state = HostState.VersionMismatch
+ if (host.version == version and oldHost.state == HostState.VersionMismatch):
+ oldHost.state = HostState.Normal
+
+ # let the host communicate what it is running
+ # XXXrgass - This is too chatty for the console, I think we should remove this.
+ # XXXstroucki - My install depends on this, but I output to log files. This should be handled by a separate accounting server in future.
+ for instance in instances:
+ self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
+ self.instanceLastContactTime.setdefault(instance.id, 0)
+
+ self.data.releaseHost(oldHost)
return host.id
def vmUpdate(self, instanceId, instance, oldState):
try:
oldInstance = self.data.acquireInstance(instanceId)
except TashiException, e:
+ # shouldn't have a lock to clean up after here
if (e.errno == Errors.NoSuchInstanceId):
- self.log.exception('Got vmUpdate for unknown instanceId %d' % (instanceId))
+ self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
return
- else:
- raise
+ except:
+ self.log.exception("Could not acquire instance")
+ raise
+
+ self.instanceLastContactTime[instanceId] = time.time()
+ oldInstance.decayed = False
+
if (instance.state == InstanceState.Exited):
- oldInstance.decayed = False
- self.updateDecay(self.decayedInstances, oldInstance)
+ # determine why a VM has exited
hostname = self.data.getHost(oldInstance.hostId).name
if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
- self.log.warning('Unexpected exit on %s of instance %d (vmId %d)' % (hostname, instanceId, oldInstance.vmId))
+ self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
if (oldInstance.state == InstanceState.Suspending):
self.stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
oldInstance.hostId = None
oldInstance.vmId = None
self.data.releaseInstance(oldInstance)
else:
+ del self.instanceLastContactTime[oldInstance.id]
self.data.removeInstance(oldInstance)
else:
if (instance.state):
+ # XXXstroucki does this matter?
if (oldState and oldInstance.state != oldState):
self.log.warning('Got vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
oldInstance.state = instance.state
@@ -452,13 +532,14 @@ class ClusterManagerService(object):
for oldNic in oldInstance.nics:
if (oldNic.mac == nic.mac):
oldNic.ip = nic.ip
- oldInstance.decayed = False
- self.updateDecay(self.decayedInstances, oldInstance)
+
self.data.releaseInstance(oldInstance)
- return
+
+ return "success"
def activateVm(self, instanceId, host):
dataHost = self.data.acquireHost(host.id)
+
if (dataHost.name != host.name):
self.data.releaseHost(dataHost)
raise TashiException(d={'errno':Errors.HostNameMismatch,'msg':"Mismatched target host"})
@@ -468,14 +549,20 @@ class ClusterManagerService(object):
if (dataHost.state != HostState.Normal):
self.data.releaseHost(dataHost)
raise TashiException(d={'errno':Errors.HostStateError,'msg':"Target host state is not normal"})
+
self.data.releaseHost(dataHost)
instance = self.data.acquireInstance(instanceId)
+
if ('__resume_source' in instance.hints):
self.stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
else:
- self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+ # XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
+ #self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+ self.stateTransition(instance, None, InstanceState.Activating)
+
instance.hostId = host.id
self.data.releaseInstance(instance)
+
try:
if ('__resume_source' in instance.hints):
vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
@@ -486,24 +573,32 @@ class ClusterManagerService(object):
if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization and initialization fails
self.data.removeInstance(instance)
else:
+ # XXXstroucki what can we do about pending hosts in the scheduler?
+ # put them at the end of the queue and keep trying?
self.stateTransition(instance, None, InstanceState.Held)
instance.hostId = None
self.data.releaseInstance(instance)
- raise
+ return "failure"
+
instance = self.data.acquireInstance(instanceId)
instance.vmId = vmId
+
if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization
- self.data.releaseInstance(instance)
try:
self.proxy[host.name].destroyVm(vmId)
+ self.data.removeInstance(instance)
except Exception:
self.log.exception('destroyVm failed for host %s vmId %d' % (host.name, instance.vmId))
- raise
+ self.data.releaseInstance(instance)
+ return "failure"
else:
if ('__resume_source' not in instance.hints):
- self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
- self.data.releaseInstance(instance)
- return
+ # XXXstroucki should we just wait for NM to update?
+ #self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
+ pass
+
+ self.data.releaseInstance(instance)
+ return "success"
def registerHost(self, hostname, memory, cores, version):
hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py Wed Feb 8 04:44:09 2012
@@ -63,7 +63,7 @@ class DataInterface(object):
def getUser(self, id):
raise NotImplementedError
-
+
def registerHost(self, hostname, memory, cores, version):
raise NotImplementedError
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py Wed Feb 8 04:44:09 2012
@@ -17,14 +17,17 @@
import subprocess
import time
-from tashi.rpycservices.rpyctypes import User
+import os
+from tashi.rpycservices.rpyctypes import User, LocalImages
from tashi.clustermanager.data import DataInterface
-from tashi.util import instantiateImplementation
+from tashi.util import instantiateImplementation, humanReadable
class GetentOverride(DataInterface):
def __init__(self, config):
DataInterface.__init__(self, config)
self.baseDataObject = instantiateImplementation(config.get("GetentOverride", "baseData"), config)
+ self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
+
self.users = {}
self.lastUserUpdate = 0.0
self.fetchThreshold = float(config.get("GetentOverride", "fetchThreshold"))
@@ -64,6 +67,17 @@ class GetentOverride(DataInterface):
def getNetwork(self, id):
return self.baseDataObject.getNetwork(id)
+
+ def getImages(self):
+ count = 0
+ myList = []
+ for i in self.dfs.list("images"):
+ myFile = self.dfs.getLocalHandle("images/" + i)
+ if os.path.isfile(myFile):
+ image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+ myList.append(image)
+ count += 1
+ return myList
def fetchFromGetent(self):
now = time.time()
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py Wed Feb 8 04:44:09 2012
@@ -67,7 +67,7 @@ class LdapOverride(DataInterface):
def getNetwork(self, id):
return self.baseDataObject.getNetwork(id)
-
+
def fetchFromLdap(self):
now = time.time()
if (now - self.lastUserUpdate > self.fetchThreshold):
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py Wed Feb 8 04:44:09 2012
@@ -19,20 +19,23 @@ import logging
import threading
import time
import types
+# XXXstroucki getImages needs os?
+import os
from tashi.rpycservices.rpyctypes import *
from tashi.clustermanager.data.datainterface import DataInterface
-from tashi.util import stringPartition, boolean
+from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
class SQL(DataInterface):
def __init__(self, config):
DataInterface.__init__(self, config)
self.uri = self.config.get("SQL", "uri")
self.log = logging.getLogger(__name__)
+ self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
if (self.uri.startswith("sqlite://")):
import sqlite
self.dbEngine = "sqlite"
- self.conn = sqlite.connect(self.uri[9:], autocommit=1)
+ self.conn = sqlite.connect(self.uri[9:], autocommit=1, timeout=1500)
elif (self.uri.startswith("mysql://")):
import MySQLdb
self.dbEngine = "mysql"
@@ -49,6 +52,7 @@ class SQL(DataInterface):
self.instanceLock = threading.Lock()
self.instanceIdLock = threading.Lock()
self.instanceLocks = {}
+ self.instanceBusy = {}
self.hostLock = threading.Lock()
self.hostLocks = {}
self.maxInstanceId = 1
@@ -63,15 +67,20 @@ class SQL(DataInterface):
try:
cur.execute(stmt)
except:
- self.log.exception('Exception executing SQL statement')
+ self.log.exception('Exception executing SQL statement %s' % stmt)
finally:
self.sqlLock.release()
return cur
def getNewInstanceId(self):
self.instanceIdLock.acquire()
- instanceId = self.maxInstanceId
+ cur = self.executeStatement("SELECT MAX(id) FROM instances")
+ self.maxInstanceId = cur.fetchone()[0]
+ # XXXstroucki perhaps this can be handled nicer
+ if (self.maxInstanceId is None):
+ self.maxInstanceId = 0
self.maxInstanceId = self.maxInstanceId + 1
+ instanceId = self.maxInstanceId
self.instanceIdLock.release()
return instanceId
@@ -135,6 +144,7 @@ class SQL(DataInterface):
instance._lock = threading.Lock()
self.instanceLocks[instance.id] = instance._lock
instance._lock.acquire()
+ self.instanceBusy[instance.id] = True
l = self.makeInstanceList(instance)
self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
finally:
@@ -142,7 +152,13 @@ class SQL(DataInterface):
return instance
def acquireInstance(self, instanceId):
- self.instanceLock.acquire()
+ busyCheck = True
+ while busyCheck == True:
+ self.instanceLock.acquire()
+ busyCheck = self.instanceBusy.setdefault(instanceId, False)
+ if busyCheck:
+ self.instanceLock.release()
+
try:
cur = self.executeStatement("SELECT * from instances WHERE id = %d" % (instanceId))
l = cur.fetchone()
@@ -152,8 +168,10 @@ class SQL(DataInterface):
self.instanceLocks[instance.id] = self.instanceLocks.get(instance.id, threading.Lock())
instance._lock = self.instanceLocks[instance.id]
instance._lock.acquire()
+ self.instanceBusy[instance.id] = True
finally:
self.instanceLock.release()
+
return instance
def releaseInstance(self, instance):
@@ -166,7 +184,11 @@ class SQL(DataInterface):
if (e < len(self.instanceOrder)-1):
s = s + ", "
self.executeStatement("UPDATE instances SET %s WHERE id = %d" % (s, instance.id))
+ self.instanceBusy[instance.id] = False
instance._lock.release()
+ except:
+ self.log.exception("Excepted while holding lock")
+ raise
finally:
self.instanceLock.release()
@@ -174,8 +196,13 @@ class SQL(DataInterface):
self.instanceLock.acquire()
try:
self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
- instance._lock.release()
+ #XXXstroucki extraneous instance won't have a lock
+ try:
+ instance._lock.release()
+ except:
+ pass
del self.instanceLocks[instance.id]
+ del self.instanceBusy[instance.id]
finally:
self.instanceLock.release()
@@ -258,6 +285,17 @@ class SQL(DataInterface):
r = cur.fetchone()
network = Network(d={'id':r[0], 'name':r[1]})
return network
+
+ def getImages(self):
+ count = 0
+ myList = []
+ for i in self.dfs.list("images"):
+ myFile = self.dfs.getLocalHandle("images/" + i)
+ if os.path.isfile(myFile):
+ image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+ myList.append(image)
+ count += 1
+ return myList
def getUsers(self):
cur = self.executeStatement("SELECT * from users")
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py Wed Feb 8 04:44:09 2012
@@ -28,7 +28,7 @@ from tashi import boolean
from tashi.rpycservices import rpycservices
from rpyc.utils.server import ThreadedServer
-from rpyc.utils.authenticators import VdbAuthenticator
+from rpyc.utils.authenticators import TlsliteVdbAuthenticator
@signalHandler(signal.SIGTERM)
def handleSIGTERM(signalNumber, stackFrame):
@@ -51,11 +51,11 @@ def main():
if boolean(config.get("Security", "authAndEncrypt")):
users = {}
users[config.get('AllowedUsers', 'clusterManagerUser')] = config.get('AllowedUsers', 'clusterManagerPassword')
- authenticator = VdbAuthenticator.from_dict(users)
+ authenticator = TlsliteVdbAuthenticator.from_dict(users)
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False)
- t.logger.quiet = True
+ t.logger.setLevel(logging.ERROR)
t.service.service = service
t.service._type = 'NodeManagerService'
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py Wed Feb 8 04:44:09 2012
@@ -71,12 +71,9 @@ class NodeManagerService(object):
def loadVmInfo(self):
try:
- f = open(self.infoFile, "r")
- data = f.read()
- f.close()
- self.instances = cPickle.loads(data)
+ self.instances = self.vmm.getInstances()
except Exception, e:
- self.log.warning('Failed to load VM info from %s' % (self.infoFile))
+ self.log.exception('Failed to obtain VM info')
self.instances = {}
def saveVmInfo(self):
@@ -94,6 +91,12 @@ class NodeManagerService(object):
self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
if (cur == InstanceState.Exited):
del self.instances[vmId]
+ return True
+
+ if (instance.state == cur):
+ # Don't do anything if state is what it should be
+ return True
+
instance.state = cur
newInst = Instance(d={'state':cur})
success = lambda: None
@@ -168,6 +171,8 @@ class NodeManagerService(object):
try:
host = self.vmm.getHostInfo(self)
instances = self.instances.values()
+ #import pprint
+ #self.log.warning("Instances: " + pprint.saferepr(instances))
self.id = cm.registerNodeManager(host, instances)
except Exception, e:
self.log.exception('Failed to register with the CM')
@@ -291,6 +296,9 @@ class NodeManagerService(object):
def listVms(self):
return self.instances.keys()
+
+ def liveCheck(self):
+ return "alive"
def statsThread(self):
if (self.statsInterval == 0):
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py Wed Feb 8 04:44:09 2012
@@ -27,6 +27,9 @@ import subprocess
import sys
import time
+# for scratch space support
+from os import system
+
from tashi.rpycservices.rpyctypes import *
from tashi.util import broken, logged, scrubString, boolean
from tashi import version, stringPartition
@@ -102,6 +105,8 @@ class Qemu(VmControlInterface):
self.consolePortLock = threading.Lock()
self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
self.stats = {}
+ self.scratchVg = self.config.get("Qemu", "scratchVg")
+ # XXXstroucki revise
self.scratchDir = self.config.get("Qemu", "scratchDir")
if len(self.scratchDir) == 0:
self.scratchDir = "/tmp"
@@ -118,7 +123,7 @@ class Qemu(VmControlInterface):
class anonClass:
def __init__(self, **attrs):
self.__dict__.update(attrs)
-
+
def getSystemPids(self):
"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
pids = []
@@ -130,15 +135,25 @@ class Qemu(VmControlInterface):
except Exception:
pass
return pids
-
+
+ def getInstances(self):
+ """Will return a dict of instances by vmId to the caller"""
+ return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
+
def matchSystemPids(self, controlledVMs):
"""This is run in a separate polling thread and it must do things that are thread safe"""
+ if self.nm is None:
+ #XXXstroucki log may not be there yet either
+ #self.log.info("NM hook not yet available")
+ return
+
vmIds = controlledVMs.keys()
pids = self.getSystemPids()
for vmId in vmIds:
+ child = controlledVMs[vmId]
+
if vmId not in pids:
os.unlink(self.INFO_DIR + "/%d"%(vmId))
- child = controlledVMs[vmId]
del controlledVMs[vmId]
try:
del self.stats[vmId]
@@ -163,11 +178,30 @@ class Qemu(VmControlInterface):
for i in child.monitorHistory:
f.write(i)
f.close()
+ #XXXstroucki remove scratch storage
+ try:
+ if self.scratchVg is not None:
+ scratch_name = child.instance.name
+ log.info("Removing any scratch for " + scratch_name)
+ cmd = "/sbin/lvremove -f %s" % self.scratchVg
+ result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
+ except:
+ pass
+
try:
if (not child.migratingOut):
self.nm.vmStateChange(vmId, None, InstanceState.Exited)
except Exception, e:
log.exception("vmStateChange failed")
+ else:
+ try:
+ if (child.migratingOut):
+ self.nm.vmStateChange(vmId, None, InstanceState.MigrateTrans)
+ else:
+ self.nm.vmStateChange(vmId, None, InstanceState.Running)
+ except:
+ #XXXstroucki nm is initialised at different time
+ log.exception("vmStateChange failed")
def scanInfoDir(self):
@@ -185,13 +219,21 @@ class Qemu(VmControlInterface):
self.vncPortLock.release()
child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
child.monitor = os.fdopen(child.monitorFd)
+
+ #XXXstroucki ensure instance has vmId
+ child.instance.vmId = vmId
+
self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
except Exception, e:
log.exception("Failed to load VM info for %d", vmId)
else:
log.info("Loaded VM info for %d", vmId)
- self.matchSystemPids(self.controlledVMs)
+ # XXXstroucki NM may not be available yet here.
+ try:
+ self.matchSystemPids(self.controlledVMs)
+ except:
+ pass
def pollVMsLoop(self):
"""Infinite loop that checks for dead VMs"""
@@ -262,7 +304,8 @@ class Qemu(VmControlInterface):
res = self.consumeAvailable(child)
os.write(child.monitorFd, command + "\n")
if (expectPrompt):
- self.consumeUntil(child, command)
+ # XXXstroucki: receiving a vm can take a long time
+ self.consumeUntil(child, command, timeout=timeout)
res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
return res
@@ -300,13 +343,8 @@ class Qemu(VmControlInterface):
cmd = "head -n 1 /proc/meminfo"
memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
if (memoryStr[2] == "kB"):
- host.memory = int(memoryStr[1])/1024
- elif (memoryStr[2] == "mB"):
- host.memory = int(memoryStr[1])
- elif (memoryStr[2] == "gB"):
- host.memory = int(memoryStr[1])*1024
- elif (memoryStr[2] == " B"):
- host.memory = int(memoryStr[1])/(1024*1024)
+ # XXXstroucki should have parameter for reserved mem
+ host.memory = (int(memoryStr[1])/1024) - 512
else:
log.warning('Unable to determine amount of physical memory - reporting 0')
host.memory = 0
@@ -351,6 +389,8 @@ class Qemu(VmControlInterface):
snapshot = "on"
migrate = "on"
+ thisDiskList.append("cache=off")
+
thisDiskList.append("snapshot=%s" % snapshot)
if (self.useMigrateArgument):
@@ -358,9 +398,50 @@ class Qemu(VmControlInterface):
diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
+ # scratch disk (should be integrated better)
+ scratchSize = instance.hints.get("scratchSpace", "0")
+ scratchSize = int(scratchSize)
+ scratch_file = None
+ try:
+ if scratchSize > 0:
+ if self.scratchVg is None:
+ raise Exception, "No scratch volume group defined"
+ # create scratch disk
+ # XXXstroucki: needs to be cleaned somewhere
+ # XXXstroucki: clean user provided instance name
+ scratch_name = "lv" + instance.name
+ # XXXstroucki hold lock
+ # XXXstroucki check for capacity
+ cmd = "/sbin/lvcreate -n" + scratch_name + " -L" + str(scratchSize) + "G " + self.scratchVg
+ result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
+ index += 1
+
+ thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratch_name) ]
+ thisDiskList.append("if=%s" % diskInterface)
+ thisDiskList.append("index=%d" % index)
+ thisDiskList.append("cache=off")
+
+ if (True or disk.persistent):
+ snapshot = "off"
+ migrate = "off"
+ else:
+ snapshot = "on"
+ migrate = "on"
+
+ thisDiskList.append("snapshot=%s" % snapshot)
+
+ if (self.useMigrateArgument):
+ thisDiskList.append("migrate=%s" % migrate)
+
+ diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
+
+ except:
+ print 'caught exception'
+ raise 'exception'
+
# Nic hints
- nicModel = instance.hints.get("nicModel", "e1000")
+ nicModel = instance.hints.get("nicModel", "virtio")
nicString = ""
for i in range(0, len(instance.nics)):
nic = instance.nics[i]
@@ -404,6 +485,7 @@ class Qemu(VmControlInterface):
child = self.anonClass(pid=pid, instance=instance, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = True, OSchild = True)
child.ptyFile = None
child.vncPort = -1
+ child.instance.vmId = child.pid
self.saveChildInfo(child)
self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
@@ -427,7 +509,8 @@ class Qemu(VmControlInterface):
child.monitor = os.fdopen(child.monitorFd)
self.saveChildInfo(child)
if (issueContinue):
- self.enterCommand(child, "c")
+ # XXXstroucki: receiving a vm can take a long time
+ self.enterCommand(child, "c", timeout=None)
def stopVm(self, vmId, target, stopFirst):
"""Universal function to stop a VM -- used by suspendVM, migrateVM """
@@ -437,7 +520,7 @@ class Qemu(VmControlInterface):
if (target):
retry = self.migrationRetries
while (retry > 0):
- res = self.enterCommand(child, "migrate %s" % (target), timeout=self.migrateTimeout)
+ res = self.enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
retry = retry - 1
if (res.find("migration failed") == -1):
retry = -1
@@ -459,7 +542,6 @@ class Qemu(VmControlInterface):
return vmId
def suspendVm(self, vmId, target):
- child = self.getChildFromPid(vmId)
tmpTarget = "/tmp/tashi_qemu_suspend_%d_%d" % (os.getpid(), vmId)
# XXX: Use fifo to improve performance
vmId = self.stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
@@ -678,6 +760,7 @@ class Qemu(VmControlInterface):
self.stats[vmId]['%s_%s' % (device, label)] = int(val)
self.stats[vmId]['cpuLoad'] = cpuLoad
self.stats[vmId]['rss'] = rss
+ self.stats[vmId]['vsize'] = vsize
self.stats[vmId]['recvMBs'] = sendMBs
self.stats[vmId]['sendMBs'] = recvMBs
except:
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py Wed Feb 8 04:44:09 2012
@@ -27,6 +27,10 @@ class VmControlInterface(object):
self.config = config
self.dfs = dfs
self.nm = nm
+
+ def getInstances(self):
+ """Will return a dict of instances by vmId to the caller"""
+ raise NotImplementedError
def instantiateVm(self, instance):
"""Takes an InstanceConfiguration, creates a VM based on it,
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py Wed Feb 8 04:44:09 2012
@@ -111,6 +111,7 @@ class XenPV(VmControlInterface, threadin
self.transientDir = self.config.get('XenPV', 'transientDir')
self.defaultVmType = self.config.get('XenPV', 'defaultVmType')
self.disktype = self.config.get('XenPV', 'defaultDiskType')
+ # XXXstroucki default disktype vhd?
self.newvms = listVms(self.vmNamePrefix)
self.hostId = -1
self.sleeptime = 5
@@ -133,7 +134,7 @@ class XenPV(VmControlInterface, threadin
# If the vm had transient disks, delete them
for i in range(len(a.disks)):
if a.disks[i].persistent == False:
- diskname = self.transientDisk(a.id, i, disktype)
+ diskname = self.transientDisk(a.id, i, self.disktype)
try:
os.unlink(diskname)
except:
@@ -150,7 +151,7 @@ class XenPV(VmControlInterface, threadin
time.sleep(self.sleeptime)
self.cron()
########################################
-# This is an ugly function, but the muti-line string literal makes it
+# This is an ugly function, but the multi-line string literal makes it
# a lot easier
########################################
def createXenConfig(self, vmName,
@@ -167,12 +168,7 @@ class XenPV(VmControlInterface, threadin
vmType = hints.get('vmtype', self.defaultVmType)
print 'starting vm with type: ', vmType
- try:
- disktype = self.config.get('XenPV', 'defaultDiskType')
- except:
- disktype = 'vhd'
-
- disk0 = 'tap:%s'%disktype
+ disk0 = 'tap:%s' % self.disktype
diskU = 'xvda1'
try:
@@ -216,7 +212,7 @@ kernel = "%s"
ramdisk)
elif vmType == 'hvm':
- disk0 = 'tap:%s'%disktype
+ disk0 = 'tap:%s' % self.disktype
diskU = 'hda1'