You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/02/08 04:56:24 UTC
svn commit: r1241776 [1/2] - in /incubator/tashi/branches/stable: ./ doc/
etc/ scripts/ src/tashi/ src/tashi/accounting/ src/tashi/agents/
src/tashi/client/ src/tashi/clustermanager/ src/tashi/clustermanager/data/
src/tashi/dfs/ src/tashi/nodemanager/ ...
Author: stroucki
Date: Wed Feb 8 04:56:23 2012
New Revision: 1241776
URL: http://svn.apache.org/viewvc?rev=1241776&view=rev
Log:
Merge stable branch from trunk
Added:
incubator/tashi/branches/stable/INSTALL
- copied unchanged from r1241774, incubator/tashi/trunk/INSTALL
incubator/tashi/branches/stable/doc/INSTALL2
- copied unchanged from r1241774, incubator/tashi/trunk/doc/INSTALL2
incubator/tashi/branches/stable/doc/RELEASES
- copied unchanged from r1241774, incubator/tashi/trunk/doc/RELEASES
incubator/tashi/branches/stable/etc/Accounting.cfg
- copied unchanged from r1241774, incubator/tashi/trunk/etc/Accounting.cfg
incubator/tashi/branches/stable/etc/Agent.cfg
- copied unchanged from r1241774, incubator/tashi/trunk/etc/Agent.cfg
incubator/tashi/branches/stable/src/tashi/accounting/
- copied from r1241774, incubator/tashi/trunk/src/tashi/accounting/
incubator/tashi/branches/stable/src/tashi/accounting/__init__.py
- copied unchanged from r1241774, incubator/tashi/trunk/src/tashi/accounting/__init__.py
incubator/tashi/branches/stable/src/tashi/accounting/accounting.py
- copied unchanged from r1241774, incubator/tashi/trunk/src/tashi/accounting/accounting.py
incubator/tashi/branches/stable/src/tashi/accounting/accountingservice.py
- copied unchanged from r1241774, incubator/tashi/trunk/src/tashi/accounting/accountingservice.py
Removed:
incubator/tashi/branches/stable/scripts/
Modified:
incubator/tashi/branches/stable/ (props changed)
incubator/tashi/branches/stable/Makefile
incubator/tashi/branches/stable/NOTICE
incubator/tashi/branches/stable/doc/DEVELOPMENT
incubator/tashi/branches/stable/etc/NodeManager.cfg
incubator/tashi/branches/stable/etc/TashiDefaults.cfg
incubator/tashi/branches/stable/src/tashi/agents/primitive.py
incubator/tashi/branches/stable/src/tashi/client/tashi-client.py
incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py
incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py
incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py
incubator/tashi/branches/stable/src/tashi/connectionmanager.py
incubator/tashi/branches/stable/src/tashi/dfs/vfs.py
incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py
incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/branches/stable/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/branches/stable/src/tashi/parallel.py
incubator/tashi/branches/stable/src/tashi/rpycservices/rpycservices.py
incubator/tashi/branches/stable/src/tashi/util.py
incubator/tashi/branches/stable/src/tashi/version.py
Propchange: incubator/tashi/branches/stable/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 8 04:56:23 2012
@@ -1,2 +1,5 @@
/incubator/tashi/branches/cmu:1178106-1187632
+/incubator/tashi/branches/stroucki-accounting:1221525-1241770
+/incubator/tashi/branches/stroucki-accounting/branches/stroucki-accounting:1221525-1235607
/incubator/tashi/branches/zoni-dev/trunk:1034098-1177646
+/incubator/tashi/trunk:1203846-1241774
Modified: incubator/tashi/branches/stable/Makefile
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/Makefile?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/Makefile (original)
+++ incubator/tashi/branches/stable/Makefile Wed Feb 8 04:56:23 2012
@@ -27,6 +27,15 @@ default: bin src/utils/nmd
all: bin src/utils/nmd src/tags doc/html aws
@echo Done
+package: src DISCLAIMER INSTALL LICENSE NOTICE README
+ @echo "Building package in apache-tashi.tar.gz"
+ rm -rf apache-tashi.tar.gz apache-tashi
+ mkdir apache-tashi
+ cp -rp doc etc Makefile src DISCLAIMER INSTALL LICENSE NOTICE README apache-tashi/
+ find apache-tashi -type d -name ".svn"|xargs rm -rf
+ tar zcf apache-tashi.tar.gz apache-tashi
+ rm -rf apache-tashi
+
doc: rmdoc doc/html
@echo Done
@@ -35,7 +44,7 @@ clean: rmnmd rmbin rmtags rmdoc rmaws
@echo Done
version:
- sed -i "s/version = .*/version = \"`date`\"/" src/tashi/version.py
+ sed -i "s/version = .*/version = \"`date +%Y-%m-%d`\"/" src/tashi/version.py
aws: src/tashi/aws/wsdl/AmazonEC2_services_types.py src/tashi/aws/wsdl/AmazonEC2_services_server.py
@@ -51,49 +60,48 @@ src/tashi/aws/wsdl/2009-04-04.ec2.wsdl:
rmaws:
if test -e src/tashi/aws/wsdl/2009-04-04.ec2.wsdl; then echo Removing aws...; rm -f src/tashi/aws/wsdl/2009-04-04.ec2.wsdl; rm -f src/tashi/aws/wsdl/AmazonEC2_*.py; fi
-# Implicit builds
-# src/utils/nmd: src/utils/Makefile src/utils/nmd.c
-# @echo Building nmd...
-# (cd src/utils; make)
-# ln -s ../src/utils/nmd ./bin/nmd
-
src/utils/nmd: src/utils/nmd.py
- ln -s ../src/utils/nmd.py ./bin/nmd.py
+ ln -s ../src/utils/nmd.py ./bin/nmd
#rmnmd:
# if test -e src/utils/nmd; then echo Removing nmd...; (cd src/utils; make clean); rm -f bin/nmd; fi
rmnmd:
- echo Removing nmd...; rm -f bin/nmd.py
+ echo Removing nmd...; rm -f bin/nmd
-bin: bindir bin/clustermanager.py bin/nodemanager.py bin/tashi-client.py bin/primitive.py bin/zoni-cli.py
+bin: bindir bin/clustermanager bin/nodemanager bin/tashi-client bin/primitive bin/zoni-cli bin/accounting
bindir:
if test ! -d bin; then mkdir bin; fi
-rmbin: rmclustermanager rmnodemanager rmtashi-client rmprimitive rmzoni-cli
+rmbin: rmclustermanager rmnodemanager rmtashi-client rmprimitive rmzoni-cli rmaccounting
if test -d bin; then rmdir bin; fi
bin/getInstances:
if test ! -e bin/getInstances; then (echo "Generating client symlinks..."; cd bin; PYTHONPATH=../src ../src/tashi/client/client.py --makesyms); fi
rmclients:
if test -e bin/getInstances; then (echo Removing client symlinks...; cd bin; PYTHONPATH=../src ../src/tashi/client/client.py --rmsyms; cd ..); fi
-bin/clustermanager.py: src/tashi/clustermanager/clustermanager.py
+bin/accounting: src/tashi/accounting/accounting.py
+ @echo Symlinking in Accounting server...
+ (cd bin; ln -s ../src/tashi/accounting/accounting.py accounting)
+rmaccounting:
+ if test -e bin/accounting; then echo Removing Accounting server symlink...; rm bin/accounting; fi
+bin/clustermanager: src/tashi/clustermanager/clustermanager.py
@echo Symlinking in clustermanager...
- (cd bin; ln -s ../src/tashi/clustermanager/clustermanager.py .)
+ (cd bin; ln -s ../src/tashi/clustermanager/clustermanager.py clustermanager)
rmclustermanager:
- if test -e bin/clustermanager.py; then echo Removing clustermanager symlink...; rm bin/clustermanager.py; fi
-bin/nodemanager.py: src/tashi/nodemanager/nodemanager.py
+ if test -e bin/clustermanager; then echo Removing clustermanager symlink...; rm bin/clustermanager; fi
+bin/nodemanager: src/tashi/nodemanager/nodemanager.py
@echo Symlinking in nodemanager...
- (cd bin; ln -s ../src/tashi/nodemanager/nodemanager.py .)
+ (cd bin; ln -s ../src/tashi/nodemanager/nodemanager.py nodemanager)
rmnodemanager:
- if test -e bin/nodemanager.py; then echo Removing nodemanager symlink...; rm bin/nodemanager.py; fi
-bin/primitive.py: src/tashi/agents/primitive.py
+ if test -e bin/nodemanager; then echo Removing nodemanager symlink...; rm bin/nodemanager; fi
+bin/primitive: src/tashi/agents/primitive.py
@echo Symlinking in primitive...
- (cd bin; ln -s ../src/tashi/agents/primitive.py .)
+ (cd bin; ln -s ../src/tashi/agents/primitive.py primitive)
rmprimitive:
- if test -e bin/primitive.py; then echo Removing primitve-agent symlink...; rm bin/primitive.py; fi
-bin/tashi-client.py:
+ if test -e bin/primitive; then echo Removing primitve-agent symlink...; rm bin/primitive; fi
+bin/tashi-client:
@echo Symlinking in tashi-client...
- (cd bin; ln -s ../src/tashi/client/tashi-client.py .)
+ (cd bin; ln -s ../src/tashi/client/tashi-client.py tashi-client)
rmtashi-client:
- if test -e bin/tashi-client.py; then echo Removing tashi-client symlink...; rm bin/tashi-client.py; fi
+ if test -e bin/tashi-client; then echo Removing tashi-client symlink...; rm bin/tashi-client; fi
src/tags:
@echo Generating tags...
(cd src; ctags-exuberant -R --c++-kinds=+p --fields=+iaS --extra=+q -f ./tags .)
@@ -107,14 +115,15 @@ rmdoc:
if test -d doc/html; then echo Removing HTML docs...; rm -rf ./doc/html; fi
# Zoni
-bin/zoni-cli.py:
+bin/zoni-cli:
@echo Symlinking in zoni-cli...
- (cd bin; ln -s ../src/zoni/client/zoni-cli.py .)
+ (cd bin; ln -s ../src/zoni/client/zoni-cli .)
+# why necessarily put this in /usr/local/bin like nothing else?
usr/local/bin/zoni:
@echo Creating /usr/local/bin/zoni
(echo '#!/bin/bash\nPYTHONPATH=$(shell pwd)/src $(shell pwd)/bin/zoni-cli.py $$*' > /usr/local/bin/zoni; chmod 755 /usr/local/bin/zoni)
rmzoni-cli:
- if test -e bin/zoni-cli.py; then echo Removing zoni-cli symlink...; rm bin/zoni-cli.py; fi
+ if test -e bin/zoni-cli; then echo Removing zoni-cli symlink...; rm bin/zoni-cli; fi
if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks
Modified: incubator/tashi/branches/stable/NOTICE
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/NOTICE?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/NOTICE (original)
+++ incubator/tashi/branches/stable/NOTICE Wed Feb 8 04:56:23 2012
@@ -1,5 +1,5 @@
Apache Tashi
-Copyright 2008-2011 The Apache Software Foundation
+Copyright 2008-2012 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Modified: incubator/tashi/branches/stable/doc/DEVELOPMENT
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/doc/DEVELOPMENT?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/doc/DEVELOPMENT (original)
+++ incubator/tashi/branches/stable/doc/DEVELOPMENT Wed Feb 8 04:56:23 2012
@@ -1,16 +1,10 @@
Current goals:
- * integrate bug fixes from Tashi sites into SVN.
- * Tashi is working well for us developers, so we should fork
- a stable version as beta release. Xen startup was problematic
- as of September, so advise on caveats there. Probably not include
- Zoni for now.
+ * Add more hardware support for Zoni.
+ * Add virtual hardware to Zoni to allow users to
+ add support for their own hardware?
Future goals:
- * How should Tashi accounting be done? Database entries or
- flat files?
- * Support host auto-registration, but only by affirmative option.
* Consider using libraries like libcloud to abstract VMM interactions.
- * Add more hardware support for Zoni.
Other ideas:
* Make available a console aggregator for user's VMs.
Modified: incubator/tashi/branches/stable/etc/NodeManager.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/etc/NodeManager.cfg?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/etc/NodeManager.cfg (original)
+++ incubator/tashi/branches/stable/etc/NodeManager.cfg Wed Feb 8 04:56:23 2012
@@ -58,7 +58,7 @@ handlers = consoleHandler
propagate = 1
[Vfs]
-prefix = /dfs
+prefix = /tmp
[XenPV]
vmNamePrefix = tashi
@@ -75,10 +75,11 @@ convertExceptions = True
port = 9883
registerHost = False
registerFrequency = 10.0
-infoFile = /var/tmp/nm.dat
clusterManagerHost = localhost ; Clustermanager hostname
clusterManagerPort = 9882
statsInterval = 0.0
+;accountingHost = clustermanager
+;accountingPort = 2228
;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Security]
Modified: incubator/tashi/branches/stable/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/etc/TashiDefaults.cfg?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/branches/stable/etc/TashiDefaults.cfg Wed Feb 8 04:56:23 2012
@@ -55,6 +55,8 @@ allowMismatchedVersions = False
maxMemory = 8192
maxCores = 8
allowDuplicateNames = False
+;accountingHost = clustermanager
+;accountingPort = 2228
;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[GetentOverride]
@@ -87,6 +89,10 @@ file = /var/tmp/cm.dat
uri = mysql://root@clustermanager/tashi
password = changeme
+# Accounting portion
+[Accounting]
+publisher = tashi.messaging.GangliaPublisher
+
# NodeManger portion
[NodeManager]
dfs = tashi.dfs.Vfs
@@ -100,7 +106,6 @@ convertExceptions = True
port = 9883
registerHost = False
registerFrequency = 10.0
-infoFile = /var/tmp/nm.dat
# Clustermanger hostname
clusterManagerHost = localhost
clusterManagerPort = 9882
@@ -108,7 +113,7 @@ statsInterval = 0.0
;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Qemu]
-qemuBin = /usr/local/bin/qemu-system-x86_64
+qemuBin = /usr/bin/kvm
infoDir = /var/tmp/VmControlQemu/
pollDelay = 1.0
migrationRetries = 10
@@ -130,7 +135,7 @@ defaultVmType = pygrub
defaultDiskType=qcow
[Vfs]
-prefix = /dfs
+prefix = /tmp
[LocalityService]
host = localityserverhostname
@@ -149,12 +154,12 @@ clusterManagerTimeout = 5.0
publisher = tashi.messaging.GangliaPublisher
[Primitive]
-hook1 = tashi.agents.DhcpDns
+#hook1 = tashi.agents.DhcpDns
scheduleDelay = 2.0
densePack = False
[MauiWiki]
-hook1 = tashi.agents.DhcpDns
+#hook1 = tashi.agents.DhcpDns
refreshTime = 5
authuser = changeme
authkey = 1111
Modified: incubator/tashi/branches/stable/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/agents/primitive.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/branches/stable/src/tashi/agents/primitive.py Wed Feb 8 04:56:23 2012
@@ -17,16 +17,11 @@
# specific language governing permissions and limitations
# under the License.
-from socket import gethostname
-import os
-import socket
-import sys
-import threading
import time
-import random
import logging.config
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
+
from tashi.util import getConfig, createClient, instantiateImplementation, boolean
import tashi
@@ -274,10 +269,10 @@ class Primitive(object):
# end for unassigned vms
- except TashiException, e:
+ except TashiException:
self.log.exception("Tashi exception")
- except Exception, e:
+ except Exception:
self.log.warning("Scheduler iteration failed")
Modified: incubator/tashi/branches/stable/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/client/tashi-client.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/branches/stable/src/tashi/client/tashi-client.py Wed Feb 8 04:56:23 2012
@@ -211,6 +211,7 @@ extraViews = {
'getSlots': (getSlots, None),
'getImages': (None, ['id', 'imageName', 'imageSize']),
'copyImage': (None, None),
+'createVm': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
'destroyMany': (destroyMany, None),
'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
@@ -575,7 +576,8 @@ def main():
if (type(res) == types.ListType):
makeTable(res, keys)
else:
- pprint(res)
+ makeTable([res], keys)
+
except IOError:
pass
except Exception, e:
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanager.py Wed Feb 8 04:56:23 2012
@@ -17,13 +17,9 @@
# specific language governing permissions and limitations
# under the License.
-import os
import sys
-import threading
import signal
import logging.config
-from getopt import getopt, GetoptError
-from ConfigParser import ConfigParser
from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
import tashi
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/clustermanagerservice.py Wed Feb 8 04:56:23 2012
@@ -15,15 +15,13 @@
# specific language governing permissions and limitations
# under the License.
-from datetime import datetime
-from random import randint
-from socket import gethostname
import logging
import threading
import time
+from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
-from tashi import boolean, convertExceptions, ConnectionManager, vmStates, timed, version, scrubString
+from tashi import boolean, ConnectionManager, vmStates, version, scrubString
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
@@ -42,6 +40,7 @@ class ClusterManagerService(object):
self.dfs = dfs
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
+ self.log.setLevel(logging.ERROR)
self.hostLastContactTime = {}
#self.hostLastUpdateTime = {}
self.instanceLastContactTime = {}
@@ -51,27 +50,96 @@ class ClusterManagerService(object):
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
- now = self.__now()
+
+ self.accountingHost = None
+ self.accountingPort = None
+ try:
+ self.accountingHost = self.config.get('ClusterManagerService', 'accountingHost')
+ self.accountingPort = self.config.getint('ClusterManagerService', 'accountingPort')
+ except:
+ pass
+
+ self.__initAccounting()
+ self.__initCluster()
+
+ threading.Thread(target=self.__monitorCluster).start()
+
+ def __initAccounting(self):
+ self.accountBuffer = []
+ self.accountLines = 0
+ self.accountingClient = None
+ try:
+ if (self.accountingHost is not None) and \
+ (self.accountingPort is not None):
+ self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+ except:
+ self.log.exception("Could not init accounting")
+
+ def __initCluster(self):
+ # initialize state of VMs if restarting
for instance in self.data.getInstances().itervalues():
instanceId = instance.id
instance = self.data.acquireInstance(instanceId)
instance.decayed = False
if instance.hostId is None:
- self.stateTransition(instance, None, InstanceState.Pending)
+ self.__stateTransition(instance, None, InstanceState.Pending)
else:
- self.stateTransition(instance, None, InstanceState.Orphaned)
+ self.__stateTransition(instance, None, InstanceState.Orphaned)
self.data.releaseInstance(instance)
+
+ # initialize state of hosts if restarting
for host in self.data.getHosts().itervalues():
hostId = host.id
host = self.data.acquireHost(hostId)
host.up = False
host.decayed = False
self.data.releaseHost(host)
- threading.Thread(target=self.monitorCluster).start()
- def stateTransition(self, instance, old, cur):
+
+
+ def __ACCOUNTFLUSH(self):
+ try:
+ if (self.accountingClient is not None):
+ self.accountingClient.record(self.accountBuffer)
+ self.accountLines = 0
+ self.accountBuffer = []
+ except:
+ self.log.exception("Failed to flush accounting data")
+
+
+ def __ACCOUNT(self, text, instance=None, host=None):
+ now = self.__now()
+ instanceText = None
+ hostText = None
+
+ if instance is not None:
+ try:
+ instanceText = 'Instance(%s)' % (instance)
+ except:
+ self.log.exception("Invalid instance data")
+
+ if host is not None:
+ try:
+ hostText = "Host(%s)" % (host)
+ except:
+ self.log.exception("Invalid host data")
+
+ secondary = ','.join(filter(None, (hostText, instanceText)))
+
+ line = "%s|%s|%s" % (now, text, secondary)
+
+ self.accountBuffer.append(line)
+ self.accountLines += 1
+
+ # XXXstroucki think about autoflush by time
+ if (self.accountLines > 0):
+ self.__ACCOUNTFLUSH()
+
+
+
+ def __stateTransition(self, instance, old, cur):
if (old and instance.state != old):
raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
if (instance.state == cur):
@@ -104,7 +172,7 @@ class ClusterManagerService(object):
instance = self.data.acquireInstance(instanceId)
if instance.hostId == host.id:
instance.decayed = True
- self.stateTransition(instance, None, InstanceState.Orphaned)
+ self.__stateTransition(instance, None, InstanceState.Orphaned)
self.data.releaseInstance(instance)
@@ -148,10 +216,16 @@ class ClusterManagerService(object):
for hostId in self.hostLastContactTime.keys():
#self.log.warning("iterate %d" % hostId)
host = self.data.acquireHost(hostId)
- if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+ # XXXstroucki: timing has changed with the message
+ # buffering in the NM, so this wasn't being run any-
+ # more because the time check was passing.
+ # I should think a bit more about this, but
+ # the "if True" is probably appropriate.
+ #if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+ if True:
host.decayed = True
- self.log.info('Fetching state from host %s because it is decayed' % (host.name))
+ self.log.debug('Fetching state from host %s because it is decayed' % (host.name))
myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
@@ -195,6 +269,15 @@ class ClusterManagerService(object):
# iterate through all VMs I believe are active
for instanceId in self.instanceLastContactTime.keys():
+ # Don't query non-running VMs. eg. if a VM
+ # is suspended, and has no host, then there's
+ # no one to ask
+ if instance.state != InstanceState.Running and \
+ instance.state != InstanceState.Activating and \
+ instance.state != InstanceState.Orphaned:
+ continue
+
+ # XXXstroucki should lock instance here?
if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
try:
instance = self.data.acquireInstance(instanceId)
@@ -202,7 +285,7 @@ class ClusterManagerService(object):
continue
instance.decayed = True
- self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
+ self.log.debug('Fetching state on instance %s because it is decayed' % (instance.name))
if instance.hostId is None: raise AssertionError
# XXXstroucki check if host is down?
@@ -225,21 +308,6 @@ class ClusterManagerService(object):
self.data.releaseInstance(instance)
-
- def monitorCluster(self):
- while True:
- sleepFor = min(self.expireHostTime, self.allowDecayed)
-
- try:
- self.__checkHosts()
- self.__checkInstances()
- except:
- self.log.exception('monitorCluster iteration failed')
- # XXXrgass too chatty. Remove
- #self.log.info("Sleeping for %d seconds" % sleepFor)
- time.sleep(sleepFor)
-
-
def normalize(self, instance):
instance.id = None
instance.vmId = None
@@ -274,12 +342,14 @@ class ClusterManagerService(object):
instance = self.normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM REQUEST", instance=instance)
return instance
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].shutdownVm(instance.vmId)
@@ -291,14 +361,17 @@ class ClusterManagerService(object):
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
+ self.__ACCOUNT("CM VM DESTROY UNSTARTED", instance=instance)
self.data.removeInstance(instance)
elif (instance.state is InstanceState.Activating):
- self.stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+ self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
+ self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
self.data.releaseInstance(instance)
else:
# XXXstroucki: This is a problem with keeping
# clean state.
- self.stateTransition(instance, None, InstanceState.Destroying)
+ self.__ACCOUNT("CM VM DESTROY", instance=instance)
+ self.__stateTransition(instance, None, InstanceState.Destroying)
if instance.hostId is None:
self.data.removeInstance(instance)
else:
@@ -316,8 +389,9 @@ class ClusterManagerService(object):
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM SUSPEND", instance=instance)
hostname = self.data.getHost(instance.hostId).name
destination = "suspend/%d_%s" % (instance.id, instance.name)
try:
@@ -329,14 +403,16 @@ class ClusterManagerService(object):
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
source = "suspend/%d_%s" % (instance.id, instance.name)
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM RESUME", instance=instance)
return instance
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
+ self.__ACCOUNT("CM VM MIGRATE", instance=instance)
try:
# FIXME: should these be acquire/release host?
targetHost = self.data.getHost(targetHostId)
@@ -345,7 +421,7 @@ class ClusterManagerService(object):
except:
self.data.releaseInstance(instance)
raise
- self.stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
self.data.releaseInstance(instance)
try:
# Prepare the target
@@ -353,16 +429,16 @@ class ClusterManagerService(object):
self.proxy[sourceHost.name].prepSourceVm(instance.vmId)
self.log.info("migrateVm: Calling prepReceiveVm on target host %s" % targetHost.name)
cookie = self.proxy[targetHost.name].prepReceiveVm(instance, sourceHost)
- except Exception, e:
+ except Exception:
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
- self.stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
self.data.releaseInstance(instance)
try:
# Send the VM
self.proxy[sourceHost.name].migrateVm(instance.vmId, targetHost, cookie)
- except Exception, e:
+ except Exception:
self.log.exception('migrateVm failed')
raise
try:
@@ -374,15 +450,16 @@ class ClusterManagerService(object):
try:
# Notify the target
vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
- except Exception, e:
+ except Exception:
self.log.exception('receiveVm failed')
raise
return
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM PAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].pauseVm(instance.vmId)
@@ -390,14 +467,15 @@ class ClusterManagerService(object):
self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
self.data.releaseInstance(instance)
return
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
self.data.releaseInstance(instance)
+ self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
try:
self.proxy[hostname].unpauseVm(instance.vmId)
@@ -405,7 +483,7 @@ class ClusterManagerService(object):
self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
self.data.releaseInstance(instance)
return
@@ -440,6 +518,7 @@ class ClusterManagerService(object):
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
+ self.__ACCOUNT("CM VM SPECIFIC CALL", instance=instance)
try:
res = self.proxy[hostname].vmmSpecificCall(instance.vmId, arg)
except Exception:
@@ -466,8 +545,7 @@ class ClusterManagerService(object):
if oldHost.up == False:
self.__upHost(oldHost)
- self.hostLastContactTime[host.id] = time.time()
- #self.hostLastUpdateTime[host.id] = time.time()
+ self.hostLastContactTime[host.id] = self.__now()
oldHost.version = host.version
oldHost.memory = host.memory
oldHost.cores = host.cores
@@ -479,10 +557,8 @@ class ClusterManagerService(object):
oldHost.state = HostState.Normal
# let the host communicate what it is running
- # XXXrgass - This is too chatty for the console, I think we should remove this.
- # XXXstroucki - My install depends on this, but I output to log files. This should be handled by a separate accounting server in future.
+ # and note that the information is not stale
for instance in instances:
- self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
self.instanceLastContactTime.setdefault(instance.id, 0)
self.data.releaseHost(oldHost)
@@ -500,8 +576,9 @@ class ClusterManagerService(object):
self.log.exception("Could not acquire instance")
raise
- self.instanceLastContactTime[instanceId] = time.time()
+ self.instanceLastContactTime[instanceId] = self.__now()
oldInstance.decayed = False
+ self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
if (instance.state == InstanceState.Exited):
# determine why a VM has exited
@@ -509,7 +586,7 @@ class ClusterManagerService(object):
if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
if (oldInstance.state == InstanceState.Suspending):
- self.stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
+ self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
oldInstance.hostId = None
oldInstance.vmId = None
self.data.releaseInstance(oldInstance)
@@ -552,13 +629,14 @@ class ClusterManagerService(object):
self.data.releaseHost(dataHost)
instance = self.data.acquireInstance(instanceId)
+ self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
if ('__resume_source' in instance.hints):
- self.stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+ self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
else:
# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
- #self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
- self.stateTransition(instance, None, InstanceState.Activating)
+ #self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+ self.__stateTransition(instance, None, InstanceState.Activating)
instance.hostId = host.id
self.data.releaseInstance(instance)
@@ -568,14 +646,14 @@ class ClusterManagerService(object):
vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
else:
vmId = self.proxy[host.name].instantiateVm(instance)
- except Exception, e:
+ except Exception:
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization and initialization fails
self.data.removeInstance(instance)
else:
# XXXstroucki what can we do about pending hosts in the scheduler?
# put them at the end of the queue and keep trying?
- self.stateTransition(instance, None, InstanceState.Held)
+ self.__stateTransition(instance, None, InstanceState.Held)
instance.hostId = None
self.data.releaseInstance(instance)
return "failure"
@@ -594,7 +672,7 @@ class ClusterManagerService(object):
else:
if ('__resume_source' not in instance.hints):
# XXXstroucki should we just wait for NM to update?
- #self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
+ #self.__stateTransition(instance, InstanceState.Activating, InstanceState.Running)
pass
self.data.releaseInstance(instance)
@@ -606,9 +684,41 @@ class ClusterManagerService(object):
self.log.info("Host %s is already registered, it was updated now" % hostname)
else:
self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+
+ try:
+ host = self.data.getHost(hostId)
+ self.__ACCOUNT("CM HOST REGISTER", host=host)
+ except:
+ self.log.warning("Failed to lookup host %s" % hostId)
+
return hostId
def unregisterHost(self, hostId):
+ try:
+ host = self.data.getHost(hostId)
+ self.__ACCOUNT("CM HOST UNREGISTER", host=host)
+ except:
+ self.log.warning("Failed to lookup host %s" % hostId)
+ return
+
self.data.unregisterHost(hostId)
self.log.info("Host %s was unregistered" % hostId)
return
+
+ # service thread
+ def __monitorCluster(self):
+ while True:
+ sleepFor = min(self.expireHostTime, self.allowDecayed)
+
+ try:
+ self.__checkHosts()
+ self.__checkInstances()
+ except:
+ self.log.exception('monitorCluster iteration failed')
+ # XXXrgass too chatty. Remove
+ # XXXstroucki the risk is that a deadlock in obtaining
+ # data could prevent this loop from continuing.
+ #self.log.info("Sleeping for %d seconds" % sleepFor)
+ time.sleep(sleepFor)
+
+
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/fromconfig.py Wed Feb 8 04:56:23 2012
@@ -20,7 +20,7 @@ import threading
import os
import ConfigParser
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState
from tashi.clustermanager.data import DataInterface
class FromConfig(DataInterface):
@@ -109,7 +109,7 @@ class FromConfig(DataInterface):
def releaseInstance(self, instance):
try:
if (instance.id not in self.instances): # MPR: should never be true, but good to check
- raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instanceId)})
+ raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instance.id)})
finally:
self.releaseLock(instance._lock)
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/pickled.py Wed Feb 8 04:56:23 2012
@@ -18,7 +18,7 @@
import cPickle
import os
import threading
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Instance, Host
from tashi.clustermanager.data import FromConfig, DataInterface
class Pickled(FromConfig):
Modified: incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/branches/stable/src/tashi/clustermanager/data/sql.py Wed Feb 8 04:56:23 2012
@@ -17,11 +17,9 @@
import logging
import threading
-import time
-import types
# XXXstroucki getImages needs os?
import os
-from tashi.rpycservices.rpyctypes import *
+from tashi.rpycservices.rpyctypes import Errors, Network, Host, User, Instance, TashiException, LocalImages, DiskConfiguration, NetworkConfiguration
from tashi.clustermanager.data.datainterface import DataInterface
from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
@@ -45,7 +43,7 @@ class SQL(DataInterface):
self.password = self.config.get('SQL', 'password')
self.conn = MySQLdb.connect(host=host, user=user, passwd=self.password, db=db)
else:
- raise ValueException, 'Unknown SQL database engine by URI: %s' % (self.uri)
+ raise TashiException, 'Unknown SQL database engine by URI: %s' % (self.uri)
self.instanceOrder = ['id', 'vmId', 'hostId', 'decayed', 'state', 'userId', 'name', 'cores', 'memory', 'disks', 'nics', 'hints']
self.hostOrder = ['id', 'name', 'up', 'decayed', 'state', 'memory', 'cores', 'version']
@@ -319,7 +317,7 @@ class SQL(DataInterface):
for r in res:
if r[1] == hostname:
id = r[0]
- print "Host %s already registered, update will be done" % id
+ self.log.warning("Host %s already registered, update will be done" % id)
s = ""
host = Host(d={'id': id, 'up': 0, 'decayed': 0, 'state': 1, 'name': hostname, 'memory':memory, 'cores': cores, 'version':version})
l = self.makeHostList(host)
Modified: incubator/tashi/branches/stable/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/connectionmanager.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/branches/stable/src/tashi/connectionmanager.py Wed Feb 8 04:56:23 2012
@@ -15,9 +15,8 @@
# specific language governing permissions and limitations
# under the License.
-import rpyc
from tashi.rpycservices import rpycservices
-from tashi.rpycservices.rpyctypes import *
+#from tashi.rpycservices.rpyctypes import *
class ConnectionManager(object):
def __init__(self, username, password, port, timeout=10000.0):
Modified: incubator/tashi/branches/stable/src/tashi/dfs/vfs.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/dfs/vfs.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/dfs/vfs.py (original)
+++ incubator/tashi/branches/stable/src/tashi/dfs/vfs.py Wed Feb 8 04:56:23 2012
@@ -27,52 +27,67 @@ class Vfs(DfsInterface):
DfsInterface.__init__(self, config)
self.prefix = self.config.get("Vfs", "prefix")
-# why do these three need to be separate?
+ def __dfsToReal(self, dfspath):
+ realpath = os.path.join(self.prefix, dfspath)
+ return realpath
+
def copyTo(self, localSrc, dst):
- shutil.copy(localSrc, os.path.join(self.prefix, dst))
-# just assuming this works
+ realdest = self.__dfsToReal(dst)
+ shutil.copy(localSrc, realdest)
+ # just assuming this works
return None
def copyFrom(self, src, localDst):
- shutil.copy(os.path.join(self.prefix, src), localDst)
-# just assuming this works
+ realsrc = self.__dfsToReal(src)
+ shutil.copy(realsrc, localDst)
+ # just assuming this works
return None
def copy(self, src, dst):
- shutil.copy(os.path.join(self.prefix, src),
- os.path.join(self.prefix, dst))
-# just assuming this works
+ realsrc = self.__dfsToReal(src)
+ realdst = self.__dfsToReal(dst)
+ shutil.copy(realsrc, realdst)
+ # just assuming this works
return None
def list(self, path):
try:
- return os.listdir(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.listdir(realpath)
except OSError, e:
+ # XXXstroucki error 20 = ENOTDIR
if (e.errno == 20):
return [path.split('/')[-1]]
else:
raise
def stat(self, path):
- return os.stat(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.stat(realpath)
def move(self, src, dst):
- shutil.move(os.path.join(self.prefix, src),
- os.path.join(self.prefix, dst))
-# just assuming this works
+ realsrc = self.__dfsToReal(src)
+ realdst = self.__dfsToReal(dst)
+ shutil.move(realsrc, realdst)
+ # just assuming this works
return None
def mkdir(self, path):
- return os.mkdir(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.mkdir(realpath)
def unlink(self, path):
- return os.unlink(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.unlink(realpath)
def rmdir(self, path):
- return os.rmdir(os.path.join(self.prefix, path))
+ realpath = self.__dfsToReal(path)
+ return os.rmdir(realpath)
def open(self, path, perm):
- return open(os.path.join(self.prefix, path), perm)
+ realpath = self.__dfsToReal(path)
+ return open(realpath, perm)
def getLocalHandle(self, path):
- return os.path.join(self.prefix, path)
+ realpath = self.__dfsToReal(path)
+ return realpath
Modified: incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanager.py Wed Feb 8 04:56:23 2012
@@ -22,7 +22,6 @@ import signal
import sys
from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
-from tashi import ConnectionManager
import tashi
from tashi import boolean
Modified: incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanagerservice.py?rev=1241776&r1=1241775&r2=1241776&view=diff
==============================================================================
--- incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/branches/stable/src/tashi/nodemanager/nodemanagerservice.py Wed Feb 8 04:56:23 2012
@@ -15,17 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-import cPickle
import logging
-import os
import socket
-import sys
import threading
import time
-from tashi.rpycservices.rpyctypes import Host, HostState, InstanceState, TashiException, Errors, Instance
-from tashi.nodemanager import RPC
-from tashi import boolean, vmStates, logged, ConnectionManager, timed
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import InstanceState, TashiException, Errors, Instance
+from tashi import boolean, vmStates, ConnectionManager
import tashi
@@ -50,201 +47,316 @@ class NodeManagerService(object):
self.log = logging.getLogger(__file__)
self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
- self.infoFile = self.config.get('NodeManagerService', 'infoFile')
self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval'))
- self.id = None
+ self.registerHost = boolean(config.get('NodeManagerService', 'registerHost'))
+ try:
+ self.cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+ except:
+ self.log.exception("Could not connect to CM")
+ return
+
+ self.accountingHost = None
+ self.accountingPort = None
+ try:
+ self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
+ self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
+ except:
+ pass
+
self.notifyCM = []
- self.loadVmInfo()
- vmList = self.vmm.listVms()
- for vmId in vmList:
- if (vmId not in self.instances):
- self.log.warning('vmcontrol backend reports additional vmId %d' % (vmId))
- self.instances[vmId] = Instance(d={'vmId':vmId,'id':-1})
- for vmId in self.instances.keys():
- if (vmId not in vmList):
- self.log.warning('vmcontrol backend does not report %d' % (vmId))
- self.vmStateChange(vmId, None, InstanceState.Exited)
- self.registerHost()
- threading.Thread(target=self.backupVmInfoAndFlushNotifyCM).start()
- threading.Thread(target=self.registerWithClusterManager).start()
- threading.Thread(target=self.statsThread).start()
-
- def loadVmInfo(self):
+
+ self.__initAccounting()
+
+ self.id = None
+ # XXXstroucki this fn could be in this level maybe?
+ self.host = self.vmm.getHostInfo(self)
+
+ # populate self.instances
+ self.__loadVmInfo()
+
+ self.__registerHost()
+
+ self.id = self.cm.registerNodeManager(self.host, self.instances.values())
+
+ # XXXstroucki cut cross check for NM/VMM state
+
+ # start service threads
+ threading.Thread(target=self.__registerWithClusterManager).start()
+ threading.Thread(target=self.__statsThread).start()
+
+ def __initAccounting(self):
+ self.accountBuffer = []
+ self.accountLines = 0
+ self.accountingClient = None
+ try:
+ if (self.accountingHost is not None) and \
+ (self.accountingPort is not None):
+ self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+ except:
+ self.log.exception("Could not init accounting")
+
+ def __loadVmInfo(self):
try:
self.instances = self.vmm.getInstances()
- except Exception, e:
+ except Exception:
self.log.exception('Failed to obtain VM info')
self.instances = {}
-
- def saveVmInfo(self):
+
+ # send data to CM
+ # XXXstroucki adapt this for accounting?
+ def __flushNotifyCM(self):
+ start = time.time()
+ # send data to CM, adding message to buffer if
+ # it fails
try:
- data = cPickle.dumps(self.instances)
- f = open(self.infoFile, "w")
- f.write(data)
- f.close()
+ notifyCM = []
+ try:
+ while (len(self.notifyCM) > 0):
+ value = self.notifyCM.pop(0)
+ (instanceId, newInst, old, success) = value
+ try:
+ self.cm.vmUpdate(instanceId, newInst, old)
+ except TashiException, e:
+ notifyCM.append((instanceId, newInst, old, success))
+ if (e.errno != Errors.IncorrectVmState):
+ raise
+ except:
+ notifyCM.append((instanceId, newInst, old, success))
+ raise
+ else:
+ success()
+ finally:
+ if len(notifyCM) > 0:
+ self.notifyCM.append(notifyCM)
except Exception, e:
- self.log.exception('Failed to save VM info to %s' % (self.infoFile))
-
- def vmStateChange(self, vmId, old, cur):
- instance = self.getInstance(vmId)
- if (old and instance.state != old):
- self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
- if (cur == InstanceState.Exited):
- del self.instances[vmId]
- return True
+ self.log.exception('Failed to send data to the CM')
- if (instance.state == cur):
- # Don't do anything if state is what it should be
- return True
+ #toSleep = start - time.time() + self.registerFrequency
+ #if (toSleep > 0):
+ #time.sleep(toSleep)
- instance.state = cur
- newInst = Instance(d={'state':cur})
- success = lambda: None
+ def __ACCOUNTFLUSH(self):
try:
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- cm.vmUpdate(instance.id, newInst, old)
- except Exception, e:
- self.log.exception('RPC failed for vmUpdate on CM')
- self.notifyCM.append((instance.id, newInst, old, success))
- else:
- success()
- return True
-
- def backupVmInfoAndFlushNotifyCM(self):
- cm = None
- cmUseCount = 0
- while True:
- if cmUseCount > 10 or cm is None:
- try:
- # XXXstroucki hope rpyc handles destruction
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- cmUseCount = 0
- except Exception, e:
- self.log.warning("Could not get a handle to the clustermanager")
- time.sleep(60)
- continue
+ if (self.accountingClient is not None):
+ self.accountingClient.record(self.accountBuffer)
+ self.accountLines = 0
+ self.accountBuffer = []
+ except:
+ self.log.exception("Failed to flush accounting data")
- cmUseCount = cmUseCount + 1
- start = time.time()
+
+ def __ACCOUNT(self, text, instance=None, host=None):
+ now = time.time()
+ instanceText = None
+ hostText = None
+
+ if instance is not None:
try:
- self.saveVmInfo()
- except Exception, e:
- self.log.exception('Failed to save VM info')
+ instanceText = 'Instance(%s)' % (instance)
+ except:
+ self.log.exception("Invalid instance data")
+
+ if host is not None:
try:
- notifyCM = []
- try:
- while (len(self.notifyCM) > 0):
- (instanceId, newInst, old, success) = self.notifyCM.pop(0)
- try:
- cm.vmUpdate(instanceId, newInst, old)
- except TashiException, e:
- notifyCM.append((instanceId, newInst, old, success))
- if (e.errno != Errors.IncorrectVmState):
- raise
- except:
- notifyCM.append((instanceId, newInst, old, success))
- raise
- else:
- success()
- finally:
- self.notifyCM = self.notifyCM + notifyCM
- except Exception, e:
- self.log.exception('Failed to register with the CM')
- toSleep = start - time.time() + self.registerFrequency
- if (toSleep > 0):
- time.sleep(toSleep)
-
- def registerWithClusterManager(self):
- cm = None
- cmUseCount = 0
+ hostText = "Host(%s)" % (host)
+ except:
+ self.log.exception("Invalid host data")
+
+ secondary = ','.join(filter(None, (hostText, instanceText)))
+
+ line = "%s|%s|%s" % (now, text, secondary)
+
+ self.accountBuffer.append(line)
+ self.accountLines += 1
+
+ # XXXstroucki think about force flush every so often
+ if (self.accountLines > 0):
+ self.__ACCOUNTFLUSH()
+
+
+ # service thread function
+ def __registerWithClusterManager(self):
while True:
- if cmUseCount > 10 or cm is None:
- try:
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- cmUseCount = 0
- except Exception, e:
- self.log.warning("Could not get a handle to the clustermanager")
- time.sleep(60)
- continue
- cmUseCount = cmUseCount + 1
+ #self.__ACCOUNT("TESTING")
start = time.time()
try:
- host = self.vmm.getHostInfo(self)
instances = self.instances.values()
- #import pprint
- #self.log.warning("Instances: " + pprint.saferepr(instances))
- self.id = cm.registerNodeManager(host, instances)
- except Exception, e:
+ self.id = self.cm.registerNodeManager(self.host, instances)
+ except Exception:
self.log.exception('Failed to register with the CM')
+
toSleep = start - time.time() + self.registerFrequency
if (toSleep > 0):
time.sleep(toSleep)
-
- def getInstance(self, vmId):
+
+ # service thread function
+ def __statsThread(self):
+ if (self.statsInterval == 0):
+ return
+ while True:
+ try:
+ publishList = []
+ for vmId in self.instances.keys():
+ try:
+ instance = self.instances.get(vmId, None)
+ if (not instance):
+ continue
+ id = instance.id
+ stats = self.vmm.getStats(vmId)
+ for stat in stats:
+ publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
+ except:
+ self.log.exception('statsThread threw an exception')
+ if (len(publishList) > 0):
+ tashi.publisher.publishList(publishList)
+ except:
+ self.log.exception('statsThread threw an exception')
+ time.sleep(self.statsInterval)
+
+ def __registerHost(self):
+ hostname = socket.gethostname()
+ # populate some defaults
+ # XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
+ memory = 0
+ cores = 0
+ version = "empty"
+ #self.cm.registerHost(hostname, memory, cores, version)
+
+ def __getInstance(self, vmId):
instance = self.instances.get(vmId, None)
- if (instance is None):
- raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
- return instance
+ if instance is not None:
+ return instance
+
+ # refresh self.instances if not found
+ self.__loadVmInfo()
+ instance = self.instances.get(vmId, None)
+ if instance is not None:
+ return instance
+
+
+ raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
- def instantiateVm(self, instance):
- vmId = self.vmm.instantiateVm(instance)
- instance.vmId = vmId
- instance.state = InstanceState.Running
+ # remote
+ # Called from VMM to update self.instances
+ # but only changes are Exited, MigrateTrans and Running
+ # qemu.py calls this in the matchSystemPids thread
+ # xenpv.py: i have no real idea why it is called there
+ def vmStateChange(self, vmId, old, cur):
+ instance = self.__getInstance(vmId)
+
+ if (instance.state == cur):
+ # Don't do anything if state is what it should be
+ return True
+
+ if (old and instance.state != old):
+ # make a note of mismatch, but go on.
+ # the VMM should know best
+ self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
+
+ instance.state = cur
+
+ self.__ACCOUNT("NM VM STATE CHANGE", instance=instance)
+
+ newInst = Instance(d={'state':cur})
+ success = lambda: None
+ # send the state change up to the CM
+ self.notifyCM.append((instance.id, newInst, old, success))
+ self.__flushNotifyCM()
+
+ # cache change locally
+ self.instances[vmId] = instance
+
+ if (cur == InstanceState.Exited):
+ # At this point, the VMM will clean up,
+ # so forget about this instance
+ del self.instances[vmId]
+ return True
+
+ return True
+
+ # remote
+ def createInstance(self, instance):
+ vmId = instance.vmId
self.instances[vmId] = instance
- return vmId
+
+
+ # remote
+ def instantiateVm(self, instance):
+ self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
+ try:
+ vmId = self.vmm.instantiateVm(instance)
+ #instance.vmId = vmId
+ #instance.state = InstanceState.Running
+ #self.instances[vmId] = instance
+ return vmId
+ except:
+ self.log.exception("Failed to start instance")
+ # remote
def suspendVm(self, vmId, destination):
- instance = self.getInstance(vmId)
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM SUSPEND", instance=instance)
+
instance.state = InstanceState.Suspending
+ self.instances[vmId] = instance
threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
- def resumeVmHelper(self, instance, name):
+ # called by resumeVm as thread
+ def __resumeVmHelper(self, instance, name):
self.vmm.resumeVmHelper(instance, name)
instance.state = InstanceState.Running
newInstance = Instance(d={'id':instance.id,'state':instance.state})
success = lambda: None
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- try:
- cm.vmUpdate(newInstance.id, newInstance, InstanceState.Resuming)
- except Exception, e:
- self.log.exception('vmUpdate failed in resumeVmHelper')
- self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
- else:
- success()
-
+ self.notifyCM.append((newInstance.id, newInstance, InstanceState.Resuming, success))
+ self.__flushNotifyCM()
+
+ # remote
def resumeVm(self, instance, name):
+ self.__ACCOUNT("NM VM RESUME", instance=instance)
instance.state = InstanceState.Resuming
instance.hostId = self.id
try:
instance.vmId = self.vmm.resumeVm(instance, name)
self.instances[instance.vmId] = instance
- threading.Thread(target=self.resumeVmHelper, args=(instance, name)).start()
+ threading.Thread(target=self.__resumeVmHelper, args=(instance, name)).start()
except:
self.log.exception('resumeVm failed')
raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
return instance.vmId
+ # remote
def prepReceiveVm(self, instance, source):
+ self.__ACCOUNT("NM VM MIGRATE RECEIVE PREP")
instance.vmId = -1
transportCookie = self.vmm.prepReceiveVm(instance, source.name)
return transportCookie
+ # remote
def prepSourceVm(self, vmId):
- instance = self.getInstance(vmId)
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM MIGRATE SOURCE PREP", instance=instance)
instance.state = InstanceState.MigratePrep
-
- def migrateVmHelper(self, instance, target, transportCookie):
+ self.instances[vmId] = instance
+
+ # called by migrateVm as thread
+ # XXXstroucki migrate out?
+ def __migrateVmHelper(self, instance, target, transportCookie):
self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
del self.instances[instance.vmId]
-
+
+ # remote
+ # XXXstroucki migrate out?
def migrateVm(self, vmId, target, transportCookie):
- instance = self.getInstance(vmId)
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM MIGRATE", instance=instance)
instance.state = InstanceState.MigrateTrans
- threading.Thread(target=self.migrateVmHelper, args=(instance, target, transportCookie)).start()
+ self.instances[vmId] = instance
+ threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
return
- def receiveVmHelper(self, instance, transportCookie):
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
+ # called by receiveVm as thread
+ # XXXstroucki migrate in?
+ def __receiveVmHelper(self, instance, transportCookie):
vmId = self.vmm.receiveVm(transportCookie)
instance.state = InstanceState.Running
instance.hostId = self.id
@@ -252,83 +364,69 @@ class NodeManagerService(object):
self.instances[vmId] = instance
newInstance = Instance(d={'id':instance.id,'state':instance.state,'vmId':instance.vmId,'hostId':instance.hostId})
success = lambda: None
- try:
- cm.vmUpdate(newInstance.id, newInstance, InstanceState.MigrateTrans)
- except Exception, e:
- self.log.exception('vmUpdate failed in receiveVmHelper')
- self.notifyCM.append((newInstance.id, newInstance, InstanceState.MigrateTrans, success))
- else:
- success()
-
+ self.notifyCM.append((newInstance.id, newInstance, InstanceState.Running, success))
+ self.__flushNotifyCM()
+
+ # remote
+ # XXXstroucki migrate in?
def receiveVm(self, instance, transportCookie):
instance.state = InstanceState.MigrateTrans
- threading.Thread(target=self.receiveVmHelper, args=(instance, transportCookie)).start()
+ vmId = instance.vmId
+ self.instances[vmId] = instance
+ self.__ACCOUNT("NM VM MIGRATE RECEIVE", instance=instance)
+ threading.Thread(target=self.__receiveVmHelper, args=(instance, transportCookie)).start()
return
-
+
+ # remote
def pauseVm(self, vmId):
- instance = self.getInstance(vmId)
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM PAUSE", instance=instance)
instance.state = InstanceState.Pausing
+ self.instances[vmId] = instance
self.vmm.pauseVm(vmId)
instance.state = InstanceState.Paused
-
+ self.instances[vmId] = instance
+
+ # remote
def unpauseVm(self, vmId):
- instance = self.getInstance(vmId)
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM UNPAUSE", instance=instance)
instance.state = InstanceState.Unpausing
+ self.instances[vmId] = instance
self.vmm.unpauseVm(vmId)
instance.state = InstanceState.Running
-
+ self.instances[vmId] = instance
+
+ # remote
def shutdownVm(self, vmId):
- instance = self.getInstance(vmId)
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM SHUTDOWN", instance=instance)
instance.state = InstanceState.ShuttingDown
+ self.instances[vmId] = instance
self.vmm.shutdownVm(vmId)
-
+
+ # remote
def destroyVm(self, vmId):
- instance = self.getInstance(vmId)
+ instance = self.__getInstance(vmId)
+ self.__ACCOUNT("NM VM DESTROY", instance=instance)
instance.state = InstanceState.Destroying
+ self.instances[vmId] = instance
self.vmm.destroyVm(vmId)
-
+
+ # remote
def getVmInfo(self, vmId):
- instance = self.getInstance(vmId)
+ instance = self.__getInstance(vmId)
return instance
-
+
+ # remote
def vmmSpecificCall(self, vmId, arg):
return self.vmm.vmmSpecificCall(vmId, arg)
-
+
+ # remote
def listVms(self):
return self.instances.keys()
+ # remote
def liveCheck(self):
return "alive"
- def statsThread(self):
- if (self.statsInterval == 0):
- return
- while True:
- try:
- publishList = []
- for vmId in self.instances.keys():
- try:
- instance = self.instances.get(vmId, None)
- if (not instance):
- continue
- id = instance.id
- stats = self.vmm.getStats(vmId)
- for stat in stats:
- publishList.append({"vm_%d_%s" % (id, stat):stats[stat]})
- except:
- self.log.exception('statsThread threw an exception')
- if (len(publishList) > 0):
- tashi.publisher.publishList(publishList)
- except:
- self.log.exception('statsThread threw an exception')
- time.sleep(self.statsInterval)
-
- def registerHost(self):
- cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
- hostname = socket.gethostname()
- # populate some defaults
- # XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
- memory = 0
- cores = 0
- version = "empty"
- #cm.registerHost(hostname, memory, cores, version)