You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/06/19 22:05:54 UTC
svn commit: r1351880 [1/3] - in /incubator/tashi/branches/luke-zoni-staging:
./ doc/ etc/ src/tashi/ src/tashi/accounting/ src/tashi/agents/
src/tashi/client/ src/tashi/clustermanager/ src/tashi/clustermanager/data/
src/tashi/dfs/ src/tashi/messaging/ ...
Author: stroucki
Date: Tue Jun 19 22:05:52 2012
New Revision: 1351880
URL: http://svn.apache.org/viewvc?rev=1351880&view=rev
Log:
merge Luke's branch with the trunk
Added:
incubator/tashi/branches/luke-zoni-staging/doc/sample.qemu-ifup
- copied unchanged from r1351876, incubator/tashi/trunk/doc/sample.qemu-ifup
incubator/tashi/branches/luke-zoni-staging/src/tashi/utils/
- copied from r1351876, incubator/tashi/trunk/src/tashi/utils/
Removed:
incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/locality-server.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/mauipacket.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/pseudoDes.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/client/client.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/client/test.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/messaging/messageBroker.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/messaging/messaging.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/messaging/soapmessaging.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/messaging/tashimessaging.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/messaging/threadpool.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/messaging/thriftmessaging.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/thrift/
incubator/tashi/branches/luke-zoni-staging/src/utils/Makefile
incubator/tashi/branches/luke-zoni-staging/src/utils/getLocality.py
incubator/tashi/branches/luke-zoni-staging/src/utils/nmd.c
Modified:
incubator/tashi/branches/luke-zoni-staging/ (props changed)
incubator/tashi/branches/luke-zoni-staging/INSTALL
incubator/tashi/branches/luke-zoni-staging/Makefile
incubator/tashi/branches/luke-zoni-staging/doc/DEVELOPMENT
incubator/tashi/branches/luke-zoni-staging/doc/INSTALL2
incubator/tashi/branches/luke-zoni-staging/etc/NodeManager.cfg
incubator/tashi/branches/luke-zoni-staging/etc/TashiDefaults.cfg
incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accounting.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accountingservice.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/dhcpdns.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/instancehook.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/mauiwiki.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive_zoni.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/client/tashi-client.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanager.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/datainterface.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/fromconfig.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/getentoverride.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/ldapoverride.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/pickled.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/sql.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/connectionmanager.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/dfs/vfs.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/messaging/gangliapublisher.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/messaging/messagingloghandler.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/nodemanager/nodemanager.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/nodemanager/vmcontrol/xenpv.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/parallel.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/rpycservices/rpycservices.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/rpycservices/rpyctypes.py
incubator/tashi/branches/luke-zoni-staging/src/tashi/util.py
incubator/tashi/branches/luke-zoni-staging/src/utils/nmd.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/agents/dhcpdns.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/bootstrap/bootstrapinterface.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/bootstrap/pxe.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/client/zoni-cli.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/data/infostore.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/data/reservation.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/data/reservationmanagementinterface.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/data/resourcequerysql.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/data/usermanagement.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/data/usermanagementinterface.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/extra/util.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/apcswitchedrackpdu.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/delldrac.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/dellswitch.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/f10s50switch.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/hpilo.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/hpswitch.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/hwswitchinterface.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/ipmi.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/raritanpdu.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/systemmanagement.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/hardware/systemmanagementinterface.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/install/db/zoniDbSetup.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/install/pxe/zoniPxeSetup.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/install/www/zoniWebSetup.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/services/pcvciservice.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/services/zonimanager.py
incubator/tashi/branches/luke-zoni-staging/src/zoni/version.py
Propchange: incubator/tashi/branches/luke-zoni-staging/
------------------------------------------------------------------------------
Merged /incubator/tashi/branches/stroucki-tashi11:r1294393-1294415
Merged /incubator/tashi/branches/stroucki-tashi10:r1294393-1294727
Merged /incubator/tashi/branches/stable:r1241774-1301161
Merged /incubator/tashi/branches/stroucki-dropthrift:r1292513-1297655
Merged /incubator/tashi/trunk:r1291412-1351876
Merged /incubator/tashi/branches/stroucki-tashi8:r1294393-1294427
Merged /incubator/tashi/branches/stroucki-accounting:r1241771-1295369
Merged /incubator/tashi/branches/stroucki-irpbugs:r1245857-1292894
Merged /incubator/tashi/branches/stroucki-tashi2:r1294935-1294944
Merged /incubator/tashi/branches/stablefix:r1203848-1241770
Merged /incubator/tashi/branches/stroucki-stable:r1297792-1298173
Modified: incubator/tashi/branches/luke-zoni-staging/INSTALL
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/INSTALL?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/INSTALL (original)
+++ incubator/tashi/branches/luke-zoni-staging/INSTALL Tue Jun 19 22:05:52 2012
@@ -124,6 +124,7 @@ When defining the host, you must provide
given by the hostname command. If you plan on eventually having several
hosts and networks, feel free to add them now.
+root@grml:/usr/local/tashi# cd bin
root@grml:/usr/local/tashi/bin# DEBUG=1 ./clustermanager
2012-01-26 23:12:33,972 [./clustermanager:INFO] Using configuration file(s) ['/usr/local/tashi/etc/TashiDefaults.cfg']
2012-01-26 23:12:33,972 [./clustermanager:INFO] Starting cluster manager
@@ -152,15 +153,14 @@ Please press <RETURN> to start IPython.
In [1]: from tashi.rpycservices.rpyctypes import Host, HostState, Network
-In [2]: data.baseDataObject.hosts[1] = Host(d={'id':1,'name':'grml','state': HostState.Normal,'up':False})
+In [2]: data.baseDataObject.hosts[0] = Host(d={'id':0,'name':'grml','state': HostState.Normal,'up':False})
-In [3]: data.baseDataObject.networks[1]=Network(d={'id':0,'name':'default'})
+In [3]: data.baseDataObject.networks[0]=Network(d={'id':0,'name':'My Network'})
In [4]: data.baseDataObject.save()
-In [5]: import os
-
-In [6]: os.kill(os.getpid(), 9)
+In [5]: (^C)
+2012-03-07 20:00:00,456 [./bin/clustermanager:INFO] Exiting cluster manager after receiving a SIGINT signal
Run the cluster manager in the background:
root@grml:/usr/local/tashi/bin# ./clustermanager &
Modified: incubator/tashi/branches/luke-zoni-staging/Makefile
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/Makefile?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/Makefile (original)
+++ incubator/tashi/branches/luke-zoni-staging/Makefile Tue Jun 19 22:05:52 2012
@@ -33,6 +33,7 @@ package: src DISCLAIMER INSTALL LICENSE
mkdir apache-tashi
cp -rp doc etc Makefile src DISCLAIMER INSTALL LICENSE NOTICE README apache-tashi/
find apache-tashi -type d -name ".svn"|xargs rm -rf
+ -chgrp -R incubator apache-tashi
tar zcf apache-tashi.tar.gz apache-tashi
rm -rf apache-tashi
@@ -117,7 +118,7 @@ rmdoc:
# Zoni
bin/zoni-cli:
@echo Symlinking in zoni-cli...
- (cd bin; ln -s ../src/zoni/client/zoni-cli.py .)
+ (cd bin; ln -s ../src/zoni/client/zoni-cli.py zoni-client)
# why necessarily put this in /usr/local/bin like nothing else?
usr/local/bin/zoni:
@echo Creating /usr/local/bin/zoni
@@ -127,11 +128,11 @@ rmzoni-cli:
if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks
-disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')
+disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')",F0401"
pysrc=$(shell find . \! -path '*gen-py*' \! -path '*services*' \! -path '*messagingthrift*' \! -name '__init__.py' -name "*.py")
tidy: $(addprefix tidyfile/,$(pysrc))
- @echo Insuring .py files are nice and tidy!
+ @echo Ensured .py files are nice and tidy!
tidyfile/%: %
@echo Checking tidy for $*
- pylint --report=no --disable-msg-cat=R,C,E --disable-msg=$(disabled_warnings) --indent-string="\t" $* 2> /dev/null;
+ pylint --report=no --disable=R,C,E --disable=$(disabled_warnings) --indent-string="\t" $* 2> /dev/null;
Modified: incubator/tashi/branches/luke-zoni-staging/doc/DEVELOPMENT
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/doc/DEVELOPMENT?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/doc/DEVELOPMENT (original)
+++ incubator/tashi/branches/luke-zoni-staging/doc/DEVELOPMENT Tue Jun 19 22:05:52 2012
@@ -8,3 +8,9 @@ Future goals:
Other ideas:
* Make available a console aggregator for user's VMs.
+
+Python caveats:
+ * We've liked to use variables like bin, id, sum, input, etc. are
+ built-in, and will be flagged by pydev
+ * pydev does not like python modules with a dash in the name
+
Modified: incubator/tashi/branches/luke-zoni-staging/doc/INSTALL2
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/doc/INSTALL2?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/doc/INSTALL2 (original)
+++ incubator/tashi/branches/luke-zoni-staging/doc/INSTALL2 Tue Jun 19 22:05:52 2012
@@ -48,6 +48,16 @@ exit 0
Note that the entire path of a network connection must be configured to
use jumbo frames, if the virtual machines are to use them.
+If you have large numbers of VLANs, and don't want to hardcode them into
+each VM host, you can find a sample qemu-ifup in the doc directory. This
+script will need to be adapted to your local standards by changing the
+basic parameters at the top. This script can then be linked to by the name
+Tashi expects them to have. For example, if you have a VLAN 1001, you will
+create a link from /etc/qemu-ifup.1001 to this script.
+
+The script will handle the creation of the VM interface, and creation of the
+bridge and VLANs if they haven't been created before.
+
---+ Accounting server
An accounting server is available in the distribution. It will log
Modified: incubator/tashi/branches/luke-zoni-staging/etc/NodeManager.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/etc/NodeManager.cfg?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/etc/NodeManager.cfg (original)
+++ incubator/tashi/branches/luke-zoni-staging/etc/NodeManager.cfg Tue Jun 19 22:05:52 2012
@@ -80,7 +80,6 @@ clusterManagerPort = 9882
statsInterval = 0.0
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Security]
authAndEncrypt = False
Modified: incubator/tashi/branches/luke-zoni-staging/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/etc/TashiDefaults.cfg?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/branches/luke-zoni-staging/etc/TashiDefaults.cfg Tue Jun 19 22:05:52 2012
@@ -54,10 +54,10 @@ allowDecayed = 30.0
allowMismatchedVersions = False
maxMemory = 8192
maxCores = 8
+defaultNetwork = 0
allowDuplicateNames = False
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[GetentOverride]
baseData = tashi.clustermanager.data.Pickled
@@ -110,11 +110,9 @@ registerFrequency = 10.0
clusterManagerHost = localhost
clusterManagerPort = 9882
statsInterval = 0.0
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Qemu]
qemuBin = /usr/bin/kvm
-infoDir = /var/tmp/VmControlQemu/
pollDelay = 1.0
migrationRetries = 10
monitorTimeout = 60.0
@@ -124,6 +122,9 @@ useMigrateArgument = False
statsInterval = 0.0
scratchDir = /tmp
scratchVg = vgscratch
+suspendHandler = gzip
+resumeHandler = zcat
+reservedMem = 512
[XenPV]
vmNamePrefix = tashi
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accounting.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accounting.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accounting.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accounting.py Tue Jun 19 22:05:52 2012
@@ -17,8 +17,8 @@
# specific language governing permissions and limitations
# under the License.
+import os
import sys
-import signal
import logging.config
from tashi.rpycservices import rpycservices
@@ -26,13 +26,15 @@ from rpyc.utils.server import ThreadedSe
#from rpyc.utils.authenticators import TlsliteVdbAuthenticator
#from tashi.rpycservices.rpyctypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole, signalHandler
+from tashi.util import createClient, instantiateImplementation, debugConsole
+from tashi.utils.config import Config
+
import tashi
class Accounting(object):
- def __init__(self, config, cmclient):
+ def __init__(self, config):
self.config = config
- self.cm = cmclient
+ self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
@@ -43,17 +45,20 @@ class Accounting(object):
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+ self.hooks.append(instantiateImplementation(value, self.config, self.cm, False))
except:
self.log.exception("Failed to load hook %s" % (value))
def initAccountingServer(self):
service = instantiateImplementation(self.config.get("Accounting", "service"), self.config)
+ #XXXstroucki: disabled authAndEncrypt for now
#if boolean(self.config.get("Security", "authAndEncrypt")):
if False:
pass
else:
+ # XXXstroucki: ThreadedServer is liable to have
+ # exceptions within if an endpoint is lost.
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(self.config.get('AccountingService', 'port')), auto_register=False)
t.logger.setLevel(logging.ERROR)
@@ -62,25 +67,44 @@ class Accounting(object):
debugConsole(globals())
- try:
- t.start()
- except KeyboardInterrupt:
- self.handleSIGTERM(signal.SIGTERM, None)
-
- @signalHandler(signal.SIGTERM)
- def handleSIGTERM(self, signalNumber, stackFrame):
- self.log.info('Exiting cluster manager after receiving a SIGINT signal')
+ t.start()
+ # shouldn't exit by itself
sys.exit(0)
def main():
- (config, configFiles) = getConfig(["Accounting"])
+ config = Config(["Accounting"])
+ configFiles = config.getFiles()
publisher = instantiateImplementation(config.get("Accounting", "publisher"), config)
tashi.publisher = publisher
- cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- accounting = Accounting(config, cmclient)
+ log = logging.getLogger(__name__)
+ log.info('Using configuration file(s) %s' % configFiles)
+
+ accounting = Accounting(config)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ accounting.initAccountingServer()
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting accounting service after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of accounting service")
+ os._exit(-1)
+
+ log.info("Exiting accounting service after service thread exited")
+ os._exit(-1)
- accounting.initAccountingServer()
+ return
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accountingservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accountingservice.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accountingservice.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/accounting/accountingservice.py Tue Jun 19 22:05:52 2012
@@ -5,15 +5,15 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
-# under the License.
+# under the License.
import logging
import threading
@@ -22,43 +22,43 @@ import time
from tashi import createClient
class AccountingService(object):
- """RPC service for the Accounting service"""
-
- def __init__(self, config):
- self.log = logging.getLogger(__name__)
- self.log.setLevel(logging.INFO)
-
- self.config = config
-
- self.pollSleep = None
-
- # XXXstroucki new python has fallback values
- try:
- self.pollSleep = self.config.getint("AccountingService", "pollSleep")
- except:
- pass
+ """RPC service for the Accounting service"""
- if self.pollSleep is None:
- self.pollSleep = 600
+ def __init__(self, config):
+ self.log = logging.getLogger(__name__)
+ self.log.setLevel(logging.INFO)
- self.cm = createClient(config)
- threading.Thread(target=self.__start).start()
+ self.config = config
+
+ self.pollSleep = None
+
+ # XXXstroucki new python has fallback values
+ try:
+ self.pollSleep = self.config.getint("AccountingService", "pollSleep")
+ except:
+ pass
+
+ if self.pollSleep is None:
+ self.pollSleep = 600
+
+ self.cm = createClient(config)
+ threading.Thread(target=self.__start).start()
# remote
- def record(self, strings):
- for string in strings:
- self.log.info("Remote: %s" % (string))
-
- def __start(self):
- while True:
- try:
- instances = self.cm.getInstances()
- for instance in instances:
- # XXXstroucki this currently duplicates what the CM was doing.
- self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
- except:
- self.log.warning("Accounting iteration failed")
-
-
- # wait to do the next iteration
- time.sleep(self.pollSleep)
+ def record(self, strings):
+ for string in strings:
+ self.log.info("Remote: %s" % (string))
+
+ def __start(self):
+ while True:
+ try:
+ instances = self.cm.getInstances()
+ for instance in instances:
+ # XXXstroucki this currently duplicates what the CM was doing.
+ self.log.info('Accounting: id %s host %s vmId %s user %s cores %s memory %s' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
+ except:
+ self.log.warning("Accounting iteration failed")
+
+
+ # wait to do the next iteration
+ time.sleep(self.pollSleep)
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/dhcpdns.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/dhcpdns.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/dhcpdns.py Tue Jun 19 22:05:52 2012
@@ -22,7 +22,7 @@ import socket
import subprocess
import time
from instancehook import InstanceHook
-from tashi.rpycservices.rpyctypes import Instance, NetworkConfiguration
+from tashi.rpycservices.rpyctypes import Instance
from tashi import boolean
class DhcpDns(InstanceHook):
@@ -55,15 +55,21 @@ class DhcpDns(InstanceHook):
self.ipMax = {}
self.currentIP = {}
self.usedIPs = {}
- for k in self.ipRange:
- ipRange = self.ipRange[k]
- (min, max) = ipRange.split("-")
- min = min.strip()
- max = max.strip()
- ipNum = self.strToIp(min)
- self.ipMin[k] = self.strToIp(min)
- self.ipMax[k] = self.strToIp(max)
- self.currentIP[k] = self.ipMin[k]
+
+ self.initIPs()
+
+ def initIPs(self):
+ self.usedIPs = {}
+ for network in self.ipRange:
+ ipRange = self.ipRange[network]
+ (ipMin, ipMax) = ipRange.split("-")
+ ipMin = ipMin.strip()
+ ipMax = ipMax.strip()
+ ipNum = self.strToIp(ipMin)
+ self.ipMin[network] = self.strToIp(ipMin)
+ self.ipMax[network] = self.strToIp(ipMax)
+ self.currentIP[network] = self.ipMin[network]
+
instances = self.client.getInstances()
for i in instances:
for nic in i.nics:
@@ -72,7 +78,7 @@ class DhcpDns(InstanceHook):
ipNum = self.strToIp(ip)
self.log.info('Added %s->%s during reinitialization' % (i.name, ip))
self.usedIPs[ipNum] = ip
- except Exception, e:
+ except Exception:
pass
def strToIp(self, s):
@@ -87,12 +93,17 @@ class DhcpDns(InstanceHook):
return "%d.%d.%d.%d" % ((ip>>24)&0xff, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff)
def allocateIP(self, nic):
+ # XXXstroucki: if the network is not defined having an ip
+ # range, this will throw a KeyError. Should be logged.
network = nic.network
allocatedIP = None
requestedIP = self.strToIp(nic.ip)
wrapToMinAlready = False
if (requestedIP <= self.ipMax[network] and requestedIP >= self.ipMin[network] and (requestedIP not in self.usedIPs)):
allocatedIP = requestedIP
+
+ # nic.ip will be updated later in preCreate if chosen
+ # ip not available
while (allocatedIP == None):
if (self.currentIP[network] > self.ipMax[network] and wrapToMinAlready):
raise UserWarning("No available IP addresses for network %d" % (network))
@@ -127,7 +138,7 @@ class DhcpDns(InstanceHook):
stdin.write("set hardware-type = 00:00:00:01\n") # Ethernet
stdin.write("create\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
def removeDhcp(self, name, ipaddr=None):
@@ -146,7 +157,7 @@ class DhcpDns(InstanceHook):
stdin.write("open\n")
stdin.write("remove\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
def addDns(self, name, ip):
@@ -169,15 +180,15 @@ class DhcpDns(InstanceHook):
stdin.write("update add %s %d IN PTR %s.%s.\n" % (reverseIpStr, self.dnsExpire, name, self.dnsDomain))
stdin.write("\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
finally:
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
while (pid == 0):
time.sleep(0.5)
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
def removeDns(self, name):
cmd = "nsupdate"
@@ -196,15 +207,15 @@ class DhcpDns(InstanceHook):
stdin.write("update delete %s.%s A\n" % (name, self.dnsDomain))
stdin.write("\n")
stdin.close()
- output = stdout.read()
+ __output = stdout.read()
stdout.close()
finally:
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
while (pid == 0):
time.sleep(0.5)
os.kill(child.pid, signal.SIGTERM)
- (pid, status) = os.waitpid(child.pid, os.WNOHANG)
+ (pid, __status) = os.waitpid(child.pid, os.WNOHANG)
def doUpdate(self, instance):
newInstance = Instance()
@@ -229,7 +240,7 @@ class DhcpDns(InstanceHook):
dhcpName = instance.name + "-nic%d" % (i)
self.log.info("Adding %s:{%s->%s} to DHCP" % (dhcpName, nic.mac, ip))
self.addDhcp(dhcpName, ip, nic.mac)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to add host %s to DHCP/DNS" % (instance.name))
self.doUpdate(instance)
@@ -242,8 +253,11 @@ class DhcpDns(InstanceHook):
ip = nic.ip
try:
ipNum = self.strToIp(ip)
+ # XXXstroucki: if this fails with KeyError,
+ # we must have double-assigned the same IP
+ # address. How does this happen?
del self.usedIPs[ipNum]
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s, ip %s from pool of usedIPs" % (instance.name, ip))
try:
if (i == 0):
@@ -251,9 +265,13 @@ class DhcpDns(InstanceHook):
else:
dhcpName = instance.name + "-nic%d" % (i)
self.removeDhcp(dhcpName)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s from DHCP" % (instance.name))
try:
+ # XXXstroucki: this can fail if the resolver can't
+ # resolve the dns server name (line 190). Perhaps
+ # the hostname should be then pushed onto a list
+ # to try again next time.
self.removeDns(instance.name)
- except Exception, e:
+ except Exception:
self.log.exception("Failed to remove host %s from DNS" % (instance.name))
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/instancehook.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/instancehook.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/instancehook.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/instancehook.py Tue Jun 19 22:05:52 2012
@@ -1,5 +1,3 @@
-#! /usr/bin/env python
-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -17,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+# superclass for instance hooks.
+
class InstanceHook(object):
def __init__(self, config, client, post=False):
if (self.__class__ is InstanceHook):
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/mauiwiki.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/mauiwiki.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/mauiwiki.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/mauiwiki.py Tue Jun 19 22:05:52 2012
@@ -17,20 +17,122 @@
# specific language governing permissions and limitations
# under the License.
+# XXXstroucki: wiki is a text based resource manager that maui can
+# use. It also seems to have disappeared from the face of the web.
+# This code is unmaintained.
+
+# XXXstroucki former file mauipacket.py
+#import subprocess
import time
-import hashlib
-import sys
-import subprocess
-import socket, SocketServer
-from socket import gethostname
-import os
+import SocketServer
+from tashi.utils import pseudoDes
+from tashi.rpycservices.rpyctypes import HostState, InstanceState
+
+class MauiPacket:
+ def __init__(self, key=0):
+ self.size = 0
+ self.char = '\n'
+ self.chksum = '0'*16
+ self.timestamp = int(time.time())
+ self.auth = ''
+ self.data = []
+ self.msg = ''
+ self.key=key
+ def readPacket(self, istream):
+ self.msg = ''
+
+ size = istream.read(8)
+ self.msg = self.msg+size
+ self.size = int(size)
+
+ self.char = istream.read(1)
+ self.msg = self.msg + self.char
+
+ packet = istream.read(self.size)
+ self.msg = self.msg + packet
+
+ packet = packet.split()
+
+ for i in range(len(packet)):
+ item = packet[i].split('=')
+ if item[0] == 'CK':
+ self.chksum = item[1]
+ if item[0] == 'TS':
+ self.timestamp = int(item[1])
+ if item[0] == 'AUTH':
+ self.auth = item[1]
+ if item[0] == 'DT':
+ self.data = packet[i:]
+ self.data=self.data[0].split('=',1)[1:] + self.data[1:]
+
+ def checksumMessage(self, message, key=None):
+ if key == None:
+ key = self.key
+ if type(key) == type(''):
+ key = int(key)
+ chksum = pseudoDes.generateKey(message, key)
+ chksum = '%016x' % chksum
+ return chksum
+ def getChecksum(self):
+ cs = self.msg.partition('TS=')
+ cs = cs[1]+cs[2]
+ chksum = self.checksumMessage(cs)
+ return chksum
+ def verifyChecksum(self):
+ chksum = self.getChecksum()
+ if chksum != self.chksum:
+ print 'verifyChecksum: "%s"\t"%s"'%(chksum, self.chksum)
+ print 'verifyChecksum (types): %s\t%s' %(type(chksum), type(self.chksum))
+ return False
+ return True
+ def set(self, data, auth=None, key=None, timestamp=None):
+ if timestamp==None:
+ timestamp = int(time.time())
+ self.data = data
+ if auth !=None:
+ self.auth = auth
+ if key != None:
+ self.key = key
+ self.timestamp=timestamp
+ self.fixup()
+ def fixup(self):
+ datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+ self.chksum = self.checksumMessage(datastring)
+
+ pktstring = 'CK=%s %s'%(self.chksum, datastring)
+ self.size = len(pktstring)
+ def __str__(self):
+ datastring = "TS=%i AUTH=%s DT=%s"%(self.timestamp, self.auth, (' '.join(self.data)))
+ self.chksum = self.checksumMessage(datastring)
+
+ pktstring = 'CK=%s %s'%(self.chksum, datastring)
+ self.msg = ''
+ self.msg = self.msg + '%08i'%len(pktstring)
+ self.msg = self.msg + self.char
+ self.msg = self.msg + pktstring
+
+ return self.msg
+ def prettyString(self):
+ s = '''Maui Packet
+-----------
+size:\t\t%i
+checksum:\t%s
+timestamp:\t%s
+auth:\t\t%s
+data:
+%s
+-----------'''
+ s = s%(self.size, self.chksum, self.timestamp, self.auth, self.data)
+ return s
+
+# XXXstroucki original file mauiwiki.py
import threading
import logging.config
from tashi.parallel import synchronizedmethod
from tashi.services.ttypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean
-from tashi.agents.mauipacket import MauiPacket
+from tashi.util import getConfig, createClient, instantiateImplementation
+#from tashi.agents.mauipacket import MauiPacket
import tashi.util
def jobnameToId(jobname):
@@ -57,24 +159,24 @@ class InstanceHooks():
def postDestroy(self, inst):
for hook in self.hooks:
hook.postDestroy(inst)
- def idToInst(self, id):
+ def idToInst(self, _id):
instances = self.client.getInstances()
print 'instances ', instances
- insts = [i for i in instances if str(i.id)==str(id)]
+ insts = [i for i in instances if str(i.id)==str(_id)]
if len(insts) == 0:
- raise "No instance with ID %s"%id
+ raise "No instance with ID %s"%_id
if len(insts) > 1:
- raise "Multiple instances with ID %s"%id
+ raise "Multiple instances with ID %s"%_id
inst = insts[0]
return inst
- def destroyById(self, id):
- inst = self.idToInst(id)
- self.client.destroyVm(int(id))
+ def destroyById(self, _id):
+ inst = self.idToInst(_id)
+ self.client.destroyVm(int(_id))
self.postDestroy(inst)
- def activateById(self, id, host):
- inst = self.idToInst(id)
+ def activateById(self, _id, host):
+ inst = self.idToInst(_id)
self.preCreate(inst)
- self.client.activateVm(int(id), host)
+ self.client.activateVm(int(_id), host)
def cmplists(a, b):
for i in range(len(a)):
@@ -301,8 +403,8 @@ class TashiConnection(threading.Thread):
if j.updateTime >= updatetime and j.id in joblist]
jl = {}
for job in jobs:
- id = "%s.%i"%(job.name, job.id)
- jl[id] = {'STATE':self.wikiInstanceState(job),
+ _id = "%s.%i"%(job.name, job.id)
+ jl[_id] = {'STATE':self.wikiInstanceState(job),
'UNAME':self.users[job.userId].name,
'GNAME':self.users[job.userId].name,
'UPDATETIME':int(job.updateTime),
@@ -313,14 +415,14 @@ class TashiConnection(threading.Thread):
'RMEM':str(job.memory),
'WCLIMIT':str(self.defaultJobTime)}
if job.hostId != None:
- jl[id]['TASKLIST'] = self.hosts[job.hostId].name
+ jl[_id]['TASKLIST'] = self.hosts[job.hostId].name
return jl
@synchronizedmethod
- def activateById(self, id, host):
- if not self.instances.has_key(id):
+ def activateById(self, _id, host):
+ if not self.instances.has_key(_id):
raise "no such instance"
- self.ihooks.activateById(id, host)
- self.instances[id].state=InstanceState.Activating
+ self.ihooks.activateById(_id, host)
+ self.instances[_id].state=InstanceState.Activating
class MauiListener(SocketServer.StreamRequestHandler):
def setup(self):
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive.py Tue Jun 19 22:05:52 2012
@@ -19,16 +19,18 @@
import time
import logging.config
+import sys
from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean
+from tashi.util import createClient, instantiateImplementation, boolean
+from tashi.utils.config import Config
import tashi
class Primitive(object):
- def __init__(self, config, cmclient):
+ def __init__(self, config):
self.config = config
- self.cm = cmclient
+ self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -40,10 +42,10 @@ class Primitive(object):
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+ self.hooks.append(instantiateImplementation(value, config, self.cm, False))
except:
self.log.exception("Failed to load hook %s" % (value))
- self.hosts = {}
+ self.hosts = {}
self.load = {}
self.instances = {}
self.muffle = {}
@@ -62,9 +64,9 @@ class Primitive(object):
for h in self.cm.getHosts():
#XXXstroucki get all hosts here?
#if (h.up == True and h.state == HostState.Normal):
- hosts[ctr] = h
- ctr = ctr + 1
- load[h.id] = []
+ hosts[ctr] = h
+ ctr = ctr + 1
+ load[h.id] = []
load[None] = []
_instances = self.cm.getInstances()
@@ -75,8 +77,9 @@ class Primitive(object):
# XXXstroucki put held machines behind pending ones
heldInstances = []
for i in instances.itervalues():
+ # Nonrunning VMs will have hostId of None, but
+ # so will Suspended VMs.
if (i.hostId or i.state == InstanceState.Pending):
- # Nonrunning VMs will have hostId of None
load[i.hostId] = load[i.hostId] + [i.id]
elif (i.hostId is None and i.state == InstanceState.Held):
heldInstances = heldInstances + [i.id]
@@ -199,14 +202,17 @@ class Primitive(object):
if myDisk == i.disks[0].uri and i.disks[0].persistent == True:
count += 1
if count > 1:
- minMaxHost = None
+ minMaxHost = None
if (minMaxHost):
# found a host
if (not inst.hints.get("__resume_source", None)):
# only run preCreate hooks if newly starting
for hook in self.hooks:
- hook.preCreate(inst)
+ try:
+ hook.preCreate(inst)
+ except:
+ self.log.warning("Failed to run preCreate hook")
self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))
rv = "fail"
try:
@@ -241,8 +247,21 @@ class Primitive(object):
def start(self):
oldInstances = {}
+ # XXXstroucki: scheduling races have been observed, where
+ # a vm is scheduled on a host that had not updated its
+ # capacity with the clustermanager, leading to overloaded
+ # hosts. I think the place to insure against this happening
+ # is in the nodemanager. This scheduler will keep an
+ # internal state of cluster loading, but that is best
+ # effort and will be refreshed from CM once the buffer
+ # of vms to be scheduled is exhausted.
+
while True:
try:
+ # XXXstroucki: to get a list of vms to be
+ # scheduled, it asks the CM for a full
+ # cluster state, and will look at those
+ # without a host.
self.__getState()
# Check for VMs that have exited and call
@@ -250,7 +269,7 @@ class Primitive(object):
for i in oldInstances:
# XXXstroucki what about paused and saved VMs?
# XXXstroucki: do we need to look at Held VMs here?
- if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying)):
+ if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying or oldInstances[i].state == InstanceState.ShuttingDown)):
self.log.info("VM exited: %s" % (oldInstances[i].name))
for hook in self.hooks:
hook.postDestroy(oldInstances[i])
@@ -280,13 +299,22 @@ class Primitive(object):
time.sleep(self.scheduleDelay)
def main():
- (config, configFiles) = getConfig(["Agent"])
+ config = Config(["Agent"])
+ configFiles = config.getFiles()
+
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
- cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- agent = Primitive(config, cmclient)
- agent.start()
+ agent = Primitive(config)
+
+ try:
+ agent.start()
+ except KeyboardInterrupt:
+ pass
+
+ log = logging.getLogger(__file__)
+ log.info("Primitive exiting")
+ sys.exit(0)
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive_zoni.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive_zoni.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive_zoni.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/agents/primitive_zoni.py Tue Jun 19 22:05:52 2012
@@ -17,6 +17,11 @@
# specific language governing permissions and limitations
# under the License.
+# XXXstroucki: this apparently originated from a copy of the primitive
+# scheduler code sometime in 2010. It aims to keep a pool of tashi servers
+# available, and other servers shut down. Could this be better suited for
+# a hook function of the scheduler?
+
from socket import gethostname
import os
import socket
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/client/tashi-client.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/client/tashi-client.py Tue Jun 19 22:05:52 2012
@@ -1,4 +1,4 @@
-#! /usr/bin/env python
+#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -21,8 +21,10 @@ import os.path
import random
import sys
import types
-from tashi.rpycservices.rpyctypes import *
-from tashi import vmStates, hostStates, boolean, getConfig, stringPartition, createClient
+from tashi.rpycservices.rpyctypes import NetworkConfiguration,\
+ DiskConfiguration, HostState, Instance, Host, TashiException
+from tashi.utils.config import Config
+from tashi import vmStates, hostStates, boolean, stringPartition, createClient
users = {}
networks = {}
@@ -50,6 +52,22 @@ def getUser():
return users[user].id
raise ValueError("Unknown user %s" % (userStr))
+def checkHid(host):
+ userId = getUser()
+ hosts = client.getHosts()
+ hostId = None
+ try:
+ hostId = int(host)
+ except:
+ for h in hosts:
+ if (h.name == host):
+ hostId = h.id
+ if (hostId is None):
+ raise ValueError("Unknown host %s" % (str(host)))
+
+ # XXXstroucki permissions for host related stuff?
+ return hostId
+
def checkIid(instance):
userId = getUser()
instances = client.getInstances()
@@ -78,10 +96,17 @@ def randomMac():
def getDefaultNetwork():
fetchNetworks()
- networkId = 1
+ networkId = 0
for network in networks:
+ if (getattr(networks[network], "default", False) is True):
+ networkId = network
+ break
+
+ # Naming the network "default" is deprecated, and
+ # this functionality will be removed soon
if (networks[network].name == "default"):
networkId = network
+ break
return networkId
def randomNetwork():
@@ -93,7 +118,7 @@ def parseDisks(arg):
disks = []
for strDisk in strDisks:
strDisk = strDisk.strip()
- (l, s, r) = stringPartition(strDisk, ":")
+ (l, __s, r) = stringPartition(strDisk, ":")
if (r == ""):
r = "False"
r = boolean(r)
@@ -109,12 +134,12 @@ def parseNics(arg):
nics = []
for strNic in strNics:
strNic = strNic.strip()
- (l, s, r) = stringPartition(strNic, ":")
+ (l, __s, r) = stringPartition(strNic, ":")
n = l
if (n == ''):
n = getDefaultNetwork()
n = int(n)
- (l, s, r) = stringPartition(r, ":")
+ (l, __s, r) = stringPartition(r, ":")
ip = l
if (ip == ''):
ip = None
@@ -133,7 +158,7 @@ def parseHints(arg):
hints = {}
for strHint in strHints:
strHint = strHint.strip()
- (l, s, r) = stringPartition(strHint, "=")
+ (l, __s, r) = stringPartition(strHint, "=")
hints[l] = r
return hints
except:
@@ -186,12 +211,26 @@ def createMany(instance, count):
instances.append(client.createVm(instance))
return instances
+def shutdownMany(basename):
+ return __shutdownOrDestroyMany("shutdown", basename)
+
def destroyMany(basename):
+ return __shutdownOrDestroyMany("destroy", basename)
+
+def __shutdownOrDestroyMany(method, basename):
instances = client.getInstances()
count = 0
for i in instances:
if (i.name.startswith(basename + "-") and i.name[len(basename)+1].isdigit()):
- client.destroyVm(i.id)
+ if method == "shutdown":
+ client.shutdownVm(i.id)
+
+ elif method == "destroy":
+ client.destroyVm(i.id)
+
+ else:
+ raise ValueError("Unknown method")
+
count = count + 1
if (count == 0):
raise ValueError("That is an unused basename")
@@ -213,6 +252,7 @@ extraViews = {
'copyImage': (None, None),
'createVm': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
+'shutdownMany': (shutdownMany, None),
'destroyMany': (destroyMany, None),
'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
'getInstances': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
@@ -225,6 +265,7 @@ argLists = {
'createMany': [('userId', int, getUser, False), ('basename', str, lambda: requiredArg('basename'), True), ('cores', int, lambda: 1, False), ('memory', int, lambda: 128, False), ('disks', parseDisks, lambda: requiredArg('disks'), True), ('nics', parseNics, randomNetwork, False), ('hints', parseHints, lambda: {}, False), ('count', int, lambda: requiredArg('count'), True)],
'shutdownVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'destroyVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
+'shutdownMany': [('basename', str, lambda: requiredArg('basename'), True)],
'destroyMany': [('basename', str, lambda: requiredArg('basename'), True)],
'suspendVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'resumeVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
@@ -235,6 +276,7 @@ argLists = {
'getImages': [],
'copyImage': [('src', str, lambda: requiredArg('src'),True), ('dst', str, lambda: requiredArg('dst'), True)],
'getHosts': [],
+'setHostState': [('host', checkHid, lambda: requiredArg('host'), True), ('state', str, lambda: requiredArg('state'), True)],
'getUsers': [],
'getNetworks': [],
'getInstances': [],
@@ -250,6 +292,7 @@ convertArgs = {
'createMany': '[Instance(d={"userId":userId,"name":basename,"cores":cores,"memory":memory,"disks":disks,"nics":nics,"hints":hints}), count]',
'shutdownVm': '[instance]',
'destroyVm': '[instance]',
+'shutdownMany': '[basename]',
'destroyMany': '[basename]',
'suspendVm': '[instance]',
'resumeVm': '[instance]',
@@ -260,6 +303,7 @@ convertArgs = {
'unregisterHost' : '[hostId]',
'getSlots' : '[cores, memory]',
'copyImage' : '[src, dst]',
+'setHostState' : '[host, state]',
}
# Descriptions
@@ -268,6 +312,7 @@ description = {
'createMany': 'Utility function that creates many VMs with the same set of parameters',
'shutdownVm': 'Attempts to shutdown a VM nicely',
'destroyVm': 'Immediately destroys a VM -- it is the same as unplugging a physical machine and should be used for non-persistent VMs or when all else fails',
+'shutdownMany': 'Attempts to gracefully shut down a group of VMs created with createMany',
'destroyMany': 'Destroys a group of VMs created with createMany',
'suspendVm': 'Suspends a running VM to disk',
'resumeVm': 'Resumes a suspended VM from disk',
@@ -276,6 +321,7 @@ description = {
'unpauseVm': 'Unpauses a paused VM',
'getSlots': 'Get a count of how many VMs could be started in the cluster',
'getHosts': 'Gets a list of hosts running Node Managers',
+'setHostState': 'Set the state of a host, eg. Normal or Drained',
'getUsers': 'Gets a list of users',
'getNetworks': 'Gets a list of available networks for VMs to be placed on',
'getInstances': 'Gets a list of all VMs in the cluster',
@@ -293,6 +339,7 @@ examples = {
'createMany': ['--basename foobar --disks i386-hardy.qcow2 --count 4'],
'shutdownVm': ['--instance 12345', '--instance foobar'],
'destroyVm': ['--instance 12345', '--instance foobar'],
+'shutdownMany': ['--basename foobar'],
'destroyMany': ['--basename foobar'],
'suspendVm': ['--instance 12345', '--instance foobar'],
'resumeVm': ['--instance 12345', '--instance foobar'],
@@ -301,6 +348,7 @@ examples = {
'unpauseVm': ['--instance 12345', '--instance foobar'],
'getSlots': ['--cores 1 --memory 128'],
'getHosts': [''],
+'setHostState': ['--host fnord --state Drained'],
'getUsers': [''],
'getNetworks': [''],
'getInstances': [''],
@@ -321,7 +369,8 @@ def usage(func = None):
print "Unknown function %s" % (func)
print
functions = argLists
- print "%s is the client program for Tashi, a system for cloud-computing on BigData." % (os.path.basename(sys.argv[0]))
+ print "%s is the client program for Tashi" % (os.path.basename(sys.argv[0]))
+ print "Tashi, a system for cloud-computing on BigData"
print "Visit http://incubator.apache.org/tashi/ for more information."
print
else:
@@ -373,9 +422,9 @@ def transformState(obj):
except:
obj.state = 'Unknown'
-def genKeys(list):
+def genKeys(_list):
keys = {}
- for row in list:
+ for row in _list:
for item in row.__dict__.keys():
keys[item] = item
if ('id' in keys):
@@ -385,25 +434,25 @@ def genKeys(list):
keys = keys.values()
return keys
-def makeTable(list, keys=None):
- (consoleWidth, consoleHeight) = (9999, 9999)
+def makeTable(_list, keys=None):
+ (consoleWidth, __consoleHeight) = (9999, 9999)
try:
# XXXpipe: get number of rows and column on current window
stdout = os.popen("stty size")
- r = stdout.read()
+ __r = stdout.read()
stdout.close()
except:
pass
- for obj in list:
+ for obj in _list:
transformState(obj)
if (keys == None):
- keys = genKeys(list)
+ keys = genKeys(_list)
for (show, k) in show_hide:
if (show):
if (k != "all"):
keys.append(k)
else:
- keys = genKeys(list)
+ keys = genKeys(_list)
else:
if (k in keys):
keys.remove(k)
@@ -412,7 +461,7 @@ def makeTable(list, keys=None):
maxWidth = {}
for k in keys:
maxWidth[k] = len(k)
- for row in list:
+ for row in _list:
for k in keys:
if (k in row.__dict__):
maxWidth[k] = max(maxWidth[k], len(str(row.__dict__[k])))
@@ -445,8 +494,8 @@ def makeTable(list, keys=None):
return 1
else:
return 0
- list.sort(cmp=sortFunction)
- for row in list:
+ _list.sort(cmp=sortFunction)
+ for row in _list:
line = ""
for k in keys:
row.__dict__[k] = row.__dict__.get(k, "")
@@ -507,11 +556,12 @@ def main():
"""Main function for the client program"""
global INDENT, exitCode, client
exitCode = 0
+ exception = None
INDENT = (os.getenv("INDENT", 4))
if (len(sys.argv) < 2):
usage()
function = matchFunction(sys.argv[1])
- (config, configFiles) = getConfig(["Client"])
+ config = Config(["Client"])
# build a structure of possible arguments
possibleArgs = {}
@@ -551,30 +601,54 @@ def main():
if (arg.startswith("--")):
if (arg[2:] in possibleArgs):
(parg, conv, default, required) = possibleArgs[arg[2:]]
- val = conv(args.pop(0))
+ try:
+ val = None
+ lookahead = args[0]
+ if not lookahead.startswith("--"):
+ val = args.pop(0)
+ except:
+ pass
+
+ val = conv(val)
if (val == None):
val = default()
vals[parg] = val
continue
+ # somewhat lame, but i don't want to rewrite the fn at this time
+ exception = ValueError("Unknown argument %s" % (arg))
- raise ValueError("Unknown argument %s" % (arg))
-
-
- f = getattr(client, function, None)
+ f = None
+ try:
+ f = extraViews[function][0]
+ except:
+ pass
if (f is None):
- f = extraViews[function][0]
- if (function in convertArgs):
- fargs = eval(convertArgs[function], globals(), vals)
- else:
- fargs = []
- res = f(*fargs)
+ f = getattr(client, function, None)
+
+ try:
+ if exception is not None:
+ raise exception
+
+ if (function in convertArgs):
+ fargs = eval(convertArgs[function], globals(), vals)
+ else:
+ fargs = []
+
+ res = f(*fargs)
+ except Exception, e:
+ print "Failed in calling %s: %s" % (function, e)
+ print "Please run tashi-client --examples for syntax information"
+ sys.exit(-1)
+
if (res != None):
keys = extraViews.get(function, (None, None))[1]
try:
if (type(res) == types.ListType):
makeTable(res, keys)
+ elif (type(res) == types.StringType):
+ print res
else:
makeTable([res], keys)
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanager.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanager.py Tue Jun 19 22:05:52 2012
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -17,11 +17,12 @@
# specific language governing permissions and limitations
# under the License.
+import os
import sys
-import signal
import logging.config
-from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
+from tashi.util import boolean, instantiateImplementation, debugConsole
+from tashi.utils.config import Config
import tashi
from tashi.rpycservices import rpycservices
@@ -46,6 +47,9 @@ def startClusterManager(config):
users[config.get('AllowedUsers', 'nodeManagerUser')] = config.get('AllowedUsers', 'nodeManagerPassword')
users[config.get('AllowedUsers', 'agentUser')] = config.get('AllowedUsers', 'agentPassword')
authenticator = TlsliteVdbAuthenticator.from_dict(users)
+
+ # XXXstroucki ThreadedServer is liable to have exceptions
+ # occur within if an endpoint is lost.
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False, authenticator=authenticator)
else:
t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False)
@@ -54,24 +58,17 @@ def startClusterManager(config):
t.service._type = 'ClusterManagerService'
debugConsole(globals())
-
- try:
- t.start()
- except KeyboardInterrupt:
- handleSIGTERM(signal.SIGTERM, None)
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
- global log
+ t.start()
+ # shouldn't exit by itself
+ return
- log.info('Exiting cluster manager after receiving a SIGINT signal')
- sys.exit(0)
-
def main():
global log
# setup configuration and logging
- (config, configFiles) = getConfig(["ClusterManager"])
+ config = Config(["ClusterManager"])
+ configFiles = config.getFiles()
publisher = instantiateImplementation(config.get("ClusterManager", "publisher"), config)
tashi.publisher = publisher
logging.config.fileConfig(configFiles)
@@ -80,7 +77,32 @@ def main():
# bind the database
log.info('Starting cluster manager')
- startClusterManager(config)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ startClusterManager(config)
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting cluster manager after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of cluster manager")
+ os._exit(-1)
+
+ log.info("Exiting cluster manager after service thread exited")
+ os._exit(-1)
+
+ return
+
+
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanagerservice.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/clustermanagerservice.py Tue Jun 19 22:05:52 2012
@@ -19,9 +19,8 @@ import logging
import threading
import time
-from tashi.rpycservices import rpycservices
-from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
-from tashi import boolean, ConnectionManager, vmStates, version, scrubString
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, Instance, HostState, TashiException
+from tashi import boolean, ConnectionManager, vmStates, hostStates, version, scrubString
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
@@ -36,7 +35,7 @@ class ClusterManagerService(object):
else:
self.username = None
self.password = None
- self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')))
+ self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')), authAndEncrypt=self.authAndEncrypt)
self.dfs = dfs
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
@@ -49,6 +48,9 @@ class ClusterManagerService(object):
self.allowMismatchedVersions = boolean(self.config.get('ClusterManagerService', 'allowMismatchedVersions'))
self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
+
+ self.defaultNetwork = self.config.getint('ClusterManagerService', 'defaultNetwork', 0)
+
self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
self.accountingHost = None
@@ -62,7 +64,7 @@ class ClusterManagerService(object):
self.__initAccounting()
self.__initCluster()
- threading.Thread(target=self.__monitorCluster).start()
+ threading.Thread(name="monitorCluster", target=self.__monitorCluster).start()
def __initAccounting(self):
self.accountBuffer = []
@@ -71,7 +73,7 @@ class ClusterManagerService(object):
try:
if (self.accountingHost is not None) and \
(self.accountingPort is not None):
- self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+ self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
except:
self.log.exception("Could not init accounting")
@@ -126,7 +128,7 @@ class ClusterManagerService(object):
except:
self.log.exception("Invalid host data")
- secondary = ','.join(filter(None, (hostText, instanceText)))
+ secondary = ','.join(filter(None, (hostText, instanceText)))
line = "%s|%s|%s" % (now, text, secondary)
@@ -232,7 +234,7 @@ class ClusterManagerService(object):
# get a list of VMs running on host
try:
hostProxy = self.proxy[host.name]
- remoteInstances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
+ remoteInstances = [self.__getVmInfo(host.name, vmId) for vmId in hostProxy.listVms()]
except:
self.log.warning('Failure getting instances from host %s' % (host.name))
self.data.releaseHost(host)
@@ -241,6 +243,9 @@ class ClusterManagerService(object):
# register instances I don't know about
for instance in remoteInstances:
if (instance.id not in myInstances):
+ if instance.state == InstanceState.Exited:
+ self.log.warning("%s telling me about exited instance %s, ignoring." % (host.name, instance.id))
+ continue
instance.hostId = host.id
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
@@ -269,18 +274,22 @@ class ClusterManagerService(object):
# iterate through all VMs I believe are active
for instanceId in self.instanceLastContactTime.keys():
- # Don't query non-running VMs. eg. if a VM
- # is suspended, and has no host, then there's
- # no one to ask
- if instance.state != InstanceState.Running and \
- instance.state != InstanceState.Activating and \
- instance.state != InstanceState.Orphaned:
- continue
# XXXstroucki should lock instance here?
- if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+ try:
+ lastContactTime = self.instanceLastContactTime[instanceId]
+ except KeyError:
+ continue
+
+ if (lastContactTime < (self.__now() - self.allowDecayed)):
try:
instance = self.data.acquireInstance(instanceId)
+ # Don't query non-running VMs. eg. if a VM
+ # is suspended, and has no host, then there's
+ # no one to ask
+ if instance.state not in [InstanceState.Running, InstanceState.Activating, InstanceState.Orphaned]:
+ self.data.releaseInstance(instance)
+ continue
except:
continue
@@ -293,22 +302,34 @@ class ClusterManagerService(object):
# get updated state on VM
try:
- hostProxy = self.proxy[host.name]
- newInstance = hostProxy.getVmInfo(instance.vmId)
+ newInstance = self.__getVmInfo(host.name, instance.vmId)
except:
self.log.warning('Failure getting data for instance %s from host %s' % (instance.name, host.name))
self.data.releaseInstance(instance)
continue
- # replace existing state with new state
- # XXXstroucki more?
- instance.state = newInstance.state
- self.instanceLastContactTime[instanceId] = self.__now()
- instance.decayed = False
- self.data.releaseInstance(instance)
+ # update the information we have on the vm
+ #before = instance.state
+ rv = self.__vmUpdate(instance, newInstance, None)
+ if (rv == "release"):
+ self.data.releaseInstance(instance)
+
+ if (rv == "remove"):
+ self.data.removeInstance(instance)
+
+ def __getVmInfo(self, host, vmid):
+ hostProxy = self.proxy[host]
+ rv = hostProxy.getVmInfo(vmid)
+ if isinstance(rv, Exception):
+ raise rv
- def normalize(self, instance):
+ if not isinstance(rv, Instance):
+ raise ValueError
+
+ return rv
+
+ def __normalize(self, instance):
instance.id = None
instance.vmId = None
instance.hostId = None
@@ -336,18 +357,20 @@ class ClusterManagerService(object):
del instance.hints[hint]
return instance
+ # extern
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
# XXXstroucki: check for exception here
- instance = self.normalize(instance)
+ instance = self.__normalize(instance)
instance = self.data.registerInstance(instance)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM REQUEST", instance=instance)
return instance
-
+
+ # extern
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+ self.__stateTransition(instance, None, InstanceState.ShuttingDown)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -357,7 +380,8 @@ class ClusterManagerService(object):
self.log.exception('shutdownVm failed for host %s vmId %d' % (instance.name, instance.vmId))
raise
return
-
+
+ # extern
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
@@ -365,7 +389,7 @@ class ClusterManagerService(object):
self.data.removeInstance(instance)
elif (instance.state is InstanceState.Activating):
self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
- self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+ self.__stateTransition(instance, None, InstanceState.Destroying)
self.data.releaseInstance(instance)
else:
# XXXstroucki: This is a problem with keeping
@@ -381,15 +405,21 @@ class ClusterManagerService(object):
self.proxy[hostname].destroyVm(instance.vmId)
self.data.releaseInstance(instance)
except:
- self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
+ self.log.warning('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
self.data.removeInstance(instance)
return
+ # extern
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SUSPEND", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -401,15 +431,22 @@ class ClusterManagerService(object):
raise TashiException(d={'errno':Errors.UnableToSuspend, 'msg':'Failed to suspend %s' % (instance.name)})
return
+ # extern
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ try:
+ self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
source = "suspend/%d_%s" % (instance.id, instance.name)
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM RESUME", instance=instance)
return instance
+ # extern
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
self.__ACCOUNT("CM VM MIGRATE", instance=instance)
@@ -421,7 +458,13 @@ class ClusterManagerService(object):
except:
self.data.releaseInstance(instance)
raise
- self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
try:
# Prepare the target
@@ -433,7 +476,12 @@ class ClusterManagerService(object):
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
- self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ try:
+ self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
try:
# Send the VM
@@ -449,15 +497,23 @@ class ClusterManagerService(object):
try:
# Notify the target
- vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
+ __vmid = self.proxy[targetHost.name].receiveVm(instance, cookie)
except Exception:
self.log.exception('receiveVm failed')
raise
+
+ self.log.info("migrateVM finished")
return
-
+
+ # extern
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM PAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -467,13 +523,24 @@ class ClusterManagerService(object):
self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ try:
+ self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
return
+ # extern
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ try:
+ self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -483,25 +550,61 @@ class ClusterManagerService(object):
self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ try:
+ self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
return
-
+
+ # extern
def getHosts(self):
return self.data.getHosts().values()
+ # extern
+ def setHostState(self, hostId, state):
+ state = state.lower()
+ hostState = None
+ if state == "normal":
+ hostState = HostState.Normal
+ if state == "drained":
+ hostState = HostState.Drained
+
+ if hostState is None:
+ return "%s is not a valid host state" % state
+
+ host = self.data.acquireHost(hostId)
+ try:
+ host.state = hostState
+ finally:
+ self.data.releaseHost(host)
+
+ return "Host state set to %s." % hostStates[hostState]
+
+ # extern
def getNetworks(self):
- return self.data.getNetworks().values()
-
+ networks = self.data.getNetworks()
+ for network in networks:
+ if self.defaultNetwork == networks[network].id:
+ setattr(networks[network], "default", True)
+
+ return networks.values()
+
+ # extern
def getUsers(self):
return self.data.getUsers().values()
-
+
+ # extern
def getInstances(self):
return self.data.getInstances().values()
+ # extern
def getImages(self):
return self.data.getImages()
+ # extern
def copyImage(self, src, dst):
imageSrc = self.dfs.getLocalHandle("images/" + src)
imageDst = self.dfs.getLocalHandle("images/" + dst)
@@ -515,6 +618,7 @@ class ClusterManagerService(object):
except Exception, e:
self.log.exception('DFS image copy failed: %s (%s->%s)' % (e, imageSrc, imageDst))
+ # extern
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
@@ -526,7 +630,7 @@ class ClusterManagerService(object):
raise
return res
-# @timed
+ # extern
def registerNodeManager(self, host, instances):
"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
@@ -559,45 +663,47 @@ class ClusterManagerService(object):
# let the host communicate what it is running
# and note that the information is not stale
for instance in instances:
+ if instance.state == InstanceState.Exited:
+ self.log.warning("%s reporting exited instance %s, ignoring." % (host.name, instance.id))
+ continue
self.instanceLastContactTime.setdefault(instance.id, 0)
self.data.releaseHost(oldHost)
return host.id
- def vmUpdate(self, instanceId, instance, oldState):
- try:
- oldInstance = self.data.acquireInstance(instanceId)
- except TashiException, e:
- # shouldn't have a lock to clean up after here
- if (e.errno == Errors.NoSuchInstanceId):
- self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
- return
- except:
- self.log.exception("Could not acquire instance")
- raise
+ def __vmUpdate(self, oldInstance, instance, oldState):
+ # this function assumes a lock is held on the instance
+ # already, and will be released elsewhere
- self.instanceLastContactTime[instanceId] = self.__now()
+ self.instanceLastContactTime[oldInstance.id] = self.__now()
oldInstance.decayed = False
- self.__ACCOUNT("CM VM UPDATE", instance=oldInstance)
if (instance.state == InstanceState.Exited):
# determine why a VM has exited
hostname = self.data.getHost(oldInstance.hostId).name
+
if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
+
if (oldInstance.state == InstanceState.Suspending):
self.__stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
oldInstance.hostId = None
oldInstance.vmId = None
- self.data.releaseInstance(oldInstance)
+ return "release"
+
+ if (oldInstance.state == InstanceState.MigrateTrans):
+ # Just await update from target host
+ return "release"
+
else:
del self.instanceLastContactTime[oldInstance.id]
- self.data.removeInstance(oldInstance)
+ return "remove"
+
else:
if (instance.state):
# XXXstroucki does this matter?
if (oldState and oldInstance.state != oldState):
- self.log.warning('Got vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
+ self.log.warning('Doing vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
oldInstance.state = instance.state
if (instance.vmId):
oldInstance.vmId = instance.vmId
@@ -610,11 +716,44 @@ class ClusterManagerService(object):
if (oldNic.mac == nic.mac):
oldNic.ip = nic.ip
+ return "release"
+
+
+ return "success"
+
+ # extern
+ def vmUpdate(self, instanceId, instance, oldState):
+ try:
+ oldInstance = self.data.acquireInstance(instanceId)
+ except TashiException, e:
+ # shouldn't have a lock to clean up after here
+ if (e.errno == Errors.NoSuchInstanceId):
+ self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
+ return
+ except:
+ self.log.exception("Could not acquire instance")
+ raise
+
+ import copy
+ displayInstance = copy.copy(oldInstance)
+ displayInstance.state = instance.state
+ self.__ACCOUNT("CM VM UPDATE", instance=displayInstance)
+
+ rv = self.__vmUpdate(oldInstance, instance, oldState)
+
+ if (rv == "release"):
self.data.releaseInstance(oldInstance)
+ if (rv == "remove"):
+ self.data.removeInstance(oldInstance)
+
return "success"
-
+
+ # extern
def activateVm(self, instanceId, host):
+ # XXXstroucki: check my idea of the host's capacity before
+ # trying.
+
dataHost = self.data.acquireHost(host.id)
if (dataHost.name != host.name):
@@ -632,7 +771,7 @@ class ClusterManagerService(object):
self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
if ('__resume_source' in instance.hints):
- self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+ self.__stateTransition(instance, None, InstanceState.Resuming)
else:
# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
@@ -678,12 +817,13 @@ class ClusterManagerService(object):
self.data.releaseInstance(instance)
return "success"
- def registerHost(self, hostname, memory, cores, version):
- hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
- if alreadyRegistered:
- self.log.info("Host %s is already registered, it was updated now" % hostname)
- else:
- self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+ # extern
+ def registerHost(self, hostname, memory, cores, version):
+ hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
+ if alreadyRegistered:
+ self.log.info("Host %s is already registered, it was updated now" % hostname)
+ else:
+ self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
try:
host = self.data.getHost(hostId)
@@ -691,9 +831,10 @@ class ClusterManagerService(object):
except:
self.log.warning("Failed to lookup host %s" % hostId)
- return hostId
+ return hostId
- def unregisterHost(self, hostId):
+ # extern
+ def unregisterHost(self, hostId):
try:
host = self.data.getHost(hostId)
self.__ACCOUNT("CM HOST UNREGISTER", host=host)
@@ -701,9 +842,9 @@ class ClusterManagerService(object):
self.log.warning("Failed to lookup host %s" % hostId)
return
- self.data.unregisterHost(hostId)
- self.log.info("Host %s was unregistered" % hostId)
- return
+ self.data.unregisterHost(hostId)
+ self.log.info("Host %s was unregistered" % hostId)
+ return
# service thread
def __monitorCluster(self):
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/datainterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/datainterface.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/datainterface.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/datainterface.py Tue Jun 19 22:05:52 2012
@@ -43,25 +43,28 @@ class DataInterface(object):
def getHosts(self):
raise NotImplementedError
- def getHost(self, id):
+ def getHost(self, _id):
+ raise NotImplementedError
+
+ def getImages(self):
raise NotImplementedError
def getInstances(self):
raise NotImplementedError
- def getInstance(self, id):
+ def getInstance(self, _id):
raise NotImplementedError
def getNetworks(self):
raise NotImplementedError
- def getNetwork(self, id):
+ def getNetwork(self, _id):
raise NotImplementedError
def getUsers(self):
raise NotImplementedError
- def getUser(self, id):
+ def getUser(self, _id):
raise NotImplementedError
def registerHost(self, hostname, memory, cores, version):
Modified: incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/fromconfig.py?rev=1351880&r1=1351879&r2=1351880&view=diff
==============================================================================
--- incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/branches/luke-zoni-staging/src/tashi/clustermanager/data/fromconfig.py Tue Jun 19 22:05:52 2012
@@ -15,17 +15,21 @@
# specific language governing permissions and limitations
# under the License.
+#XXXstroucki: for compatibility with python 2.5
from __future__ import with_statement
+
+import logging
import threading
import os
import ConfigParser
-from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState
+from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState, Instance
from tashi.clustermanager.data import DataInterface
class FromConfig(DataInterface):
def __init__(self, config):
DataInterface.__init__(self, config)
+ self.log = logging.getLogger(__name__)
self.hosts = {}
self.instances = {}
self.networks = {}
@@ -78,6 +82,10 @@ class FromConfig(DataInterface):
return instanceId
def registerInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.acquireLock(self.instanceLock)
try:
if (instance.id is not None and instance.id not in self.instances):
@@ -107,6 +115,10 @@ class FromConfig(DataInterface):
return instance
def releaseInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
try:
if (instance.id not in self.instances): # MPR: should never be true, but good to check
raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instance.id)})
@@ -114,6 +126,10 @@ class FromConfig(DataInterface):
self.releaseLock(instance._lock)
def removeInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.acquireLock(self.instanceLock)
try:
del self.instances[instance.id]
@@ -122,6 +138,10 @@ class FromConfig(DataInterface):
self.releaseLock(self.instanceLock)
def acquireHost(self, hostId):
+ if type(hostId) is not int:
+ self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+ raise TypeError
+
self.hostLock.acquire()
host = self.hosts.get(hostId, None)
if (host is None):
@@ -134,6 +154,10 @@ class FromConfig(DataInterface):
def releaseHost(self, host):
+ if type(host) is not Host:
+ self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+ raise TypeError
+
try:
if (host.id not in self.hosts): # MPR: should never be true, but good to check
raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (host.id)})
@@ -145,47 +169,47 @@ class FromConfig(DataInterface):
def getHosts(self):
return self.hosts
- def getHost(self, id):
- host = self.hosts.get(id, None)
+ def getHost(self, _id):
+ host = self.hosts.get(_id, None)
if (not host):
- raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (_id)})
return host
def getInstances(self):
return self.instances
- def getInstance(self, id):
- instance = self.instances.get(id, None)
+ def getInstance(self, _id):
+ instance = self.instances.get(_id, None)
if (not instance):
- raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (id)})
+ raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (_id)})
return instance
def getNetworks(self):
return self.networks
- def getNetwork(self, id):
- return self.networks[id]
+ def getNetwork(self, _id):
+ return self.networks[_id]
def getUsers(self):
return self.users
- def getUser(self, id):
- return self.users[id]
+ def getUser(self, _id):
+ return self.users[_id]
def registerHost(self, hostname, memory, cores, version):
self.hostLock.acquire()
- for id in self.hosts.keys():
- if self.hosts[id].name == hostname:
- host = Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
- self.hosts[id] = host
+ for _id in self.hosts.keys():
+ if self.hosts[_id].name == hostname:
+ host = Host(d={'id':_id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+ self.hosts[_id] = host
self.save()
self.hostLock.release()
- return id, True
- id = self.getNewId("hosts")
- self.hosts[id] = Host(d={'id':id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
+ return _id, True
+ _id = self.getNewId("hosts")
+ self.hosts[_id] = Host(d={'id':_id,'name':hostname,'state':HostState.Normal,'memory':memory,'cores':cores,'version':version})
self.save()
self.hostLock.release()
- return id, False
+ return _id, False
def unregisterHost(self, hostId):
self.hostLock.acquire()
@@ -200,10 +224,10 @@ class FromConfig(DataInterface):
maxId = 0
l = []
if(table == "hosts"):
- for id in self.hosts.keys():
- l.append(id)
- if id >= maxId:
- maxId = id
+ for _id in self.hosts.keys():
+ l.append(_id)
+ if _id >= maxId:
+ maxId = _id
l.sort() # sort to enable comparing with range output
# check if some id is released:
t = range(maxId + 1)
@@ -221,9 +245,9 @@ class FromConfig(DataInterface):
# and in what order does it get loaded
fileName = "./etc/Tashi.cfg"
if not os.path.exists(fileName):
- file = open(fileName, "w")
- file.write("[FromConfig]")
- file.close()
+ filehandle = open(fileName, "w")
+ filehandle.write("[FromConfig]")
+ filehandle.close()
parser = ConfigParser.ConfigParser()
parser.read(fileName)
@@ -231,7 +255,7 @@ class FromConfig(DataInterface):
parser.add_section("FromConfig")
hostsInFile = []
- for (name, value) in parser.items("FromConfig"):
+ for (name, __value) in parser.items("FromConfig"):
name = name.lower()
if (name.startswith("host")):
hostsInFile.append(name)