You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/07/17 00:15:36 UTC
svn commit: r1362324 [1/3] - in /incubator/tashi/branches/stable: ./ doc/
etc/ src/tashi/ src/tashi/accounting/ src/tashi/agents/ src/tashi/client/
src/tashi/clustermanager/ src/tashi/clustermanager/data/ src/tashi/dfs/
src/tashi/messaging/ src/tashi/n...
Author: stroucki
Date: Tue Jul 17 00:15:34 2012
New Revision: 1362324
URL: http://svn.apache.org/viewvc?rev=1362324&view=rev
Log:
Set new stable version from trunk.
Added:
incubator/tashi/branches/stable/doc/UPDATING
- copied unchanged from r1362321, incubator/tashi/trunk/doc/UPDATING
incubator/tashi/branches/stable/doc/sample.qemu-ifup
- copied unchanged from r1362321, incubator/tashi/trunk/doc/sample.qemu-ifup
incubator/tashi/branches/stable/src/tashi/utils/
- copied from r1362321, incubator/tashi/trunk/src/tashi/utils/
incubator/tashi/branches/stable/src/zoni/extensions/
- copied from r1362321, incubator/tashi/trunk/src/zoni/extensions/
Removed:
incubator/tashi/branches/stable/src/tashi/agents/mauipacket.py
incubator/tashi/branches/stable/src/tashi/agents/pseudoDes.py
Modified:
incubator/tashi/branches/stable/ (props changed)
incubator/tashi/branches/stable/INSTALL
incubator/tashi/branches/stable/doc/DEVELOPMENT
incubator/tashi/branches/stable/doc/INSTALL2
incubator/tashi/branches/stable/etc/TashiDefaults.cfg
incubator/tashi/branches/stable/etc/ZoniDefaults.cfg
incubator/tashi/branches/stable/src/tashi/accounting/accounting.py
incubator/tashi/branches/stable/src/tashi/agents/dhcpdns.py
incubator/tashi/branches/stable/src/tashi/agents/instancehook.py
incubator/tashi/branches/stable/src/tashi/agents/mauiwiki.py
incubator/tashi/branches/stable/src/tashi/agents/primitive.py
incubator/tashi/branches/stable/src/tashi/agents/primitive_zoni.py
incubator/tashi/branches/stable/src/tashi/client/tashi-client.py
incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py
incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/datainterface.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/getentoverride.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/ldapoverride.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py
incubator/tashi/branches/stable/src/tashi/connectionmanager.py
incubator/tashi/branches/stable/src/tashi/dfs/vfs.py
incubator/tashi/branches/stable/src/tashi/messaging/gangliapublisher.py
incubator/tashi/branches/stable/src/tashi/messaging/messagingloghandler.py
incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py
incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/branches/stable/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/branches/stable/src/tashi/nodemanager/vmcontrol/xenpv.py
incubator/tashi/branches/stable/src/tashi/parallel.py
incubator/tashi/branches/stable/src/tashi/rpycservices/rpycservices.py
incubator/tashi/branches/stable/src/tashi/rpycservices/rpyctypes.py
incubator/tashi/branches/stable/src/tashi/util.py
incubator/tashi/branches/stable/src/zoni/agents/dhcpdns.py
incubator/tashi/branches/stable/src/zoni/bootstrap/bootstrapinterface.py
incubator/tashi/branches/stable/src/zoni/bootstrap/pxe.py
incubator/tashi/branches/stable/src/zoni/client/zoni-cli.py
incubator/tashi/branches/stable/src/zoni/data/infostore.py
incubator/tashi/branches/stable/src/zoni/data/reservation.py
incubator/tashi/branches/stable/src/zoni/data/reservationmanagementinterface.py
incubator/tashi/branches/stable/src/zoni/data/resourcequerysql.py
incubator/tashi/branches/stable/src/zoni/data/usermanagement.py
incubator/tashi/branches/stable/src/zoni/data/usermanagementinterface.py
incubator/tashi/branches/stable/src/zoni/extra/util.py
incubator/tashi/branches/stable/src/zoni/hardware/apcswitchedrackpdu.py
incubator/tashi/branches/stable/src/zoni/hardware/delldrac.py
incubator/tashi/branches/stable/src/zoni/hardware/dellswitch.py
incubator/tashi/branches/stable/src/zoni/hardware/f10s50switch.py
incubator/tashi/branches/stable/src/zoni/hardware/hpilo.py
incubator/tashi/branches/stable/src/zoni/hardware/hpswitch.py
incubator/tashi/branches/stable/src/zoni/hardware/hwswitchinterface.py
incubator/tashi/branches/stable/src/zoni/hardware/ipmi.py
incubator/tashi/branches/stable/src/zoni/hardware/raritanpdu.py
incubator/tashi/branches/stable/src/zoni/hardware/systemmanagement.py
incubator/tashi/branches/stable/src/zoni/hardware/systemmanagementinterface.py
incubator/tashi/branches/stable/src/zoni/install/db/zoniDbSetup.py
incubator/tashi/branches/stable/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
incubator/tashi/branches/stable/src/zoni/install/pxe/zoniPxeSetup.py
incubator/tashi/branches/stable/src/zoni/install/www/zoniWebSetup.py
incubator/tashi/branches/stable/src/zoni/services/pcvciservice.py
incubator/tashi/branches/stable/src/zoni/services/zonimanager.py
incubator/tashi/branches/stable/src/zoni/version.py
Propchange: incubator/tashi/branches/stable/
------------------------------------------------------------------------------
Merged /incubator/tashi/branches/luke-zoni:r1292129-1351835,1351890-1351918
Merged /incubator/tashi/branches/luke-zoni-staging:r1351875-1351888
Merged /incubator/tashi/trunk:r1298110-1301132,1301135-1301155,1301157-1304334,1304336-1362323
Merged /incubator/tashi/branches/trunk-staging:r1351915-1351919
Modified: incubator/tashi/branches/stable/INSTALL
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/INSTALL?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/INSTALL (original)
+++ incubator/tashi/branches/stable/INSTALL Tue Jul 17 00:15:34 2012
@@ -153,9 +153,9 @@ Please press <RETURN> to start IPython.
In [1]: from tashi.rpycservices.rpyctypes import Host, HostState, Network
-In [2]: data.baseDataObject.hosts[1] = Host(d={'id':1,'name':'grml','state': HostState.Normal,'up':False})
+In [2]: data.baseDataObject.hosts[0] = Host(d={'id':0,'name':'grml','state': HostState.Normal,'up':False})
-In [3]: data.baseDataObject.networks[1]=Network(d={'id':0,'name':'default'})
+In [3]: data.baseDataObject.networks[0]=Network(d={'id':0,'name':'My Network'})
In [4]: data.baseDataObject.save()
Modified: incubator/tashi/branches/stable/doc/DEVELOPMENT
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/doc/DEVELOPMENT?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/doc/DEVELOPMENT (original)
+++ incubator/tashi/branches/stable/doc/DEVELOPMENT Tue Jul 17 00:15:34 2012
@@ -8,3 +8,9 @@ Future goals:
Other ideas:
* Make available a console aggregator for user's VMs.
+
+Python caveats:
+ * We've liked to use variables like bin, id, sum, input, etc. are
+ built-in, and will be flagged by pydev
+ * pydev does not like python modules with a dash in the name
+
Modified: incubator/tashi/branches/stable/doc/INSTALL2
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/doc/INSTALL2?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/doc/INSTALL2 (original)
+++ incubator/tashi/branches/stable/doc/INSTALL2 Tue Jul 17 00:15:34 2012
@@ -48,6 +48,16 @@ exit 0
Note that the entire path of a network connection must be configured to
use jumbo frames, if the virtual machines are to use them.
+If you have large numbers of VLANs, and don't want to hardcode them into
+each VM host, you can find a sample qemu-ifup in the doc directory. This
+script will need to be adapted to your local standards by changing the
+basic parameters at the top. This script can then be linked to by the name
+Tashi expects them to have. For example, if you have a VLAN 1001, you will
+create a link from /etc/qemu-ifup.1001 to this script.
+
+The script will handle the creation of the VM interface, and creation of the
+bridge and VLANs if they haven't been created before.
+
---+ Accounting server
An accounting server is available in the distribution. It will log
Modified: incubator/tashi/branches/stable/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/etc/TashiDefaults.cfg?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/branches/stable/etc/TashiDefaults.cfg Tue Jul 17 00:15:34 2012
@@ -54,6 +54,7 @@ allowDecayed = 30.0
allowMismatchedVersions = False
maxMemory = 8192
maxCores = 8
+defaultNetwork = 0
allowDuplicateNames = False
;accountingHost = clustermanager
;accountingPort = 2228
@@ -112,7 +113,6 @@ statsInterval = 0.0
[Qemu]
qemuBin = /usr/bin/kvm
-infoDir = /var/tmp/VmControlQemu/
pollDelay = 1.0
migrationRetries = 10
monitorTimeout = 60.0
@@ -121,7 +121,10 @@ maxParallelMigrations = 10
useMigrateArgument = False
statsInterval = 0.0
scratchDir = /tmp
-scratchVg = vgscratch
+#scratchVg = vgscratch
+suspendHandler = gzip
+resumeHandler = zcat
+reservedMem = 512
[XenPV]
vmNamePrefix = tashi
Modified: incubator/tashi/branches/stable/etc/ZoniDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/etc/ZoniDefaults.cfg?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/etc/ZoniDefaults.cfg (original)
+++ incubator/tashi/branches/stable/etc/ZoniDefaults.cfg Tue Jul 17 00:15:34 2012
@@ -30,6 +30,7 @@ LOG_FILE = /var/tmp/zoni_logfile.txt
# Specify data store
INFO_STORE = sql
USER_MANAGEMENT = ldap
+PICKLE_FILE = pickled
# DB host
[dbConnection]
@@ -58,6 +59,10 @@ PXE_SERVER_IP = IP_OF_PXE_SERVER_IN_DOMA
# Must be relative to TFTP_ROOT_DIR
INITRD_ROOT = builds/initrd
KERNEL_ROOT = builds/kernel
+# Extensions from MIMOS
+# put the IP address of your NTP server here
+NTPSVR = 127.0.0.1
+CUSTOM_TEMPLATES_DIR = /var/lib/tftpboot/templates
[www]
WWW_DOCUMENT_ROOT = /var/www
@@ -96,13 +101,6 @@ dhcpServer = xx_dhcpserver_host_or_ip_xx
dhcpKeyName = xx_dhcpservername_xx
dhcpSecretKey = xx_secretkey_xx
-# Domain Config
-[domain]
-domainDescription = "/usr/local/tashi/etc/ZoniDomains.xml"
-ZONI_HOME_DOMAIN = 1 # Default domain for most switches
-ZONI_HOME_NETWORK = 10.10.0.0/20
-ZONI_IPMI_NETWORK = 10.10.16.0/20
-
# Logging
[loggers]
keys=root
Modified: incubator/tashi/branches/stable/src/tashi/accounting/accounting.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/accounting/accounting.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/accounting/accounting.py (original)
+++ incubator/tashi/branches/stable/src/tashi/accounting/accounting.py Tue Jul 17 00:15:34 2012
@@ -18,9 +18,7 @@
# under the License.
import os
-import time
import sys
-import signal
import logging.config
from tashi.rpycservices import rpycservices
@@ -28,7 +26,9 @@ from rpyc.utils.server import ThreadedSe
#from rpyc.utils.authenticators import TlsliteVdbAuthenticator
#from tashi.rpycservices.rpyctypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole
+from tashi.util import createClient, instantiateImplementation, debugConsole
+from tashi.utils.config import Config
+
import tashi
class Accounting(object):
@@ -45,17 +45,20 @@ class Accounting(object):
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+ self.hooks.append(instantiateImplementation(value, self.config, self.cm, False))
except:
self.log.exception("Failed to load hook %s" % (value))
def initAccountingServer(self):
service = instantiateImplementation(self.config.get("Accounting", "service"), self.config)
+ #XXXstroucki: disabled authAndEncrypt for now
#if boolean(self.config.get("Security", "authAndEncrypt")):
if False:
pass
else:
+ # XXXstroucki: ThreadedServer is liable to have
+ # exceptions within if an endpoint is lost.
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(self.config.get('AccountingService', 'port')), auto_register=False)
t.logger.setLevel(logging.ERROR)
@@ -69,7 +72,8 @@ class Accounting(object):
sys.exit(0)
def main():
- (config, configFiles) = getConfig(["Accounting"])
+ config = Config(["Accounting"])
+ configFiles = config.getFiles()
publisher = instantiateImplementation(config.get("Accounting", "publisher"), config)
tashi.publisher = publisher
logging.config.fileConfig(configFiles)
Modified: incubator/tashi/branches/stable/src/tashi/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/agents/dhcpdns.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/agents/dhcpdns.py (original)
+++ incubator/tashi/branches/stable/src/tashi/agents/dhcpdns.py Tue Jul 17 00:15:34 2012
@@ -22,7 +22,7 @@ import socket
import subprocess
import time
from instancehook import InstanceHook
-from tashi.rpycservices.rpyctypes import Instance, NetworkConfiguration
+from tashi.rpycservices.rpyctypes import Instance
from tashi import boolean
class DhcpDns(InstanceHook):
@@ -55,15 +55,21 @@ class DhcpDns(InstanceHook):
self.ipMax = {}
self.currentIP = {}
self.usedIPs = {}
- for k in self.ipRange:
- ipRange = self.ipRange[k]
- (min, max) = ipRange.split("-")
- min = min.strip()
- max = max.strip()
- ipNum = self.strToIp(min)
- self.ipMin[k] = self.strToIp(min)
- self.ipMax[k] = self.strToIp(max)
- self.currentIP[k] = self.ipMin[k]
+
+ self.initIPs()
+
+ def initIPs(self):
+ self.usedIPs = {}
+ for network in self.ipRange:
+ ipRange = self.ipRange[network]
+ (ipMin, ipMax) = ipRange.split("-")
+ ipMin = ipMin.strip()
+ ipMax = ipMax.strip()
+ ipNum = self.strToIp(ipMin)
+ self.ipMin[network] = self.strToIp(ipMin)
+ self.ipMax[network] = self.strToIp(ipMax)
+ self.currentIP[network] = self.ipMin[network]
+
instances = self.client.getInstances()
for i in instances:
for nic in i.nics:
@@ -72,7 +78,7 @@ class DhcpDns(InstanceHook):
ipNum = self.strToIp(ip)
self.log.info('Added %s->%s during reinitialization' % (i.name, ip))
self.usedIPs[ipNum] = ip
- except Exception, e:
+ except Exception:
pass
def strToIp(self, s):
@@ -87,12 +93,17 @@ class DhcpDns(InstanceHook):
return "%d.%d.%d.%d" % ((ip>>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff)
def allocateIP(self, nic):
+ # XXXstroucki: if the network is not defined having an ip
+ # range, this will throw a KeyError. Should be logged.
network = nic.network
allocatedIP = None
requestedIP = self.strToIp(nic.ip)
wrapToMinAlready = False
if (requestedIP <= self.ipMax[network] and requestedIP >= self.ipMin[network] and (requestedIP not in self.usedIPs)):
allocatedIP = requestedIP
+
+ # nic.ip will be updated later in preCreate if chosen
+ # ip not available
while (allocatedIP == None):
if (self.currentIP[network] > self.ipMax[network] and wrapToMinAlready):
raise UserWarning("No available IP addresses for network %d" % (network))
@@ -127,7 +138,7 @@ class DhcpDns(InstanceHook):
stdin.write("set hardware-type = 00:00:00:01\n") # Ethernet
stdin.write("create\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
def removeDhcp(self, name, ipaddr=None):
@@ -146,7 +157,7 @@ class DhcpDns(InstanceHook):
stdin.write("open\n")
stdin.write("remove\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
def addDns(self, name, ip):
@@ -169,15 +180,15 @@ class DhcpDns(InstanceHook):
stdin.write("update add %s %d IN PTR %s.%s.\n" % (reverseIpStr, self.dnsExpire, name, self.dnsDomain))
stdin.write("\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
finally:
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
while (pid == 0):
time.sleep(0.5)
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
def removeDns(self, name):
cmd = "nsupdate"
@@ -196,15 +207,15 @@ class DhcpDns(InstanceHook):
stdin.write("update delete %s.%s A\n" % (name, self.dnsDomain))
stdin.write("\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
finally:
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
while (pid == 0):
time.sleep(0.5)
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
def doUpdate(self, instance):
newInstance = Instance()
@@ -229,7 +240,7 @@ class DhcpDns(InstanceHook):
dhcpName = instance.name + "-nic%d" % (i)
self.log.info("Adding %s:{%s->%s} to DHCP" % (dhcpName, nic.mac, ip))
self.addDhcp(dhcpName, ip, nic.mac)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to add host %s to DHCP/DNS" % (instance.name))
self.doUpdate(instance)
@@ -242,8 +253,11 @@ class DhcpDns(InstanceHook):
ip = nic.ip
try:
ipNum = self.strToIp(ip)
+ # XXXstroucki: if this fails with KeyError,
+ # we must have double-assigned the same IP
+ # address. How does this happen?
del self.usedIPs[ipNum]
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s, ip %s from pool of usedIPs" % (instance.name, ip))
try:
if (i == 0):
@@ -251,9 +265,13 @@ class DhcpDns(InstanceHook):
else:
dhcpName = instance.name + "-nic%d" % (i)
self.removeDhcp(dhcpName)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s from DHCP" % (instance.name))
try:
+ # XXXstroucki: this can fail if the resolver can't
+ # resolve the dns server name (line 190). Perhaps
+ # the hostname should be then pushed onto a list
+ # to try again next time.
self.removeDns(instance.name)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s from DNS" % (instance.name))
Modified: incubator/tashi/branches/stable/src/tashi/agents/instancehook.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/agents/instancehook.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/agents/instancehook.py (original)
+++ incubator/tashi/branches/stable/src/tashi/agents/instancehook.py Tue Jul 17 00:15:34 2012
@@ -1,5 +1,3 @@
-#! /usr/bin/env python
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+# superclass for instance hooks.
+
class InstanceHook(object):
def __init__(self, config, client, post=False):
if (self.__class__ is InstanceHook):
Modified: incubator/tashi/branches/stable/src/tashi/agents/mauiwiki.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/agents/mauiwiki.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/agents/mauiwiki.py (original)
+++ incubator/tashi/branches/stable/src/tashi/agents/mauiwiki.py Tue Jul 17 00:15:34 2012
@@ -17,20 +17,122 @@
# specific language governing permissions and limitations
# under the License.
+# XXXstroucki: wiki is a text based resource manager that maui can
+# use. It also seems to have disappeared from the face of the web.
+# This code is unmaintained.
+
+# XXXstroucki former file mauipacket.py
+#import subprocess
import time
-import hashlib
-import sys
-import subprocess
-import socket, SocketServer
-from socket import gethostname
-import os
+import SocketServer
+from tashi.utils import pseudoDes
+from tashi.rpycservices.rpyctypes import HostState, InstanceState
+
+class MauiPacket:
+ def __init__(self, key=0):
+ self.size = 0
+ self.char = '\n'
+ self.chksum = '0'*16
+ self.timestamp = int(time.time())
+ self.auth = ''
+ self.data = []
+ self.msg = ''
+ self.key=key
+ def readPacket(self, istream):
+ self.msg = ''
+
+ size = istream.read(8)
+ self.msg = self.msg+size
+ self.size = int(size)
+
+ self.char = istream.read(1)
+ self.msg = self.msg + self.char
+
+ packet = istream.read(self.size)
+ self.msg = self.msg + packet
+
+ packet = packet.split()
+
+ for i in range(len(packet)):
+ item = packet[i].split('=')
+ if item[0] == 'CK':
+ self.chksum = item[1]
+ if item[0] == 'TS':
+ self.timestamp = int(item[1])
+ if item[0] == 'AUTH':
+ self.auth = item[1]
+ if item[0] == 'DT':
+ self.data = packet[i:]
+ self.data=self.data[0].split('=',1)[1:] + self.data[1:]
+
+ def checksumMessage(self, message, key=None):
+ if key == None:
+ key = self.key
+ if type(key) == type(''):
+ key = int(key)
+ chksum = pseudoDes.generateKey(message, key)
+ chksum = '%016x' % chksum
+ return chksum
+ def getChecksum(self):
+ cs = self.msg.partition('TS=')
+ cs = cs[1]+cs[2]
+ chksum = self.checksumMessage(cs)
+ return chksum
+ def verifyChecksum(self):
+ chksum = self.getChecksum()
+ if chksum != self.chksum:
+ print 'verifyChecksum: "%s"\t"%s"'%(chksum, self.chksum)
+ print 'verifyChecksum (types): %s\t%s' %(type(chksum), type(self.chksum))
+ return False
+ return True
+ def set(self, data, auth=None, key=None, timestamp=None):
+ if timestamp==None:
+ timestamp = int(time.time())
+ self.data = data
+ if auth !=None:
+ self.auth = auth
+ if key != None:
+ self.key = key
+ self.timestamp=timestamp
+ self.fixup()
+ def fixup(self):
+ datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+ self.chksum = self.checksumMessage(datastring)
+
+ pktstring = 'CK=%s %s'%(self.chksum, datastring)
+ self.size = len(pktstring)
+ def __str__(self):
+ datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+ self.chksum = self.checksumMessage(datastring)
+
+ pktstring = 'CK=%s %s'%(self.chksum, datastring)
+ self.msg = ''
+ self.msg = self.msg + '%08i'%len(pktstring)
+ self.msg = self.msg + self.char
+ self.msg = self.msg + pktstring
+
+ return self.msg
+ def prettyString(self):
+ s = '''Maui Packet
+-----------
+size:\t\t%i
+checksum:\t%s
+timestamp:\t%s
+auth:\t\t%s
+data:
+%s
+-----------'''
+ s = s%(self.size, self.chksum, self.timestamp, self.auth, self.data)
+ return s
+
+# XXXstroucki original file mauiwiki.py
import threading
import logging.config
from tashi.parallel import synchronizedmethod
from tashi.services.ttypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean
-from tashi.agents.mauipacket import MauiPacket
+from tashi.util import getConfig, createClient, instantiateImplementation
+#from tashi.agents.mauipacket import MauiPacket
import tashi.util
def jobnameToId(jobname):
@@ -57,24 +159,24 @@ class InstanceHooks():
def postDestroy(self, inst):
for hook in self.hooks:
hook.postDestroy(inst)
- def idToInst(self, id):
+ def idToInst(self, _id):
instances = self.client.getInstances()
print 'instances ', instances
- insts = [i for i in instances if str(i.id)==str(id)]
+ insts = [i for i in instances if str(i.id)==str(_id)]
if len(insts) == 0:
- raise "No instance with ID %s"%id
+ raise "No instance with ID %s"%_id
if len(insts) > 1:
- raise "Multiple instances with ID %s"%id
+ raise "Multiple instances with ID %s"%_id
inst = insts[0]
return inst
- def destroyById(self, id):
- inst = self.idToInst(id)
- self.client.destroyVm(int(id))
+ def destroyById(self, _id):
+ inst = self.idToInst(_id)
+ self.client.destroyVm(int(_id))
self.postDestroy(inst)
- def activateById(self, id, host):
- inst = self.idToInst(id)
+ def activateById(self, _id, host):
+ inst = self.idToInst(_id)
self.preCreate(inst)
- self.client.activateVm(int(id), host)
+ self.client.activateVm(int(_id), host)
def cmplists(a, b):
for i in range(len(a)):
@@ -301,8 +403,8 @@ class TashiConnection(threading.Thread):
if j.updateTime >= updatetime and j.id in joblist]
jl = {}
for job in jobs:
- id = "%s.%i"%(job.name, job.id)
- jl[id] = {'STATE':self.wikiInstanceState(job),
+ _id = "%s.%i"%(job.name, job.id)
+ jl[_id] = {'STATE':self.wikiInstanceState(job),
'UNAME':self.users[job.userId].name,
'GNAME':self.users[job.userId].name,
'UPDATETIME':int(job.updateTime),
@@ -313,14 +415,14 @@ class TashiConnection(threading.Thread):
'RMEM':str(job.memory),
'WCLIMIT':str(self.defaultJobTime)}
if job.hostId != None:
- jl[id]['TASKLIST'] = self.hosts[job.hostId].name
+ jl[_id]['TASKLIST'] = self.hosts[job.hostId].name
return jl
@synchronizedmethod
- def activateById(self, id, host):
- if not self.instances.has_key(id):
+ def activateById(self, _id, host):
+ if not self.instances.has_key(_id):
raise "no such instance"
- self.ihooks.activateById(id, host)
- self.instances[id].state=InstanceState.Activating
+ self.ihooks.activateById(_id, host)
+ self.instances[_id].state=InstanceState.Activating
class MauiListener(SocketServer.StreamRequestHandler):
def setup(self):
Modified: incubator/tashi/branches/stable/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/agents/primitive.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/branches/stable/src/tashi/agents/primitive.py Tue Jul 17 00:15:34 2012
@@ -23,7 +23,8 @@ import sys
from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+from tashi.util import createClient, instantiateImplementation, boolean
+from tashi.utils.config import Config
import tashi
class Primitive(object):
@@ -62,7 +63,7 @@ class Primitive(object):
for h in self.cm.getHosts():
#XXXstroucki get all hosts here?
- #if (h.up == True and h.state == HostState.Normal):
+ #if (self.__isReady(h)):
hosts[ctr] = h
ctr = ctr + 1
load[h.id] = []
@@ -76,8 +77,9 @@ class Primitive(object):
# XXXstroucki put held machines behind pending ones
heldInstances = []
for i in instances.itervalues():
+ # Nonrunning VMs will have hostId of None, but
+ # so will Suspended VMs.
if (i.hostId or i.state == InstanceState.Pending):
- # Nonrunning VMs will have hostId of None
load[i.hostId] = load[i.hostId] + [i.id]
elif (i.hostId is None and i.state == InstanceState.Held):
heldInstances = heldInstances + [i.id]
@@ -102,6 +104,11 @@ class Primitive(object):
if name in self.clearHints[hint]:
popit = self.clearHints[hint].index(name)
self.clearHints[hint].pop(popit)
+
+ def __isReady(self, host):
+ if host.up == False or host.state != HostState.Normal:
+ return False
+ return True
def __scheduleInstance(self, inst):
@@ -133,7 +140,7 @@ class Primitive(object):
# has a host preference been expressed?
if (targetHost != None):
for h in self.hosts.values():
- if (h.state == HostState.Normal):
+ if (self.__isReady(h)):
self.__clearHints("targetHost", h.name)
# if this is not the host we are looking for, continue
if ((str(h.id) != targetHost and h.name != targetHost)):
@@ -161,13 +168,8 @@ class Primitive(object):
for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
h = self.hosts[ctr]
- # XXXstroucki if it's down, find another machine
- if (h.up == False):
- continue
-
- # If the host not in normal operating state,
- # find another machine
- if (h.state != HostState.Normal):
+ # XXXstroucki if it's unavailable, find another machine
+ if (self.__isReady(h) == False):
continue
else:
# If the host is back to normal, get rid of the entry in clearHints
@@ -207,7 +209,10 @@ class Primitive(object):
if (not inst.hints.get("__resume_source", None)):
# only run preCreate hooks if newly starting
for hook in self.hooks:
- hook.preCreate(inst)
+ try:
+ hook.preCreate(inst)
+ except:
+ self.log.warning("Failed to run preCreate hook")
self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))
rv = "fail"
try:
@@ -242,8 +247,21 @@ class Primitive(object):
def start(self):
oldInstances = {}
+ # XXXstroucki: scheduling races have been observed, where
+ # a vm is scheduled on a host that had not updated its
+ # capacity with the clustermanager, leading to overloaded
+ # hosts. I think the place to insure against this happening
+ # is in the nodemanager. This scheduler will keep an
+ # internal state of cluster loading, but that is best
+ # effort and will be refreshed from CM once the buffer
+ # of vms to be scheduled is exhausted.
+
while True:
try:
+ # XXXstroucki: to get a list of vms to be
+ # scheduled, it asks the CM for a full
+ # cluster state, and will look at those
+ # without a host.
self.__getState()
# Check for VMs that have exited and call
@@ -281,7 +299,9 @@ class Primitive(object):
time.sleep(self.scheduleDelay)
def main():
- (config, configFiles) = getConfig(["Agent"])
+ config = Config(["Agent"])
+ configFiles = config.getFiles()
+
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
logging.config.fileConfig(configFiles)
Modified: incubator/tashi/branches/stable/src/tashi/agents/primitive_zoni.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/agents/primitive_zoni.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/agents/primitive_zoni.py (original)
+++ incubator/tashi/branches/stable/src/tashi/agents/primitive_zoni.py Tue Jul 17 00:15:34 2012
@@ -17,6 +17,11 @@
# specific language governing permissions and limitations
# under the License.
+# XXXstroucki: this apparently originated from a copy of the primitive
+# scheduler code sometime in 2010. It aims to keep a pool of tashi servers
+# available, and other servers shut down. Could this be better suited for
+# a hook function of the scheduler?
+
from socket import gethostname
import os
import socket
Modified: incubator/tashi/branches/stable/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/client/tashi-client.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/branches/stable/src/tashi/client/tashi-client.py Tue Jul 17 00:15:34 2012
@@ -1,4 +1,4 @@
-#! /usr/bin/env python
+#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -21,8 +21,10 @@ import os.path
import random
import sys
import types
-from tashi.rpycservices.rpyctypes import *
-from tashi import vmStates, hostStates, boolean, getConfig, stringPartition, createClient
+from tashi.rpycservices.rpyctypes import NetworkConfiguration,\
+ DiskConfiguration, HostState, Instance, Host, TashiException
+from tashi.utils.config import Config
+from tashi import vmStates, hostStates, boolean, stringPartition, createClient
users = {}
networks = {}
@@ -48,7 +50,23 @@ def getUser():
for user in users:
if (users[user].name == userStr):
return users[user].id
- raise ValueError("Unknown user %s" % (userStr))
+ raise TashiException({'msg':"Unknown user %s" % (userStr)})
+
+def checkHid(host):
+ userId = getUser()
+ hosts = client.getHosts()
+ hostId = None
+ try:
+ hostId = int(host)
+ except:
+ for h in hosts:
+ if (h.name == host):
+ hostId = h.id
+ if (hostId is None):
+ raise TashiException({'msg':"Unknown host %s" % (str(host))})
+
+ # XXXstroucki permissions for host related stuff?
+ return hostId
def checkIid(instance):
userId = getUser()
@@ -61,13 +79,13 @@ def checkIid(instance):
if (i.name == instance):
instanceId = i.id
if (instanceId is None):
- raise ValueError("Unknown instance %s" % (str(instance)))
+ raise TashiException({'msg':"Unknown instance %s" % (str(instance))})
for instance in instances:
if (instance.id == instanceId):
# XXXstroucki uid 0 to have superuser access
# how about admin groups?
if (instance.userId != userId and instance.userId != None and userId != 0):
- raise ValueError("You don't own that VM")
+ raise TashiException({'msg':"You don't have permissions on VM %s" % instance.name})
return instanceId
def requiredArg(name):
@@ -78,10 +96,17 @@ def randomMac():
def getDefaultNetwork():
fetchNetworks()
- networkId = 1
+ networkId = 0
for network in networks:
+ if (getattr(networks[network], "default", False) is True):
+ networkId = network
+ break
+
+ # Naming the network "default" is deprecated, and
+ # this functionality will be removed soon
if (networks[network].name == "default"):
networkId = network
+ break
return networkId
def randomNetwork():
@@ -93,7 +118,7 @@ def parseDisks(arg):
disks = []
for strDisk in strDisks:
strDisk = strDisk.strip()
- (l, s, r) = stringPartition(strDisk, ":")
+ (l, __s, r) = stringPartition(strDisk, ":")
if (r == ""):
r = "False"
r = boolean(r)
@@ -109,12 +134,12 @@ def parseNics(arg):
nics = []
for strNic in strNics:
strNic = strNic.strip()
- (l, s, r) = stringPartition(strNic, ":")
+ (l, __s, r) = stringPartition(strNic, ":")
n = l
if (n == ''):
n = getDefaultNetwork()
n = int(n)
- (l, s, r) = stringPartition(r, ":")
+ (l, __s, r) = stringPartition(r, ":")
ip = l
if (ip == ''):
ip = None
@@ -133,7 +158,7 @@ def parseHints(arg):
hints = {}
for strHint in strHints:
strHint = strHint.strip()
- (l, s, r) = stringPartition(strHint, "=")
+ (l, __s, r) = stringPartition(strHint, "=")
hints[l] = r
return hints
except:
@@ -161,6 +186,14 @@ def getSlots(cores, memory):
hosts = getVmLayout()
count = 0
+ if cores < 1:
+ print "Argument to cores must be 1 or greater."
+ return
+
+ if memory <= 0:
+ print "Argument to memory must be greater than 0."
+ return
+
for h in hosts:
if h.up is False or h.state != HostState.Normal:
continue
@@ -197,6 +230,9 @@ def __shutdownOrDestroyMany(method, base
count = 0
for i in instances:
if (i.name.startswith(basename + "-") and i.name[len(basename)+1].isdigit()):
+ # checking permissions here
+ checkIid(i.name)
+
if method == "shutdown":
client.shutdownVm(i.id)
@@ -208,7 +244,7 @@ def __shutdownOrDestroyMany(method, base
count = count + 1
if (count == 0):
- raise ValueError("That is an unused basename")
+ raise TashiException({'msg':"%s is an unused basename" % basename})
return None
def getMyInstances():
@@ -244,13 +280,14 @@ argLists = {
'destroyMany': [('basename', str, lambda: requiredArg('basename'), True)],
'suspendVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'resumeVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
-'migrateVm': [('instance', checkIid, lambda: requiredArg('instance'), True), ('targetHostId', int, lambda: requiredArg('targetHostId'), True)],
+'migrateVm': [('instance', checkIid, lambda: requiredArg('instance'), True), ('dst', checkHid, lambda: requiredArg('dst'), True)],
'pauseVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'unpauseVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'getSlots': [('cores', int, lambda: 1, False), ('memory', int, lambda: 128, False)],
'getImages': [],
'copyImage': [('src', str, lambda: requiredArg('src'),True), ('dst', str, lambda: requiredArg('dst'), True)],
'getHosts': [],
+'setHostState': [('host', checkHid, lambda: requiredArg('host'), True), ('state', str, lambda: requiredArg('state'), True)],
'getUsers': [],
'getNetworks': [],
'getInstances': [],
@@ -270,13 +307,14 @@ convertArgs = {
'destroyMany': '[basename]',
'suspendVm': '[instance]',
'resumeVm': '[instance]',
-'migrateVm': '[instance, targetHostId]',
+'migrateVm': '[instance, dst]',
'pauseVm': '[instance]',
'unpauseVm': '[instance]',
'vmmSpecificCall': '[instance, arg]',
'unregisterHost' : '[hostId]',
'getSlots' : '[cores, memory]',
'copyImage' : '[src, dst]',
+'setHostState' : '[host, state]',
}
# Descriptions
@@ -294,6 +332,7 @@ description = {
'unpauseVm': 'Unpauses a paused VM',
'getSlots': 'Get a count of how many VMs could be started in the cluster',
'getHosts': 'Gets a list of hosts running Node Managers',
+'setHostState': 'Set the state of a host, eg. Normal or Drained',
'getUsers': 'Gets a list of users',
'getNetworks': 'Gets a list of available networks for VMs to be placed on',
'getInstances': 'Gets a list of all VMs in the cluster',
@@ -307,19 +346,20 @@ description = {
# Example use strings
examples = {
-'createVm': ['--name foobar --disks i386-hardy.qcow2', '--userId 3 --name foobar --cores 8 --memory 7168 --disks mpi-hardy.qcow2:True,scratch.qcow2:False --nics :1.2.3.4,1::52:54:00:00:56:78 --hints enableDisplay=True'],
-'createMany': ['--basename foobar --disks i386-hardy.qcow2 --count 4'],
-'shutdownVm': ['--instance 12345', '--instance foobar'],
-'destroyVm': ['--instance 12345', '--instance foobar'],
-'shutdownMany': ['--basename foobar'],
-'destroyMany': ['--basename foobar'],
-'suspendVm': ['--instance 12345', '--instance foobar'],
-'resumeVm': ['--instance 12345', '--instance foobar'],
-'migrateVm': ['--instance 12345 --targetHostId 73', '--instance foobar --targetHostId 73'],
-'pauseVm': ['--instance 12345', '--instance foobar'],
-'unpauseVm': ['--instance 12345', '--instance foobar'],
+'createVm': ['--name vmname --disks i386-hardy.qcow2', '--userId 3 --name vmname --cores 8 --memory 7168 --disks mpi-hardy.qcow2:True,scratch.qcow2:False --nics :1.2.3.4,1::52:54:00:00:56:78 --hints enableDisplay=True'],
+'createMany': ['--basename vmname --disks i386-hardy.qcow2 --count 4'],
+'shutdownVm': ['--instance 12345', '--instance vmname'],
+'destroyVm': ['--instance 12345', '--instance vmname'],
+'shutdownMany': ['--basename vmname'],
+'destroyMany': ['--basename vmname'],
+'suspendVm': ['--instance 12345', '--instance vmname'],
+'resumeVm': ['--instance 12345', '--instance vmname'],
+'migrateVm': ['--instance 12345 --dst vmhost1', '--instance vmname --dst 73'],
+'pauseVm': ['--instance 12345', '--instance vmname'],
+'unpauseVm': ['--instance 12345', '--instance vmname'],
'getSlots': ['--cores 1 --memory 128'],
'getHosts': [''],
+'setHostState': ['--host vmhost1 --state Drained'],
'getUsers': [''],
'getNetworks': [''],
'getInstances': [''],
@@ -327,7 +367,7 @@ examples = {
'getVmLayout': [''],
'getImages': [''],
'copyImage': ['--src src.qcow2 --dst dst.qcow2'],
-'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar --arg stopVnc'],
+'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance vmname --arg stopVnc'],
'unregisterHost' : ['--hostId 2'],
}
@@ -393,9 +433,9 @@ def transformState(obj):
except:
obj.state = 'Unknown'
-def genKeys(list):
+def genKeys(_list):
keys = {}
- for row in list:
+ for row in _list:
for item in row.__dict__.keys():
keys[item] = item
if ('id' in keys):
@@ -405,25 +445,25 @@ def genKeys(list):
keys = keys.values()
return keys
-def makeTable(list, keys=None):
- (consoleWidth, consoleHeight) = (9999, 9999)
+def makeTable(_list, keys=None):
+ (consoleWidth, __consoleHeight) = (9999, 9999)
try:
# XXXpipe: get number of rows and column on current window
stdout = os.popen("stty size")
- r = stdout.read()
+ __r = stdout.read()
stdout.close()
except:
pass
- for obj in list:
+ for obj in _list:
transformState(obj)
if (keys == None):
- keys = genKeys(list)
+ keys = genKeys(_list)
for (show, k) in show_hide:
if (show):
if (k != "all"):
keys.append(k)
else:
- keys = genKeys(list)
+ keys = genKeys(_list)
else:
if (k in keys):
keys.remove(k)
@@ -432,7 +472,7 @@ def makeTable(list, keys=None):
maxWidth = {}
for k in keys:
maxWidth[k] = len(k)
- for row in list:
+ for row in _list:
for k in keys:
if (k in row.__dict__):
maxWidth[k] = max(maxWidth[k], len(str(row.__dict__[k])))
@@ -465,8 +505,8 @@ def makeTable(list, keys=None):
return 1
else:
return 0
- list.sort(cmp=sortFunction)
- for row in list:
+ _list.sort(cmp=sortFunction)
+ for row in _list:
line = ""
for k in keys:
row.__dict__[k] = row.__dict__.get(k, "")
@@ -532,7 +572,7 @@ def main():
if (len(sys.argv) < 2):
usage()
function = matchFunction(sys.argv[1])
- (config, configFiles) = getConfig(["Client"])
+ config = Config(["Client"])
# build a structure of possible arguments
possibleArgs = {}
@@ -608,6 +648,11 @@ def main():
fargs = []
res = f(*fargs)
+
+ except TashiException, e:
+ print "Failed in calling %s: %s" % (function, e.msg)
+ sys.exit(-1)
+
except Exception, e:
print "Failed in calling %s: %s" % (function, e)
print "Please run tashi-client --examples for syntax information"
@@ -628,15 +673,12 @@ def main():
except Exception, e:
print e
except TashiException, e:
- print "TashiException:"
+ print "Tashi could not complete your request:"
print e.msg
exitCode = e.errno
-# except Exception, e:
-# print e
- # XXXstroucki: exception may be unrelated to usage of function
- # so don't print usage on exception as if there were a problem
- # with the arguments
- #usage(function)
+ except Exception, e:
+ print e
+ print "Please run tashi-client --examples for syntax information"
sys.exit(exitCode)
if __name__ == "__main__":
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py Tue Jul 17 00:15:34 2012
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -19,10 +19,10 @@
import os
import sys
-import time
import logging.config
-from tashi.util import boolean, instantiateImplementation, getConfig, debugConsole
+from tashi.util import boolean, instantiateImplementation, debugConsole
+from tashi.utils.config import Config
import tashi
from tashi.rpycservices import rpycservices
@@ -47,6 +47,9 @@ def startClusterManager(config):
users[config.get('AllowedUsers', 'nodeManagerUser')] = config.get('AllowedUsers', 'nodeManagerPassword')
users[config.get('AllowedUsers', 'agentUser')] = config.get('AllowedUsers', 'agentPassword')
authenticator = TlsliteVdbAuthenticator.from_dict(users)
+
+ # XXXstroucki ThreadedServer is liable to have exceptions
+ # occur within if an endpoint is lost.
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False)
@@ -64,7 +67,8 @@ def main():
global log
# setup configuration and logging
- (config, configFiles) = getConfig(["ClusterManager"])
+ config = Config(["ClusterManager"])
+ configFiles = config.getFiles()
publisher = instantiateImplementation(config.get("ClusterManager", "publisher"), config)
tashi.publisher = publisher
logging.config.fileConfig(configFiles)
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py Tue Jul 17 00:15:34 2012
@@ -19,9 +19,8 @@ import logging
import threading
import time
-from tashi.rpycservices import rpycservices
-from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
-from tashi import boolean, ConnectionManager, vmStates, version, scrubString
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, Instance, HostState, TashiException
+from tashi import boolean, ConnectionManager, vmStates, hostStates, version, scrubString
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
@@ -49,6 +48,9 @@ class ClusterManagerService(object):
self.allowMismatchedVersions = boolean(self.config.get('ClusterManagerService', 'allowMismatchedVersions'))
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
+
+ self.defaultNetwork = self.config.getint('ClusterManagerService', 'defaultNetwork', 0)
+
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
self.accountingHost = None
@@ -62,7 +64,7 @@ class ClusterManagerService(object):
self.__initAccounting()
self.__initCluster()
- threading.Thread(target=self.__monitorCluster).start()
+ threading.Thread(name="monitorCluster", target=self.__monitorCluster).start()
def __initAccounting(self):
self.accountBuffer = []
@@ -232,7 +234,7 @@ class ClusterManagerService(object):
# get a list of VMs running on host
try:
hostProxy = self.proxy[host.name]
- remoteInstances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
+ remoteInstances = [self.__getVmInfo(host.name, vmId) for vmId in hostProxy.listVms()]
except:
self.log.warning('Failure getting instances from host %s' % (host.name))
self.data.releaseHost(host)
@@ -241,6 +243,9 @@ class ClusterManagerService(object):
# register instances I don't know about
for instance in remoteInstances:
if (instance.id not in myInstances):
+ if instance.state == InstanceState.Exited:
+ self.log.warning("%s telling me about exited instance %s, ignoring." % (host.name, instance.id))
+ continue
instance.hostId = host.id
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
@@ -282,9 +287,7 @@ class ClusterManagerService(object):
# Don't query non-running VMs. eg. if a VM
# is suspended, and has no host, then there's
# no one to ask
- if instance.state != InstanceState.Running and \
- instance.state != InstanceState.Activating and \
- instance.state != InstanceState.Orphaned:
+ if instance.state not in [InstanceState.Running, InstanceState.Activating, InstanceState.Orphaned]:
self.data.releaseInstance(instance)
continue
except:
@@ -299,22 +302,34 @@ class ClusterManagerService(object):
# get updated state on VM
try:
- hostProxy = self.proxy[host.name]
- newInstance = hostProxy.getVmInfo(instance.vmId)
+ newInstance = self.__getVmInfo(host.name, instance.vmId)
except:
self.log.warning('Failure getting data for instance %s from host %s' % (instance.name, host.name))
self.data.releaseInstance(instance)
continue
- # replace existing state with new state
- # XXXstroucki more?
- instance.state = newInstance.state
- self.instanceLastContactTime[instanceId] = self.__now()
- instance.decayed = False
- self.data.releaseInstance(instance)
+ # update the information we have on the vm
+ #before = instance.state
+ rv = self.__vmUpdate(instance, newInstance, None)
+ if (rv == "release"):
+ self.data.releaseInstance(instance)
+
+ if (rv == "remove"):
+ self.data.removeInstance(instance)
+
+
+ def __getVmInfo(self, host, vmid):
+ hostProxy = self.proxy[host]
+ rv = hostProxy.getVmInfo(vmid)
+ if isinstance(rv, Exception):
+ raise rv
+ if not isinstance(rv, Instance):
+ raise ValueError
- def normalize(self, instance):
+ return rv
+
+ def __normalize(self, instance):
instance.id = None
instance.vmId = None
instance.hostId = None
@@ -342,15 +357,17 @@ class ClusterManagerService(object):
del instance.hints[hint]
return instance
+ # extern
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
# XXXstroucki: check for exception here
- instance = self.normalize(instance)
+ instance = self.__normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM REQUEST", instance=instance)
return instance
-
+
+ # extern
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.__stateTransition(instance, None, InstanceState.ShuttingDown)
@@ -363,7 +380,8 @@ class ClusterManagerService(object):
self.log.exception('shutdownVm failed for host %s vmId %d' % (instance.name, instance.vmId))
raise
return
-
+
+ # extern
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
@@ -393,6 +411,7 @@ class ClusterManagerService(object):
return
+ # extern
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
@@ -410,8 +429,10 @@ class ClusterManagerService(object):
except:
self.log.exception('suspendVm failed for host %s vmId %d' % (hostname, instance.vmId))
raise TashiException(d={'errno':Errors.UnableToSuspend, 'msg':'Failed to suspend %s' % (instance.name)})
- return
+
+ return "%s is suspending." % (instance.name)
+ # extern
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
@@ -424,8 +445,9 @@ class ClusterManagerService(object):
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM RESUME", instance=instance)
- return instance
+ return "%s is resuming." % (instance.name)
+ # extern
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
self.__ACCOUNT("CM VM MIGRATE", instance=instance)
@@ -476,12 +498,15 @@ class ClusterManagerService(object):
try:
# Notify the target
- vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
+ __vmid = self.proxy[targetHost.name].receiveVm(instance, cookie)
except Exception:
self.log.exception('receiveVm failed')
raise
+
+ self.log.info("migrateVM finished")
return
-
+
+ # extern
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
@@ -508,6 +533,7 @@ class ClusterManagerService(object):
self.data.releaseInstance(instance)
return
+ # extern
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
try:
@@ -533,22 +559,53 @@ class ClusterManagerService(object):
self.data.releaseInstance(instance)
return
-
+
+ # extern
def getHosts(self):
return self.data.getHosts().values()
+ # extern
+ def setHostState(self, hostId, state):
+ state = state.lower()
+ hostState = None
+ if state == "normal":
+ hostState = HostState.Normal
+ if state == "drained":
+ hostState = HostState.Drained
+
+ if hostState is None:
+ return "%s is not a valid host state" % state
+
+ host = self.data.acquireHost(hostId)
+ try:
+ host.state = hostState
+ finally:
+ self.data.releaseHost(host)
+
+ return "Host state set to %s." % hostStates[hostState]
+
+ # extern
def getNetworks(self):
- return self.data.getNetworks().values()
-
+ networks = self.data.getNetworks()
+ for network in networks:
+ if self.defaultNetwork == networks[network].id:
+ setattr(networks[network], "default", True)
+
+ return networks.values()
+
+ # extern
def getUsers(self):
return self.data.getUsers().values()
-
+
+ # extern
def getInstances(self):
return self.data.getInstances().values()
+ # extern
def getImages(self):
return self.data.getImages()
+ # extern
def copyImage(self, src, dst):
imageSrc = self.dfs.getLocalHandle("images/" + src)
imageDst = self.dfs.getLocalHandle("images/" + dst)
@@ -562,6 +619,7 @@ class ClusterManagerService(object):
except Exception, e:
self.log.exception('DFS image copy failed: %s (%s->%s)' % (e, imageSrc, imageDst))
+ # extern
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
@@ -573,7 +631,7 @@ class ClusterManagerService(object):
raise
return res
-# @timed
+ # extern
def registerNodeManager(self, host, instances):
"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
@@ -606,45 +664,47 @@ class ClusterManagerService(object):
# let the host communicate what it is running
# and note that the information is not stale
for instance in instances:
+ if instance.state == InstanceState.Exited:
+ self.log.warning("%s reporting exited instance %s, ignoring." % (host.name, instance.id))
+ continue
self.instanceLastContactTime.setdefault(instance.id, 0)
self.data.releaseHost(oldHost)
return host.id
- def vmUpdate(self, instanceId, instance, oldState):
- try:
- oldInstance = self.data.acquireInstance(instanceId)
- except TashiException, e:
- # shouldn't have a lock to clean up after here
- if (e.errno == Errors.NoSuchInstanceId):
- self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
- return
- except:
- self.log.exception("Could not acquire instance")
- raise
+ def __vmUpdate(self, oldInstance, instance, oldState):
+ # this function assumes a lock is held on the instance
+ # already, and will be released elsewhere
- self.instanceLastContactTime[instanceId] = self.__now()
+ self.instanceLastContactTime[oldInstance.id] = self.__now()
oldInstance.decayed = False
- self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
if (instance.state == InstanceState.Exited):
# determine why a VM has exited
hostname = self.data.getHost(oldInstance.hostId).name
+
if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
+
if (oldInstance.state == InstanceState.Suspending):
self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
oldInstance.hostId = None
oldInstance.vmId = None
- self.data.releaseInstance(oldInstance)
+ return "release"
+
+ if (oldInstance.state == InstanceState.MigrateTrans):
+ # Just await update from target host
+ return "release"
+
else:
del self.instanceLastContactTime[oldInstance.id]
- self.data.removeInstance(oldInstance)
+ return "remove"
+
else:
if (instance.state):
# XXXstroucki does this matter?
if (oldState and oldInstance.state != oldState):
- self.log.warning('Got vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
+ self.log.warning('Doing vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
oldInstance.state = instance.state
if (instance.vmId):
oldInstance.vmId = instance.vmId
@@ -657,11 +717,44 @@ class ClusterManagerService(object):
if (oldNic.mac == nic.mac):
oldNic.ip = nic.ip
+ return "release"
+
+
+ return "success"
+
+ # extern
+ def vmUpdate(self, instanceId, instance, oldState):
+ try:
+ oldInstance = self.data.acquireInstance(instanceId)
+ except TashiException, e:
+ # shouldn't have a lock to clean up after here
+ if (e.errno == Errors.NoSuchInstanceId):
+ self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
+ return
+ except:
+ self.log.exception("Could not acquire instance")
+ raise
+
+ import copy
+ displayInstance = copy.copy(oldInstance)
+ displayInstance.state = instance.state
+ self.__ACCOUNT("CM VM UPDATE", instance=displayInstance)
+
+ rv = self.__vmUpdate(oldInstance, instance, oldState)
+
+ if (rv == "release"):
self.data.releaseInstance(oldInstance)
+ if (rv == "remove"):
+ self.data.removeInstance(oldInstance)
+
return "success"
-
+
+ # extern
def activateVm(self, instanceId, host):
+ # XXXstroucki: check my idea of the host's capacity before
+ # trying.
+
dataHost = self.data.acquireHost(host.id)
if (dataHost.name != host.name):
@@ -725,6 +818,7 @@ class ClusterManagerService(object):
self.data.releaseInstance(instance)
return "success"
+ # extern
def registerHost(self, hostname, memory, cores, version):
hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
if alreadyRegistered:
@@ -740,6 +834,7 @@ class ClusterManagerService(object):
return hostId
+ # extern
def unregisterHost(self, hostId):
try:
host = self.data.getHost(hostId)
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/datainterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/datainterface.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/datainterface.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/datainterface.py Tue Jul 17 00:15:34 2012
@@ -43,7 +43,7 @@ class DataInterface(object):
def getHosts(self):
raise NotImplementedError
- def getHost(self, id):
+ def getHost(self, _id):
raise NotImplementedError
def getImages(self):
@@ -52,19 +52,19 @@ class DataInterface(object):
def getInstances(self):
raise NotImplementedError
- def getInstance(self, id):
+ def getInstance(self, _id):
raise NotImplementedError
def getNetworks(self):
raise NotImplementedError
- def getNetwork(self, id):
+ def getNetwork(self, _id):
raise NotImplementedError
def getUsers(self):
raise NotImplementedError
- def getUser(self, id):
+ def getUser(self, _id):
raise NotImplementedError
def registerHost(self, hostname, memory, cores, version):
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py Tue Jul 17 00:15:34 2012
@@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+#XXXstroucki: for compatibility with python 2.5
from __future__ import with_statement
+
import logging
import threading
import os
@@ -167,47 +169,47 @@ class FromConfig(DataInterface):
def getHosts(self):
return self.hosts
- def getHost(self, id):
- host = self.hosts.get(id, None)
+ def getHost(self, _id):
+ host = self.hosts.get(_id, None)
if (not host):
- raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (_id)})
return host
def getInstances(self):
return self.instances
- def getInstance(self, id):
- instance = self.instances.get(id, None)
+ def getInstance(self, _id):
+ instance = self.instances.get(_id, None)
if (not instance):
- raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (_id)})
return instance
def getNetworks(self):
return self.networks
- def getNetwork(self, id):
- return self.networks[id]
+ def getNetwork(self, _id):
+ return self.networks[_id]
def getUsers(self):
return self.users
- def getUser(self, id):
- return self.users[id]
+ def getUser(self, _id):
+ return self.users[_id]
def registerHost(self, hostname, memory, cores, version):
self.hostLock.acquire()
- for id in self.hosts.keys():
- if self.hosts[id].name == hostname:
- host = Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
- self.hosts[id] = host
+ for _id in self.hosts.keys():
+ if self.hosts[_id].name == hostname:
+ host = Host(d={'id':_id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+ self.hosts[_id] = host
self.save()
self.hostLock.release()
- return id, True
- id = self.getNewId("hosts")
- self.hosts[id] = Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+ return _id, True
+ _id = self.getNewId("hosts")
+ self.hosts[_id] = Host(d={'id':_id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
self.save()
self.hostLock.release()
- return id, False
+ return _id, False
def unregisterHost(self, hostId):
self.hostLock.acquire()
@@ -222,10 +224,10 @@ class FromConfig(DataInterface):
maxId = 0
l = []
if(table == "hosts"):
- for id in self.hosts.keys():
- l.append(id)
- if id >= maxId:
- maxId = id
+ for _id in self.hosts.keys():
+ l.append(_id)
+ if _id >= maxId:
+ maxId = _id
l.sort() # sort to enable comparing with range output
# check if some id is released:
t = range(maxId + 1)
@@ -243,9 +245,9 @@ class FromConfig(DataInterface):
# and in what order does it get loaded
fileName = "./etc/Tashi.cfg"
if not os.path.exists(fileName):
- file = open(fileName, "w")
- file.write("[FromConfig]")
- file.close()
+ filehandle = open(fileName, "w")
+ filehandle.write("[FromConfig]")
+ filehandle.close()
parser = ConfigParser.ConfigParser()
parser.read(fileName)
@@ -253,7 +255,7 @@ class FromConfig(DataInterface):
parser.add_section("FromConfig")
hostsInFile = []
- for (name, value) in parser.items("FromConfig"):
+ for (name, __value) in parser.items("FromConfig"):
name = name.lower()
if (name.startswith("host")):
hostsInFile.append(name)
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/getentoverride.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/getentoverride.py Tue Jul 17 00:15:34 2012
@@ -75,20 +75,20 @@ class GetentOverride(DataInterface):
def getHosts(self):
return self.baseDataObject.getHosts()
- def getHost(self, id):
- return self.baseDataObject.getHost(id)
+ def getHost(self, _id):
+ return self.baseDataObject.getHost(_id)
def getInstances(self):
return self.baseDataObject.getInstances()
- def getInstance(self, id):
- return self.baseDataObject.getInstance(id)
+ def getInstance(self, _id):
+ return self.baseDataObject.getInstance(_id)
def getNetworks(self):
return self.baseDataObject.getNetworks()
- def getNetwork(self, id):
- return self.baseDataObject.getNetwork(id)
+ def getNetwork(self, _id):
+ return self.baseDataObject.getNetwork(_id)
def getImages(self):
count = 0
@@ -109,12 +109,12 @@ class GetentOverride(DataInterface):
try:
for l in p.stdout.xreadlines():
ws = l.strip().split(":")
- id = int(ws[2])
+ _id = int(ws[2])
name = ws[0]
user = User()
- user.id = id
+ user.id = _id
user.name = name
- myUsers[id] = user
+ myUsers[_id] = user
self.users = myUsers
self.lastUserUpdate = now
finally:
@@ -124,9 +124,9 @@ class GetentOverride(DataInterface):
self.fetchFromGetent()
return self.users
- def getUser(self, id):
+ def getUser(self, _id):
self.fetchFromGetent()
- return self.users[id]
+ return self.users[_id]
def registerHost(self, hostname, memory, cores, version):
return self.baseDataObject.registerHost(hostname, memory, cores, version)
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/ldapoverride.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/ldapoverride.py Tue Jul 17 00:15:34 2012
@@ -19,9 +19,8 @@ import subprocess
import time
#XXXstroucki getImages requires os?
import os
-from tashi.rpycservices.rpyctypes import Errors, Network, Host, User, Instance, TashiException, LocalImages, DiskConfiguration, NetworkConfiguration
-from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
-from tashi.rpycservices.rpyctypes import User
+from tashi.rpycservices.rpyctypes import User, LocalImages
+from tashi.util import instantiateImplementation, humanReadable
from tashi.clustermanager.data import DataInterface
class LdapOverride(DataInterface):
@@ -57,20 +56,20 @@ class LdapOverride(DataInterface):
def getHosts(self):
return self.baseDataObject.getHosts()
- def getHost(self, id):
- return self.baseDataObject.getHost(id)
+ def getHost(self, _id):
+ return self.baseDataObject.getHost(_id)
def getInstances(self):
return self.baseDataObject.getInstances()
- def getInstance(self, id):
- return self.baseDataObject.getInstance(id)
+ def getInstance(self, _id):
+ return self.baseDataObject.getInstance(_id)
def getNetworks(self):
return self.baseDataObject.getNetworks()
- def getNetwork(self, id):
- return self.baseDataObject.getNetwork(id)
+ def getNetwork(self, _id):
+ return self.baseDataObject.getNetwork(_id)
def getImages(self):
count = 0
@@ -101,7 +100,7 @@ class LdapOverride(DataInterface):
myUsers[user.id] = user
thisUser = {}
else:
- (key, sep, val) = l.partition(":")
+ (key, __sep, val) = l.partition(":")
key = key.strip()
val = val.strip()
thisUser[key] = val
@@ -116,9 +115,9 @@ class LdapOverride(DataInterface):
self.fetchFromLdap()
return self.users
- def getUser(self, id):
+ def getUser(self, _id):
self.fetchFromLdap()
- return self.users[id]
+ return self.users[_id]
def registerHost(self, hostname, memory, cores, version):
return self.baseDataObject.registerHost(hostname, memory, cores, version)
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py Tue Jul 17 00:15:34 2012
@@ -37,18 +37,19 @@ class Pickled(FromConfig):
self.hostLock = threading.Lock()
self.hostLocks = {}
self.idLock = threading.Lock()
+ self.dbLock = threading.Lock()
self.load()
def cleanInstances(self):
ci = {}
- for ignore, i in self.instances.items():
+ for __ignore, i in self.instances.items():
i2 = Instance(d=i.__dict__)
ci[i2.id] = i2
return ci
def cleanHosts(self):
ch = {}
- for ignore, h in self.hosts.items():
+ for __ignore, h in self.hosts.items():
h2 = Host(d=h.__dict__)
ch[h2.id] = h2
return ch
@@ -58,27 +59,35 @@ class Pickled(FromConfig):
# XXXstroucki could be better
tempfile = "%s.new" % filename
- file = open(tempfile, "w")
- cPickle.dump((self.cleanHosts(), self.cleanInstances(), self.networks, self.users), file)
- file.close()
- os.rename(tempfile, filename)
+ self.dbLock.acquire()
+ try:
+ filehandle = open(tempfile, "w")
+ cPickle.dump((self.cleanHosts(), self.cleanInstances(), self.networks, self.users), filehandle)
+ filehandle.close()
+ os.rename(tempfile, filename)
+
+ except OSError:
+ self.log.exception("Error saving database")
+
+ finally:
+ self.dbLock.release()
def load(self):
if (os.access(self.file, os.F_OK)):
- file = open(self.file, "r")
- (hosts, instances, networks, users) = cPickle.load(file)
- file.close()
+ filehandle = open(self.file, "r")
+ (hosts, instances, networks, users) = cPickle.load(filehandle)
+ filehandle.close()
else:
(hosts, instances, networks, users) = ({}, {}, {}, {})
self.hosts = hosts
self.instances = instances
self.networks = networks
self.users = users
- for ignore, i in self.instances.items():
+ for __ignore, i in self.instances.items():
if (i.id >= self.maxInstanceId):
self.maxInstanceId = i.id + 1
i._lock = threading.Lock()
self.lockNames[i._lock] = "i%d" % (i.id)
- for ignore, h in self.hosts.items():
+ for __ignore, h in self.hosts.items():
h._lock = threading.Lock()
self.lockNames[h._lock] = "h%d" % (h.id)
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py Tue Jul 17 00:15:34 2012
@@ -45,8 +45,8 @@ class SQL(DataInterface):
else:
raise TashiException, 'Unknown SQL database engine by URI: %s' % (self.uri)
- self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints']
- self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version']
+ self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints', 'groupName']
+ self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version', 'notes', 'reserved']
self.instanceLock = threading.Lock()
self.instanceIdLock = threading.Lock()
self.instanceLocks = {}
@@ -83,8 +83,8 @@ class SQL(DataInterface):
return instanceId
def verifyStructure(self):
- self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL, state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024) NOT NULL, hints varchar(1024) NOT NULL)")
- self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0, decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores int(11), version varchar(256))")
+ self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL, state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024) NOT NULL, hints varchar(1024) NOT NULL, groupName varchar(256))")
+ self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0, decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores int(11), version varchar(256), notes varchar(256), reserved varchar(1024))")
self.executeStatement("CREATE TABLE IF NOT EXISTS networks (id int(11) NOT NULL, name varchar(256) NOT NULL)")
self.executeStatement("CREATE TABLE IF NOT EXISTS users (id int(11) NOT NULL, name varchar(256) NOT NULL, passwd varchar(256))")
@@ -101,7 +101,7 @@ class SQL(DataInterface):
l = []
for e in range(0, len(self.instanceOrder)):
l.append(i.__dict__[self.instanceOrder[e]])
- return map(lambda x: self.sanitizeForSql('"' + str(x) + '"'), l)
+ return map(lambda x: self.sanitizeForSql('"%s"' % str(x)), l)
def makeListInstance(self, l):
i = Instance()
@@ -118,7 +118,7 @@ class SQL(DataInterface):
l = []
for e in range(0, len(self.hostOrder)):
l.append(h.__dict__[self.hostOrder[e]])
- return map(lambda x: self.sanitizeForSql('"' + str(x) + '"'), l)
+ return map(lambda x: self.sanitizeForSql('"%s"' % str(x)), l)
def makeListHost(self, l):
h = Host()
@@ -127,6 +127,10 @@ class SQL(DataInterface):
h.up = boolean(h.up)
h.decayed = boolean(h.decayed)
h.state = int(h.state)
+ if h.reserved is not None:
+ h.reserved = eval(h.reserved)
+ else:
+ h.reserved = []
return h
def registerInstance(self, instance):
@@ -148,7 +152,8 @@ class SQL(DataInterface):
instance._lock.acquire()
self.instanceBusy[instance.id] = True
l = self.makeInstanceList(instance)
- self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
+ # XXXstroucki nicer?
+ self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
finally:
self.instanceLock.release()
return instance
@@ -254,14 +259,14 @@ class SQL(DataInterface):
def getHost(self, in_id):
try:
- id = int(in_id)
+ _id = int(in_id)
except:
self.log.exception("Argument to getHost was not integer: %s" % in_id)
- cur = self.executeStatement("SELECT * FROM hosts WHERE id = %d" % id)
+ cur = self.executeStatement("SELECT * FROM hosts WHERE id = %d" % _id)
r = cur.fetchone()
if (r == None):
- raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (_id)})
host = self.makeListHost(r)
return host
@@ -276,16 +281,16 @@ class SQL(DataInterface):
def getInstance(self, in_id):
try:
- id = int(in_id)
+ _id = int(in_id)
except:
self.log.exception("Argument to getInstance was not integer: %s" % in_id)
- cur = self.executeStatement("SELECT * FROM instances WHERE id = %d" % (id))
+ cur = self.executeStatement("SELECT * FROM instances WHERE id = %d" % (_id))
# XXXstroucki should only return one row.
# what about migration? should it be enforced?
r = cur.fetchone()
if (not r):
- raise TashiException(d={'errno':Errors.NoSuchInstanceId, 'msg':"No such instanceId - %d" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchInstanceId, 'msg':"No such instanceId - %d" % (_id)})
instance = self.makeListInstance(r)
return instance
@@ -298,8 +303,8 @@ class SQL(DataInterface):
networks[network.id] = network
return networks
- def getNetwork(self, id):
- cur = self.executeStatement("SELECT * FROM networks WHERE id = %d" % (id))
+ def getNetwork(self, _id):
+ cur = self.executeStatement("SELECT * FROM networks WHERE id = %d" % (_id))
r = cur.fetchone()
network = Network(d={'id':r[0], 'name':r[1]})
return network
@@ -325,8 +330,8 @@ class SQL(DataInterface):
users[user.id] = user
return users
- def getUser(self, id):
- cur = self.executeStatement("SELECT * FROM users WHERE id = %d" % (id))
+ def getUser(self, _id):
+ cur = self.executeStatement("SELECT * FROM users WHERE id = %d" % (_id))
r = cur.fetchone()
user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
return user
@@ -337,22 +342,23 @@ class SQL(DataInterface):
res = cur.fetchall()
for r in res:
if r[1] == hostname:
- id = r[0]
- self.log.warning("Host %s already registered, update will be done" % id)
+ _id = r[0]
+ self.log.warning("Host %s already registered, update will be done" % _id)
s = ""
- host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
+ host = Host(d={'id': _id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
l = self.makeHostList(host)
for e in range(0, len(self.hostOrder)):
s = s + self.hostOrder[e] + "=" + l[e]
if (e < len(self.hostOrder)-1):
s = s + ", "
- self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, id))
+ self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, _id))
self.hostLock.release()
return r[0], True
- id = self.getNewId("hosts")
- host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
+ _id = self.getNewId("hosts")
+ host = Host(d={'id': _id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version, 'notes':'', 'reserved':[]})
l = self.makeHostList(host)
- self.executeStatement("INSERT INTO hosts VALUES (%s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
+ # XXXstroucki nicer?
+ self.executeStatement("INSERT INTO hosts VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
self.hostLock.release()
return id, False
@@ -374,10 +380,10 @@ class SQL(DataInterface):
maxId = 0 # the first id would be 1
l = []
for r in res:
- id = r[0]
- l.append(id)
- if id >= maxId:
- maxId = id
+ _id = r[0]
+ l.append(_id)
+ if _id >= maxId:
+ maxId = _id
l.sort() # sort to enable comparing with range output
# check if some id is released:
t = range(maxId + 1)
Modified: incubator/tashi/branches/stable/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/connectionmanager.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/branches/stable/src/tashi/connectionmanager.py Tue Jul 17 00:15:34 2012
@@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-from tashi.rpycservices import rpycservices
from tashi import Connection
#from tashi.rpycservices.rpyctypes import *
Modified: incubator/tashi/branches/stable/src/tashi/dfs/vfs.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/dfs/vfs.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/dfs/vfs.py (original)
+++ incubator/tashi/branches/stable/src/tashi/dfs/vfs.py Tue Jul 17 00:15:34 2012
@@ -18,7 +18,6 @@
# implementation of dfs interface functions
import shutil
-import os
import os.path
from dfsinterface import DfsInterface
Modified: incubator/tashi/branches/stable/src/tashi/messaging/gangliapublisher.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/messaging/gangliapublisher.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/messaging/gangliapublisher.py (original)
+++ incubator/tashi/branches/stable/src/tashi/messaging/gangliapublisher.py Tue Jul 17 00:15:34 2012
@@ -17,7 +17,6 @@
import os
import time
-import types
from tashi import scrubString
Modified: incubator/tashi/branches/stable/src/tashi/messaging/messagingloghandler.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/messaging/messagingloghandler.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/messaging/messagingloghandler.py (original)
+++ incubator/tashi/branches/stable/src/tashi/messaging/messagingloghandler.py Tue Jul 17 00:15:34 2012
@@ -34,7 +34,9 @@ class MessagingLogHandler(logging.Handle
try:
key = "log_%s_%d_%d" % (self.name, self.msgIndex, int(time.time()*1000))
val = self.format(record)
- tashi.publisher.publish({key:val})
+ #XXXstroucki publisher does not exist
+ (_,_) = (key,val)
+ #tashi.publisher.publish({key:val})
self.msgIndex = self.msgIndex + 1
except Exception, e:
print e
Modified: incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py?rev=1362324&r1=1362323&r2=1362324&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py Tue Jul 17 00:15:34 2012
@@ -18,25 +18,25 @@
# under the License.
import logging.config
-import signal
import sys
import os
-import time
-from tashi.util import instantiateImplementation, getConfig, debugConsole
+from tashi.util import instantiateImplementation, debugConsole
import tashi
from tashi import boolean
from tashi.rpycservices import rpycservices
+from tashi.utils.config import Config
+
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import TlsliteVdbAuthenticator
def main():
global config, log
- (config, configFiles) = getConfig(["NodeManager"])
- publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
- tashi.publisher = publisher
+ config = Config(["NodeManager"])
+ configFiles = config.getFiles()
+
logging.config.fileConfig(configFiles)
log = logging.getLogger(__name__)
log.info('Using configuration file(s) %s' % configFiles)
@@ -78,6 +78,9 @@ def startNodeManager():
users = {}
users[config.get('AllowedUsers', 'clusterManagerUser')] = config.get('AllowedUsers', 'clusterManagerPassword')
authenticator = TlsliteVdbAuthenticator.from_dict(users)
+
+ # XXXstroucki: ThreadedServer is liable to have exceptions
+ # occur within if an endpoint is lost.
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False)