You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/07/17 21:06:01 UTC
svn commit: r1362643 [2/3] - in
/incubator/tashi/branches/stroucki-registration: ./ doc/ etc/ src/tashi/
src/tashi/accounting/ src/tashi/agents/ src/tashi/client/
src/tashi/clustermanager/ src/tashi/clustermanager/data/ src/tashi/dfs/
src/tashi/messagi...
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/sql.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/clustermanager/data/sql.py Tue Jul 17 21:05:59 2012
@@ -45,8 +45,8 @@ class SQL(DataInterface):
else:
raise TashiException, 'Unknown SQL database engine by URI: %s' % (self.uri)
- self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints']
- self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version']
+ self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints', 'groupName']
+ self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version', 'notes', 'reserved']
self.instanceLock = threading.Lock()
self.instanceIdLock = threading.Lock()
self.instanceLocks = {}
@@ -83,8 +83,8 @@ class SQL(DataInterface):
return instanceId
def verifyStructure(self):
- self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL, state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024) NOT NULL, hints varchar(1024) NOT NULL)")
- self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0, decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores int(11), version varchar(256))")
+ self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL, state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024) NOT NULL, hints varchar(1024) NOT NULL, groupName varchar(256))")
+ self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0, decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores int(11), version varchar(256), notes varchar(256), reserved varchar(1024))")
self.executeStatement("CREATE TABLE IF NOT EXISTS networks (id int(11) NOT NULL, name varchar(256) NOT NULL)")
self.executeStatement("CREATE TABLE IF NOT EXISTS users (id int(11) NOT NULL, name varchar(256) NOT NULL, passwd varchar(256))")
@@ -101,7 +101,7 @@ class SQL(DataInterface):
l = []
for e in range(0, len(self.instanceOrder)):
l.append(i.__dict__[self.instanceOrder[e]])
- return map(lambda x: self.sanitizeForSql('"' + str(x) + '"'), l)
+ return map(lambda x: self.sanitizeForSql('"%s"' % str(x)), l)
def makeListInstance(self, l):
i = Instance()
@@ -118,7 +118,7 @@ class SQL(DataInterface):
l = []
for e in range(0, len(self.hostOrder)):
l.append(h.__dict__[self.hostOrder[e]])
- return map(lambda x: self.sanitizeForSql('"' + str(x) + '"'), l)
+ return map(lambda x: self.sanitizeForSql('"%s"' % str(x)), l)
def makeListHost(self, l):
h = Host()
@@ -127,6 +127,10 @@ class SQL(DataInterface):
h.up = boolean(h.up)
h.decayed = boolean(h.decayed)
h.state = int(h.state)
+ if h.reserved is not None:
+ h.reserved = eval(h.reserved)
+ else:
+ h.reserved = []
return h
def registerInstance(self, instance):
@@ -148,7 +152,8 @@ class SQL(DataInterface):
instance._lock.acquire()
self.instanceBusy[instance.id] = True
l = self.makeInstanceList(instance)
- self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
+ # XXXstroucki nicer?
+ self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
finally:
self.instanceLock.release()
return instance
@@ -254,14 +259,14 @@ class SQL(DataInterface):
def getHost(self, in_id):
try:
- id = int(in_id)
+ _id = int(in_id)
except:
self.log.exception("Argument to getHost was not integer: %s" % in_id)
- cur = self.executeStatement("SELECT * FROM hosts WHERE id = %d" % id)
+ cur = self.executeStatement("SELECT * FROM hosts WHERE id = %d" % _id)
r = cur.fetchone()
if (r == None):
- raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (_id)})
host = self.makeListHost(r)
return host
@@ -276,16 +281,16 @@ class SQL(DataInterface):
def getInstance(self, in_id):
try:
- id = int(in_id)
+ _id = int(in_id)
except:
self.log.exception("Argument to getInstance was not integer: %s" % in_id)
- cur = self.executeStatement("SELECT * FROM instances WHERE id = %d" % (id))
+ cur = self.executeStatement("SELECT * FROM instances WHERE id = %d" % (_id))
# XXXstroucki should only return one row.
# what about migration? should it be enforced?
r = cur.fetchone()
if (not r):
- raise TashiException(d={'errno':Errors.NoSuchInstanceId, 'msg':"No such instanceId - %d" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchInstanceId, 'msg':"No such instanceId - %d" % (_id)})
instance = self.makeListInstance(r)
return instance
@@ -298,8 +303,8 @@ class SQL(DataInterface):
networks[network.id] = network
return networks
- def getNetwork(self, id):
- cur = self.executeStatement("SELECT * FROM networks WHERE id = %d" % (id))
+ def getNetwork(self, _id):
+ cur = self.executeStatement("SELECT * FROM networks WHERE id = %d" % (_id))
r = cur.fetchone()
network = Network(d={'id':r[0], 'name':r[1]})
return network
@@ -325,8 +330,8 @@ class SQL(DataInterface):
users[user.id] = user
return users
- def getUser(self, id):
- cur = self.executeStatement("SELECT * FROM users WHERE id = %d" % (id))
+ def getUser(self, _id):
+ cur = self.executeStatement("SELECT * FROM users WHERE id = %d" % (_id))
r = cur.fetchone()
user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
return user
@@ -337,22 +342,23 @@ class SQL(DataInterface):
res = cur.fetchall()
for r in res:
if r[1] == hostname:
- id = r[0]
- self.log.warning("Host %s already registered, update will be done" % id)
+ _id = r[0]
+ self.log.warning("Host %s already registered, update will be done" % _id)
s = ""
- host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
+ host = Host(d={'id': _id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
l = self.makeHostList(host)
for e in range(0, len(self.hostOrder)):
s = s + self.hostOrder[e] + "=" + l[e]
if (e < len(self.hostOrder)-1):
s = s + ", "
- self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, id))
+ self.executeStatement("UPDATE hosts SET %s WHERE id = %d" % (s, _id))
self.hostLock.release()
return r[0], True
- id = self.getNewId("hosts")
- host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
+ _id = self.getNewId("hosts")
+ host = Host(d={'id': _id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version, 'notes':'', 'reserved':[]})
l = self.makeHostList(host)
- self.executeStatement("INSERT INTO hosts VALUES (%s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
+ # XXXstroucki nicer?
+ self.executeStatement("INSERT INTO hosts VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
self.hostLock.release()
return id, False
@@ -374,10 +380,10 @@ class SQL(DataInterface):
maxId = 0 # the first id would be 1
l = []
for r in res:
- id = r[0]
- l.append(id)
- if id >= maxId:
- maxId = id
+ _id = r[0]
+ l.append(_id)
+ if _id >= maxId:
+ maxId = _id
l.sort() # sort to enable comparing with range output
# check if some id is released:
t = range(maxId + 1)
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/connectionmanager.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/connectionmanager.py Tue Jul 17 21:05:59 2012
@@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-from tashi.rpycservices import rpycservices
from tashi import Connection
#from tashi.rpycservices.rpyctypes import *
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/dfs/vfs.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/dfs/vfs.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/dfs/vfs.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/dfs/vfs.py Tue Jul 17 21:05:59 2012
@@ -18,7 +18,6 @@
# implementation of dfs interface functions
import shutil
-import os
import os.path
from dfsinterface import DfsInterface
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/messaging/gangliapublisher.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/messaging/gangliapublisher.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/messaging/gangliapublisher.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/messaging/gangliapublisher.py Tue Jul 17 21:05:59 2012
@@ -17,7 +17,6 @@
import os
import time
-import types
from tashi import scrubString
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/messaging/messagingloghandler.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/messaging/messagingloghandler.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/messaging/messagingloghandler.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/messaging/messagingloghandler.py Tue Jul 17 21:05:59 2012
@@ -34,7 +34,9 @@ class MessagingLogHandler(logging.Handle
try:
key = "log_%s_%d_%d" % (self.name, self.msgIndex, int(time.time()*1000))
val = self.format(record)
- tashi.publisher.publish({key:val})
+ #XXXstroucki publisher does not exist
+ (_,_) = (key,val)
+ #tashi.publisher.publish({key:val})
self.msgIndex = self.msgIndex + 1
except Exception, e:
print e
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanager.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanager.py Tue Jul 17 21:05:59 2012
@@ -18,30 +18,57 @@
# under the License.
import logging.config
-import signal
import sys
+import os
-from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
+from tashi.util import instantiateImplementation, debugConsole
import tashi
from tashi import boolean
from tashi.rpycservices import rpycservices
+from tashi.utils.config import Config
+
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import TlsliteVdbAuthenticator
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
- sys.exit(0)
-
def main():
- global config, dfs, vmm, service, server, log, notifier
+ global config, log
- (config, configFiles) = getConfig(["NodeManager"])
- publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
- tashi.publisher = publisher
+ config = Config(["NodeManager"])
+ configFiles = config.getFiles()
+
logging.config.fileConfig(configFiles)
log = logging.getLogger(__name__)
log.info('Using configuration file(s) %s' % configFiles)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ startNodeManager()
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting node manager after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of node manager")
+ os._exit(-1)
+
+ log.info("Exiting node manager after service thread exited")
+ os._exit(-1)
+
+ return
+
+def startNodeManager():
+ global config, dfs, vmm, service, server, log, notifier
+ publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
+ tashi.publisher = publisher
dfs = instantiateImplementation(config.get("NodeManager", "dfs"), config)
vmm = instantiateImplementation(config.get("NodeManager", "vmm"), config, dfs, None)
service = instantiateImplementation(config.get("NodeManager", "service"), config, vmm)
@@ -51,6 +78,9 @@ def main():
users = {}
users[config.get('AllowedUsers', 'clusterManagerUser')] = config.get('AllowedUsers', 'clusterManagerPassword')
authenticator = TlsliteVdbAuthenticator.from_dict(users)
+
+ # XXXstroucki: ThreadedServer is liable to have exceptions
+ # occur within if an endpoint is lost.
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False)
@@ -59,14 +89,11 @@ def main():
t.service._type = 'NodeManagerService'
debugConsole(globals())
-
- try:
- t.start()
- except KeyboardInterrupt:
- handleSIGTERM(signal.SIGTERM, None)
- except Exception, e:
- sys.stderr.write(str(e) + "\n")
- sys.exit(-1)
+
+ t.start()
+ # shouldn't exit by itself
+ sys.exit(0)
+
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanagerservice.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/nodemanagerservice.py Tue Jul 17 21:05:59 2012
@@ -20,11 +20,8 @@ import socket
import threading
import time
-from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import InstanceState, TashiException, Errors, Instance
from tashi import boolean, vmStates, ConnectionManager
-import tashi
-
class NodeManagerService(object):
"""RPC handler for the NodeManager
@@ -33,35 +30,32 @@ class NodeManagerService(object):
VmControlInterface and do all dfs operations here?"""
def __init__(self, config, vmm):
+ # XXXstroucki: vmm will wait for this constructor to complete
self.config = config
self.vmm = vmm
- self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
- self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
- self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+ self.cmHost = self.config.get("NodeManagerService", "clusterManagerHost")
+ self.cmPort = int(self.config.get("NodeManagerService", "clusterManagerPort"))
+ self.authAndEncrypt = boolean(self.config.get('Security', 'authAndEncrypt'))
if self.authAndEncrypt:
- self.username = config.get('AccessClusterManager', 'username')
- self.password = config.get('AccessClusterManager', 'password')
+ self.username = self.config.get('AccessClusterManager', 'username')
+ self.password = self.config.get('AccessClusterManager', 'password')
else:
self.username = None
self.password = None
self.log = logging.getLogger(__file__)
- self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
- self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
- self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval'))
- self.registerHost = boolean(config.get('NodeManagerService', 'registerHost'))
+ self.convertExceptions = boolean(self.config.get('NodeManagerService', 'convertExceptions'))
+ self.registerFrequency = float(self.config.get('NodeManagerService', 'registerFrequency'))
+ self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval', default = 0))
+ self.registerHost = boolean(self.config.get('NodeManagerService', 'registerHost'))
try:
self.cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
except:
self.log.exception("Could not connect to CM")
+ # XXXstroucki: raise?
return
- self.accountingHost = None
- self.accountingPort = None
- try:
- self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
- self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
- except:
- pass
+ self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
+ self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
self.notifyCM = []
@@ -76,13 +70,13 @@ class NodeManagerService(object):
self.__registerHost()
+ # XXXstroucki: should make an effort to retry
+ # This can time out now with an exception
self.id = self.cm.registerNodeManager(self.host, self.instances.values())
- # XXXstroucki cut cross check for NM/VMM state
-
# start service threads
- threading.Thread(target=self.__registerWithClusterManager).start()
- threading.Thread(target=self.__statsThread).start()
+ threading.Thread(name="registerWithClusterManager", target=self.__registerWithClusterManager).start()
+ threading.Thread(name="statsThread", target=self.__statsThread).start()
def __initAccounting(self):
self.accountBuffer = []
@@ -105,7 +99,6 @@ class NodeManagerService(object):
# send data to CM
# XXXstroucki adapt this for accounting?
def __flushNotifyCM(self):
- start = time.time()
# send data to CM, adding message to buffer if
# it fails
try:
@@ -115,7 +108,10 @@ class NodeManagerService(object):
# XXXstroucki ValueError: need more than 1 value to unpack
# observed here. How?
value = self.notifyCM.pop(0)
- (instanceId, newInst, old, success) = value
+ try:
+ (instanceId, newInst, old, success) = value
+ except:
+ self.log.exception("problem with value: %s" % value)
try:
self.cm.vmUpdate(instanceId, newInst, old)
except TashiException, e:
@@ -178,14 +174,25 @@ class NodeManagerService(object):
# service thread function
def __registerWithClusterManager(self):
+ happy = False
while True:
#self.__ACCOUNT("TESTING")
start = time.time()
try:
instances = self.instances.values()
self.id = self.cm.registerNodeManager(self.host, instances)
+ if not happy:
+ happy = True
+ self.log.info("Registered with the CM")
+
except Exception:
self.log.exception('Failed to register with the CM')
+ happy = False
+
+ # make sure we flush our notification buffers
+ # if we have good comms with the CM
+ if happy:
+ self.__flushNotifyCM()
toSleep = start - time.time() + self.registerFrequency
if (toSleep > 0):
@@ -203,14 +210,16 @@ class NodeManagerService(object):
instance = self.instances.get(vmId, None)
if (not instance):
continue
- id = instance.id
+ _id = instance.id
stats = self.vmm.getStats(vmId)
for stat in stats:
- publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
+ publishList.append({"vm_%d_%s" % (_id, stat):stats[stat]})
except:
self.log.exception('statsThread threw an exception')
if (len(publishList) > 0):
- tashi.publisher.publishList(publishList)
+ # XXXstroucki: no publisher currently
+ pass
+ #tashi.publisher.publishList(publishList)
except:
self.log.exception('statsThread threw an exception')
time.sleep(self.statsInterval)
@@ -218,7 +227,8 @@ class NodeManagerService(object):
def __registerHost(self):
hostname = socket.gethostname()
# populate some defaults
- # XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
+ # XXXstroucki: I think it's better if the nodemanager fills these in
+ # properly when registering with the clustermanager
memory = 0
cores = 0
version = "empty"
@@ -244,8 +254,16 @@ class NodeManagerService(object):
# qemu.py calls this in the matchSystemPids thread
# xenpv.py: i have no real idea why it is called there
def vmStateChange(self, vmId, old, cur):
- instance = self.__getInstance(vmId)
+ try:
+ instance = self.__getInstance(vmId)
+ except TashiException, e:
+ if e.errno == Errors.NoSuchVmId:
+ self.log.warning("Asked to change state for unknown VM. Has it not completed starting yet?")
+ return False
+ else:
+ raise
+ before = instance.state
if (instance.state == cur):
# Don't do anything if state is what it should be
return True
@@ -261,9 +279,18 @@ class NodeManagerService(object):
newInst = Instance(d={'state':cur})
success = lambda: None
- # send the state change up to the CM
- self.notifyCM.append((instance.id, newInst, old, success))
- self.__flushNotifyCM()
+
+ # if this instance was in MigrateTrans, and has exited
+ # then don't tell the CM; it is the source instance
+ # exiting, and the CM should have updated its information
+ # to the target instance's info.
+ # Otherwise, send the state change up to the CM
+
+ if before == InstanceState.MigrateTrans and cur == InstanceState.Exited:
+ pass
+ else:
+ self.notifyCM.append((instance.id, newInst, old, success))
+ self.__flushNotifyCM()
# cache change locally
self.instances[vmId] = instance
@@ -272,7 +299,6 @@ class NodeManagerService(object):
# At this point, the VMM will clean up,
# so forget about this instance
del self.instances[vmId]
- return True
return True
@@ -284,6 +310,8 @@ class NodeManagerService(object):
# remote
def instantiateVm(self, instance):
+ # XXXstroucki: check my capacity before instantiating
+
self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
try:
vmId = self.vmm.instantiateVm(instance)
@@ -306,6 +334,8 @@ class NodeManagerService(object):
# called by resumeVm as thread
def __resumeVmHelper(self, instance, name):
self.vmm.resumeVmHelper(instance, name)
+ # XXXstroucki should the VMM be responsible for setting
+ # state? It should know better.
instance.state = InstanceState.Running
newInstance = Instance(d={'id':instance.id,'state':instance.state})
success = lambda: None
@@ -344,7 +374,9 @@ class NodeManagerService(object):
# XXXstroucki migrate out?
def __migrateVmHelper(self, instance, target, transportCookie):
self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
- del self.instances[instance.vmId]
+ # removal from self.instances done by communication from
+ # VMM as part of above migrateVm function
+ return
# remote
# XXXstroucki migrate out?
@@ -353,7 +385,7 @@ class NodeManagerService(object):
self.__ACCOUNT("NM VM MIGRATE", instance=instance)
instance.state = InstanceState.MigrateTrans
self.instances[vmId] = instance
- threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
+ threading.Thread(name="migrateVmHelper", target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
return
# called by receiveVm as thread
@@ -366,15 +398,16 @@ class NodeManagerService(object):
self.instances[vmId] = instance
newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
success = lambda: None
- self.notifyCM.append((newInstance.id, newInstance, InstanceState.Running, success))
+ self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
self.__flushNotifyCM()
# remote
# XXXstroucki migrate in?
def receiveVm(self, instance, transportCookie):
instance.state = InstanceState.MigrateTrans
- vmId = instance.vmId
- self.instances[vmId] = instance
+ # XXXstroucki new vmId is not known yet until VM is received
+ #vmId = instance.vmId
+ #self.instances[vmId] = instance
self.__ACCOUNT("NM VM MIGRATE RECEIVE", instance=instance)
threading.Thread(target=self.__receiveVmHelper, args=(instance, transportCookie)).start()
return
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/qemu.py Tue Jul 17 21:05:59 2012
@@ -50,12 +50,12 @@ def controlConsole(child, port):
output = child.monitorFd
#print "listen"
select.select([ls], [], [])
- (s, clientAddr) = listenSocket.accept()
+ (s, __clientAddr) = listenSocket.accept()
while s:
if (output != -1):
- (rl, wl, el) = select.select([s, output], [], [])
+ (rl, __wl, __el) = select.select([s, output], [], [])
else:
- (rl, wl, el) = select.select([s], [], [])
+ (rl, __wl, __el) = select.select([s], [], [])
if (len(rl) > 0):
if (rl[0] == s):
#print "from s"
@@ -87,17 +87,19 @@ class Qemu(VmControlInterface):
def __init__(self, config, dfs, nm):
VmControlInterface.__init__(self, config, dfs, nm)
- self.QEMU_BIN = self.config.get("Qemu", "qemuBin")
- self.INFO_DIR = self.config.get("Qemu", "infoDir")
- self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay"))
- self.migrationRetries = int(self.config.get("Qemu", "migrationRetries"))
- self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout"))
- self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
- self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument"))
- self.statsInterval = float(self.config.get("Qemu", "statsInterval"))
- # XXXstroucki amount of reserved memory could be configurable
- self.reservedMem = 512
- # XXXstroucki perhaps make this configurable
+ self.QEMU_BIN = self.config.get("Qemu", "qemuBin", default = "/usr/bin/kvm")
+ self.INFO_DIR = self.config.get("Qemu", "infoDir", default = "/var/tmp/VmControlQemu/")
+ self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay", default = 1))
+ self.migrationRetries = int(self.config.get("Qemu", "migrationRetries", default = 10))
+ self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout", default = 60))
+ self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout", default = 300))
+ self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument", default = False))
+ self.statsInterval = float(self.config.get("Qemu", "statsInterval", default = 0))
+ reservedMem = self.config.get("Qemu", "reservedMem", default = 512)
+ reservedMem = int(reservedMem)
+
+ self.reservedMem = reservedMem
+
self.ifPrefix = "tashi"
self.controlledVMs = {}
self.usedPorts = []
@@ -106,13 +108,20 @@ class Qemu(VmControlInterface):
self.vncPortLock = threading.Lock()
self.consolePort = 10000
self.consolePortLock = threading.Lock()
- self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
+ maxParallelMigrations = self.config.get("Qemu", "maxParallelMigrations")
+ maxParallelMigrations = int(maxParallelMigrations)
+ if maxParallelMigrations < 1:
+ maxParallelMigrations = 1
+
+ self.migrationSemaphore = threading.Semaphore(maxParallelMigrations)
self.stats = {}
+
+ self.suspendHandler = self.config.get("Qemu", "suspendHandler", default = "gzip")
+ self.resumeHandler = self.config.get("Qemu", "resumeHandler", default = "zcat")
+
self.scratchVg = self.config.get("Qemu", "scratchVg")
- # XXXstroucki revise
- self.scratchDir = self.config.get("Qemu", "scratchDir")
- if len(self.scratchDir) == 0:
- self.scratchDir = "/tmp"
+
+ self.scratchDir = self.config.get("Qemu", "scratchDir", default = "/tmp")
try:
os.mkdir(self.INFO_DIR)
@@ -129,17 +138,20 @@ class Qemu(VmControlInterface):
def __init__(self, **attrs):
self.__dict__.update(attrs)
+ def __dereferenceLink(self, spec):
+ newspec = os.path.realpath(spec)
+ return newspec
+
+
def __getHostPids(self):
"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
pids = []
- real_bin = self.QEMU_BIN
- while os.path.islink(real_bin):
- real_bin = os.readlink(self.QEMU_BIN)
+ real_bin = self.__dereferenceLink(self.QEMU_BIN)
for f in os.listdir("/proc"):
try:
- bin = os.readlink("/proc/%s/exe" % (f))
- if (bin.find(real_bin) != -1):
+ binary = os.readlink("/proc/%s/exe" % (f))
+ if (binary.find(real_bin) != -1):
pids.append(int(f))
except Exception:
pass
@@ -150,14 +162,25 @@ class Qemu(VmControlInterface):
"""Will return a dict of instances by vmId to the caller"""
return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
- def __matchHostPids(self, controlledVMs):
+ def __matchHostPids(self):
"""This is run in a separate polling thread and it must do things that are thread safe"""
- vmIds = controlledVMs.keys()
+ vmIds = self.controlledVMs.keys()
pids = self.__getHostPids()
for vmId in vmIds:
- child = controlledVMs[vmId]
+ child = self.controlledVMs[vmId]
+
+ # check to see if the child was just started.
+ # Only try to check on it if startup was more
+ # than 5 seconds in the past
+ if "startTime" in child.__dict__:
+ if child.startTime + 5 < time.time():
+ del child.startTime
+ else:
+ log.info("Not processing vmId %d because it is newly started" % (vmId))
+ continue
+
instance = child.instance
name = instance.name
@@ -168,9 +191,9 @@ class Qemu(VmControlInterface):
# remove info file
os.unlink(self.INFO_DIR + "/%d"%(vmId))
- # XXXstroucki why not use self.controlledVMs
- # argument, so why modify this fn's formal?
- del controlledVMs[vmId]
+ # XXXstroucki python should handle
+ # locking here (?)
+ del self.controlledVMs[vmId]
# remove any stats (appropriate?)
try:
@@ -191,7 +214,7 @@ class Qemu(VmControlInterface):
try:
os.waitpid(vmId, 0)
except:
- log.exception("waitpid failed for vmId" % (vmId))
+ log.exception("waitpid failed for vmId %s" % (vmId))
# recover the child's stderr and monitor
# output if possible
if (child.errorBit):
@@ -208,17 +231,21 @@ class Qemu(VmControlInterface):
# remove scratch storage
try:
if self.scratchVg is not None:
+ scratchName = "lv%s" % name
log.info("Removing any scratch for %s" % (name))
- cmd = "/sbin/lvremove --quiet -f %s" % self.scratchVg
- result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
+ cmd = "/sbin/lvremove --quiet -f %s/%s" % (self.scratchVg, scratchName)
+ __result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
except:
log.warning("Problem cleaning scratch volumes")
pass
# let the NM know
try:
- if (not child.migratingOut):
- self.nm.vmStateChange(vmId, None, InstanceState.Exited)
+ # XXXstroucki: we don't want to treat
+ # the source VM of a migration exiting
+ # as an actual
+ # exit, but the NM should probably know.
+ self.nm.vmStateChange(vmId, None, InstanceState.Exited)
except Exception:
log.exception("vmStateChange failed for VM %s" % (name))
else:
@@ -277,7 +304,7 @@ class Qemu(VmControlInterface):
while True:
try:
time.sleep(self.POLL_DELAY)
- self.__matchHostPids(self.controlledVMs)
+ self.__matchHostPids()
except:
log.exception("Exception in poolVMsLoop")
@@ -298,7 +325,7 @@ class Qemu(VmControlInterface):
monitorFd = child.monitorFd
buf = ""
try:
- (rlist, wlist, xlist) = select.select([monitorFd], [], [], 0.0)
+ (rlist, __wlist, __xlist) = select.select([monitorFd], [], [], 0.0)
while (len(rlist) > 0):
c = os.read(monitorFd, 1)
if (c == ""):
@@ -306,7 +333,7 @@ class Qemu(VmControlInterface):
child.errorBit = True
raise RuntimeError
buf = buf + c
- (rlist, wlist, xlist) = select.select([monitorFd], [], [], 0.0)
+ (rlist, __wlist, __xlist) = select.select([monitorFd], [], [], 0.0)
finally:
child.monitorHistory.append(buf)
return buf
@@ -321,14 +348,14 @@ class Qemu(VmControlInterface):
while (buf[-(len(needle)):] != needle):
#print "[BUF]: %s" % (buf)
#print "[NEE]: %s" % (needle)
- (rlist, wlist, xlist) = select.select([monitorFd], [], [], timeout)
+ (rlist, __wlist, __xlist) = select.select([monitorFd], [], [], timeout)
if (len(rlist) == 0):
- log.error("Timeout getting results from monitor for vmId %d" % (child.pid))
+ log.error("Timeout getting results from monitor on FD %s for vmId %d" % (monitorFd, child.pid))
child.errorBit = True
raise RuntimeError
c = os.read(monitorFd, 1)
if (c == ""):
- log.error("Early termination on monitor for vmId %d" % (child.pid))
+ log.error("Early termination on monitor FD %s for vmId %d" % (monitorFd, child.pid))
child.errorBit = True
raise RuntimeError
buf = buf + c
@@ -430,6 +457,7 @@ class Qemu(VmControlInterface):
disk = instance.disks[index]
uri = scrubString(disk.uri)
imageLocal = self.dfs.getLocalHandle("images/" + uri)
+ imageLocal = self.__dereferenceLink(imageLocal)
thisDiskList = [ "file=%s" % imageLocal ]
thisDiskList.append("if=%s" % diskInterface)
thisDiskList.append("index=%d" % index)
@@ -470,7 +498,7 @@ class Qemu(VmControlInterface):
# XXXstroucki check for capacity
cmd = "/sbin/lvcreate --quiet -n%s -L %dG %s" % (scratchName, scratchSize, self.scratchVg)
# XXXstroucki check result
- result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
+ __result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
index += 1
thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratchName) ]
@@ -560,9 +588,15 @@ class Qemu(VmControlInterface):
child.ptyFile = None
child.vncPort = -1
child.instance.vmId = child.pid
+
+ # Add a token to this new child object so that
+ # we don't mistakenly clean up when matchHostPids
+ # runs and the child process hasn't exec'ed yet.
+ child.startTime = time.time()
+
self.__saveChildInfo(child)
- self.controlledVMs[child.pid] = child
log.info("Adding vmId %d" % (child.pid))
+ self.controlledVMs[child.pid] = child
return (child.pid, cmd)
def __getPtyInfo(self, child, issueContinue):
@@ -603,10 +637,15 @@ class Qemu(VmControlInterface):
# trying to restart the migration by running
# the command again (when qemu is ready to
# listen again) is probably not helpful
+ # XXXstroucki: failures observed:
+ # "migration failed"
+ # "Block format 'qcow' used by device '' does not support feature 'live migration'
success = False
+ # see if migration can be speeded up
+ res = self.__enterCommand(child, "migrate_set_speed 1g", timeout=self.migrateTimeout)
res = self.__enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
retry = retry - 1
- if (res.find("migration failed") == -1):
+ if (res.find("Block migration completed") != -1):
success = True
retry = 0
break
@@ -623,6 +662,8 @@ class Qemu(VmControlInterface):
# extern
def instantiateVm(self, instance):
+ # XXXstroucki: check capacity before instantiating
+
try:
(vmId, cmd) = self.__startVm(instance, None)
child = self.__getChildFromPid(vmId)
@@ -642,16 +683,23 @@ class Qemu(VmControlInterface):
# extern
def suspendVm(self, vmId, target):
- tmpTarget = "/%s/tashi_qemu_suspend_%d_%d" % (self.scratchDir, os.getpid(), vmId)
# XXX: Use fifo to improve performance
- vmId = self.__stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
- self.dfs.copyTo(tmpTarget, target)
- os.unlink(tmpTarget)
+ # XXXstroucki: we could create a fifo on the local fs,
+ # then start a thread to copy it to dfs. But if we're
+ # reading from dfs directly on resume, why not write
+ # directly here?
+
+ #tmpTarget = "/%s/tashi_qemu_suspend_%d_%d" % (self.scratchDir, os.getpid(), vmId)
+ fn = self.dfs.getLocalHandle("%s" % target)
+ vmId = self.__stopVm(vmId, "\"exec:%s > %s\"" % (self.suspendHandler, fn), True)
+ #self.dfs.copyTo(tmpTarget, target)
+ #os.unlink(tmpTarget)
return vmId
# extern
def resumeVmHelper(self, instance, source):
- child = self.__getChildFromPid(instance.vmId)
+ vmId = instance.vmId
+ child = self.__getChildFromPid(vmId)
try:
self.__getPtyInfo(child, True)
except RuntimeError:
@@ -660,21 +708,25 @@ class Qemu(VmControlInterface):
raise
status = "paused"
while ("running" not in status):
- status = self.__enterCommand(child, "info status")
- time.sleep(1)
+ try:
+ status = self.__enterCommand(child, "info status")
+ except RuntimeError:
+ pass
+ time.sleep(60)
+
+ self.nm.vmStateChange(vmId, None, InstanceState.Running)
child.instance.state = InstanceState.Running
self.__saveChildInfo(child)
# extern
def resumeVm(self, instance, source):
fn = self.dfs.getLocalHandle("%s" % (source))
- (vmId, cmd) = self.__startVm(instance, "exec:zcat %s" % (fn))
+ (vmId, cmd) = self.__startVm(instance, "exec:%s < %s" % (self.resumeHandler, fn))
child = self.__getChildFromPid(vmId)
child.cmd = cmd
return vmId
def __checkPortListening(self, port):
- lc = 0
# XXXpipe: find whether something is listening yet on the port
(stdin, stdout) = os.popen2("netstat -ln | grep 0.0.0.0:%d | wc -l" % (port))
stdin.close()
@@ -730,7 +782,9 @@ class Qemu(VmControlInterface):
try:
child = self.__getChildFromPid(vmId)
except:
- log.error("Failed to get child info; transportCookie = %s; hostname = %s" % (str(cPickle.loads(transportCookie)), socket.hostname()))
+ # XXXstroucki: Does hostname contain the peer hostname?
+ log.error("Failed to get child info; transportCookie = %s; hostname = %s" %
+ (str(cPickle.loads(transportCookie)), _hostname))
raise
try:
self.__getPtyInfo(child, True)
@@ -818,6 +872,11 @@ class Qemu(VmControlInterface):
threading.Thread(target=controlConsole, args=(child,consolePort)).start()
return "Control console listening on %s:%d" % (hostname, consolePort)
+ def __specificReset(self, vmId):
+ child = self.__getChildFromPid(vmId)
+ self.__enterCommand(child, "system_reset")
+ return "Sent reset signal to instance"
+
# extern
def vmmSpecificCall(self, vmId, arg):
arg = arg.lower()
@@ -836,12 +895,16 @@ class Qemu(VmControlInterface):
elif (arg == "startconsole"):
return self.__specificStartConsole(vmId)
+ elif (arg == "reset"):
+ return self.__specificReset(vmId)
+
elif (arg == "list"):
commands = [
"startVnc",
"stopVnc",
"changeCdrom:<image.iso>",
"startConsole",
+ "reset",
]
return "\n".join(commands)
@@ -867,10 +930,14 @@ class Qemu(VmControlInterface):
myTicks = userTicks + sysTicks
vsize = (int(ws[22]))/1024.0/1024.0
rss = (int(ws[23])*4096)/1024.0/1024.0
- cpuSeconds = myTicks/ticksPerSecond
- lastCpuSeconds = cpuStats.get(vmId, cpuSeconds)
- cpuLoad = (cpuSeconds - lastCpuSeconds)/(now - last)
- cpuStats[vmId] = cpuSeconds
+ cpuSeconds = myTicks/self.ticksPerSecond
+ # XXXstroucki be more exact here?
+ last = time.time() - self.statsInterval
+ lastCpuSeconds = self.cpuStats.get(vmId, cpuSeconds)
+ if lastCpuSeconds is None:
+ lastCpuSeconds = cpuSeconds
+ cpuLoad = (cpuSeconds - lastCpuSeconds)/(time.time() - last)
+ self.cpuStats[vmId] = cpuSeconds
try:
child = self.controlledVMs[vmId]
except:
@@ -880,17 +947,17 @@ class Qemu(VmControlInterface):
(recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
for i in range(0, len(child.instance.nics)):
netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
- (tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
+ (tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = self.netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
(recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
self.stats[vmId] = self.stats.get(vmId, {})
child = self.controlledVMs.get(vmId, None)
if (child):
res = self.__enterCommand(child, "info blockstats")
for l in res.split("\n"):
- (device, sep, data) = stringPartition(l, ": ")
+ (device, __sep, data) = stringPartition(l, ": ")
if (data != ""):
for field in data.split(" "):
- (label, sep, val) = stringPartition(field, "=")
+ (label, __sep, val) = stringPartition(field, "=")
if (val != ""):
self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s' % (device, label), 0)))/self.statsInterval
self.stats[vmId]['%s_%s' % (device, label)] = int(val)
@@ -902,9 +969,9 @@ class Qemu(VmControlInterface):
# thread
def statsThread(self):
- ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
- netStats = {}
- cpuStats = {}
+ self.ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
+ self.netStats = {}
+ self.cpuStats = {}
# XXXstroucki be more exact here?
last = time.time() - self.statsInterval
while True:
@@ -915,12 +982,12 @@ class Qemu(VmControlInterface):
f.close()
for l in netData:
if (l.find(self.ifPrefix) != -1):
- (dev, sep, ld) = stringPartition(l, ":")
+ (dev, __sep, ld) = stringPartition(l, ":")
dev = dev.strip()
ws = ld.split()
recvBytes = float(ws[0])
sendBytes = float(ws[8])
- (recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
+ (recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = self.netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
if (recvBytes < lastRecvBytes):
# We seem to have overflowed
# XXXstroucki How likely is this to happen?
@@ -936,7 +1003,7 @@ class Qemu(VmControlInterface):
lastSendBytes = lastSendBytes - 2**32
recvMBs = (recvBytes-lastRecvBytes)/(now-last)/1024.0/1024.0
sendMBs = (sendBytes-lastSendBytes)/(now-last)/1024.0/1024.0
- netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
+ self.netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
for vmId in self.controlledVMs:
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/xenpv.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/nodemanager/vmcontrol/xenpv.py Tue Jul 17 21:05:59 2012
@@ -15,37 +15,35 @@
# specific language governing permissions and limitations
# under the License.
-import os
import os.path
import cPickle
-import subprocess # FIXME: should switch os.system to this
+import subprocess
import time
import threading
import logging
import socket
from vmcontrolinterface import VmControlInterface
-from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
+from tashi.rpycservices.rpyctypes import InstanceState
from tashi.rpycservices.rpyctypes import Instance, Host
-from tashi import boolean, convertExceptions, ConnectionManager, version
-from tashi.util import isolatedRPC, broken
+from tashi import version
+from tashi.util import broken
-import tashi.parallel
-from tashi.parallel import synchronized, synchronizedmethod
+from tashi.parallel import synchronizedmethod
log = logging.getLogger(__file__)
# FIXME: these should throw errors on failure
def domIdToName(domid):
# XXXpipe: get domain name from id
- f = os.popen("/usr/sbin/xm domname %i"%domid)
+ f = os.popen("/usr/sbin/xm domname %i"% domid)
name = f.readline().strip()
f.close()
return name
def domNameToId(domname):
# XXXpipe: get domain id from name
- f = os.popen("/usr/sbin/xm domid %s"%domname)
+ f = os.popen("/usr/sbin/xm domid %s"% domname)
name = f.readline().strip()
f.close()
return int(name)
@@ -55,10 +53,10 @@ def nameToId(domname, prefix='tashi'):
if domname[0:(len(prefix))] != prefix:
return None
try:
- id = int(domname[len(prefix):])
+ _id = int(domname[len(prefix):])
except:
return None
- return id
+ return _id
# Try to do a listVms call using info from xend
@@ -80,12 +78,12 @@ def listVms(prefix='tashi'):
vminfo[fields[i]] = line[i]
# if the name begins with our prefix, get the id,
# otherwise skip this record
- id = nameToId(vminfo['name'], prefix)
- if id == None:
+ _id = nameToId(vminfo['name'], prefix)
+ if _id == None:
continue
# fill in the instance object
- instance.id = int(id)
+ instance.id = int(_id)
instance.vmId = int(vminfo['vmId'])
instance.state = InstanceState.Running
if(vminfo['state'][2] !='-'):
@@ -142,8 +140,8 @@ class XenPV(VmControlInterface, threadin
self.nm.vmStateChange(a.vmId, a.state, InstanceState.Exited)
for vmId in vmlist.keys():
if not self.newvms.has_key(vmId):
+ # FIXME: log this
print 'WARNING: found vm that should be managed, but is not'
- # FIXME: log that
def run(self):
@@ -155,7 +153,7 @@ class XenPV(VmControlInterface, threadin
# a lot easier
########################################
def createXenConfig(self, vmName,
- image, macAddr, netID, memory, cores, hints, id):
+ image, macAddr, netID, memory, cores, hints, _id):
bootstr = None
rootconfig = None
diskconfig = None
@@ -171,6 +169,7 @@ class XenPV(VmControlInterface, threadin
disk0 = 'tap:%s' % self.disktype
diskU = 'xvda1'
+ # XXXstroucki: use soft config
try:
bridgeformat = self.config.get('XenPV', 'defaultBridgeFormat')
except:
@@ -360,9 +359,9 @@ extra='xencons=tty'
r = os.system(cmd)
# self.deleteXenConfig(name)
if r != 0:
+ # FIXME: log/handle error
print 'WARNING: "%s" returned %i' % ( cmd, r)
raise Exception, 'WARNING: "%s" returned %i' % ( cmd, r)
- # FIXME: log/handle error
vmId = domNameToId(name)
self.newvms[vmId] = instance
instance.vmId = vmId
@@ -387,7 +386,7 @@ extra='xencons=tty'
instance = self.newvms[vmId]
instance.suspendCookie = suspendCookie
infof = self.dfs.open(infofile, "w")
- name = domIdToName(vmId)
+ #name = domIdToName(vmId)
cPickle.dump(instance, infof)
infof.close()
@@ -416,7 +415,7 @@ extra='xencons=tty'
self.dfs.unlink(infofile)
self.dfs.copyFrom(source, tmpfile)
- r = os.system("/usr/sbin/xm restore %s"%(tmpfile))
+ __r = os.system("/usr/sbin/xm restore %s"%(tmpfile))
os.unlink(tmpfile)
# FIXME: if the vmName function changes, suspended vms will become invalid
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/parallel.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/parallel.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/parallel.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/parallel.py Tue Jul 17 21:05:59 2012
@@ -34,7 +34,8 @@ class ThreadPool(Queue.Queue):
def __init__(self, size=8, maxsize=0):
Queue.Queue.__init__(self, maxsize)
for i in range(size):
- thread = threading.Thread(target=self._worker)
+ name = "parallel.ThreadPool#%s" % (i)
+ thread = threading.Thread(name=name, target=self._worker)
thread.setDaemon(True)
thread.start()
def _worker(self):
@@ -129,9 +130,9 @@ class TestThreadPool(unittest.TestCase):
time.sleep(sleep)
queue.put(None)
tt = time.time()
- for i in range(4):
+ for _ in range(4):
slowfunc()
- for i in range(4):
+ for _ in range(4):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 4, 1)
@@ -143,9 +144,9 @@ class TestThreadPool(unittest.TestCase):
time.sleep(sleep)
queue.put(None)
tt = time.time()
- for i in range(8):
+ for _ in range(8):
slowthreadfunc()
- for i in range(8):
+ for _ in range(8):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 1, 1)
@@ -158,9 +159,9 @@ class TestThreadPool(unittest.TestCase):
time.sleep(sleep)
queue.put(None)
tt = time.time()
- for i in range(8):
+ for _ in range(8):
slowpoolfunc()
- for i in range(8):
+ for _ in range(8):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 2, 1)
@@ -175,9 +176,9 @@ class TestThreadPool(unittest.TestCase):
queue.put(None)
sc = slowclass()
tt = time.time()
- for i in range(4):
+ for _ in range(4):
sc.beslow()
- for i in range(4):
+ for _ in range(4):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 4, 1)
@@ -193,9 +194,9 @@ class TestThreadPool(unittest.TestCase):
queue.put(None)
sc = slowclass()
tt = time.time()
- for i in range(4):
+ for _ in range(4):
sc.beslow()
- for i in range(4):
+ for _ in range(4):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 1, 1)
@@ -211,9 +212,9 @@ class TestThreadPool(unittest.TestCase):
queue.put(None)
sc = slowclass()
tt = time.time()
- for i in range(16):
+ for _ in range(16):
sc.beslow()
- for i in range(16):
+ for _ in range(16):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 2, 1)
@@ -228,9 +229,9 @@ class TestThreadPool(unittest.TestCase):
def slowthreadfunc():
addtoqueue()
tt = time.time()
- for i in range(4):
+ for _ in range(4):
slowthreadfunc()
- for i in range(4):
+ for _ in range(4):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 4, 1)
@@ -254,10 +255,10 @@ class TestThreadPool(unittest.TestCase):
def slowthreadfunc2():
atc.addtoqueue2()
tt = time.time()
- for i in range(4):
+ for _ in range(4):
slowthreadfunc1()
slowthreadfunc2()
- for i in range(8):
+ for _ in range(8):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 8, 1)
@@ -279,10 +280,10 @@ class TestThreadPool(unittest.TestCase):
def slowthreadfunc2():
atc.addtoqueue2()
tt = time.time()
- for i in range(4):
+ for _ in range(4):
slowthreadfunc1()
slowthreadfunc2()
- for i in range(8):
+ for _ in range(8):
queue.get()
tt = time.time() - tt
self.assertAlmostEqual(tt, 1, 1)
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpycservices.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpycservices.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpycservices.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpycservices.py Tue Jul 17 21:05:59 2012
@@ -19,7 +19,7 @@ import rpyc
from tashi.rpycservices.rpyctypes import Instance, Host, User
import cPickle
-clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 'vmUpdate', 'activateVm', 'registerHost', 'getImages', 'copyImage']
+clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 'vmUpdate', 'activateVm', 'registerHost', 'getImages', 'copyImage', 'setHostState']
nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'prepReceiveVm', 'prepSourceVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo', 'liveCheck']
accountingRPCs = ['record']
@@ -65,6 +65,9 @@ class client:
if name not in clusterManagerRPCs and name not in nodeManagerRPCs and name not in accountingRPCs:
return None
def connectWrap(*args):
+ # XXXstroucki: why not talk directly, instead
+ # of using rpyc? We're already using pickle to move
+ # args.
args = cPickle.dumps(clean(args))
try:
res = getattr(self.conn.root, name)(args)
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpyctypes.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpyctypes.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpyctypes.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/rpycservices/rpyctypes.py Tue Jul 17 21:05:59 2012
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+# XXXstroucki: shouldn't this be tashitypes.py instead?
+
class Errors(object):
ConvertedException = 1
NoSuchInstanceId = 2
Modified: incubator/tashi/branches/stroucki-registration/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/tashi/util.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/tashi/util.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/tashi/util.py Tue Jul 17 21:05:59 2012
@@ -15,6 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+#XXXstroucki: for compatibility with python 2.5
+from __future__ import with_statement
+
import ConfigParser
#import cPickle
import os
@@ -22,15 +25,15 @@ import os
import signal
#import struct
import sys
-import threading
+#import threading
import time
import traceback
import types
-import getpass
import functools
from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
+from tashi.utils.timeout import *
def broken(oldFunc):
"""Decorator that is used to mark a function as temporarily broken"""
@@ -86,14 +89,14 @@ def timed(oldFunc):
return res
return newFunc
-def editAndContinue(file, mod, name):
+def editAndContinue(filespec, mod, name):
def wrapper(oldFunc):
persist = {}
persist['lastMod'] = time.time()
persist['oldFunc'] = oldFunc
persist['func'] = oldFunc
def newFunc(*args, **kw):
- modTime = os.stat(file)[8]
+ modTime = os.stat(filespec)[8]
if (modTime > persist['lastMod']):
persist['lastMod'] = modTime
space = {}
@@ -149,14 +152,6 @@ class reference(object):
def __delattr__(self, name):
return delattr(self.__dict__['__real_obj__'], name)
-def isolatedRPC(client, method, *args, **kw):
- """Opens and closes a thrift transport for a single RPC call"""
- if (not client._iprot.trans.isOpen()):
- client._iprot.trans.open()
- res = getattr(client, method)(*args, **kw)
- client._iprot.trans.close()
- return res
-
def signalHandler(signalNumber):
"""Used to denote a particular function as the signal handler for a
specific signal"""
@@ -171,6 +166,13 @@ def boolean(value):
return value
if (type(value) == types.IntType):
return (value != 0)
+
+ # See if it can be expressed as a string
+ try:
+ value = str(value)
+ except:
+ raise ValueError
+
lowercaseValue = value.lower()
if lowercaseValue in ['yes', 'true', '1']:
return True
@@ -187,13 +189,14 @@ def instantiateImplementation(className,
cmd = "import %s\n" % (package)
else:
cmd = ""
- cmd += "obj = %s(*args)\n" % (className)
+ cmd += "_obj = %s(*args)\n" % (className)
exec cmd in locals()
- return obj
+ # XXXstroucki: this is correct, even though pydev complains
+ return _obj
def convertExceptions(oldFunc):
"""This converts any exception type into a TashiException so that
- it can be passed over a Thrift RPC"""
+ it can be passed over an RPC"""
def newFunc(*args, **kw):
try:
return oldFunc(*args, **kw)
@@ -220,38 +223,45 @@ def getConfig(additionalNames=[], additi
return (config, configFiles)
def __getShellFn():
- if sys.version_info < (2, 6, 1):
+ try:
from IPython.Shell import IPShellEmbed
- return IPShellEmbed()
- else:
+ return (1, IPShellEmbed)
+ except ImportError:
import IPython
- return IPython.embed()
+ return (2, IPython.embed)
def debugConsole(globalDict):
"""A debugging console that optionally uses pysh"""
def realDebugConsole(globalDict):
try :
import atexit
- shellfn = __getShellFn()
+ (calltype, shellfn) = __getShellFn()
def resetConsole():
# XXXpipe: make input window sane
- (stdin, stdout) = os.popen2("reset")
+ (__stdin, stdout) = os.popen2("reset")
stdout.read()
- dbgshell = shellfn()
atexit.register(resetConsole)
- dbgshell(local_ns=globalDict, global_ns=globalDict)
- except Exception:
+ if calltype == 1:
+ dbgshell=shellfn(user_ns=globalDict)
+ dbgshell()
+ elif calltype == 2:
+ dbgshell=shellfn
+ dbgshell(user_ns=globalDict)
+ except Exception, e:
CONSOLE_TEXT=">>> "
- input = " "
- while (input != ""):
+ inputline = " "
+ while (inputline != ""):
sys.stdout.write(CONSOLE_TEXT)
- input = sys.stdin.readline()
+ inputline = sys.stdin.readline()
try:
- exec(input) in globalDict
+ exec(inputline) in globalDict
except Exception, e:
sys.stdout.write(str(e) + "\n")
+
+ os._exit(0)
+
if (os.getenv("DEBUG", "0") == "1"):
- threading.Thread(target=lambda: realDebugConsole(globalDict)).start()
+ threading.Thread(name="debugConsole", target=lambda: realDebugConsole(globalDict)).start()
def stringPartition(s, field):
index = s.find(field)
@@ -270,6 +280,7 @@ def scrubString(s, allowed="ABCDEFGHIJKL
return ns
class Connection:
+
def __init__(self, host, port, authAndEncrypt=False, credentials=None):
self.host = host
self.port = port
@@ -312,11 +323,24 @@ class Connection:
if self.connection is None:
self.__connect()
- remotefn = getattr(self.connection, name, None)
+ threadname = "%s:%s" % (self.host, self.port)
+ # XXXstroucki: Use 10 second timeout, ok?
+ # XXXstroucki: does this fn touch the network?
+ t = TimeoutThread(getattr, (self.connection, name, None))
+ threading.Thread(name=threadname, target=t.run).start()
+
+ try:
+ remotefn = t.wait(timeout=10)
+ except TimeoutException:
+ self.connection = None
+ raise
try:
if callable(remotefn):
- returns = remotefn(*args, **kwargs)
+ # XXXstroucki: Use 10 second timeout, ok?
+ t = TimeoutThread(remotefn, args, kwargs)
+ threading.Thread(name=threadname, target=t.run).start()
+ returns = t.wait(timeout=10.0)
else:
raise TashiException({'msg':'%s not callable' % name})
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/agents/dhcpdns.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/agents/dhcpdns.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/agents/dhcpdns.py Tue Jul 17 21:05:59 2012
@@ -27,6 +27,7 @@ import time
#from instancehook import InstanceHook
#from tashi.services.ttypes import Instance, NetworkConfiguration
#from tashi import boolean
+from tashi.rpycservices.rpyctypes import Instance
class DhcpDns():
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/bootstrapinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/bootstrapinterface.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/bootstrapinterface.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/bootstrapinterface.py Tue Jul 17 21:05:59 2012
@@ -18,10 +18,6 @@
# $Id$
#
-import sys
-import os
-import optparse
-
class BootStrapInterface(object):
""" Interface description for booting
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/pxe.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/pxe.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/pxe.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/bootstrap/pxe.py Tue Jul 17 21:05:59 2012
@@ -22,7 +22,6 @@ import os
import sys
import string
import datetime
-import subprocess
import MySQLdb
import traceback
import logging
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/client/zoni-cli.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/client/zoni-cli.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/client/zoni-cli.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/client/zoni-cli.py Tue Jul 17 21:05:59 2012
@@ -20,46 +20,42 @@
#
# $Id$
#
-import os
-import sys
+
import optparse
import socket
import logging.config
import getpass
+import os
+import sys
import re
+import string
import subprocess
-
-
#from zoni import *
#from zoni.data.resourcequerysql import ResourceQuerySql
-import zoni
-from zoni.data.resourcequerysql import *
+#import zoni
+#from zoni.data.resourcequerysql import *
-from zoni.data.usermanagementinterface import UserManagementInterface
-from zoni.data.usermanagementinterface import UserManagementInterface
-
-from zoni.bootstrap.bootstrapinterface import BootStrapInterface
from zoni.bootstrap.pxe import Pxe
-from zoni.hardware.systemmanagementinterface import SystemManagementInterface
from zoni.hardware.ipmi import Ipmi
-from zoni.hardware.hwswitchinterface import HwSwitchInterface
from zoni.hardware.dellswitch import HwDellSwitch
from zoni.hardware.raritanpdu import raritanDominionPx
from zoni.hardware.delldrac import dellDrac
+import zoni.hardware.systemmanagement
+from zoni.data import usermanagement
from zoni.agents.dhcpdns import DhcpDns
-from zoni.hardware.systemmanagement import SystemManagement
-
+from zoni.extra.util import validIp, validMac
+from zoni.version import version, revision
-from zoni.extra.util import *
-from zoni.version import *
-
-from tashi.util import instantiateImplementation, signalHandler
+from tashi.util import instantiateImplementation, getConfig
#import zoni.data.usermanagement
#from usermanagement import UserManagement
+# Extensions from MIMOS
+from zoni.extensions.m_extensions import *
+
def parseTable():
pass
@@ -71,7 +67,7 @@ def main():
(configs, configFiles) = getConfig()
logging.config.fileConfig(configFiles)
- log = logging.getLogger(os.path.basename(__file__))
+ #log = logging.getLogger(os.path.basename(__file__))
#logit(configs['logFile'], "Starting Zoni client")
#logit(configs['logFile'], "Loading config file")
@@ -99,6 +95,8 @@ def main():
group.add_option("--powerOn", "--poweron", dest="POWERON", help="Power on node", action="store_true", default=False)
group.add_option("--powerReset", "--powerreset", dest="POWERRESET", help="Power reset node", action="store_true", default=False)
group.add_option("--console", dest="CONSOLE", help="Console mode", action="store_true", default=False)
+ # Extensions from MIMOS - specific only for HP Blades and HP c7000 Blade Enclosures
+ group.add_option("--powerOnNet", "--poweronnet", dest="POWERONENET", help="Power on Node into PXE (Currently support on HP Blades through HP c7000 Blade Enclosure)", action="store_true", default=False)
parser.add_option_group(group)
# Query Interface
@@ -195,6 +193,18 @@ def main():
group.add_option("--removeDhcp", dest="removeDhcp", help="Remove a DHCP entry", action="store_true", default=False)
parser.add_option_group(group)
+ # Extensions from MIMOS
+ group = optparse.OptionGroup(parser, "Zoni MIMOS Extensions", "Special Functions created by MIMOS Lab:")
+ group.add_option("--addRole", "--addrole", dest="addRole", help="Create a disk based installation default file for a node based on its role or function, e.g. one|oned|cc|clc|walrus|sc|nc|preseed|kickstart", default=None, action="store")
+ group.add_option("--removeRole", "--removerole", dest="removeRole", help="Remove the default file of a node", action="store_true", default=False)
+ group.add_option("--showRoleMap", dest="showRoleMap", help="Show Role to Host Mapping", action="store_true", default=False)
+ group.add_option("--showKernel", dest="showKernelInfo", help="Show Kernel Info", action="store_true", default=False)
+ group.add_option("--showInitrd", dest="showInitrdInfo", help="Show Initrd Info", action="store_true", default=False)
+ group.add_option("--registerKernelInitrd", dest="registerKernelInitrd", help="Register Kernel and Initrd - vmlinuz:vmlinuz-ver:vmlinuz-arch:initrd:initrd-arch:imagename")
+ group.add_option("--getKernelInitrdID", dest="getKernelInitrdID", help="Get corresponding Kernel and Initrd Info - vmlinuz:initrd:arch")
+ group.add_option("--getConfig", dest="getConfig", help="Get a value from ZoniDefault.cfg - e.g. tftpRootDir, initrdRoot, kernelRoot, fsImagesBaseDir, etc.", default=None, action="store")
+ parser.add_option_group(group)
+
(options, args) = parser.parse_args()
@@ -208,6 +218,8 @@ def main():
data = instantiateImplementation("zoni.data.resourcequerysql.ResourceQuerySql", configs, options.verbosity)
reservation = instantiateImplementation("zoni.data.reservation.reservationMysql", configs, data, options.verbosity)
#query = zoni.data.resourcequerysql.ResourceQuerySql(configs, options.verbosity)
+ # Extensions from MIMOS
+ mimos = instantiateImplementation("zoni.extensions.m_extensions.mimos",configs)
# Get host info
host=None
@@ -231,12 +243,21 @@ def main():
if "drac_name" in host:
hw= dellDrac(configs, options.nodeName, host)
else:
- mesg = "Host (" + options.nodeName + ") does not have a DRAC card!!\n"
+ mesg = "Host (%s) does not have a DRAC card!!\n" % options.nodeName
sys.stdout.write(mesg)
exit(1)
+
+ ## Extensions from MIMOS - For Dell Blades - calling Dell Blades via the Blade Enclosure, some DRAC commands are slightly different from the ones in blade enclosure when compared to those in the actual blade, this allow a bit more flexiblity and standard calls to the blades
+ if options.hardwareType == "dracblade":
+ hw = dellBlade(configs, options.nodeName, host)
+
+ ## Extensions from MIMOS - For HP Blades - calling HP Blades via the HP c7000 Blade Enclosure instead of direct to the blade server itself, this allow a bit more flexiblity and standard calls to the blades
+ if options.hardwareType == "hpilo":
+ hw = hpILO(configs, options.nodeName, host)
+
if (options.REBOOTNODE or options.POWERCYCLE or options.POWEROFF or options.POWEROFFSOFT or \
options.POWERON or options.POWERSTATUS or options.CONSOLE or \
- options.POWERRESET) and options.nodeName:
+ options.POWERONNET or options.POWERRESET) and options.nodeName: # Extensions from MIMOS - added POWERONNET
if options.verbosity:
hw.setVerbose(True)
@@ -265,6 +286,10 @@ def main():
if options.CONSOLE:
hw.activateConsole()
exit()
+ ## Extensions from MIMOS - For HP Blade via c7000 Blade Enclosure
+ if options.POWERONNET:
+ hw.powerOnNet()
+ exit()
hw.getPowerStatus()
exit()
else:
@@ -432,7 +457,7 @@ def main():
userId = usermgt.getUserId(options.userName)
if userId:
- reservationId = reservation.createReservation(userId, options.reservationDuration, options.myNotes + " " + str(string.join(args[0:len(args)])))
+ __reservationId = reservation.createReservation(userId, options.reservationDuration, options.myNotes + " " + str(string.join(args[0:len(args)])))
else:
print "user doesn't exist"
@@ -739,7 +764,7 @@ def main():
try:
socket.gethostbyname(hostName)
sys.stdout.write("[Success]\n")
- except Exception, e:
+ except Exception:
sys.stdout.write("[Fail]\n")
else:
mesg = "ERROR: Malformed IP Address\n"
@@ -762,7 +787,7 @@ def main():
try:
socket.gethostbyname(hostName)
sys.stdout.write("[Fail]\n")
- except Exception, e:
+ except Exception:
sys.stdout.write("[Success]\n")
if options.removeDhcp:
dhcpdns.removeDhcp(hostName)
@@ -824,5 +849,29 @@ def main():
mesg = "[SUCCESS]\n"
sys.stdout.write(mesg)
+ ## Extensions from MIMOS - functions are defined in m_extensions.py
+ if ( options.addRole and options.nodeName ) or ( options.removeRole and options.nodeName ):
+ if options.addRole:
+ mimos.assignRoletoHost(host,options.addRole)
+ mimos.addRoletoNode(configs,host,options.nodeName,options.addRole)
+ if options.removeRole:
+ mimos.unassignRolefromHost(host)
+ mimos.removeRolefromNode(configs,host,options.nodeName)
+ if ( options.addRole and not options.nodeName ) or ( options.removeRole and not options.nodeName ):
+ mesg = "Roles: Missing Parameter(s)!"
+ log.error(mesg)
+ if options.showRoleMap:
+ mimos.showRoletoHost(configs)
+ if options.showKernelInfo:
+ mimos.showKernelInfo()
+ if options.showInitrdInfo:
+ mimos.showInitrdInfo()
+ if options.registerKernelInitrd:
+ mimos.registerKernelInitrd(configs,options.registerKernelInitrd)
+ if options.getKernelInitrdID:
+ mimos.getKernelInitrdID(options.getKernelInitrdID)
+ if options.getConfig:
+ mimos.getConfig(configs,options.getConfig)
+
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/data/infostore.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/data/infostore.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/data/infostore.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/data/infostore.py Tue Jul 17 21:05:59 2012
@@ -18,10 +18,6 @@
# $Id$
#
-import sys
-import os
-import optparse
-
class InfoStore (object):
""" Interface description for query system resources
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/data/reservation.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/data/reservation.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/data/reservation.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/data/reservation.py Tue Jul 17 21:05:59 2012
@@ -18,7 +18,6 @@
# $Id:$
#
-import sys
import os
import string
import logging
@@ -88,9 +87,9 @@ class reservationMysql(ReservationManage
def delReservation (self, userId):
raise NotImplementedError
- def defineReservation():
+ def defineReservation(self):
raise NotImplementedError
- def showReservation():
+ def showReservation(self):
raise NotImplementedError
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/data/reservationmanagementinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/data/reservationmanagementinterface.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/data/reservationmanagementinterface.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/data/reservationmanagementinterface.py Tue Jul 17 21:05:59 2012
@@ -18,9 +18,6 @@
# $Id:$
#
-import sys
-import os
-
class ReservationManagementInterface(object):
""" Interface description for reservation management
@@ -50,9 +47,9 @@ class ReservationManagementInterface(obj
def delReservation (self, userId):
raise NotImplementedError
- def defineReservation():
+ def defineReservation(self):
raise NotImplementedError
- def showReservation():
+ def showReservation(self):
raise NotImplementedError
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/data/resourcequerysql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/data/resourcequerysql.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/data/resourcequerysql.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/data/resourcequerysql.py Tue Jul 17 21:05:59 2012
@@ -22,16 +22,13 @@ import os
import sys
import string
import MySQLdb
-import subprocess
import traceback
import logging
-import threading
import time
import usermanagement
from zoni.data.infostore import InfoStore
-from zoni.extra.util import checkSuper, createKey
-from zoni.agents.dhcpdns import DhcpDns
+from zoni.extra.util import createKey
class ResourceQuerySql(InfoStore):
def __init__(self, config, verbose=None):
@@ -80,7 +77,7 @@ class ResourceQuerySql(InfoStore):
vlans = []
for val in vlanInfo.split(","):
try:
- ret = self.getVlanId(val.split(":")[0])
+ __ret = self.getVlanId(val.split(":")[0])
vlans.append(val)
except Exception, e:
print e
@@ -96,7 +93,7 @@ class ResourceQuerySql(InfoStore):
domainKey = createKey(name)
query = "insert into domaininfo (domain_name, domain_desc, domain_key) values ('%s','%s', '%s')" % (name, desc, domainKey)
try:
- result = self.insertDb(query)
+ __result = self.insertDb(query)
mesg = "Adding domain %s(%s)" % (name, desc)
self.log.info(mesg)
except Exception, e:
@@ -112,7 +109,7 @@ class ResourceQuerySql(InfoStore):
vlanType = i.split(":")[1]
query = "insert into domainmembermap values (%d, %d, '%s')" % (domainId, vlanId, vlanType)
try:
- result = self.insertDb(query)
+ __result = self.insertDb(query)
except Exception, e:
print e
@@ -133,16 +130,16 @@ class ResourceQuerySql(InfoStore):
mesg = "Removing domain %s" % (name)
self.log.info(mesg)
query = "delete from domaininfo where domain_name = '%s'" % (name)
- result = self.__deleteDb(query)
+ __result = self.__deleteDb(query)
# Need to remove any vlans attached to this domain
query = "delete from domainmembermap where domain_id = '%s'" % (domainId)
- result = self.__deleteDb(query)
+ __result = self.__deleteDb(query)
def showDomains(self):
usermgt = eval("usermanagement.%s" % (self.config['userManagement']) + "()")
query = "select r.reservation_id, r.user_id, d.domain_name, d.domain_desc from domaininfo d, allocationinfo a, reservationinfo r where d.domain_id = a.domain_id and a.reservation_id = r.reservation_id"
result = self.selectDb(query)
- desc = result.description
+ #desc = result.description
if result.rowcount > 0:
print "%s\t%s\t%s\t%s" % (result.description[0][0], result.description[1][0], result.description[2][0], result.description[3][0])
print "------------------------------------------------------------"
@@ -173,7 +170,7 @@ class ResourceQuerySql(InfoStore):
return -1
query = "insert into vlaninfo (vlan_num, vlan_desc) values ('%s','%s')" % (vnumber, desc)
try:
- result = self.insertDb(query)
+ __result = self.insertDb(query)
mesg = "Adding vlan %s(%s)" % (vnumber, desc)
self.log.info(mesg)
except Exception, e:
@@ -290,8 +287,6 @@ class ResourceQuerySql(InfoStore):
def showResources(self, cmdargs):
-
- queryopt = ""
defaultFields = "mac_addr, location, num_procs, num_cores, clock_speed, mem_total "
#defaultFields = "*"
@@ -303,6 +298,11 @@ class ResourceQuerySql(InfoStore):
query = "select " + defaultFields + "from sysinfo " + queryopt
result = self.selectDb(query)
+ # Extensions from MIMOS - allow showResources to fail gracefully if the Zoni DB is not populated yet
+ if result.rowcount < 1:
+ print "Zoni Hardware/System Database is empty."
+ exit(1)
+
line = ""
for i in defaultFields.split(","):
#line += string.strip(str(i)) + "\t"
@@ -310,20 +310,21 @@ class ResourceQuerySql(InfoStore):
# header
print line
- sum = {}
+ _sum = {}
for row in result.fetchall():
line = ""
- sum['totProc'] = sum.get('totProc', 0)
- sum['totProc'] += int(row[2])
- sum['totCores'] = sum.get('totCores', 0)
- sum['totCores'] += int(row[3])
- sum['totMemory'] = sum.get('totMemory', 0)
- sum['totMemory'] += int(row[5])
+ _sum['totProc'] = _sum.get('totProc', 0)
+ _sum['totProc'] += int(row[2])
+ _sum['totCores'] = _sum.get('totCores', 0)
+ _sum['totCores'] += int(row[3])
+ _sum['totMemory'] = _sum.get('totMemory', 0)
+ _sum['totMemory'] += int(row[5])
for val in row:
line += str(val).center(20)
print line
- print "\n%s systems registered - %d procs | %d cores | %d bytes RAM" % (str(result.rowcount), sum['totProc'], sum['totCores'], sum['totMemory'],)
+ print "\n%s systems registered - %d procs | %d cores | %d bytes RAM" % \
+ (str(result.rowcount), _sum['totProc'], _sum['totCores'], _sum['totMemory'],)
def getAvailableResources(self):
# Maybe should add a status flag?
@@ -508,7 +509,7 @@ class ResourceQuerySql(InfoStore):
result = self.selectDb(query)
print "NODE ALLOCATION\n"
- sum = {}
+ _sum = {}
if self.verbose:
print "%-5s%-10s%-10s%-10s%-13s%-12s%-10s%-34s%-20s%s" % ("Res", "User", "Host", "Domain", "Cores/Mem","Expiration", "Hostname", "Boot Image Name", "Vlan Member", "Notes")
else:
@@ -533,10 +534,10 @@ class ResourceQuerySql(InfoStore):
allocation_id = i[11]
userName = usermgt.getUserName(uid)
combined_notes = str(rnotes) + "|" + str(anotes)
- sum['totCores'] = sum.get('totCores', 0)
- sum['totCores'] += cores
- sum['totMemory'] = sum.get('totMemory', 0)
- sum['totMemory'] += memory
+ _sum['totCores'] = _sum.get('totCores', 0)
+ _sum['totCores'] += cores
+ _sum['totMemory'] = _sum.get('totMemory', 0)
+ _sum['totMemory'] += memory
if self.verbose:
query = "select v.vlan_num, m.vlan_type from vlaninfo v, vlanmembermap m where v.vlan_id = m.vlan_id and allocation_id = '%d' order by vlan_num asc" % allocation_id
vlanRes = self.selectDb(query)
@@ -550,7 +551,7 @@ class ResourceQuerySql(InfoStore):
print "%-5s%-10s%-10s%-10s%-2s/%-10s%-12s%-10s%-34s%-20s%s" % (resId, userName, host, domain, cores, memory,expire, hostname, image_name, vlanMember,combined_notes)
else:
print "%-10s%-10s%-10s%-2s/%-10s%-12s%s" % (userName, host, domain, cores, memory,expire, combined_notes)
- print "\n%s systems allocated - %d cores| %d bytes RAM" % (str(result.rowcount), sum['totCores'], sum['totMemory'])
+ print "\n%s systems allocated - %d cores| %d bytes RAM" % (str(result.rowcount), _sum['totCores'], _sum['totMemory'])
def showReservation(self, userId=None):
#from IPython.Shell import IPShellEmbed
@@ -612,7 +613,7 @@ class ResourceQuerySql(InfoStore):
query = "select image_name from imageinfo"
result = self.selectDb(query)
row = result.fetchall()
- desc = result.description
+ #desc = result.description
imagelist = []
for i in row:
@@ -625,7 +626,7 @@ class ResourceQuerySql(InfoStore):
query = "select image_name, dist, dist_ver from imageinfo"
result = self.selectDb(query)
row = result.fetchall()
- desc = result.description
+ #desc = result.description
for i in row:
print i
@@ -736,7 +737,7 @@ class ResourceQuerySql(InfoStore):
host['hw_port'] = int(i[6])
# Get drac info
- query = "select h.hw_id, h.hw_name, h.hw_model, h.hw_ipaddr, h.hw_userid, h.hw_password, p.port_num from hardwareinfo h, portmap p where p.hw_id = h.hw_id and hw_type = 'drac' and sys_id = " + str(host['sys_id'])
+ query = "select h.hw_id, h.hw_name, h.hw_model, h.hw_ipaddr, h.hw_userid, h.hw_password, 0, p.port_num from hardwareinfo h, portmap p where p.hw_id = h.hw_id and hw_type = 'drac' and sys_id = " + str(host['sys_id'])
result = self.selectDb(query)
if result.rowcount > 0:
for i in result.fetchall():
@@ -746,7 +747,10 @@ class ResourceQuerySql(InfoStore):
host['drac_ipaddr'] = i[3]
host['drac_userid'] = i[4]
host['drac_password'] = i[5]
- host['drac_port'] = int(i[6])
+ # Extensions from MIMOS - for Dell Blade
+ # XXXstroucki removed hw_blenc from query
+ # host['drac_enclosure'] = i[6]
+ host['drac_port'] = int(i[7])
# Get PDU info
query = "select h.hw_id, h.hw_name, h.hw_model, h.hw_ipaddr, h.hw_userid, h.hw_password, p.port_num from hardwareinfo h, portmap p where p.hw_id = h.hw_id and h.hw_type = 'pdu' and p.sys_id = " + str(host['sys_id'])
@@ -760,6 +764,19 @@ class ResourceQuerySql(InfoStore):
host['pdu_password'] = i[5]
host['pdu_port'] = int(i[6])
+ # Extensions from MIMOS - for HP Blade iLO
+ query = "select h.hw_id, h.hw_name, h.hw_model, h.hw_ipaddr, h.hw_userid, h.hw_password, 0, p.port_num from hardwareinfo h, portmap p where p.hw_id = h.hw_id and hw_type = 'hpilo' and sys_id = " + str(host['sys_id'])
+ result = self.selectDb(query)
+ for i in result.fetchall():
+ host['ilo_id'] = int(i[0])
+ host['ilo_name'] = i[1]
+ host['ilo_model'] = i[2]
+ host['ilo_ipaddr'] = i[3]
+ host['ilo_userid'] = i[4]
+ host['ilo_password'] = i[5]
+ # XXXstroucki removed hw_blenc from query
+ #host['ilo_enclosure'] = i[6]
+ host['ilo_port'] = int(i[7])
#print "host is ", host
return host
@@ -786,7 +803,7 @@ class ResourceQuerySql(InfoStore):
cursor.execute (query)
self.conn.commit()
row = cursor.fetchall()
- desc = cursor.description
+ #desc = cursor.description
except MySQLdb.OperationalError, e:
msg = "%s : %s" % (e[1], query)
self.log.error(msg)
@@ -867,12 +884,12 @@ class ResourceQuerySql(InfoStore):
return cursor
- def updateReservation (self, reservationId, userId=None, reservationDuration=None, vlanIsolate=None, allocationNotes=None):
+ def updateReservation (self, reservationId, userId=None, resDuration=None, vlanIsolate=None, allocationNotes=None):
mesg = "Updating reservation %s" % (str(reservationId))
self.log.info(mesg)
- if reservationDuration:
+ if resDuration:
if len(resDuration) == 8:
expireDate = resDuration
elif len(resDuration) < 4:
@@ -887,7 +904,7 @@ class ResourceQuerySql(InfoStore):
mesg = "Updating reservationDuration :" + resDuration
self.log.info(mesg)
- query = "update reservationinfo set reservation_exiration = \"" + expireDate_ + "\" where reservation_id = \"" + str(reservationId) + "\""
+ query = "update reservationinfo set reservation_expiration = \"" + expireDate + "\" where reservation_id = \"" + str(reservationId) + "\""
self.__updateDb(query)
if allocationNotes:
@@ -1043,7 +1060,7 @@ class ResourceQuerySql(InfoStore):
vId = self.getVlanId(v)
query = "delete from vlanmembermap where allocation_id = '%s' and vlan_id = '%s'" % (allocationId, vId)
- result = self.insertDb(query)
+ __result = self.insertDb(query)
mesg = "Removing vlan %s from node %s" % (v, nodeName)
self.log.info(mesg)
@@ -1092,8 +1109,14 @@ class ResourceQuerySql(InfoStore):
name = imageName.split(":")[0]
if len(imageName.split(":")) > 2:
dist = imageName.split(":")[1]
- if len(imageName.split(":")) >= 3:
+ # Extensions from MIMOS - allow adding 2 more pieces of info - kernel_id and initrd_id
+ #if len(imageName.split(":")) >= 3:
+ if len(imageName.split(":")) > 3:
dist_ver = imageName.split(":")[2]
+ if len(imageName.split(":")) > 4:
+ kernel_id = imageName.split(":")[3]
+ if len(imageName.split(":")) >= 5:
+ initrd_id = imageName.split(":")[4]
query = "select * from imageinfo where image_name = \"" + name + "\""
result = self.selectDb(query)
@@ -1110,7 +1133,9 @@ class ResourceQuerySql(InfoStore):
sys.stderr.write(mesg)
return
- query = "insert into imageinfo (image_name, dist, dist_ver) values(\"" + name + "\", \"" + dist + "\", \"" + dist_ver + "\")"
+ # Extensions from MIMOS - to take care of the addition of kernel_id and initrd_id
+ #query = "insert into imageinfo (image_name, dist, dist_ver) values(\"" + name + "\", \"" + dist + "\", \"" + dist_ver + "\")"
+ query = "insert into imageinfo (image_name, dist, dist_ver, kernel_id, initrd_id) values ('%s', '%s', '%s', '%s', '%s')" % (name, dist, dist_ver, kernel_id, initrd_id)
self.insertDb(query)
@@ -1126,7 +1151,7 @@ class ResourceQuerySql(InfoStore):
# imagemap db should be sys_id instead of mac_addr
# change later
- cur_image = host['pxe_image_name']
+ #cur_image = host['pxe_image_name']
# Get the id of the new image
query = "select image_id from imageinfo where image_name = " + "\"" + image + "\""
row = self.__queryDb(query)
@@ -1207,7 +1232,7 @@ class ResourceQuerySql(InfoStore):
return cap
# print out data in a consistent format
- def __showIt(data):
+ def __showIt(self, data):
pass
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagement.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagement.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagement.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagement.py Tue Jul 17 21:05:59 2012
@@ -18,7 +18,6 @@
# $Id$
#
-import sys
import os
from usermanagementinterface import UserManagementInterface
Modified: incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagementinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagementinterface.py?rev=1362643&r1=1362642&r2=1362643&view=diff
==============================================================================
--- incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagementinterface.py (original)
+++ incubator/tashi/branches/stroucki-registration/src/zoni/data/usermanagementinterface.py Tue Jul 17 21:05:59 2012
@@ -18,9 +18,6 @@
# $Id$
#
-import sys
-import os
-
class UserManagementInterface(object):
""" Interface description for user management