You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/07/17 21:06:01 UTC
svn commit: r1362643 [1/3] - in
/incubator/tashi/branches/stroucki-registration: ./ doc/ etc/ src/tashi/
src/tashi/accounting/ src/tashi/agents/ src/tashi/client/
src/tashi/clustermanager/ src/tashi/clustermanager/data/ src/tashi/dfs/
src/tashi/messagi...
Author: stroucki
Date: Tue Jul 17 21:05:59 2012
New Revision: 1362643
URL: http://svn.apache.org/viewvc?rev=1362643&view=rev
Log:
resync stroucki-registration from trunk
Added:
incubator/tashi/branches/stroucki-registration/doc/UPDATING
- copied unchanged from r1362642, incubator/tashi/trunk/doc/UPDATING
incubator/tashi/branches/stroucki-registration/doc/sample.qemu-ifup
- copied unchanged from r1362642, incubator/tashi/trunk/doc/sample.qemu-ifup
incubator/tashi/branches/stroucki-registration/src/tashi/utils/
- copied from r1362642, incubator/tashi/trunk/src/tashi/utils/
incubator/tashi/branches/stroucki-registration/src/zoni/extensions/
- copied from r1362642, incubator/tashi/trunk/src/zoni/extensions/
Removed:
incubator/tashi/branches/stroucki-registration/src/tashi/agents/locality-server.py
incubator/tashi/branches/stroucki-registration/src/tashi/agents/mauipacket.py
incubator/tashi/branches/stroucki-registration/src/tashi/agents/pseudoDes.py
incubator/tashi/branches/stroucki-registration/src/tashi/client/client.py
incubator/tashi/branches/stroucki-registration/src/tashi/client/test.py
incubator/tashi/branches/stroucki-registration/src/tashi/messaging/messageBroker.py
incubator/tashi/branches/stroucki-registration/src/tashi/messaging/messaging.py
incubator/tashi/branches/stroucki-registration/src/tashi/messaging/soapmessaging.py
incubator/tashi/branches/stroucki-registration/src/tashi/messaging/tashimessaging.py
incubator/tashi/branches/stroucki-registration/src/tashi/messaging/threadpool.py
incubator/tashi/branches/stroucki-registration/src/tashi/messaging/thriftmessaging.py
incubator/tashi/branches/stroucki-registration/src/tashi/thrift/
incubator/tashi/branches/stroucki-registration/src/utils/Makefile
incubator/tashi/branches/stroucki-registration/src/utils/getLocality.py
incubator/tashi/branches/stroucki-registration/src/utils/nmd.c
Modified:
incubator/tashi/branches/stroucki-registration/ (props changed)
incubator/tashi/branches/stroucki-registration/INSTALL
incubator/tashi/branches/stroucki-registration/doc/DEVELOPMENT
incubator/tashi/branches/stroucki-registration/doc/INSTALL2
incubator/tashi/branches/stroucki-registration/etc/NodeManager.cfg
incubator/tashi/branches/stroucki-registration/etc/TashiDefaults.cfg
incubator/tashi/branches/stroucki-registration/etc/ZoniDefaults.cfg
incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accounting.py
incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accountingservice.py
incubator/tashi/branches/stroucki-registration/src/tashi/agents/dhcpdns.py
incubator/tashi/branches/stroucki-registration/src/tashi/agents/instancehook.py
incubator/tashi/branches/stroucki-registration/src/tashi/agents/mauiwiki.py
incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive.py
incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive_zoni.py
incubator/tashi/branches/stroucki-registration/src/tashi/client/tashi-client.py
incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanager.py
incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/datainterface.py
incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/fromconfig.py
incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/getentoverride.py
incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/ldapoverride.py
incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/pickled.py
incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/sql.py
incubator/tashi/branches/stroucki-registration/src/tashi/connectionmanager.py
incubator/tashi/branches/stroucki-registration/src/tashi/dfs/vfs.py
incubator/tashi/branches/stroucki-registration/src/tashi/messaging/gangliapublisher.py
incubator/tashi/branches/stroucki-registration/src/tashi/messaging/messagingloghandler.py
incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanager.py
incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/xenpv.py
incubator/tashi/branches/stroucki-registration/src/tashi/parallel.py
incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpycservices.py
incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpyctypes.py
incubator/tashi/branches/stroucki-registration/src/tashi/util.py
incubator/tashi/branches/stroucki-registration/src/zoni/agents/dhcpdns.py
incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/bootstrapinterface.py
incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/pxe.py
incubator/tashi/branches/stroucki-registration/src/zoni/client/zoni-cli.py
incubator/tashi/branches/stroucki-registration/src/zoni/data/infostore.py
incubator/tashi/branches/stroucki-registration/src/zoni/data/reservation.py
incubator/tashi/branches/stroucki-registration/src/zoni/data/reservationmanagementinterface.py
incubator/tashi/branches/stroucki-registration/src/zoni/data/resourcequerysql.py
incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagement.py
incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagementinterface.py
incubator/tashi/branches/stroucki-registration/src/zoni/extra/util.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/apcswitchedrackpdu.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/delldrac.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/dellswitch.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/f10s50switch.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/hpilo.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/hpswitch.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/hwswitchinterface.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/ipmi.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/raritanpdu.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/systemmanagement.py
incubator/tashi/branches/stroucki-registration/src/zoni/hardware/systemmanagementinterface.py
incubator/tashi/branches/stroucki-registration/src/zoni/install/db/zoniDbSetup.py
incubator/tashi/branches/stroucki-registration/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
incubator/tashi/branches/stroucki-registration/src/zoni/install/pxe/zoniPxeSetup.py
incubator/tashi/branches/stroucki-registration/src/zoni/install/www/zoniWebSetup.py
incubator/tashi/branches/stroucki-registration/src/zoni/services/pcvciservice.py
incubator/tashi/branches/stroucki-registration/src/zoni/services/zonimanager.py
incubator/tashi/branches/stroucki-registration/src/zoni/version.py
Propchange: incubator/tashi/branches/stroucki-registration/
------------------------------------------------------------------------------
Merged /incubator/tashi/branches/luke-zoni:r1292129-1351835,1351890-1351918
Merged /incubator/tashi/branches/stable:r1245857-1301161
Merged /incubator/tashi/branches/stroucki-dropthrift:r1292513-1297655
Merged /incubator/tashi/trunk:r1295398-1362642
Merged /incubator/tashi/branches/luke-zoni-staging:r1351875-1351888
Merged /incubator/tashi/branches/trunk-staging:r1351915-1351919
Merged /incubator/tashi/branches/stroucki-stable:r1297792-1298173
Modified: incubator/tashi/branches/stroucki-registration/INSTALL
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/INSTALL?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/INSTALL (original)
+++ incubator/tashi/branches/stroucki-registration/INSTALL Tue Jul 17 21:05:59 2012
@@ -124,6 +124,7 @@ When defining the host, you must provide
given by the hostname command. If you plan on eventually having several
hosts and networks, feel free to add them now.
+root@grml:/usr/local/tashi# cd bin
root@grml:/usr/local/tashi/bin# DEBUG=1 ./clustermanager
2012-01-26 23:12:33,972 [./clustermanager:INFO] Using configuration file(s) ['/usr/local/tashi/etc/TashiDefaults.cfg']
2012-01-26 23:12:33,972 [./clustermanager:INFO] Starting cluster manager
@@ -152,15 +153,14 @@ Please press <RETURN> to start IPython.
In [1]: from tashi.rpycservices.rpyctypes import Host, HostState, Network
-In [2]: data.baseDataObject.hosts[1] = Host(d={'id':1,'name':'grml','state': HostState.Normal,'up':False})
+In [2]: data.baseDataObject.hosts[0] = Host(d={'id':0,'name':'grml','state': HostState.Normal,'up':False})
-In [3]: data.baseDataObject.networks[1]=Network(d={'id':0,'name':'default'})
+In [3]: data.baseDataObject.networks[0]=Network(d={'id':0,'name':'My Network'})
In [4]: data.baseDataObject.save()
-In [5]: import os
-
-In [6]: os.kill(os.getpid(), 9)
+In [5]: (^C)
+2012-03-07 20:00:00,456 [./bin/clustermanager:INFO] Exiting cluster manager after receiving a SIGINT signal
Run the cluster manager in the background:
root@grml:/usr/local/tashi/bin# ./clustermanager &
Modified: incubator/tashi/branches/stroucki-registration/doc/DEVELOPMENT
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/doc/DEVELOPMENT?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/doc/DEVELOPMENT (original)
+++ incubator/tashi/branches/stroucki-registration/doc/DEVELOPMENT Tue Jul 17 21:05:59 2012
@@ -8,3 +8,9 @@ Future goals:
Other ideas:
* Make available a console aggregator for user's VMs.
+
+Python caveats:
+ * We've liked to use variables like bin, id, sum, input, etc. are
+ built-in, and will be flagged by pydev
+ * pydev does not like python modules with a dash in the name
+
Modified: incubator/tashi/branches/stroucki-registration/doc/INSTALL2
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/doc/INSTALL2?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/doc/INSTALL2 (original)
+++ incubator/tashi/branches/stroucki-registration/doc/INSTALL2 Tue Jul 17 21:05:59 2012
@@ -48,6 +48,16 @@ exit 0
Note that the entire path of a network connection must be configured to
use jumbo frames, if the virtual machines are to use them.
+If you have large numbers of VLANs, and don't want to hardcode them into
+each VM host, you can find a sample qemu-ifup in the doc directory. This
+script will need to be adapted to your local standards by changing the
+basic parameters at the top. This script can then be linked to by the name
+Tashi expects them to have. For example, if you have a VLAN 1001, you will
+create a link from /etc/qemu-ifup.1001 to this script.
+
+The script will handle the creation of the VM interface, and creation of the
+bridge and VLANs if they haven't been created before.
+
---+ Accounting server
An accounting server is available in the distribution. It will log
Modified: incubator/tashi/branches/stroucki-registration/etc/NodeManager.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/etc/NodeManager.cfg?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/etc/NodeManager.cfg (original)
+++ incubator/tashi/branches/stroucki-registration/etc/NodeManager.cfg Tue Jul 17 21:05:59 2012
@@ -80,7 +80,6 @@ clusterManagerPort = 9882
statsInterval = 0.0
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Security]
authAndEncrypt = False
Modified: incubator/tashi/branches/stroucki-registration/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/etc/TashiDefaults.cfg?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/branches/stroucki-registration/etc/TashiDefaults.cfg Tue Jul 17 21:05:59 2012
@@ -54,10 +54,10 @@ allowDecayed = 30.0
allowMismatchedVersions = False
maxMemory = 8192
maxCores = 8
+defaultNetwork = 0
allowDuplicateNames = False
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[GetentOverride]
baseData = tashi.clustermanager.data.Pickled
@@ -110,11 +110,9 @@ registerFrequency = 10.0
clusterManagerHost = localhost
clusterManagerPort = 9882
statsInterval = 0.0
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Qemu]
qemuBin = /usr/bin/kvm
-infoDir = /var/tmp/VmControlQemu/
pollDelay = 1.0
migrationRetries = 10
monitorTimeout = 60.0
@@ -123,7 +121,10 @@ maxParallelMigrations = 10
useMigrateArgument = False
statsInterval = 0.0
scratchDir = /tmp
-scratchVg = vgscratch
+#scratchVg = vgscratch
+suspendHandler = gzip
+resumeHandler = zcat
+reservedMem = 512
[XenPV]
vmNamePrefix = tashi
Modified: incubator/tashi/branches/stroucki-registration/etc/ZoniDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/etc/ZoniDefaults.cfg?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/etc/ZoniDefaults.cfg (original)
+++ incubator/tashi/branches/stroucki-registration/etc/ZoniDefaults.cfg Tue Jul 17 21:05:59 2012
@@ -30,6 +30,7 @@ LOG_FILE = /var/tmp/zoni_logfile.txt
# Specify data store
INFO_STORE = sql
USER_MANAGEMENT = ldap
+PICKLE_FILE = pickled
# DB host
[dbConnection]
@@ -58,6 +59,10 @@ PXE_SERVER_IP = IP_OF_PXE_SERVER_IN_DOMA
# Must be relative to TFTP_ROOT_DIR
INITRD_ROOT = builds/initrd
KERNEL_ROOT = builds/kernel
+# Extensions from MIMOS
+# put the IP address of your NTP server here
+NTPSVR = 127.0.0.1
+CUSTOM_TEMPLATES_DIR = /var/lib/tftpboot/templates
[www]
WWW_DOCUMENT_ROOT = /var/www
@@ -96,13 +101,6 @@ dhcpServer = xx_dhcpserver_host_or_ip_xx
dhcpKeyName = xx_dhcpservername_xx
dhcpSecretKey = xx_secretkey_xx
-# Domain Config
-[domain]
-domainDescription = "/usr/local/tashi/etc/ZoniDomains.xml"
-ZONI_HOME_DOMAIN = 1 # Default domain for most switches
-ZONI_HOME_NETWORK = 10.10.0.0/20
-ZONI_IPMI_NETWORK = 10.10.16.0/20
-
# Logging
[loggers]
keys=root
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accounting.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accounting.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accounting.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accounting.py Tue Jul 17 21:05:59 2012
@@ -17,8 +17,8 @@
# specific language governing permissions and limitations
# under the License.
+import os
import sys
-import signal
import logging.config
from tashi.rpycservices import rpycservices
@@ -26,13 +26,15 @@ from rpyc.utils.server import ThreadedSe
#from rpyc.utils.authenticators import TlsliteVdbAuthenticator
#from tashi.rpycservices.rpyctypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole, signalHandler
+from tashi.util import createClient, instantiateImplementation, debugConsole
+from tashi.utils.config import Config
+
import tashi
class Accounting(object):
- def __init__(self, config, cmclient):
+ def __init__(self, config):
self.config = config
- self.cm = cmclient
+ self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
@@ -43,17 +45,20 @@ class Accounting(object):
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+ self.hooks.append(instantiateImplementation(value, self.config, self.cm, False))
except:
self.log.exception("Failed to load hook %s" % (value))
def initAccountingServer(self):
service = instantiateImplementation(self.config.get("Accounting", "service"), self.config)
+ #XXXstroucki: disabled authAndEncrypt for now
#if boolean(self.config.get("Security", "authAndEncrypt")):
if False:
pass
else:
+ # XXXstroucki: ThreadedServer is liable to have
+ # exceptions within if an endpoint is lost.
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(self.config.get('AccountingService', 'port')), auto_register=False)
t.logger.setLevel(logging.ERROR)
@@ -62,25 +67,44 @@ class Accounting(object):
debugConsole(globals())
- try:
- t.start()
- except KeyboardInterrupt:
- self.handleSIGTERM(signal.SIGTERM, None)
-
- @signalHandler(signal.SIGTERM)
- def handleSIGTERM(self, signalNumber, stackFrame):
- self.log.info('Exiting cluster manager after receiving a SIGINT signal')
+ t.start()
+ # shouldn't exit by itself
sys.exit(0)
def main():
- (config, configFiles) = getConfig(["Accounting"])
+ config = Config(["Accounting"])
+ configFiles = config.getFiles()
publisher = instantiateImplementation(config.get("Accounting", "publisher"), config)
tashi.publisher = publisher
- cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- accounting = Accounting(config, cmclient)
+ log = logging.getLogger(__name__)
+ log.info('Using configuration file(s) %s' % configFiles)
+
+ accounting = Accounting(config)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ accounting.initAccountingServer()
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting accounting service after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of accounting service")
+ os._exit(-1)
+
+ log.info("Exiting accounting service after service thread exited")
+ os._exit(-1)
- accounting.initAccountingServer()
+ return
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accountingservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accountingservice.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accountingservice.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/accounting/accountingservice.py Tue Jul 17 21:05:59 2012
@@ -55,7 +55,7 @@ class AccountingService(object):
instances = self.cm.getInstances()
for instance in instances:
# XXXstroucki this currently duplicates what the CM was doing.
- self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
+ self.log.info('Accounting: id %s host %s vmId %s user %s cores %s memory %s' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
except:
self.log.warning("Accounting iteration failed")
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/agents/dhcpdns.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/agents/dhcpdns.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/agents/dhcpdns.py Tue Jul 17 21:05:59 2012
@@ -22,7 +22,7 @@ import socket
import subprocess
import time
from instancehook import InstanceHook
-from tashi.rpycservices.rpyctypes import Instance, NetworkConfiguration
+from tashi.rpycservices.rpyctypes import Instance
from tashi import boolean
class DhcpDns(InstanceHook):
@@ -55,15 +55,21 @@ class DhcpDns(InstanceHook):
self.ipMax = {}
self.currentIP = {}
self.usedIPs = {}
- for k in self.ipRange:
- ipRange = self.ipRange[k]
- (min, max) = ipRange.split("-")
- min = min.strip()
- max = max.strip()
- ipNum = self.strToIp(min)
- self.ipMin[k] = self.strToIp(min)
- self.ipMax[k] = self.strToIp(max)
- self.currentIP[k] = self.ipMin[k]
+
+ self.initIPs()
+
+ def initIPs(self):
+ self.usedIPs = {}
+ for network in self.ipRange:
+ ipRange = self.ipRange[network]
+ (ipMin, ipMax) = ipRange.split("-")
+ ipMin = ipMin.strip()
+ ipMax = ipMax.strip()
+ ipNum = self.strToIp(ipMin)
+ self.ipMin[network] = self.strToIp(ipMin)
+ self.ipMax[network] = self.strToIp(ipMax)
+ self.currentIP[network] = self.ipMin[network]
+
instances = self.client.getInstances()
for i in instances:
for nic in i.nics:
@@ -72,7 +78,7 @@ class DhcpDns(InstanceHook):
ipNum = self.strToIp(ip)
self.log.info('Added %s->%s during reinitialization' % (i.name, ip))
self.usedIPs[ipNum] = ip
- except Exception, e:
+ except Exception:
pass
def strToIp(self, s):
@@ -87,12 +93,17 @@ class DhcpDns(InstanceHook):
return "%d.%d.%d.%d" % ((ip>>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff)
def allocateIP(self, nic):
+ # XXXstroucki: if the network is not defined having an ip
+ # range, this will throw a KeyError. Should be logged.
network = nic.network
allocatedIP = None
requestedIP = self.strToIp(nic.ip)
wrapToMinAlready = False
if (requestedIP <= self.ipMax[network] and requestedIP >= self.ipMin[network] and (requestedIP not in self.usedIPs)):
allocatedIP = requestedIP
+
+ # nic.ip will be updated later in preCreate if chosen
+ # ip not available
while (allocatedIP == None):
if (self.currentIP[network] > self.ipMax[network] and wrapToMinAlready):
raise UserWarning("No available IP addresses for network %d" % (network))
@@ -127,7 +138,7 @@ class DhcpDns(InstanceHook):
stdin.write("set hardware-type = 00:00:00:01\n") # Ethernet
stdin.write("create\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
def removeDhcp(self, name, ipaddr=None):
@@ -146,7 +157,7 @@ class DhcpDns(InstanceHook):
stdin.write("open\n")
stdin.write("remove\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
def addDns(self, name, ip):
@@ -169,15 +180,15 @@ class DhcpDns(InstanceHook):
stdin.write("update add %s %d IN PTR %s.%s.\n" % (reverseIpStr, self.dnsExpire, name, self.dnsDomain))
stdin.write("\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
finally:
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
while (pid == 0):
time.sleep(0.5)
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
def removeDns(self, name):
cmd = "nsupdate"
@@ -196,15 +207,15 @@ class DhcpDns(InstanceHook):
stdin.write("update delete %s.%s A\n" % (name, self.dnsDomain))
stdin.write("\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
finally:
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
while (pid == 0):
time.sleep(0.5)
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
def doUpdate(self, instance):
newInstance = Instance()
@@ -229,7 +240,7 @@ class DhcpDns(InstanceHook):
dhcpName = instance.name + "-nic%d" % (i)
self.log.info("Adding %s:{%s->%s} to DHCP" % (dhcpName, nic.mac, ip))
self.addDhcp(dhcpName, ip, nic.mac)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to add host %s to DHCP/DNS" % (instance.name))
self.doUpdate(instance)
@@ -242,8 +253,11 @@ class DhcpDns(InstanceHook):
ip = nic.ip
try:
ipNum = self.strToIp(ip)
+ # XXXstroucki: if this fails with KeyError,
+ # we must have double-assigned the same IP
+ # address. How does this happen?
del self.usedIPs[ipNum]
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s, ip %s from pool of usedIPs" % (instance.name, ip))
try:
if (i == 0):
@@ -251,9 +265,13 @@ class DhcpDns(InstanceHook):
else:
dhcpName = instance.name + "-nic%d" % (i)
self.removeDhcp(dhcpName)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s from DHCP" % (instance.name))
try:
+ # XXXstroucki: this can fail if the resolver can't
+ # resolve the dns server name (line 190). Perhaps
+ # the hostname should be then pushed onto a list
+ # to try again next time.
self.removeDns(instance.name)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s from DNS" % (instance.name))
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/agents/instancehook.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/agents/instancehook.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/agents/instancehook.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/agents/instancehook.py Tue Jul 17 21:05:59 2012
@@ -1,5 +1,3 @@
-#! /usr/bin/env python
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+# superclass for instance hooks.
+
class InstanceHook(object):
def __init__(self, config, client, post=False):
if (self.__class__ is InstanceHook):
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/agents/mauiwiki.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/agents/mauiwiki.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/agents/mauiwiki.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/agents/mauiwiki.py Tue Jul 17 21:05:59 2012
@@ -17,20 +17,122 @@
# specific language governing permissions and limitations
# under the License.
+# XXXstroucki: wiki is a text based resource manager that maui can
+# use. It also seems to have disappeared from the face of the web.
+# This code is unmaintained.
+
+# XXXstroucki former file mauipacket.py
+#import subprocess
import time
-import hashlib
-import sys
-import subprocess
-import socket, SocketServer
-from socket import gethostname
-import os
+import SocketServer
+from tashi.utils import pseudoDes
+from tashi.rpycservices.rpyctypes import HostState, InstanceState
+
+class MauiPacket:
+ def __init__(self, key=0):
+ self.size = 0
+ self.char = '\n'
+ self.chksum = '0'*16
+ self.timestamp = int(time.time())
+ self.auth = ''
+ self.data = []
+ self.msg = ''
+ self.key=key
+ def readPacket(self, istream):
+ self.msg = ''
+
+ size = istream.read(8)
+ self.msg = self.msg+size
+ self.size = int(size)
+
+ self.char = istream.read(1)
+ self.msg = self.msg + self.char
+
+ packet = istream.read(self.size)
+ self.msg = self.msg + packet
+
+ packet = packet.split()
+
+ for i in range(len(packet)):
+ item = packet[i].split('=')
+ if item[0] == 'CK':
+ self.chksum = item[1]
+ if item[0] == 'TS':
+ self.timestamp = int(item[1])
+ if item[0] == 'AUTH':
+ self.auth = item[1]
+ if item[0] == 'DT':
+ self.data = packet[i:]
+ self.data=self.data[0].split('=',1)[1:] + self.data[1:]
+
+ def checksumMessage(self, message, key=None):
+ if key == None:
+ key = self.key
+ if type(key) == type(''):
+ key = int(key)
+ chksum = pseudoDes.generateKey(message, key)
+ chksum = '%016x' % chksum
+ return chksum
+ def getChecksum(self):
+ cs = self.msg.partition('TS=')
+ cs = cs[1]+cs[2]
+ chksum = self.checksumMessage(cs)
+ return chksum
+ def verifyChecksum(self):
+ chksum = self.getChecksum()
+ if chksum != self.chksum:
+ print 'verifyChecksum: "%s"\t"%s"'%(chksum, self.chksum)
+ print 'verifyChecksum (types): %s\t%s' %(type(chksum), type(self.chksum))
+ return False
+ return True
+ def set(self, data, auth=None, key=None, timestamp=None):
+ if timestamp==None:
+ timestamp = int(time.time())
+ self.data = data
+ if auth !=None:
+ self.auth = auth
+ if key != None:
+ self.key = key
+ self.timestamp=timestamp
+ self.fixup()
+ def fixup(self):
+ datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+ self.chksum = self.checksumMessage(datastring)
+
+ pktstring = 'CK=%s %s'%(self.chksum, datastring)
+ self.size = len(pktstring)
+ def __str__(self):
+ datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+ self.chksum = self.checksumMessage(datastring)
+
+ pktstring = 'CK=%s %s'%(self.chksum, datastring)
+ self.msg = ''
+ self.msg = self.msg + '%08i'%len(pktstring)
+ self.msg = self.msg + self.char
+ self.msg = self.msg + pktstring
+
+ return self.msg
+ def prettyString(self):
+ s = '''Maui Packet
+-----------
+size:\t\t%i
+checksum:\t%s
+timestamp:\t%s
+auth:\t\t%s
+data:
+%s
+-----------'''
+ s = s%(self.size, self.chksum, self.timestamp, self.auth, self.data)
+ return s
+
+# XXXstroucki original file mauiwiki.py
import threading
import logging.config
from tashi.parallel import synchronizedmethod
from tashi.services.ttypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean
-from tashi.agents.mauipacket import MauiPacket
+from tashi.util import getConfig, createClient, instantiateImplementation
+#from tashi.agents.mauipacket import MauiPacket
import tashi.util
def jobnameToId(jobname):
@@ -57,24 +159,24 @@ class InstanceHooks():
def postDestroy(self, inst):
for hook in self.hooks:
hook.postDestroy(inst)
- def idToInst(self, id):
+ def idToInst(self, _id):
instances = self.client.getInstances()
print 'instances ', instances
- insts = [i for i in instances if str(i.id)==str(id)]
+ insts = [i for i in instances if str(i.id)==str(_id)]
if len(insts) == 0:
- raise "No instance with ID %s"%id
+ raise "No instance with ID %s"%_id
if len(insts) > 1:
- raise "Multiple instances with ID %s"%id
+ raise "Multiple instances with ID %s"%_id
inst = insts[0]
return inst
- def destroyById(self, id):
- inst = self.idToInst(id)
- self.client.destroyVm(int(id))
+ def destroyById(self, _id):
+ inst = self.idToInst(_id)
+ self.client.destroyVm(int(_id))
self.postDestroy(inst)
- def activateById(self, id, host):
- inst = self.idToInst(id)
+ def activateById(self, _id, host):
+ inst = self.idToInst(_id)
self.preCreate(inst)
- self.client.activateVm(int(id), host)
+ self.client.activateVm(int(_id), host)
def cmplists(a, b):
for i in range(len(a)):
@@ -301,8 +403,8 @@ class TashiConnection(threading.Thread):
if j.updateTime >= updatetime and j.id in joblist]
jl = {}
for job in jobs:
- id = "%s.%i"%(job.name, job.id)
- jl[id] = {'STATE':self.wikiInstanceState(job),
+ _id = "%s.%i"%(job.name, job.id)
+ jl[_id] = {'STATE':self.wikiInstanceState(job),
'UNAME':self.users[job.userId].name,
'GNAME':self.users[job.userId].name,
'UPDATETIME':int(job.updateTime),
@@ -313,14 +415,14 @@ class TashiConnection(threading.Thread):
'RMEM':str(job.memory),
'WCLIMIT':str(self.defaultJobTime)}
if job.hostId != None:
- jl[id]['TASKLIST'] = self.hosts[job.hostId].name
+ jl[_id]['TASKLIST'] = self.hosts[job.hostId].name
return jl
@synchronizedmethod
- def activateById(self, id, host):
- if not self.instances.has_key(id):
+ def activateById(self, _id, host):
+ if not self.instances.has_key(_id):
raise "no such instance"
- self.ihooks.activateById(id, host)
- self.instances[id].state=InstanceState.Activating
+ self.ihooks.activateById(_id, host)
+ self.instances[_id].state=InstanceState.Activating
class MauiListener(SocketServer.StreamRequestHandler):
def setup(self):
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive.py Tue Jul 17 21:05:59 2012
@@ -19,16 +19,18 @@
import time
import logging.config
+import sys
from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+from tashi.util import createClient, instantiateImplementation, boolean
+from tashi.utils.config import Config
import tashi
class Primitive(object):
- def __init__(self, config, cmclient):
+ def __init__(self, config):
self.config = config
- self.cm = cmclient
+ self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -40,7 +42,7 @@ class Primitive(object):
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+ self.hooks.append(instantiateImplementation(value, config, self.cm, False))
except:
self.log.exception("Failed to load hook %s" % (value))
self.hosts = {}
@@ -61,7 +63,7 @@ class Primitive(object):
for h in self.cm.getHosts():
#XXXstroucki get all hosts here?
- #if (h.up == True and h.state == HostState.Normal):
+ #if (self.__isReady(h)):
hosts[ctr] = h
ctr = ctr + 1
load[h.id] = []
@@ -75,8 +77,9 @@ class Primitive(object):
# XXXstroucki put held machines behind pending ones
heldInstances = []
for i in instances.itervalues():
+ # Nonrunning VMs will have hostId of None, but
+ # so will Suspended VMs.
if (i.hostId or i.state == InstanceState.Pending):
- # Nonrunning VMs will have hostId of None
load[i.hostId] = load[i.hostId] + [i.id]
elif (i.hostId is None and i.state == InstanceState.Held):
heldInstances = heldInstances + [i.id]
@@ -101,6 +104,11 @@ class Primitive(object):
if name in self.clearHints[hint]:
popit = self.clearHints[hint].index(name)
self.clearHints[hint].pop(popit)
+
+ def __isReady(self, host):
+ if host.up == False or host.state != HostState.Normal:
+ return False
+ return True
def __scheduleInstance(self, inst):
@@ -132,7 +140,7 @@ class Primitive(object):
# has a host preference been expressed?
if (targetHost != None):
for h in self.hosts.values():
- if (h.state == HostState.Normal):
+ if (self.__isReady(h)):
self.__clearHints("targetHost", h.name)
# if this is not the host we are looking for, continue
if ((str(h.id) != targetHost and h.name != targetHost)):
@@ -160,13 +168,8 @@ class Primitive(object):
for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
h = self.hosts[ctr]
- # XXXstroucki if it's down, find another machine
- if (h.up == False):
- continue
-
- # If the host not in normal operating state,
- # find another machine
- if (h.state != HostState.Normal):
+ # XXXstroucki if it's unavailable, find another machine
+ if (self.__isReady(h) == False):
continue
else:
# If the host is back to normal, get rid of the entry in clearHints
@@ -206,7 +209,10 @@ class Primitive(object):
if (not inst.hints.get("__resume_source", None)):
# only run preCreate hooks if newly starting
for hook in self.hooks:
- hook.preCreate(inst)
+ try:
+ hook.preCreate(inst)
+ except:
+ self.log.warning("Failed to run preCreate hook")
self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))
rv = "fail"
try:
@@ -241,8 +247,21 @@ class Primitive(object):
def start(self):
oldInstances = {}
+ # XXXstroucki: scheduling races have been observed, where
+ # a vm is scheduled on a host that had not updated its
+ # capacity with the clustermanager, leading to overloaded
+ # hosts. I think the place to insure against this happening
+ # is in the nodemanager. This scheduler will keep an
+ # internal state of cluster loading, but that is best
+ # effort and will be refreshed from CM once the buffer
+ # of vms to be scheduled is exhausted.
+
while True:
try:
+ # XXXstroucki: to get a list of vms to be
+ # scheduled, it asks the CM for a full
+ # cluster state, and will look at those
+ # without a host.
self.__getState()
# Check for VMs that have exited and call
@@ -250,7 +269,7 @@ class Primitive(object):
for i in oldInstances:
# XXXstroucki what about paused and saved VMs?
# XXXstroucki: do we need to look at Held VMs here?
- if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying)):
+ if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying or oldInstances[i].state == InstanceState.ShuttingDown)):
self.log.info("VM exited: %s" % (oldInstances[i].name))
for hook in self.hooks:
hook.postDestroy(oldInstances[i])
@@ -280,13 +299,22 @@ class Primitive(object):
time.sleep(self.scheduleDelay)
def main():
- (config, configFiles) = getConfig(["Agent"])
+ config = Config(["Agent"])
+ configFiles = config.getFiles()
+
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
- cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- agent = Primitive(config, cmclient)
- agent.start()
+ agent = Primitive(config)
+
+ try:
+ agent.start()
+ except KeyboardInterrupt:
+ pass
+
+ log = logging.getLogger(__file__)
+ log.info("Primitive exiting")
+ sys.exit(0)
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive_zoni.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive_zoni.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive_zoni.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/agents/primitive_zoni.py Tue Jul 17 21:05:59 2012
@@ -17,6 +17,11 @@
# specific language governing permissions and limitations
# under the License.
+# XXXstroucki: this apparently originated from a copy of the primitive
+# scheduler code sometime in 2010. It aims to keep a pool of tashi servers
+# available, and other servers shut down. Could this be better suited for
+# a hook function of the scheduler?
+
from socket import gethostname
import os
import socket
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/client/tashi-client.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/client/tashi-client.py Tue Jul 17 21:05:59 2012
@@ -1,4 +1,4 @@
-#! /usr/bin/env python
+#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -21,8 +21,10 @@ import os.path
import random
import sys
import types
-from tashi.rpycservices.rpyctypes import *
-from tashi import vmStates, hostStates, boolean, getConfig, stringPartition, createClient
+from tashi.rpycservices.rpyctypes import NetworkConfiguration,\
+ DiskConfiguration, HostState, Instance, Host, TashiException
+from tashi.utils.config import Config
+from tashi import vmStates, hostStates, boolean, stringPartition, createClient
users = {}
networks = {}
@@ -48,7 +50,23 @@ def getUser():
for user in users:
if (users[user].name == userStr):
return users[user].id
- raise ValueError("Unknown user %s" % (userStr))
+ raise TashiException({'msg':"Unknown user %s" % (userStr)})
+
+def checkHid(host):
+ userId = getUser()
+ hosts = client.getHosts()
+ hostId = None
+ try:
+ hostId = int(host)
+ except:
+ for h in hosts:
+ if (h.name == host):
+ hostId = h.id
+ if (hostId is None):
+ raise TashiException({'msg':"Unknown host %s" % (str(host))})
+
+ # XXXstroucki permissions for host related stuff?
+ return hostId
def checkIid(instance):
userId = getUser()
@@ -61,13 +79,13 @@ def checkIid(instance):
if (i.name == instance):
instanceId = i.id
if (instanceId is None):
- raise ValueError("Unknown instance %s" % (str(instance)))
+ raise TashiException({'msg':"Unknown instance %s" % (str(instance))})
for instance in instances:
if (instance.id == instanceId):
# XXXstroucki uid 0 to have superuser access
# how about admin groups?
if (instance.userId != userId and instance.userId != None and userId != 0):
- raise ValueError("You don't own that VM")
+ raise TashiException({'msg':"You don't have permissions on VM %s" % instance.name})
return instanceId
def requiredArg(name):
@@ -78,10 +96,17 @@ def randomMac():
def getDefaultNetwork():
fetchNetworks()
- networkId = 1
+ networkId = 0
for network in networks:
+ if (getattr(networks[network], "default", False) is True):
+ networkId = network
+ break
+
+ # Naming the network "default" is deprecated, and
+ # this functionality will be removed soon
if (networks[network].name == "default"):
networkId = network
+ break
return networkId
def randomNetwork():
@@ -93,7 +118,7 @@ def parseDisks(arg):
disks = []
for strDisk in strDisks:
strDisk = strDisk.strip()
- (l, s, r) = stringPartition(strDisk, ":")
+ (l, __s, r) = stringPartition(strDisk, ":")
if (r == ""):
r = "False"
r = boolean(r)
@@ -109,12 +134,12 @@ def parseNics(arg):
nics = []
for strNic in strNics:
strNic = strNic.strip()
- (l, s, r) = stringPartition(strNic, ":")
+ (l, __s, r) = stringPartition(strNic, ":")
n = l
if (n == ''):
n = getDefaultNetwork()
n = int(n)
- (l, s, r) = stringPartition(r, ":")
+ (l, __s, r) = stringPartition(r, ":")
ip = l
if (ip == ''):
ip = None
@@ -133,7 +158,7 @@ def parseHints(arg):
hints = {}
for strHint in strHints:
strHint = strHint.strip()
- (l, s, r) = stringPartition(strHint, "=")
+ (l, __s, r) = stringPartition(strHint, "=")
hints[l] = r
return hints
except:
@@ -161,6 +186,14 @@ def getSlots(cores, memory):
hosts = getVmLayout()
count = 0
+ if cores < 1:
+ print "Argument to cores must be 1 or greater."
+ return
+
+ if memory <= 0:
+ print "Argument to memory must be greater than 0."
+ return
+
for h in hosts:
if h.up is False or h.state != HostState.Normal:
continue
@@ -197,6 +230,9 @@ def __shutdownOrDestroyMany(method, base
count = 0
for i in instances:
if (i.name.startswith(basename + "-") and i.name[len(basename)+1].isdigit()):
+ # checking permissions here
+ checkIid(i.name)
+
if method == "shutdown":
client.shutdownVm(i.id)
@@ -208,7 +244,7 @@ def __shutdownOrDestroyMany(method, base
count = count + 1
if (count == 0):
- raise ValueError("That is an unused basename")
+ raise TashiException({'msg':"%s is an unused basename" % basename})
return None
def getMyInstances():
@@ -246,13 +282,14 @@ argLists = {
'destroyMany': [('basename', str, lambda: requiredArg('basename'), True)],
'suspendVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'resumeVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
-'migrateVm': [('instance', checkIid, lambda: requiredArg('instance'), True), ('targetHostId', int, lambda: requiredArg('targetHostId'), True)],
+'migrateVm': [('instance', checkIid, lambda: requiredArg('instance'), True), ('dst', checkHid, lambda: requiredArg('dst'), True)],
'pauseVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'unpauseVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'getSlots': [('cores', int, lambda: 1, False), ('memory', int, lambda: 128, False)],
'getImages': [],
'copyImage': [('src', str, lambda: requiredArg('src'),True), ('dst', str, lambda: requiredArg('dst'), True)],
'getHosts': [],
+'setHostState': [('host', checkHid, lambda: requiredArg('host'), True), ('state', str, lambda: requiredArg('state'), True)],
'getUsers': [],
'getNetworks': [],
'getInstances': [],
@@ -273,7 +310,7 @@ convertArgs = {
'destroyMany': '[basename]',
'suspendVm': '[instance]',
'resumeVm': '[instance]',
-'migrateVm': '[instance, targetHostId]',
+'migrateVm': '[instance, dst]',
'pauseVm': '[instance]',
'unpauseVm': '[instance]',
'vmmSpecificCall': '[instance, arg]',
@@ -281,6 +318,7 @@ convertArgs = {
'unregisterHost' : '[hostId]',
'getSlots' : '[cores, memory]',
'copyImage' : '[src, dst]',
+'setHostState' : '[host, state]',
}
# Descriptions
@@ -298,6 +336,7 @@ description = {
'unpauseVm': 'Unpauses a paused VM',
'getSlots': 'Get a count of how many VMs could be started in the cluster',
'getHosts': 'Gets a list of hosts running Node Managers',
+'setHostState': 'Set the state of a host, eg. Normal or Drained',
'getUsers': 'Gets a list of users',
'getNetworks': 'Gets a list of available networks for VMs to be placed on',
'getInstances': 'Gets a list of all VMs in the cluster',
@@ -312,19 +351,20 @@ description = {
# Example use strings
examples = {
-'createVm': ['--name foobar --disks i386-hardy.qcow2', '--userId 3 --name foobar --cores 8 --memory 7168 --disks mpi-hardy.qcow2:True,scratch.qcow2:False --nics :1.2.3.4,1::52:54:00:00:56:78 --hints enableDisplay=True'],
-'createMany': ['--basename foobar --disks i386-hardy.qcow2 --count 4'],
-'shutdownVm': ['--instance 12345', '--instance foobar'],
-'destroyVm': ['--instance 12345', '--instance foobar'],
-'shutdownMany': ['--basename foobar'],
-'destroyMany': ['--basename foobar'],
-'suspendVm': ['--instance 12345', '--instance foobar'],
-'resumeVm': ['--instance 12345', '--instance foobar'],
-'migrateVm': ['--instance 12345 --targetHostId 73', '--instance foobar --targetHostId 73'],
-'pauseVm': ['--instance 12345', '--instance foobar'],
-'unpauseVm': ['--instance 12345', '--instance foobar'],
+'createVm': ['--name vmname --disks i386-hardy.qcow2', '--userId 3 --name vmname --cores 8 --memory 7168 --disks mpi-hardy.qcow2:True,scratch.qcow2:False --nics :1.2.3.4,1::52:54:00:00:56:78 --hints enableDisplay=True'],
+'createMany': ['--basename vmname --disks i386-hardy.qcow2 --count 4'],
+'shutdownVm': ['--instance 12345', '--instance vmname'],
+'destroyVm': ['--instance 12345', '--instance vmname'],
+'shutdownMany': ['--basename vmname'],
+'destroyMany': ['--basename vmname'],
+'suspendVm': ['--instance 12345', '--instance vmname'],
+'resumeVm': ['--instance 12345', '--instance vmname'],
+'migrateVm': ['--instance 12345 --dst vmhost1', '--instance vmname --dst 73'],
+'pauseVm': ['--instance 12345', '--instance vmname'],
+'unpauseVm': ['--instance 12345', '--instance vmname'],
'getSlots': ['--cores 1 --memory 128'],
'getHosts': [''],
+'setHostState': ['--host vmhost1 --state Drained'],
'getUsers': [''],
'getNetworks': [''],
'getInstances': [''],
@@ -332,7 +372,7 @@ examples = {
'getVmLayout': [''],
'getImages': [''],
'copyImage': ['--src src.qcow2 --dst dst.qcow2'],
-'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar --arg stopVnc'],
+'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance vmname --arg stopVnc'],
'cmAdmin': ['(see hints)'],
'unregisterHost' : ['--hostId 2'],
}
@@ -346,7 +386,8 @@ def usage(func = None):
print "Unknown function %s" % (func)
print
functions = argLists
- print "%s is the client program for Tashi, a system for cloud-computing on BigData." % (os.path.basename(sys.argv[0]))
+ print "%s is the client program for Tashi" % (os.path.basename(sys.argv[0]))
+ print "Tashi, a system for cloud-computing on BigData"
print "Visit http://incubator.apache.org/tashi/ for more information."
print
else:
@@ -398,9 +439,9 @@ def transformState(obj):
except:
obj.state = 'Unknown'
-def genKeys(list):
+def genKeys(_list):
keys = {}
- for row in list:
+ for row in _list:
for item in row.__dict__.keys():
keys[item] = item
if ('id' in keys):
@@ -410,25 +451,25 @@ def genKeys(list):
keys = keys.values()
return keys
-def makeTable(list, keys=None):
- (consoleWidth, consoleHeight) = (9999, 9999)
+def makeTable(_list, keys=None):
+ (consoleWidth, __consoleHeight) = (9999, 9999)
try:
# XXXpipe: get number of rows and column on current window
stdout = os.popen("stty size")
- r = stdout.read()
+ __r = stdout.read()
stdout.close()
except:
pass
- for obj in list:
+ for obj in _list:
transformState(obj)
if (keys == None):
- keys = genKeys(list)
+ keys = genKeys(_list)
for (show, k) in show_hide:
if (show):
if (k != "all"):
keys.append(k)
else:
- keys = genKeys(list)
+ keys = genKeys(_list)
else:
if (k in keys):
keys.remove(k)
@@ -437,7 +478,7 @@ def makeTable(list, keys=None):
maxWidth = {}
for k in keys:
maxWidth[k] = len(k)
- for row in list:
+ for row in _list:
for k in keys:
if (k in row.__dict__):
maxWidth[k] = max(maxWidth[k], len(str(row.__dict__[k])))
@@ -470,8 +511,8 @@ def makeTable(list, keys=None):
return 1
else:
return 0
- list.sort(cmp=sortFunction)
- for row in list:
+ _list.sort(cmp=sortFunction)
+ for row in _list:
line = ""
for k in keys:
row.__dict__[k] = row.__dict__.get(k, "")
@@ -537,7 +578,7 @@ def main():
if (len(sys.argv) < 2):
usage()
function = matchFunction(sys.argv[1])
- (config, configFiles) = getConfig(["Client"])
+ config = Config(["Client"])
# build a structure of possible arguments
possibleArgs = {}
@@ -613,6 +654,11 @@ def main():
fargs = []
res = f(*fargs)
+
+ except TashiException, e:
+ print "Failed in calling %s: %s" % (function, e.msg)
+ sys.exit(-1)
+
except Exception, e:
print "Failed in calling %s: %s" % (function, e)
print "Please run tashi-client --examples for syntax information"
@@ -633,15 +679,12 @@ def main():
except Exception, e:
print e
except TashiException, e:
- print "TashiException:"
+ print "Tashi could not complete your request:"
print e.msg
exitCode = e.errno
-# except Exception, e:
-# print e
- # XXXstroucki: exception may be unrelated to usage of function
- # so don't print usage on exception as if there were a problem
- # with the arguments
- #usage(function)
+ except Exception, e:
+ print e
+ print "Please run tashi-client --examples for syntax information"
sys.exit(exitCode)
if __name__ == "__main__":
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanager.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanager.py Tue Jul 17 21:05:59 2012
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,11 +17,12 @@
# specific language governing permissions and limitations
# under the License.
+import os
import sys
-import signal
import logging.config
-from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
+from tashi.util import boolean, instantiateImplementation, debugConsole
+from tashi.utils.config import Config
import tashi
from tashi.rpycservices import rpycservices
@@ -46,6 +47,9 @@ def startClusterManager(config):
users[config.get('AllowedUsers', 'nodeManagerUser')] = config.get('AllowedUsers', 'nodeManagerPassword')
users[config.get('AllowedUsers', 'agentUser')] = config.get('AllowedUsers', 'agentPassword')
authenticator = TlsliteVdbAuthenticator.from_dict(users)
+
+ # XXXstroucki ThreadedServer is liable to have exceptions
+ # occur within if an endpoint is lost.
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False)
@@ -54,24 +58,17 @@ def startClusterManager(config):
t.service._type = 'ClusterManagerService'
debugConsole(globals())
-
- try:
- t.start()
- except KeyboardInterrupt:
- handleSIGTERM(signal.SIGTERM, None)
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
- global log
+ t.start()
+ # shouldn't exit by itself
+ return
- log.info('Exiting cluster manager after receiving a SIGINT signal')
- sys.exit(0)
-
def main():
global log
# setup configuration and logging
- (config, configFiles) = getConfig(["ClusterManager"])
+ config = Config(["ClusterManager"])
+ configFiles = config.getFiles()
publisher = instantiateImplementation(config.get("ClusterManager", "publisher"), config)
tashi.publisher = publisher
logging.config.fileConfig(configFiles)
@@ -80,7 +77,32 @@ def main():
# bind the database
log.info('Starting cluster manager')
- startClusterManager(config)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ startClusterManager(config)
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting cluster manager after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of cluster manager")
+ os._exit(-1)
+
+ log.info("Exiting cluster manager after service thread exited")
+ os._exit(-1)
+
+ return
+
+
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanagerservice.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/clustermanagerservice.py Tue Jul 17 21:05:59 2012
@@ -19,9 +19,8 @@ import logging
import threading
import time
-from tashi.rpycservices import rpycservices
-from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
-from tashi import boolean, ConnectionManager, vmStates, version, scrubString
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, Instance, HostState, TashiException
+from tashi import boolean, ConnectionManager, vmStates, hostStates, version, scrubString
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
@@ -49,6 +48,9 @@ class ClusterManagerService(object):
self.allowMismatchedVersions = boolean(self.config.get('ClusterManagerService', 'allowMismatchedVersions'))
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
+
+ self.defaultNetwork = self.config.getint('ClusterManagerService', 'defaultNetwork', 0)
+
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
self.accountingHost = None
@@ -62,7 +64,7 @@ class ClusterManagerService(object):
self.__initAccounting()
self.__initCluster()
- threading.Thread(target=self.__monitorCluster).start()
+ threading.Thread(name="monitorCluster", target=self.__monitorCluster).start()
def __initAccounting(self):
self.accountBuffer = []
@@ -232,7 +234,7 @@ class ClusterManagerService(object):
# get a list of VMs running on host
try:
hostProxy = self.proxy[host.name]
- remoteInstances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
+ remoteInstances = [self.__getVmInfo(host.name, vmId) for vmId in hostProxy.listVms()]
except:
self.log.warning('Failure getting instances from host %s' % (host.name))
self.data.releaseHost(host)
@@ -241,6 +243,9 @@ class ClusterManagerService(object):
# register instances I don't know about
for instance in remoteInstances:
if (instance.id not in myInstances):
+ if instance.state == InstanceState.Exited:
+ self.log.warning("%s telling me about exited instance %s, ignoring." % (host.name, instance.id))
+ continue
instance.hostId = host.id
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
@@ -271,15 +276,18 @@ class ClusterManagerService(object):
for instanceId in self.instanceLastContactTime.keys():
# XXXstroucki should lock instance here?
- if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+ try:
+ lastContactTime = self.instanceLastContactTime[instanceId]
+ except KeyError:
+ continue
+
+ if (lastContactTime < (self.__now() - self.allowDecayed)):
try:
instance = self.data.acquireInstance(instanceId)
# Don't query non-running VMs. eg. if a VM
# is suspended, and has no host, then there's
# no one to ask
- if instance.state != InstanceState.Running and \
- instance.state != InstanceState.Activating and \
- instance.state != InstanceState.Orphaned:
+ if instance.state not in [InstanceState.Running, InstanceState.Activating, InstanceState.Orphaned]:
self.data.releaseInstance(instance)
continue
except:
@@ -294,22 +302,34 @@ class ClusterManagerService(object):
# get updated state on VM
try:
- hostProxy = self.proxy[host.name]
- newInstance = hostProxy.getVmInfo(instance.vmId)
+ newInstance = self.__getVmInfo(host.name, instance.vmId)
except:
self.log.warning('Failure getting data for instance %s from host %s' % (instance.name, host.name))
self.data.releaseInstance(instance)
continue
- # replace existing state with new state
- # XXXstroucki more?
- instance.state = newInstance.state
- self.instanceLastContactTime[instanceId] = self.__now()
- instance.decayed = False
- self.data.releaseInstance(instance)
+ # update the information we have on the vm
+ #before = instance.state
+ rv = self.__vmUpdate(instance, newInstance, None)
+ if (rv == "release"):
+ self.data.releaseInstance(instance)
+
+ if (rv == "remove"):
+ self.data.removeInstance(instance)
+
+ def __getVmInfo(self, host, vmid):
+ hostProxy = self.proxy[host]
+ rv = hostProxy.getVmInfo(vmid)
+ if isinstance(rv, Exception):
+ raise rv
- def normalize(self, instance):
+ if not isinstance(rv, Instance):
+ raise ValueError
+
+ return rv
+
+ def __normalize(self, instance):
instance.id = None
instance.vmId = None
instance.hostId = None
@@ -337,18 +357,20 @@ class ClusterManagerService(object):
del instance.hints[hint]
return instance
+ # extern
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
# XXXstroucki: check for exception here
- instance = self.normalize(instance)
+ instance = self.__normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM REQUEST", instance=instance)
return instance
-
+
+ # extern
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+ self.__stateTransition(instance, None, InstanceState.ShuttingDown)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -358,7 +380,8 @@ class ClusterManagerService(object):
self.log.exception('shutdownVm failed for host %s vmId %d' % (instance.name, instance.vmId))
raise
return
-
+
+ # extern
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
@@ -366,7 +389,7 @@ class ClusterManagerService(object):
self.data.removeInstance(instance)
elif (instance.state is InstanceState.Activating):
self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
- self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+ self.__stateTransition(instance, None, InstanceState.Destroying)
self.data.releaseInstance(instance)
else:
# XXXstroucki: This is a problem with keeping
@@ -382,15 +405,21 @@ class ClusterManagerService(object):
self.proxy[hostname].destroyVm(instance.vmId)
self.data.releaseInstance(instance)
except:
- self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
+ self.log.warning('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
self.data.removeInstance(instance)
return
+ # extern
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SUSPEND", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -400,17 +429,25 @@ class ClusterManagerService(object):
except:
self.log.exception('suspendVm failed for host %s vmId %d' % (hostname, instance.vmId))
raise TashiException(d={'errno':Errors.UnableToSuspend, 'msg':'Failed to suspend %s' % (instance.name)})
- return
+
+ return "%s is suspending." % (instance.name)
+ # extern
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ try:
+ self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
source = "suspend/%d_%s" % (instance.id, instance.name)
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM RESUME", instance=instance)
- return instance
+ return "%s is resuming." % (instance.name)
+ # extern
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
self.__ACCOUNT("CM VM MIGRATE", instance=instance)
@@ -422,7 +459,13 @@ class ClusterManagerService(object):
except:
self.data.releaseInstance(instance)
raise
- self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
try:
# Prepare the target
@@ -434,7 +477,12 @@ class ClusterManagerService(object):
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
- self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ try:
+ self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
try:
# Send the VM
@@ -450,15 +498,23 @@ class ClusterManagerService(object):
try:
# Notify the target
- vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
+ __vmid = self.proxy[targetHost.name].receiveVm(instance, cookie)
except Exception:
self.log.exception('receiveVm failed')
raise
+
+ self.log.info("migrateVM finished")
return
-
+
+ # extern
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM PAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -468,13 +524,24 @@ class ClusterManagerService(object):
self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ try:
+ self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
return
+ # extern
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ try:
+ self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -484,25 +551,61 @@ class ClusterManagerService(object):
self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ try:
+ self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
return
-
+
+ # extern
def getHosts(self):
return self.data.getHosts().values()
+ # extern
+ def setHostState(self, hostId, state):
+ state = state.lower()
+ hostState = None
+ if state == "normal":
+ hostState = HostState.Normal
+ if state == "drained":
+ hostState = HostState.Drained
+
+ if hostState is None:
+ return "%s is not a valid host state" % state
+
+ host = self.data.acquireHost(hostId)
+ try:
+ host.state = hostState
+ finally:
+ self.data.releaseHost(host)
+
+ return "Host state set to %s." % hostStates[hostState]
+
+ # extern
def getNetworks(self):
- return self.data.getNetworks().values()
-
+ networks = self.data.getNetworks()
+ for network in networks:
+ if self.defaultNetwork == networks[network].id:
+ setattr(networks[network], "default", True)
+
+ return networks.values()
+
+ # extern
def getUsers(self):
return self.data.getUsers().values()
-
+
+ # extern
def getInstances(self):
return self.data.getInstances().values()
+ # extern
def getImages(self):
return self.data.getImages()
+ # extern
def copyImage(self, src, dst):
imageSrc = self.dfs.getLocalHandle("images/" + src)
imageDst = self.dfs.getLocalHandle("images/" + dst)
@@ -516,6 +619,7 @@ class ClusterManagerService(object):
except Exception, e:
self.log.exception('DFS image copy failed: %s (%s->%s)' % (e, imageSrc, imageDst))
+ # extern
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
@@ -604,7 +708,7 @@ class ClusterManagerService(object):
self.log.exception('cmAdmin failed')
raise
-# @timed
+ # extern
def registerNodeManager(self, host, instances):
"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
@@ -637,45 +741,47 @@ class ClusterManagerService(object):
# let the host communicate what it is running
# and note that the information is not stale
for instance in instances:
+ if instance.state == InstanceState.Exited:
+ self.log.warning("%s reporting exited instance %s, ignoring." % (host.name, instance.id))
+ continue
self.instanceLastContactTime.setdefault(instance.id, 0)
self.data.releaseHost(oldHost)
return host.id
- def vmUpdate(self, instanceId, instance, oldState):
- try:
- oldInstance = self.data.acquireInstance(instanceId)
- except TashiException, e:
- # shouldn't have a lock to clean up after here
- if (e.errno == Errors.NoSuchInstanceId):
- self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
- return
- except:
- self.log.exception("Could not acquire instance")
- raise
+ def __vmUpdate(self, oldInstance, instance, oldState):
+ # this function assumes a lock is held on the instance
+ # already, and will be released elsewhere
- self.instanceLastContactTime[instanceId] = self.__now()
+ self.instanceLastContactTime[oldInstance.id] = self.__now()
oldInstance.decayed = False
- self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
if (instance.state == InstanceState.Exited):
# determine why a VM has exited
hostname = self.data.getHost(oldInstance.hostId).name
+
if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
+
if (oldInstance.state == InstanceState.Suspending):
self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
oldInstance.hostId = None
oldInstance.vmId = None
- self.data.releaseInstance(oldInstance)
+ return "release"
+
+ if (oldInstance.state == InstanceState.MigrateTrans):
+ # Just await update from target host
+ return "release"
+
else:
del self.instanceLastContactTime[oldInstance.id]
- self.data.removeInstance(oldInstance)
+ return "remove"
+
else:
if (instance.state):
# XXXstroucki does this matter?
if (oldState and oldInstance.state != oldState):
- self.log.warning('Got vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
+ self.log.warning('Doing vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
oldInstance.state = instance.state
if (instance.vmId):
oldInstance.vmId = instance.vmId
@@ -688,11 +794,44 @@ class ClusterManagerService(object):
if (oldNic.mac == nic.mac):
oldNic.ip = nic.ip
+ return "release"
+
+
+ return "success"
+
+ # extern
+ def vmUpdate(self, instanceId, instance, oldState):
+ try:
+ oldInstance = self.data.acquireInstance(instanceId)
+ except TashiException, e:
+ # shouldn't have a lock to clean up after here
+ if (e.errno == Errors.NoSuchInstanceId):
+ self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
+ return
+ except:
+ self.log.exception("Could not acquire instance")
+ raise
+
+ import copy
+ displayInstance = copy.copy(oldInstance)
+ displayInstance.state = instance.state
+ self.__ACCOUNT("CM VM UPDATE", instance=displayInstance)
+
+ rv = self.__vmUpdate(oldInstance, instance, oldState)
+
+ if (rv == "release"):
self.data.releaseInstance(oldInstance)
+ if (rv == "remove"):
+ self.data.removeInstance(oldInstance)
+
return "success"
-
+
+ # extern
def activateVm(self, instanceId, host):
+ # XXXstroucki: check my idea of the host's capacity before
+ # trying.
+
dataHost = self.data.acquireHost(host.id)
if (dataHost.name != host.name):
@@ -710,7 +849,7 @@ class ClusterManagerService(object):
self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
if ('__resume_source' in instance.hints):
- self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+ self.__stateTransition(instance, None, InstanceState.Resuming)
else:
# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
@@ -756,6 +895,7 @@ class ClusterManagerService(object):
self.data.releaseInstance(instance)
return "success"
+ # extern
def registerHost(self, hostname, memory, cores, version):
hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
if alreadyRegistered:
@@ -771,6 +911,7 @@ class ClusterManagerService(object):
return hostId
+ # extern
def unregisterHost(self, hostId):
try:
host = self.data.getHost(hostId)
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/datainterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/datainterface.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/datainterface.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/datainterface.py Tue Jul 17 21:05:59 2012
@@ -43,7 +43,7 @@ class DataInterface(object):
def getHosts(self):
raise NotImplementedError
- def getHost(self, id):
+ def getHost(self, _id):
raise NotImplementedError
def getImages(self):
@@ -52,19 +52,19 @@ class DataInterface(object):
def getInstances(self):
raise NotImplementedError
- def getInstance(self, id):
+ def getInstance(self, _id):
raise NotImplementedError
def getNetworks(self):
raise NotImplementedError
- def getNetwork(self, id):
+ def getNetwork(self, _id):
raise NotImplementedError
def getUsers(self):
raise NotImplementedError
- def getUser(self, id):
+ def getUser(self, _id):
raise NotImplementedError
def registerHost(self, hostname, memory, cores, version):
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/fromconfig.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/fromconfig.py Tue Jul 17 21:05:59 2012
@@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+#XXXstroucki: for compatibility with python 2.5
from __future__ import with_statement
+
import logging
import threading
import os
@@ -167,47 +169,47 @@ class FromConfig(DataInterface):
def getHosts(self):
return self.hosts
- def getHost(self, id):
- host = self.hosts.get(id, None)
+ def getHost(self, _id):
+ host = self.hosts.get(_id, None)
if (not host):
- raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (_id)})
return host
def getInstances(self):
return self.instances
- def getInstance(self, id):
- instance = self.instances.get(id, None)
+ def getInstance(self, _id):
+ instance = self.instances.get(_id, None)
if (not instance):
- raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (_id)})
return instance
def getNetworks(self):
return self.networks
- def getNetwork(self, id):
- return self.networks[id]
+ def getNetwork(self, _id):
+ return self.networks[_id]
def getUsers(self):
return self.users
- def getUser(self, id):
- return self.users[id]
+ def getUser(self, _id):
+ return self.users[_id]
def registerHost(self, hostname, memory, cores, version):
self.hostLock.acquire()
- for id in self.hosts.keys():
- if self.hosts[id].name == hostname:
- host = Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
- self.hosts[id] = host
+ for _id in self.hosts.keys():
+ if self.hosts[_id].name == hostname:
+ host = Host(d={'id':_id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+ self.hosts[_id] = host
self.save()
self.hostLock.release()
- return id, True
- id = self.getNewId("hosts")
- self.hosts[id] = Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+ return _id, True
+ _id = self.getNewId("hosts")
+ self.hosts[_id] = Host(d={'id':_id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
self.save()
self.hostLock.release()
- return id, False
+ return _id, False
def unregisterHost(self, hostId):
self.hostLock.acquire()
@@ -222,10 +224,10 @@ class FromConfig(DataInterface):
maxId = 0
l = []
if(table == "hosts"):
- for id in self.hosts.keys():
- l.append(id)
- if id >= maxId:
- maxId = id
+ for _id in self.hosts.keys():
+ l.append(_id)
+ if _id >= maxId:
+ maxId = _id
l.sort() # sort to enable comparing with range output
# check if some id is released:
t = range(maxId + 1)
@@ -243,9 +245,9 @@ class FromConfig(DataInterface):
# and in what order does it get loaded
fileName = "./etc/Tashi.cfg"
if not os.path.exists(fileName):
- file = open(fileName, "w")
- file.write("[FromConfig]")
- file.close()
+ filehandle = open(fileName, "w")
+ filehandle.write("[FromConfig]")
+ filehandle.close()
parser = ConfigParser.ConfigParser()
parser.read(fileName)
@@ -253,7 +255,7 @@ class FromConfig(DataInterface):
parser.add_section("FromConfig")
hostsInFile = []
- for (name, value) in parser.items("FromConfig"):
+ for (name, __value) in parser.items("FromConfig"):
name = name.lower()
if (name.startswith("host")):
hostsInFile.append(name)
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/getentoverride.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/getentoverride.py Tue Jul 17 21:05:59 2012
@@ -19,7 +19,7 @@ import logging
import subprocess
import time
import os
-from tashi.rpycservices.rpyctypes import User, LocalImages
+from tashi.rpycservices.rpyctypes import User, LocalImages, Instance, Host
from tashi.clustermanager.data import DataInterface
from tashi.util import instantiateImplementation, humanReadable
@@ -66,7 +66,7 @@ class GetentOverride(DataInterface):
return self.baseDataObject.acquireHost(hostId)
def releaseHost(self, host):
- if type(host) is not Instance:
+ if type(host) is not Host:
self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
raise TypeError
@@ -75,20 +75,20 @@ class GetentOverride(DataInterface):
def getHosts(self):
return self.baseDataObject.getHosts()
- def getHost(self, id):
- return self.baseDataObject.getHost(id)
+ def getHost(self, _id):
+ return self.baseDataObject.getHost(_id)
def getInstances(self):
return self.baseDataObject.getInstances()
- def getInstance(self, id):
- return self.baseDataObject.getInstance(id)
+ def getInstance(self, _id):
+ return self.baseDataObject.getInstance(_id)
def getNetworks(self):
return self.baseDataObject.getNetworks()
- def getNetwork(self, id):
- return self.baseDataObject.getNetwork(id)
+ def getNetwork(self, _id):
+ return self.baseDataObject.getNetwork(_id)
def getImages(self):
count = 0
@@ -109,12 +109,12 @@ class GetentOverride(DataInterface):
try:
for l in p.stdout.xreadlines():
ws = l.strip().split(":")
- id = int(ws[2])
+ _id = int(ws[2])
name = ws[0]
user = User()
- user.id = id
+ user.id = _id
user.name = name
- myUsers[id] = user
+ myUsers[_id] = user
self.users = myUsers
self.lastUserUpdate = now
finally:
@@ -124,9 +124,9 @@ class GetentOverride(DataInterface):
self.fetchFromGetent()
return self.users
- def getUser(self, id):
+ def getUser(self, _id):
self.fetchFromGetent()
- return self.users[id]
+ return self.users[_id]
def registerHost(self, hostname, memory, cores, version):
return self.baseDataObject.registerHost(hostname, memory, cores, version)
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/ldapoverride.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/ldapoverride.py Tue Jul 17 21:05:59 2012
@@ -19,9 +19,8 @@ import subprocess
import time
#XXXstroucki getImages requires os?
import os
-from tashi.rpycservices.rpyctypes import Errors, Network, Host, User, Instance, TashiException, LocalImages, DiskConfiguration, NetworkConfiguration
-from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
-from tashi.rpycservices.rpyctypes import User
+from tashi.rpycservices.rpyctypes import User, LocalImages
+from tashi.util import instantiateImplementation, humanReadable
from tashi.clustermanager.data import DataInterface
class LdapOverride(DataInterface):
@@ -57,20 +56,20 @@ class LdapOverride(DataInterface):
def getHosts(self):
return self.baseDataObject.getHosts()
- def getHost(self, id):
- return self.baseDataObject.getHost(id)
+ def getHost(self, _id):
+ return self.baseDataObject.getHost(_id)
def getInstances(self):
return self.baseDataObject.getInstances()
- def getInstance(self, id):
- return self.baseDataObject.getInstance(id)
+ def getInstance(self, _id):
+ return self.baseDataObject.getInstance(_id)
def getNetworks(self):
return self.baseDataObject.getNetworks()
- def getNetwork(self, id):
- return self.baseDataObject.getNetwork(id)
+ def getNetwork(self, _id):
+ return self.baseDataObject.getNetwork(_id)
def getImages(self):
count = 0
@@ -101,7 +100,7 @@ class LdapOverride(DataInterface):
myUsers[user.id] = user
thisUser = {}
else:
- (key, sep, val) = l.partition(":")
+ (key, __sep, val) = l.partition(":")
key = key.strip()
val = val.strip()
thisUser[key] = val
@@ -116,9 +115,9 @@ class LdapOverride(DataInterface):
self.fetchFromLdap()
return self.users
- def getUser(self, id):
+ def getUser(self, _id):
self.fetchFromLdap()
- return self.users[id]
+ return self.users[_id]
def registerHost(self, hostname, memory, cores, version):
return self.baseDataObject.registerHost(hostname, memory, cores, version)
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/pickled.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/pickled.py Tue Jul 17 21:05:59 2012
@@ -37,18 +37,19 @@ class Pickled(FromConfig):
self.hostLock = threading.Lock()
self.hostLocks = {}
self.idLock = threading.Lock()
+ self.dbLock = threading.Lock()
self.load()
def cleanInstances(self):
ci = {}
- for ignore, i in self.instances.items():
+ for __ignore, i in self.instances.items():
i2 = Instance(d=i.__dict__)
ci[i2.id] = i2
return ci
def cleanHosts(self):
ch = {}
- for ignore, h in self.hosts.items():
+ for __ignore, h in self.hosts.items():
h2 = Host(d=h.__dict__)
ch[h2.id] = h2
return ch
@@ -58,27 +59,35 @@ class Pickled(FromConfig):
# XXXstroucki could be better
tempfile = "%s.new" % filename
- file = open(tempfile, "w")
- cPickle.dump((self.cleanHosts(), self.cleanInstances(), self.networks, self.users), file)
- file.close()
- os.rename(tempfile, filename)
+ self.dbLock.acquire()
+ try:
+ filehandle = open(tempfile, "w")
+ cPickle.dump((self.cleanHosts(), self.cleanInstances(), self.networks, self.users), filehandle)
+ filehandle.close()
+ os.rename(tempfile, filename)
+
+ except OSError:
+ self.log.exception("Error saving database")
+
+ finally:
+ self.dbLock.release()
def load(self):
if (os.access(self.file, os.F_OK)):
- file = open(self.file, "r")
- (hosts, instances, networks, users) = cPickle.load(file)
- file.close()
+ filehandle = open(self.file, "r")
+ (hosts, instances, networks, users) = cPickle.load(filehandle)
+ filehandle.close()
else:
(hosts, instances, networks, users) = ({}, {}, {}, {})
self.hosts = hosts
self.instances = instances
self.networks = networks
self.users = users
- for ignore, i in self.instances.items():
+ for __ignore, i in self.instances.items():
if (i.id >= self.maxInstanceId):
self.maxInstanceId = i.id + 1
i._lock = threading.Lock()
self.lockNames[i._lock] = "i%d" % (i.id)
- for ignore, h in self.hosts.items():
+ for __ignore, h in self.hosts.items():
h._lock = threading.Lock()
self.lockNames[h._lock] = "h%d" % (h.id)