You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/07/16 23:52:43 UTC
svn commit: r1362316 [1/2] - in /incubator/tashi/branches/oldstable: ./ etc/
src/tashi/ src/tashi/accounting/ src/tashi/agents/ src/tashi/client/
src/tashi/clustermanager/ src/tashi/clustermanager/data/
src/tashi/messaging/ src/tashi/nodemanager/ src/t...
Author: stroucki
Date: Mon Jul 16 23:52:41 2012
New Revision: 1362316
URL: http://svn.apache.org/viewvc?rev=1362316&view=rev
Log:
Copy stable branch down to oldstable branch in preparation for new stable branch.
Removed:
incubator/tashi/branches/oldstable/src/tashi/client/client.py
incubator/tashi/branches/oldstable/src/tashi/client/test.py
incubator/tashi/branches/oldstable/src/tashi/messaging/messageBroker.py
incubator/tashi/branches/oldstable/src/tashi/messaging/messaging.py
incubator/tashi/branches/oldstable/src/tashi/messaging/soapmessaging.py
incubator/tashi/branches/oldstable/src/tashi/messaging/tashimessaging.py
incubator/tashi/branches/oldstable/src/tashi/messaging/threadpool.py
incubator/tashi/branches/oldstable/src/tashi/messaging/thriftmessaging.py
incubator/tashi/branches/oldstable/src/tashi/thrift/
incubator/tashi/branches/oldstable/src/utils/Makefile
incubator/tashi/branches/oldstable/src/utils/getLocality.py
incubator/tashi/branches/oldstable/src/utils/nmd.c
Modified:
incubator/tashi/branches/oldstable/ (props changed)
incubator/tashi/branches/oldstable/INSTALL
incubator/tashi/branches/oldstable/Makefile
incubator/tashi/branches/oldstable/etc/NodeManager.cfg
incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg
incubator/tashi/branches/oldstable/src/tashi/accounting/accounting.py
incubator/tashi/branches/oldstable/src/tashi/accounting/accountingservice.py
incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py
incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py
incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py
incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py
incubator/tashi/branches/oldstable/src/tashi/util.py
incubator/tashi/branches/oldstable/src/tashi/version.py
incubator/tashi/branches/oldstable/src/utils/nmd.py
incubator/tashi/branches/oldstable/src/zoni/client/zoni-cli.py
incubator/tashi/branches/oldstable/src/zoni/extra/util.py
incubator/tashi/branches/oldstable/src/zoni/hardware/delldrac.py
incubator/tashi/branches/oldstable/src/zoni/hardware/dellswitch.py
incubator/tashi/branches/oldstable/src/zoni/hardware/hpswitch.py
incubator/tashi/branches/oldstable/src/zoni/install/db/zoniDbSetup.py
incubator/tashi/branches/oldstable/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
Propchange: incubator/tashi/branches/oldstable/
------------------------------------------------------------------------------
Merged /incubator/tashi/branches/stroucki-tashi11:r1294393-1294415
Merged /incubator/tashi/branches/stroucki-tashi10:r1294393-1294727
Merged /incubator/tashi/branches/stable:r1298172-1362313
Merged /incubator/tashi/branches/stroucki-dropthrift:r1292513-1297655
Merged /incubator/tashi/trunk:r1241775-1245044,1245046-1292487,1292489-1292540,1292542,1292544-1292894,1292896-1293347,1293349-1293400,1293402-1294309,1294311-1294408,1294410-1298109,1301133-1301134,1301156,1304335
Merged /incubator/tashi/branches/stroucki-tashi8:r1294393-1294427
Merged /incubator/tashi/branches/stroucki-accounting:r1241771-1295369
Merged /incubator/tashi/branches/stroucki-tashi2:r1294935-1294944
Merged /incubator/tashi/branches/stroucki-stable:r1297792-1298173
Modified: incubator/tashi/branches/oldstable/INSTALL
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/INSTALL?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/INSTALL (original)
+++ incubator/tashi/branches/oldstable/INSTALL Mon Jul 16 23:52:41 2012
@@ -124,6 +124,7 @@ When defining the host, you must provide
given by the hostname command. If you plan on eventually having several
hosts and networks, feel free to add them now.
+root@grml:/usr/local/tashi# cd bin
root@grml:/usr/local/tashi/bin# DEBUG=1 ./clustermanager
2012-01-26 23:12:33,972 [./clustermanager:INFO] Using configuration file(s) ['/usr/local/tashi/etc/TashiDefaults.cfg']
2012-01-26 23:12:33,972 [./clustermanager:INFO] Starting cluster manager
@@ -158,9 +159,8 @@ In [3]: data.baseDataObject.networks[1]=
In [4]: data.baseDataObject.save()
-In [5]: import os
-
-In [6]: os.kill(os.getpid(), 9)
+In [5]: (^C)
+2012-03-07 20:00:00,456 [./bin/clustermanager:INFO] Exiting cluster manager after receiving a SIGINT signal
Run the cluster manager in the background:
root@grml:/usr/local/tashi/bin# ./clustermanager &
Modified: incubator/tashi/branches/oldstable/Makefile
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/Makefile?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/Makefile (original)
+++ incubator/tashi/branches/oldstable/Makefile Mon Jul 16 23:52:41 2012
@@ -128,11 +128,11 @@ rmzoni-cli:
if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks
-disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')
+disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')",F0401"
pysrc=$(shell find . \! -path '*gen-py*' \! -path '*services*' \! -path '*messagingthrift*' \! -name '__init__.py' -name "*.py")
tidy: $(addprefix tidyfile/,$(pysrc))
- @echo Insuring .py files are nice and tidy!
+ @echo Ensured .py files are nice and tidy!
tidyfile/%: %
@echo Checking tidy for $*
- pylint --report=no --disable-msg-cat=R,C,E --disable-msg=$(disabled_warnings) --indent-string="\t" $* 2> /dev/null;
+ pylint --report=no --disable=R,C,E --disable=$(disabled_warnings) --indent-string="\t" $* 2> /dev/null;
Modified: incubator/tashi/branches/oldstable/etc/NodeManager.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/NodeManager.cfg?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/NodeManager.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/NodeManager.cfg Mon Jul 16 23:52:41 2012
@@ -80,7 +80,6 @@ clusterManagerPort = 9882
statsInterval = 0.0
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Security]
authAndEncrypt = False
Modified: incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg Mon Jul 16 23:52:41 2012
@@ -57,7 +57,6 @@ maxCores = 8
allowDuplicateNames = False
;accountingHost = clustermanager
;accountingPort = 2228
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[GetentOverride]
baseData = tashi.clustermanager.data.Pickled
@@ -110,7 +109,6 @@ registerFrequency = 10.0
clusterManagerHost = localhost
clusterManagerPort = 9882
statsInterval = 0.0
-;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
[Qemu]
qemuBin = /usr/bin/kvm
Modified: incubator/tashi/branches/oldstable/src/tashi/accounting/accounting.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/accounting/accounting.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/accounting/accounting.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/accounting/accounting.py Mon Jul 16 23:52:41 2012
@@ -17,6 +17,8 @@
# specific language governing permissions and limitations
# under the License.
+import os
+import time
import sys
import signal
import logging.config
@@ -26,13 +28,13 @@ from rpyc.utils.server import ThreadedSe
#from rpyc.utils.authenticators import TlsliteVdbAuthenticator
#from tashi.rpycservices.rpyctypes import *
-from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole, signalHandler
+from tashi.util import getConfig, createClient, instantiateImplementation, boolean, debugConsole
import tashi
class Accounting(object):
- def __init__(self, config, cmclient):
+ def __init__(self, config):
self.config = config
- self.cm = cmclient
+ self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
@@ -62,25 +64,43 @@ class Accounting(object):
debugConsole(globals())
- try:
- t.start()
- except KeyboardInterrupt:
- self.handleSIGTERM(signal.SIGTERM, None)
-
- @signalHandler(signal.SIGTERM)
- def handleSIGTERM(self, signalNumber, stackFrame):
- self.log.info('Exiting cluster manager after receiving a SIGINT signal')
+ t.start()
+ # shouldn't exit by itself
sys.exit(0)
def main():
(config, configFiles) = getConfig(["Accounting"])
publisher = instantiateImplementation(config.get("Accounting", "publisher"), config)
tashi.publisher = publisher
- cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- accounting = Accounting(config, cmclient)
+ log = logging.getLogger(__name__)
+ log.info('Using configuration file(s) %s' % configFiles)
+
+ accounting = Accounting(config)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ accounting.initAccountingServer()
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting accounting service after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of accounting service")
+ os._exit(-1)
+
+ log.info("Exiting accounting service after service thread exited")
+ os._exit(-1)
- accounting.initAccountingServer()
+ return
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/oldstable/src/tashi/accounting/accountingservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/accounting/accountingservice.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/accounting/accountingservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/accounting/accountingservice.py Mon Jul 16 23:52:41 2012
@@ -5,15 +5,15 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
-# under the License.
+# under the License.
import logging
import threading
@@ -22,43 +22,43 @@ import time
from tashi import createClient
class AccountingService(object):
- """RPC service for the Accounting service"""
-
- def __init__(self, config):
- self.log = logging.getLogger(__name__)
- self.log.setLevel(logging.INFO)
-
- self.config = config
-
- self.pollSleep = None
-
- # XXXstroucki new python has fallback values
- try:
- self.pollSleep = self.config.getint("AccountingService", "pollSleep")
- except:
- pass
+ """RPC service for the Accounting service"""
- if self.pollSleep is None:
- self.pollSleep = 600
+ def __init__(self, config):
+ self.log = logging.getLogger(__name__)
+ self.log.setLevel(logging.INFO)
- self.cm = createClient(config)
- threading.Thread(target=self.__start).start()
+ self.config = config
+
+ self.pollSleep = None
+
+ # XXXstroucki new python has fallback values
+ try:
+ self.pollSleep = self.config.getint("AccountingService", "pollSleep")
+ except:
+ pass
+
+ if self.pollSleep is None:
+ self.pollSleep = 600
+
+ self.cm = createClient(config)
+ threading.Thread(target=self.__start).start()
# remote
- def record(self, strings):
- for string in strings:
- self.log.info("Remote: %s" % (string))
-
- def __start(self):
- while True:
- try:
- instances = self.cm.getInstances()
- for instance in instances:
- # XXXstroucki this currently duplicates what the CM was doing.
- self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
- except:
- self.log.warning("Accounting iteration failed")
-
-
- # wait to do the next iteration
- time.sleep(self.pollSleep)
+ def record(self, strings):
+ for string in strings:
+ self.log.info("Remote: %s" % (string))
+
+ def __start(self):
+ while True:
+ try:
+ instances = self.cm.getInstances()
+ for instance in instances:
+ # XXXstroucki this currently duplicates what the CM was doing.
+ self.log.info('Accounting: id %s host %s vmId %s user %s cores %s memory %s' % (instance.id, instance.hostId, instance.vmId, instance.userId, instance.cores, instance.memory))
+ except:
+ self.log.warning("Accounting iteration failed")
+
+
+ # wait to do the next iteration
+ time.sleep(self.pollSleep)
Modified: incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py Mon Jul 16 23:52:41 2012
@@ -19,6 +19,7 @@
import time
import logging.config
+import sys
from tashi.rpycservices.rpyctypes import Errors, HostState, InstanceState, TashiException
@@ -26,9 +27,9 @@ from tashi.util import getConfig, create
import tashi
class Primitive(object):
- def __init__(self, config, cmclient):
+ def __init__(self, config):
self.config = config
- self.cm = cmclient
+ self.cm = createClient(config)
self.hooks = []
self.log = logging.getLogger(__file__)
self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -40,10 +41,10 @@ class Primitive(object):
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, cmclient, False))
+ self.hooks.append(instantiateImplementation(value, config, self.cm, False))
except:
self.log.exception("Failed to load hook %s" % (value))
- self.hosts = {}
+ self.hosts = {}
self.load = {}
self.instances = {}
self.muffle = {}
@@ -62,9 +63,9 @@ class Primitive(object):
for h in self.cm.getHosts():
#XXXstroucki get all hosts here?
#if (h.up == True and h.state == HostState.Normal):
- hosts[ctr] = h
- ctr = ctr + 1
- load[h.id] = []
+ hosts[ctr] = h
+ ctr = ctr + 1
+ load[h.id] = []
load[None] = []
_instances = self.cm.getInstances()
@@ -199,7 +200,7 @@ class Primitive(object):
if myDisk == i.disks[0].uri and i.disks[0].persistent == True:
count += 1
if count > 1:
- minMaxHost = None
+ minMaxHost = None
if (minMaxHost):
# found a host
@@ -250,7 +251,7 @@ class Primitive(object):
for i in oldInstances:
# XXXstroucki what about paused and saved VMs?
# XXXstroucki: do we need to look at Held VMs here?
- if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying)):
+ if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying or oldInstances[i].state == InstanceState.ShuttingDown)):
self.log.info("VM exited: %s" % (oldInstances[i].name))
for hook in self.hooks:
hook.postDestroy(oldInstances[i])
@@ -283,10 +284,17 @@ def main():
(config, configFiles) = getConfig(["Agent"])
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
- cmclient = createClient(config)
logging.config.fileConfig(configFiles)
- agent = Primitive(config, cmclient)
- agent.start()
+ agent = Primitive(config)
+
+ try:
+ agent.start()
+ except KeyboardInterrupt:
+ pass
+
+ log = logging.getLogger(__file__)
+ log.info("Primitive exiting")
+ sys.exit(0)
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py Mon Jul 16 23:52:41 2012
@@ -186,12 +186,26 @@ def createMany(instance, count):
instances.append(client.createVm(instance))
return instances
+def shutdownMany(basename):
+ return __shutdownOrDestroyMany("shutdown", basename)
+
def destroyMany(basename):
+ return __shutdownOrDestroyMany("destroy", basename)
+
+def __shutdownOrDestroyMany(method, basename):
instances = client.getInstances()
count = 0
for i in instances:
if (i.name.startswith(basename + "-") and i.name[len(basename)+1].isdigit()):
- client.destroyVm(i.id)
+ if method == "shutdown":
+ client.shutdownVm(i.id)
+
+ elif method == "destroy":
+ client.destroyVm(i.id)
+
+ else:
+ raise ValueError("Unknown method")
+
count = count + 1
if (count == 0):
raise ValueError("That is an unused basename")
@@ -213,6 +227,7 @@ extraViews = {
'copyImage': (None, None),
'createVm': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
+'shutdownMany': (shutdownMany, None),
'destroyMany': (destroyMany, None),
'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
'getInstances': (None, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
@@ -225,6 +240,7 @@ argLists = {
'createMany': [('userId', int, getUser, False), ('basename', str, lambda: requiredArg('basename'), True), ('cores', int, lambda: 1, False), ('memory', int, lambda: 128, False), ('disks', parseDisks, lambda: requiredArg('disks'), True), ('nics', parseNics, randomNetwork, False), ('hints', parseHints, lambda: {}, False), ('count', int, lambda: requiredArg('count'), True)],
'shutdownVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'destroyVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
+'shutdownMany': [('basename', str, lambda: requiredArg('basename'), True)],
'destroyMany': [('basename', str, lambda: requiredArg('basename'), True)],
'suspendVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
'resumeVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
@@ -250,6 +266,7 @@ convertArgs = {
'createMany': '[Instance(d={"userId":userId,"name":basename,"cores":cores,"memory":memory,"disks":disks,"nics":nics,"hints":hints}), count]',
'shutdownVm': '[instance]',
'destroyVm': '[instance]',
+'shutdownMany': '[basename]',
'destroyMany': '[basename]',
'suspendVm': '[instance]',
'resumeVm': '[instance]',
@@ -268,6 +285,7 @@ description = {
'createMany': 'Utility function that creates many VMs with the same set of parameters',
'shutdownVm': 'Attempts to shutdown a VM nicely',
'destroyVm': 'Immediately destroys a VM -- it is the same as unplugging a physical machine and should be used for non-persistent VMs or when all else fails',
+'shutdownMany': 'Attempts to gracefully shut down a group of VMs created with createMany',
'destroyMany': 'Destroys a group of VMs created with createMany',
'suspendVm': 'Suspends a running VM to disk',
'resumeVm': 'Resumes a suspended VM from disk',
@@ -293,6 +311,7 @@ examples = {
'createMany': ['--basename foobar --disks i386-hardy.qcow2 --count 4'],
'shutdownVm': ['--instance 12345', '--instance foobar'],
'destroyVm': ['--instance 12345', '--instance foobar'],
+'shutdownMany': ['--basename foobar'],
'destroyMany': ['--basename foobar'],
'suspendVm': ['--instance 12345', '--instance foobar'],
'resumeVm': ['--instance 12345', '--instance foobar'],
@@ -321,7 +340,8 @@ def usage(func = None):
print "Unknown function %s" % (func)
print
functions = argLists
- print "%s is the client program for Tashi, a system for cloud-computing on BigData." % (os.path.basename(sys.argv[0]))
+ print "%s is the client program for Tashi" % (os.path.basename(sys.argv[0]))
+ print "Tashi, a system for cloud-computing on BigData"
print "Visit http://incubator.apache.org/tashi/ for more information."
print
else:
@@ -507,6 +527,7 @@ def main():
"""Main function for the client program"""
global INDENT, exitCode, client
exitCode = 0
+ exception = None
INDENT = (os.getenv("INDENT", 4))
if (len(sys.argv) < 2):
usage()
@@ -551,25 +572,47 @@ def main():
if (arg.startswith("--")):
if (arg[2:] in possibleArgs):
(parg, conv, default, required) = possibleArgs[arg[2:]]
- val = conv(args.pop(0))
+ try:
+ val = None
+ lookahead = args[0]
+ if not lookahead.startswith("--"):
+ val = args.pop(0)
+ except:
+ pass
+
+ val = conv(val)
if (val == None):
val = default()
vals[parg] = val
continue
+ # somewhat lame, but i don't want to rewrite the fn at this time
+ exception = ValueError("Unknown argument %s" % (arg))
- raise ValueError("Unknown argument %s" % (arg))
-
-
- f = getattr(client, function, None)
+ f = None
+ try:
+ f = extraViews[function][0]
+ except:
+ pass
if (f is None):
- f = extraViews[function][0]
- if (function in convertArgs):
- fargs = eval(convertArgs[function], globals(), vals)
- else:
- fargs = []
- res = f(*fargs)
+ f = getattr(client, function, None)
+
+ try:
+ if exception is not None:
+ raise exception
+
+ if (function in convertArgs):
+ fargs = eval(convertArgs[function], globals(), vals)
+ else:
+ fargs = []
+
+ res = f(*fargs)
+ except Exception, e:
+ print "Failed in calling %s: %s" % (function, e)
+ print "Please run tashi-client --examples for syntax information"
+ sys.exit(-1)
+
if (res != None):
keys = extraViews.get(function, (None, None))[1]
try:
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py Mon Jul 16 23:52:41 2012
@@ -17,11 +17,12 @@
# specific language governing permissions and limitations
# under the License.
+import os
import sys
-import signal
+import time
import logging.config
-from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
+from tashi.util import boolean, instantiateImplementation, getConfig, debugConsole
import tashi
from tashi.rpycservices import rpycservices
@@ -54,19 +55,11 @@ def startClusterManager(config):
t.service._type = 'ClusterManagerService'
debugConsole(globals())
-
- try:
- t.start()
- except KeyboardInterrupt:
- handleSIGTERM(signal.SIGTERM, None)
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
- global log
+ t.start()
+ # shouldn't exit by itself
+ return
- log.info('Exiting cluster manager after receiving a SIGINT signal')
- sys.exit(0)
-
def main():
global log
@@ -80,7 +73,32 @@ def main():
# bind the database
log.info('Starting cluster manager')
- startClusterManager(config)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ startClusterManager(config)
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting cluster manager after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of cluster manager")
+ os._exit(-1)
+
+ log.info("Exiting cluster manager after service thread exited")
+ os._exit(-1)
+
+ return
+
+
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py Mon Jul 16 23:52:41 2012
@@ -19,7 +19,7 @@ import logging
import threading
import time
-from tashi.rpycservices import rpycservices
+from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
from tashi import boolean, ConnectionManager, vmStates, version, scrubString
@@ -36,7 +36,7 @@ class ClusterManagerService(object):
else:
self.username = None
self.password = None
- self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')))
+ self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')), authAndEncrypt=self.authAndEncrypt)
self.dfs = dfs
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
@@ -71,7 +71,7 @@ class ClusterManagerService(object):
try:
if (self.accountingHost is not None) and \
(self.accountingPort is not None):
- self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
+ self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
except:
self.log.exception("Could not init accounting")
@@ -126,7 +126,7 @@ class ClusterManagerService(object):
except:
self.log.exception("Invalid host data")
- secondary = ','.join(filter(None, (hostText, instanceText)))
+ secondary = ','.join(filter(None, (hostText, instanceText)))
line = "%s|%s|%s" % (now, text, secondary)
@@ -271,7 +271,12 @@ class ClusterManagerService(object):
for instanceId in self.instanceLastContactTime.keys():
# XXXstroucki should lock instance here?
- if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+ try:
+ lastContactTime = self.instanceLastContactTime[instanceId]
+ except KeyError:
+ continue
+
+ if (lastContactTime < (self.__now() - self.allowDecayed)):
try:
instance = self.data.acquireInstance(instanceId)
# Don't query non-running VMs. eg. if a VM
@@ -348,7 +353,7 @@ class ClusterManagerService(object):
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
+ self.__stateTransition(instance, None, InstanceState.ShuttingDown)
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SHUTDOWN", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -366,7 +371,7 @@ class ClusterManagerService(object):
self.data.removeInstance(instance)
elif (instance.state is InstanceState.Activating):
self.__ACCOUNT("CM VM DESTROY STARTING", instance=instance)
- self.__stateTransition(instance, InstanceState.Activating, InstanceState.Destroying)
+ self.__stateTransition(instance, None, InstanceState.Destroying)
self.data.releaseInstance(instance)
else:
# XXXstroucki: This is a problem with keeping
@@ -382,7 +387,7 @@ class ClusterManagerService(object):
self.proxy[hostname].destroyVm(instance.vmId)
self.data.releaseInstance(instance)
except:
- self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
+ self.log.warning('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
self.data.removeInstance(instance)
@@ -390,7 +395,12 @@ class ClusterManagerService(object):
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM SUSPEND", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -404,7 +414,12 @@ class ClusterManagerService(object):
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ try:
+ self.__stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
source = "suspend/%d_%s" % (instance.id, instance.name)
instance.hints['__resume_source'] = source
self.data.releaseInstance(instance)
@@ -422,7 +437,13 @@ class ClusterManagerService(object):
except:
self.data.releaseInstance(instance)
raise
- self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.MigratePrep)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
try:
# Prepare the target
@@ -434,7 +455,12 @@ class ClusterManagerService(object):
self.log.exception('prepReceiveVm failed')
raise
instance = self.data.acquireInstance(instance.id)
- self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ try:
+ self.__stateTransition(instance, InstanceState.MigratePrep, InstanceState.MigrateTrans)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
try:
# Send the VM
@@ -458,7 +484,12 @@ class ClusterManagerService(object):
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ try:
+ self.__stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM PAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -468,13 +499,23 @@ class ClusterManagerService(object):
self.log.exception('pauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ try:
+ self.__stateTransition(instance, InstanceState.Pausing, InstanceState.Paused)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
return
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ try:
+ self.__stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
self.__ACCOUNT("CM VM UNPAUSE", instance=instance)
hostname = self.data.getHost(instance.hostId).name
@@ -484,7 +525,12 @@ class ClusterManagerService(object):
self.log.exception('unpauseVm failed on host %s with vmId %d' % (hostname, instance.vmId))
raise
instance = self.data.acquireInstance(instanceId)
- self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ try:
+ self.__stateTransition(instance, InstanceState.Unpausing, InstanceState.Running)
+ except TashiException:
+ self.data.releaseInstance(instance)
+ raise
+
self.data.releaseInstance(instance)
return
@@ -633,7 +679,7 @@ class ClusterManagerService(object):
self.__ACCOUNT("CM VM ACTIVATE", instance=instance)
if ('__resume_source' in instance.hints):
- self.__stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
+ self.__stateTransition(instance, None, InstanceState.Resuming)
else:
# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
#self.__stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
@@ -679,12 +725,12 @@ class ClusterManagerService(object):
self.data.releaseInstance(instance)
return "success"
- def registerHost(self, hostname, memory, cores, version):
- hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
- if alreadyRegistered:
- self.log.info("Host %s is already registered, it was updated now" % hostname)
- else:
- self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
+ def registerHost(self, hostname, memory, cores, version):
+ hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)
+ if alreadyRegistered:
+ self.log.info("Host %s is already registered, it was updated now" % hostname)
+ else:
+ self.log.info("A host was registered - hostname: %s, version: %s, memory: %s, cores: %s" % (hostname, version, memory, cores))
try:
host = self.data.getHost(hostId)
@@ -692,9 +738,9 @@ class ClusterManagerService(object):
except:
self.log.warning("Failed to lookup host %s" % hostId)
- return hostId
+ return hostId
- def unregisterHost(self, hostId):
+ def unregisterHost(self, hostId):
try:
host = self.data.getHost(hostId)
self.__ACCOUNT("CM HOST UNREGISTER", host=host)
@@ -702,9 +748,9 @@ class ClusterManagerService(object):
self.log.warning("Failed to lookup host %s" % hostId)
return
- self.data.unregisterHost(hostId)
- self.log.info("Host %s was unregistered" % hostId)
- return
+ self.data.unregisterHost(hostId)
+ self.log.info("Host %s was unregistered" % hostId)
+ return
# service thread
def __monitorCluster(self):
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/fromconfig.py Mon Jul 16 23:52:41 2012
@@ -16,16 +16,18 @@
# under the License.
from __future__ import with_statement
+import logging
import threading
import os
import ConfigParser
-from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState
+from tashi.rpycservices.rpyctypes import Host, Network, User, TashiException, Errors, HostState, Instance
from tashi.clustermanager.data import DataInterface
class FromConfig(DataInterface):
def __init__(self, config):
DataInterface.__init__(self, config)
+ self.log = logging.getLogger(__name__)
self.hosts = {}
self.instances = {}
self.networks = {}
@@ -78,6 +80,10 @@ class FromConfig(DataInterface):
return instanceId
def registerInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.acquireLock(self.instanceLock)
try:
if (instance.id is not None and instance.id not in self.instances):
@@ -107,6 +113,10 @@ class FromConfig(DataInterface):
return instance
def releaseInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
try:
if (instance.id not in self.instances): # MPR: should never be true, but good to check
raise TashiException(d={'errno':Errors.NoSuchInstanceId,'msg':"No such instanceId - %d" % (instance.id)})
@@ -114,6 +124,10 @@ class FromConfig(DataInterface):
self.releaseLock(instance._lock)
def removeInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.acquireLock(self.instanceLock)
try:
del self.instances[instance.id]
@@ -122,6 +136,10 @@ class FromConfig(DataInterface):
self.releaseLock(self.instanceLock)
def acquireHost(self, hostId):
+ if type(hostId) is not int:
+ self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+ raise TypeError
+
self.hostLock.acquire()
host = self.hosts.get(hostId, None)
if (host is None):
@@ -134,6 +152,10 @@ class FromConfig(DataInterface):
def releaseHost(self, host):
+ if type(host) is not Host:
+ self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+ raise TypeError
+
try:
if (host.id not in self.hosts): # MPR: should never be true, but good to check
raise TashiException(d={'errno':Errors.NoSuchHostId,'msg':"No such hostId - %s" % (host.id)})
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py Mon Jul 16 23:52:41 2012
@@ -15,16 +15,18 @@
# specific language governing permissions and limitations
# under the License.
+import logging
import subprocess
import time
import os
-from tashi.rpycservices.rpyctypes import User, LocalImages
+from tashi.rpycservices.rpyctypes import User, LocalImages, Instance, Host
from tashi.clustermanager.data import DataInterface
from tashi.util import instantiateImplementation, humanReadable
class GetentOverride(DataInterface):
def __init__(self, config):
DataInterface.__init__(self, config)
+ self.log = logging.getLogger(__name__)
self.baseDataObject = instantiateImplementation(config.get("GetentOverride", "baseData"), config)
self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
@@ -33,21 +35,41 @@ class GetentOverride(DataInterface):
self.fetchThreshold = float(config.get("GetentOverride", "fetchThreshold"))
def registerInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
return self.baseDataObject.registerInstance(instance)
def acquireInstance(self, instanceId):
return self.baseDataObject.acquireInstance(instanceId)
def releaseInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
return self.baseDataObject.releaseInstance(instance)
def removeInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
return self.baseDataObject.removeInstance(instance)
def acquireHost(self, hostId):
+ if type(hostId) is not int:
+ self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+ raise TypeError
+
return self.baseDataObject.acquireHost(hostId)
def releaseHost(self, host):
+ if type(host) is not Host:
+ self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+ raise TypeError
+
return self.baseDataObject.releaseHost(host)
def getHosts(self):
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py Mon Jul 16 23:52:41 2012
@@ -72,16 +72,16 @@ class LdapOverride(DataInterface):
def getNetwork(self, id):
return self.baseDataObject.getNetwork(id)
- def getImages(self):
- count = 0
- myList = []
- for i in self.dfs.list("images"):
- myFile = self.dfs.getLocalHandle("images/" + i)
- if os.path.isfile(myFile):
- image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
- myList.append(image)
- count += 1
- return myList
+ def getImages(self):
+ count = 0
+ myList = []
+ for i in self.dfs.list("images"):
+ myFile = self.dfs.getLocalHandle("images/" + i)
+ if os.path.isfile(myFile):
+ image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+ myList.append(image)
+ count += 1
+ return myList
def fetchFromLdap(self):
now = time.time()
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/pickled.py Mon Jul 16 23:52:41 2012
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+import logging
import cPickle
import os
import threading
@@ -24,6 +25,7 @@ from tashi.clustermanager.data import Fr
class Pickled(FromConfig):
def __init__(self, config):
DataInterface.__init__(self, config)
+ self.log = logging.getLogger(__name__)
self.file = self.config.get("Pickled", "file")
self.locks = {}
self.lockNames = {}
Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py Mon Jul 16 23:52:41 2012
@@ -130,6 +130,10 @@ class SQL(DataInterface):
return h
def registerInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.instanceLock.acquire()
try:
if (instance.id is not None and instance.id not in self.getInstances()):
@@ -173,6 +177,10 @@ class SQL(DataInterface):
return instance
def releaseInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.instanceLock.acquire()
try:
l = self.makeInstanceList(instance)
@@ -191,6 +199,10 @@ class SQL(DataInterface):
self.instanceLock.release()
def removeInstance(self, instance):
+ if type(instance) is not Instance:
+ self.log.exception("Argument is not of type Instance, but of type %s" % (type(instance)))
+ raise TypeError
+
self.instanceLock.acquire()
try:
self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
@@ -205,6 +217,10 @@ class SQL(DataInterface):
self.instanceLock.release()
def acquireHost(self, hostId):
+ if type(hostId) is not int:
+ self.log.exception("Argument is not of type int, but of type %s" % (type(hostId)))
+ raise TypeError
+
host = self.getHost(hostId)
self.hostLock.acquire()
self.hostLocks[host.id] = self.hostLocks.get(host.id, threading.Lock())
@@ -214,6 +230,10 @@ class SQL(DataInterface):
return host
def releaseHost(self, host):
+ if type(host) is not Host:
+ self.log.exception("Argument is not of type Host, but of type %s" % (type(host)))
+ raise TypeError
+
l = self.makeHostList(host)
s = ""
for e in range(0, len(self.hostOrder)):
@@ -284,16 +304,17 @@ class SQL(DataInterface):
network = Network(d={'id':r[0], 'name':r[1]})
return network
- def getImages(self):
- count = 0
- myList = []
- for i in self.dfs.list("images"):
- myFile = self.dfs.getLocalHandle("images/" + i)
- if os.path.isfile(myFile):
- image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
- myList.append(image)
- count += 1
- return myList
+ def getImages(self):
+ count = 0
+ myList = []
+ for i in self.dfs.list("images"):
+ myFile = self.dfs.getLocalHandle("images/" + i)
+ if os.path.isfile(myFile):
+ image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+ myList.append(image)
+ count += 1
+
+ return myList
def getUsers(self):
cur = self.executeStatement("SELECT * from users")
Modified: incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/connectionmanager.py Mon Jul 16 23:52:41 2012
@@ -16,14 +16,16 @@
# under the License.
from tashi.rpycservices import rpycservices
+from tashi import Connection
#from tashi.rpycservices.rpyctypes import *
class ConnectionManager(object):
- def __init__(self, username, password, port, timeout=10000.0):
+ def __init__(self, username, password, port, timeout=10000.0, authAndEncrypt=False):
self.username = username
self.password = password
self.timeout = timeout
self.port = port
+ self.authAndEncrypt = authAndEncrypt
def __getitem__(self, hostname):
port = self.port
@@ -31,4 +33,4 @@ class ConnectionManager(object):
port = hostname[1]
hostname = hostname[0]
- return rpycservices.client(hostname, port, username=self.username, password=self.password)
+ return Connection(hostname, port, credentials=(self.username, self.password), authAndEncrypt=self.authAndEncrypt)
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py Mon Jul 16 23:52:41 2012
@@ -20,8 +20,10 @@
import logging.config
import signal
import sys
+import os
+import time
-from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
+from tashi.util import instantiateImplementation, getConfig, debugConsole
import tashi
from tashi import boolean
@@ -29,12 +31,8 @@ from tashi.rpycservices import rpycservi
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import TlsliteVdbAuthenticator
-@signalHandler(signal.SIGTERM)
-def handleSIGTERM(signalNumber, stackFrame):
- sys.exit(0)
-
def main():
- global config, dfs, vmm, service, server, log, notifier
+ global config, log
(config, configFiles) = getConfig(["NodeManager"])
publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
@@ -42,6 +40,35 @@ def main():
logging.config.fileConfig(configFiles)
log = logging.getLogger(__name__)
log.info('Using configuration file(s) %s' % configFiles)
+
+ # handle keyboard interrupts (http://code.activestate.com/recipes/496735-workaround-for-missed-sigint-in-multithreaded-prog/)
+ child = os.fork()
+
+ if child == 0:
+ startNodeManager()
+ # shouldn't exit by itself
+ sys.exit(0)
+
+ else:
+ # main
+ try:
+ os.waitpid(child, 0)
+ except KeyboardInterrupt:
+ log.info("Exiting node manager after receiving a SIGINT signal")
+ os._exit(0)
+ except Exception:
+ log.exception("Abnormal termination of node manager")
+ os._exit(-1)
+
+ log.info("Exiting node manager after service thread exited")
+ os._exit(-1)
+
+ return
+
+def startNodeManager():
+ global config, dfs, vmm, service, server, log, notifier
+ publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
+ tashi.publisher = publisher
dfs = instantiateImplementation(config.get("NodeManager", "dfs"), config)
vmm = instantiateImplementation(config.get("NodeManager", "vmm"), config, dfs, None)
service = instantiateImplementation(config.get("NodeManager", "service"), config, vmm)
@@ -59,14 +86,11 @@ def main():
t.service._type = 'NodeManagerService'
debugConsole(globals())
-
- try:
- t.start()
- except KeyboardInterrupt:
- handleSIGTERM(signal.SIGTERM, None)
- except Exception, e:
- sys.stderr.write(str(e) + "\n")
- sys.exit(-1)
+
+ t.start()
+ # shouldn't exit by itself
+ sys.exit(0)
+
if __name__ == "__main__":
main()
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py Mon Jul 16 23:52:41 2012
@@ -5,15 +5,15 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
-# under the License.
+# under the License.
import logging
import socket
@@ -28,10 +28,10 @@ import tashi
class NodeManagerService(object):
"""RPC handler for the NodeManager
-
- Perhaps in the future I can hide the dfs from the
+
+ Perhaps in the future I can hide the dfs from the
VmControlInterface and do all dfs operations here?"""
-
+
def __init__(self, config, vmm):
self.config = config
self.vmm = vmm
@@ -76,6 +76,8 @@ class NodeManagerService(object):
self.__registerHost()
+ # XXXstroucki: should make an effort to retry
+ # otherwise vmm will wait forever
self.id = self.cm.registerNodeManager(self.host, self.instances.values())
# XXXstroucki cut cross check for NM/VMM state
@@ -83,18 +85,18 @@ class NodeManagerService(object):
# start service threads
threading.Thread(target=self.__registerWithClusterManager).start()
threading.Thread(target=self.__statsThread).start()
-
+
def __initAccounting(self):
- self.accountBuffer = []
- self.accountLines = 0
- self.accountingClient = None
- try:
- if (self.accountingHost is not None) and \
- (self.accountingPort is not None):
- self.accountingClient=rpycservices.client(self.accountingHost, self.accountingPort)
- except:
- self.log.exception("Could not init accounting")
-
+ self.accountBuffer = []
+ self.accountLines = 0
+ self.accountingClient = None
+ try:
+ if (self.accountingHost is not None) and \
+ (self.accountingPort is not None):
+ self.accountingClient = ConnectionManager(self.username, self.password, self.accountingPort)[self.accountingHost]
+ except:
+ self.log.exception("Could not init accounting")
+
def __loadVmInfo(self):
try:
self.instances = self.vmm.getInstances()
@@ -112,6 +114,8 @@ class NodeManagerService(object):
notifyCM = []
try:
while (len(self.notifyCM) > 0):
+ # XXXstroucki ValueError: need more than 1 value to unpack
+ # observed here. How?
value = self.notifyCM.pop(0)
(instanceId, newInst, old, success) = value
try:
@@ -135,7 +139,7 @@ class NodeManagerService(object):
#if (toSleep > 0):
#time.sleep(toSleep)
- def __ACCOUNTFLUSH(self):
+ def __ACCOUNTFLUSH(self):
try:
if (self.accountingClient is not None):
self.accountingClient.record(self.accountBuffer)
@@ -145,33 +149,33 @@ class NodeManagerService(object):
self.log.exception("Failed to flush accounting data")
- def __ACCOUNT(self, text, instance=None, host=None):
- now = time.time()
- instanceText = None
- hostText = None
+ def __ACCOUNT(self, text, instance=None, host=None):
+ now = time.time()
+ instanceText = None
+ hostText = None
- if instance is not None:
+ if instance is not None:
try:
- instanceText = 'Instance(%s)' % (instance)
+ instanceText = 'Instance(%s)' % (instance)
except:
self.log.exception("Invalid instance data")
- if host is not None:
+ if host is not None:
try:
- hostText = "Host(%s)" % (host)
+ hostText = "Host(%s)" % (host)
except:
self.log.exception("Invalid host data")
- secondary = ','.join(filter(None, (hostText, instanceText)))
+ secondary = ','.join(filter(None, (hostText, instanceText)))
- line = "%s|%s|%s" % (now, text, secondary)
+ line = "%s|%s|%s" % (now, text, secondary)
- self.accountBuffer.append(line)
- self.accountLines += 1
+ self.accountBuffer.append(line)
+ self.accountLines += 1
# XXXstroucki think about force flush every so often
- if (self.accountLines > 0):
- self.__ACCOUNTFLUSH()
+ if (self.accountLines > 0):
+ self.__ACCOUNTFLUSH()
# service thread function
@@ -213,14 +217,14 @@ class NodeManagerService(object):
self.log.exception('statsThread threw an exception')
time.sleep(self.statsInterval)
- def __registerHost(self):
- hostname = socket.gethostname()
+ def __registerHost(self):
+ hostname = socket.gethostname()
# populate some defaults
# XXXstroucki: I think it's better if the nodemanager fills these in properly when registering with the clustermanager
memory = 0
cores = 0
version = "empty"
- #self.cm.registerHost(hostname, memory, cores, version)
+ #self.cm.registerHost(hostname, memory, cores, version)
def __getInstance(self, vmId):
instance = self.instances.get(vmId, None)
@@ -235,7 +239,7 @@ class NodeManagerService(object):
raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
-
+
# remote
# Called from VMM to update self.instances
# but only changes are Exited, MigrateTrans and Running
@@ -252,11 +256,11 @@ class NodeManagerService(object):
# make a note of mismatch, but go on.
# the VMM should know best
self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
-
+
instance.state = cur
self.__ACCOUNT("NM VM STATE CHANGE", instance=instance)
-
+
newInst = Instance(d={'state':cur})
success = lambda: None
# send the state change up to the CM
@@ -278,8 +282,8 @@ class NodeManagerService(object):
def createInstance(self, instance):
vmId = instance.vmId
self.instances[vmId] = instance
-
-
+
+
# remote
def instantiateVm(self, instance):
self.__ACCOUNT("NM VM INSTANTIATE", instance=instance)
@@ -291,7 +295,7 @@ class NodeManagerService(object):
return vmId
except:
self.log.exception("Failed to start instance")
-
+
# remote
def suspendVm(self, vmId, destination):
instance = self.__getInstance(vmId)
@@ -300,10 +304,12 @@ class NodeManagerService(object):
instance.state = InstanceState.Suspending
self.instances[vmId] = instance
threading.Thread(target=self.vmm.suspendVm, args=(vmId, destination)).start()
-
+
# called by resumeVm as thread
def __resumeVmHelper(self, instance, name):
self.vmm.resumeVmHelper(instance, name)
+ # XXXstroucki should the VMM be responsible for setting
+ # state? It should know better.
instance.state = InstanceState.Running
newInstance = Instance(d={'id':instance.id,'state':instance.state})
success = lambda: None
@@ -323,7 +329,7 @@ class NodeManagerService(object):
self.log.exception('resumeVm failed')
raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
return instance.vmId
-
+
# remote
def prepReceiveVm(self, instance, source):
self.__ACCOUNT("NM VM MIGRATE RECEIVE PREP")
@@ -353,7 +359,7 @@ class NodeManagerService(object):
self.instances[vmId] = instance
threading.Thread(target=self.__migrateVmHelper, args=(instance, target, transportCookie)).start()
return
-
+
# called by receiveVm as thread
# XXXstroucki migrate in?
def __receiveVmHelper(self, instance, transportCookie):
@@ -429,4 +435,3 @@ class NodeManagerService(object):
# remote
def liveCheck(self):
return "alive"
-
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py Mon Jul 16 23:52:41 2012
@@ -132,9 +132,7 @@ class Qemu(VmControlInterface):
def __getHostPids(self):
"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
pids = []
- real_bin = self.QEMU_BIN
- while os.path.islink(real_bin):
- real_bin = os.readlink(self.QEMU_BIN)
+ real_bin = os.path.realpath(self.QEMU_BIN)
for f in os.listdir("/proc"):
try:
@@ -210,7 +208,7 @@ class Qemu(VmControlInterface):
if self.scratchVg is not None:
log.info("Removing any scratch for %s" % (name))
cmd = "/sbin/lvremove --quiet -f %s" % self.scratchVg
- result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
+ result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE, stderr=open(os.devnull, "w"), close_fds=True).wait()
except:
log.warning("Problem cleaning scratch volumes")
pass
@@ -323,12 +321,12 @@ class Qemu(VmControlInterface):
#print "[NEE]: %s" % (needle)
(rlist, wlist, xlist) = select.select([monitorFd], [], [], timeout)
if (len(rlist) == 0):
- log.error("Timeout getting results from monitor for vmId %d" % (child.pid))
+ log.error("Timeout getting results from monitor on FD %s for vmId %d" % (monitorFd, child.pid))
child.errorBit = True
raise RuntimeError
c = os.read(monitorFd, 1)
if (c == ""):
- log.error("Early termination on monitor for vmId %d" % (child.pid))
+ log.error("Early termination on monitor FD %s for vmId %d" % (monitorFd, child.pid))
child.errorBit = True
raise RuntimeError
buf = buf + c
@@ -504,8 +502,14 @@ class Qemu(VmControlInterface):
nicModel = self.__stripSpace(nicModel)
nicString = ""
+ nicNetworks = {}
for i in range(0, len(instance.nics)):
+ # Don't allow more than one interface per vlan
nic = instance.nics[i]
+ if nicNetworks.has_key(nic.network):
+ continue
+ nicNetworks[nic.network] = True
+
nicString = nicString + "-net nic,macaddr=%s,model=%s,vlan=%d -net tap,ifname=%s%d.%d,vlan=%d,script=/etc/qemu-ifup.%d " % (nic.mac, nicModel, nic.network, self.ifPrefix, instance.id, i, nic.network, nic.network)
# ACPI
@@ -645,7 +649,8 @@ class Qemu(VmControlInterface):
# extern
def resumeVmHelper(self, instance, source):
- child = self.__getChildFromPid(instance.vmId)
+ vmId = instance.vmId
+ child = self.__getChildFromPid(vmId)
try:
self.__getPtyInfo(child, True)
except RuntimeError:
@@ -654,8 +659,13 @@ class Qemu(VmControlInterface):
raise
status = "paused"
while ("running" not in status):
- status = self.__enterCommand(child, "info status")
- time.sleep(1)
+ try:
+ status = self.__enterCommand(child, "info status")
+ except RuntimeError:
+ pass
+ time.sleep(60)
+
+ self.nm.vmStateChange(vmId, None, InstanceState.Running)
child.instance.state = InstanceState.Running
self.__saveChildInfo(child)
@@ -846,11 +856,63 @@ class Qemu(VmControlInterface):
def listVms(self):
return self.controlledVMs.keys()
+ def __processVmStats(self, vmId):
+ try:
+ f = open("/proc/%d/stat" % (vmId))
+ procData = f.read()
+ f.close()
+ except:
+ log.warning("Unable to get data for instance %d" % vmId)
+ return
+
+ ws = procData.strip().split()
+ userTicks = float(ws[13])
+ sysTicks = float(ws[14])
+ myTicks = userTicks + sysTicks
+ vsize = (int(ws[22]))/1024.0/1024.0
+ rss = (int(ws[23])*4096)/1024.0/1024.0
+ cpuSeconds = myTicks/self.ticksPerSecond
+ # XXXstroucki be more exact here?
+ last = time.time() - self.statsInterval
+ lastCpuSeconds = self.cpuStats.get(vmId, cpuSeconds)
+ if lastCpuSeconds is None:
+ lastCpuSeconds = cpuSeconds
+ cpuLoad = (cpuSeconds - lastCpuSeconds)/(time.time() - last)
+ self.cpuStats[vmId] = cpuSeconds
+ try:
+ child = self.controlledVMs[vmId]
+ except:
+ log.warning("Unable to obtain information on instance %d" % vmId)
+ return
+
+ (recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
+ for i in range(0, len(child.instance.nics)):
+ netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
+ (tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = self.netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
+ (recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
+ self.stats[vmId] = self.stats.get(vmId, {})
+ child = self.controlledVMs.get(vmId, None)
+ if (child):
+ res = self.__enterCommand(child, "info blockstats")
+ for l in res.split("\n"):
+ (device, sep, data) = stringPartition(l, ": ")
+ if (data != ""):
+ for field in data.split(" "):
+ (label, sep, val) = stringPartition(field, "=")
+ if (val != ""):
+ self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s' % (device, label), 0)))/self.statsInterval
+ self.stats[vmId]['%s_%s' % (device, label)] = int(val)
+ self.stats[vmId]['cpuLoad'] = cpuLoad
+ self.stats[vmId]['rss'] = rss
+ self.stats[vmId]['vsize'] = vsize
+ self.stats[vmId]['recvMBs'] = sendMBs
+ self.stats[vmId]['sendMBs'] = recvMBs
+
# thread
def statsThread(self):
- ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
- netStats = {}
- cpuStats = {}
+ self.ticksPerSecond = float(os.sysconf('SC_CLK_TCK'))
+ self.netStats = {}
+ self.cpuStats = {}
# XXXstroucki be more exact here?
last = time.time() - self.statsInterval
while True:
@@ -866,7 +928,7 @@ class Qemu(VmControlInterface):
ws = ld.split()
recvBytes = float(ws[0])
sendBytes = float(ws[8])
- (recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
+ (recvMBs, sendMBs, lastRecvBytes, lastSendBytes) = self.netStats.get(dev, (0.0, 0.0, recvBytes, sendBytes))
if (recvBytes < lastRecvBytes):
# We seem to have overflowed
# XXXstroucki How likely is this to happen?
@@ -882,44 +944,12 @@ class Qemu(VmControlInterface):
lastSendBytes = lastSendBytes - 2**32
recvMBs = (recvBytes-lastRecvBytes)/(now-last)/1024.0/1024.0
sendMBs = (sendBytes-lastSendBytes)/(now-last)/1024.0/1024.0
- netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
+ self.netStats[dev] = (recvMBs, sendMBs, recvBytes, sendBytes)
+
+
for vmId in self.controlledVMs:
- f = open("/proc/%d/stat" % (vmId))
- procData = f.read()
- f.close()
- ws = procData.strip().split()
- userTicks = float(ws[13])
- sysTicks = float(ws[14])
- myTicks = userTicks + sysTicks
- vsize = (int(ws[22]))/1024.0/1024.0
- rss = (int(ws[23])*4096)/1024.0/1024.0
- cpuSeconds = myTicks/ticksPerSecond
- lastCpuSeconds = cpuStats.get(vmId, cpuSeconds)
- cpuLoad = (cpuSeconds - lastCpuSeconds)/(now - last)
- cpuStats[vmId] = cpuSeconds
- child = self.controlledVMs[vmId]
- (recvMBs, sendMBs, recvBytes, sendBytes) = (0.0, 0.0, 0.0, 0.0)
- for i in range(0, len(child.instance.nics)):
- netDev = "%s%d.%d" % (self.ifPrefix, child.instance.id, i)
- (tmpRecvMBs, tmpSendMBs, tmpRecvBytes, tmpSendBytes) = netStats.get(netDev, (0.0, 0.0, 0.0, 0.0))
- (recvMBs, sendMBs, recvBytes, sendBytes) = (recvMBs + tmpRecvMBs, sendMBs + tmpSendMBs, recvBytes + tmpRecvBytes, sendBytes + tmpSendBytes)
- self.stats[vmId] = self.stats.get(vmId, {})
- child = self.controlledVMs.get(vmId, None)
- if (child):
- res = self.__enterCommand(child, "info blockstats")
- for l in res.split("\n"):
- (device, sep, data) = stringPartition(l, ": ")
- if (data != ""):
- for field in data.split(" "):
- (label, sep, val) = stringPartition(field, "=")
- if (val != ""):
- self.stats[vmId]['%s_%s_per_s' % (device, label)] = (float(val) - float(self.stats[vmId].get('%s_%s' % (device, label), 0)))/self.statsInterval
- self.stats[vmId]['%s_%s' % (device, label)] = int(val)
- self.stats[vmId]['cpuLoad'] = cpuLoad
- self.stats[vmId]['rss'] = rss
- self.stats[vmId]['vsize'] = vsize
- self.stats[vmId]['recvMBs'] = sendMBs
- self.stats[vmId]['sendMBs'] = recvMBs
+ self.__processVmStats(vmId)
+
except:
log.exception("statsThread threw an exception")
last = now
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py Mon Jul 16 23:52:41 2012
@@ -28,8 +28,8 @@ class VmControlInterface(object):
self.dfs = dfs
self.nm = nm
- def getInstances(self):
- """Will return a dict of instances by vmId to the caller"""
+ def getInstances(self):
+ """Will return a dict of instances by vmId to the caller"""
raise NotImplementedError
def instantiateVm(self, instance):
Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py Mon Jul 16 23:52:41 2012
@@ -28,7 +28,7 @@ from vmcontrolinterface import VmControl
from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
from tashi.rpycservices.rpyctypes import Instance, Host
from tashi import boolean, convertExceptions, ConnectionManager, version
-from tashi.util import isolatedRPC, broken
+from tashi.util import broken
import tashi.parallel
from tashi.parallel import synchronized, synchronizedmethod
@@ -168,7 +168,7 @@ class XenPV(VmControlInterface, threadin
vmType = hints.get('vmtype', self.defaultVmType)
print 'starting vm with type: ', vmType
- disk0 = 'tap:%s' % self.disktype
+ disk0 = 'tap:%s' % self.disktype
diskU = 'xvda1'
try:
@@ -313,10 +313,10 @@ extra='xencons=tty'
@synchronizedmethod
def instantiateVm(self, instance):
- try:
- disktype = self.config.get('XenPV', 'defaultDiskType')
- except:
- disktype = 'vhd'
+ try:
+ disktype = self.config.get('XenPV', 'defaultDiskType')
+ except:
+ disktype = 'vhd'
# FIXME: this is NOT the right way to get out hostId
self.hostId = instance.hostId
@@ -346,6 +346,8 @@ extra='xencons=tty'
instance.disks[i].local = newdisk
+ # XXXstroucki if ever supporting multiple nics,
+ # ensure more than one isn't put on the same network.
fn = self.createXenConfig(name,
instance.disks[0].local,
instance.nics[0].mac,
Modified: incubator/tashi/branches/oldstable/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/util.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/util.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/util.py Mon Jul 16 23:52:41 2012
@@ -27,6 +27,7 @@ import time
import traceback
import types
import getpass
+import functools
from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
@@ -148,14 +149,6 @@ class reference(object):
def __delattr__(self, name):
return delattr(self.__dict__['__real_obj__'], name)
-def isolatedRPC(client, method, *args, **kw):
- """Opens and closes a thrift transport for a single RPC call"""
- if (not client._iprot.trans.isOpen()):
- client._iprot.trans.open()
- res = getattr(client, method)(*args, **kw)
- client._iprot.trans.close()
- return res
-
def signalHandler(signalNumber):
"""Used to denote a particular function as the signal handler for a
specific signal"""
@@ -192,7 +185,7 @@ def instantiateImplementation(className,
def convertExceptions(oldFunc):
"""This converts any exception type into a TashiException so that
- it can be passed over a Thrift RPC"""
+ it can be passed over an RPC"""
def newFunc(*args, **kw):
try:
return oldFunc(*args, **kw)
@@ -218,20 +211,33 @@ def getConfig(additionalNames=[], additi
raise Exception("No config file could be found: %s" % (str(allLocations)))
return (config, configFiles)
+def __getShellFn():
+ try:
+ from IPython.Shell import IPShellEmbed
+ return (1, IPShellEmbed)
+ except ImportError:
+ import IPython
+ return (2, IPython.embed)
+
def debugConsole(globalDict):
"""A debugging console that optionally uses pysh"""
def realDebugConsole(globalDict):
+ import os
try :
import atexit
- from IPython.Shell import IPShellEmbed
+ (calltype, shellfn) = __getShellFn()
def resetConsole():
# XXXpipe: make input window sane
(stdin, stdout) = os.popen2("reset")
stdout.read()
- dbgshell = IPShellEmbed()
atexit.register(resetConsole)
- dbgshell(local_ns=globalDict, global_ns=globalDict)
- except Exception:
+ if calltype == 1:
+ dbgshell=shellfn(user_ns=globalDict)
+ dbgshell()
+ elif calltype == 2:
+ dbgshell=shellfn
+ dbgshell(user_ns=globalDict)
+ except Exception, e:
CONSOLE_TEXT=">>> "
input = " "
while (input != ""):
@@ -241,6 +247,9 @@ def debugConsole(globalDict):
exec(input) in globalDict
except Exception, e:
sys.stdout.write(str(e) + "\n")
+
+ os._exit(0)
+
if (os.getenv("DEBUG", "0") == "1"):
threading.Thread(target=lambda: realDebugConsole(globalDict)).start()
@@ -260,6 +269,68 @@ def scrubString(s, allowed="ABCDEFGHIJKL
ns = ns + c
return ns
+class Connection:
+ def __init__(self, host, port, authAndEncrypt=False, credentials=None):
+ self.host = host
+ self.port = port
+ self.credentials = credentials
+ self.authAndEncrypt = authAndEncrypt
+ self.connection = None
+ # XXXstroucki some thing may still depend on this (client)
+ self.username = None
+ if credentials is not None:
+ self.username = credentials[0]
+
+ def __connect(self):
+ # create new connection
+
+ username = None
+ password = None
+
+ if self.credentials is not None:
+ username = self.credentials[0]
+ password = self.credentials[1]
+
+ if self.authAndEncrypt:
+ if username is None:
+ username = raw_input("Enter Username:")
+
+ if password is None:
+ password = raw_input("Enter Password:")
+
+ if self.credentials != (username, password):
+ self.credentials = (username, password)
+
+ client = rpycservices.client(self.host, self.port, username=username, password=password)
+ else:
+ client = rpycservices.client(self.host, self.port)
+
+ self.connection = client
+
+
+ def __do(self, name, *args, **kwargs):
+ if self.connection is None:
+ self.__connect()
+
+ remotefn = getattr(self.connection, name, None)
+
+ try:
+ if callable(remotefn):
+ returns = remotefn(*args, **kwargs)
+
+ else:
+ raise TashiException({'msg':'%s not callable' % name})
+
+ except:
+ self.connection = None
+ raise
+
+ return returns
+
+ def __getattr__(self, name):
+ return functools.partial(self.__do, name)
+
+
def createClient(config):
cfgHost = config.get('Client', 'clusterManagerHost')
cfgPort = config.get('Client', 'clusterManagerPort')
@@ -273,14 +344,12 @@ def createClient(config):
authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
if authAndEncrypt:
username = config.get('AccessClusterManager', 'username')
- if username == '':
- username = raw_input('Enter Username:')
password = config.get('AccessClusterManager', 'password')
- if password == '':
- password = getpass.getpass('Enter Password:')
- client = rpycservices.client(host, port, username=username, password=password)
+ client = Connection(host, port, authAndEncrypt, (username, password))
+
else:
- client = rpycservices.client(host, port)
+ client = Connection(host, port)
+
return client
def enumToStringDict(cls):
Modified: incubator/tashi/branches/oldstable/src/tashi/version.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/version.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/version.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/version.py Mon Jul 16 23:52:41 2012
@@ -15,4 +15,4 @@
# specific language governing permissions and limitations
# under the License.
-version = "201202"
+version = "201203"
Modified: incubator/tashi/branches/oldstable/src/utils/nmd.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/utils/nmd.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/utils/nmd.py (original)
+++ incubator/tashi/branches/oldstable/src/utils/nmd.py Mon Jul 16 23:52:41 2012
@@ -16,9 +16,10 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
*/
"""
+# XXXstroucki: why not use something like supervise instead?
import os
import sys
@@ -36,81 +37,81 @@ LOG_FILE="/var/log/nodemanager.log"
*/
"""
def make_invincible():
- # dependent on linux
- try:
- oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
- except IOError:
- pass
- else:
- os.write(oom_adj_fd, "-17\n")
- os.close(oom_adj_fd)
+ # dependent on linux
+ try:
+ oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
+ except IOError:
+ pass
+ else:
+ os.write(oom_adj_fd, "-17\n")
+ os.close(oom_adj_fd)
"""
/* This function resets (on Linux!) its oom scoring to default
*/
"""
def make_vulnerable():
- # dependent on linux
- try:
- oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
- except IOError:
- pass
- else:
- os.write(oom_adj_fd, "0\n")
- os.close(oom_adj_fd)
+ # dependent on linux
+ try:
+ oom_adj_fd = os.open("/proc/self/oom_adj", os.O_WRONLY)
+ except IOError:
+ pass
+ else:
+ os.write(oom_adj_fd, "0\n")
+ os.close(oom_adj_fd)
def main(argv=None):
- if argv is None:
- argv = sys.argv
- try:
- opts, args = getopt.getopt(argv[1:], "f", ["foreground"])
- except getopt.GetoptError, err:
- # print help information and exit:
- print str(err) # will print something like "option -a not recognized"
- # usage()
- return 2
- foreground = False
- for o, a in opts:
- if o in ("-f", "--foreground"):
- foreground = True
- else:
- assert False, "unhandled option"
- if foreground == False:
- pid = os.fork();
- if pid != 0:
- os._exit(0)
- os.close(0)
- os.close(1)
- os.close(2)
-
- # adjust oom preference
- make_invincible()
-
- # configure environment of children
- env = {"PYTHONPATH":TASHI_PATH+"/src"}
- while True:
- pid = os.fork();
- if pid == 0:
- # child
- # nodemanagers are vulnerable, not the supervisor
- make_vulnerable()
- if foreground == False:
- try:
- lfd = os.open(LOG_FILE, os.O_APPEND|os.O_CREAT|os.O_WRONLY)
- except IOError:
- lfd = os.open("/dev/null", os.O_WRONLY)
- # make this fd stdout and stderr
- os.dup2(lfd, 1)
- os.dup2(lfd, 2)
- # close stdin
- os.close(0)
- os.chdir(TASHI_PATH)
- os.execle("./bin/nodemanager.py", "./bin/nodemanager.py", env)
- os._exit(-1)
- # sleep before checking child status
- time.sleep(SLEEP_INTERVAL)
- os.waitpid(pid, 0)
- return 0
+ if argv is None:
+ argv = sys.argv
+ try:
+ opts, args = getopt.getopt(argv[1:], "f", ["foreground"])
+ except getopt.GetoptError, err:
+ # print help information and exit:
+ print str(err) # will print something like "option -a not recognized"
+ # usage()
+ return 2
+ foreground = False
+ for o, a in opts:
+ if o in ("-f", "--foreground"):
+ foreground = True
+ else:
+ assert False, "unhandled option"
+ if foreground == False:
+ pid = os.fork();
+ if pid != 0:
+ os._exit(0)
+ os.close(0)
+ os.close(1)
+ os.close(2)
+
+ # adjust oom preference
+ make_invincible()
+
+ # configure environment of children
+ env = {"PYTHONPATH":TASHI_PATH+"/src"}
+ while True:
+ pid = os.fork();
+ if pid == 0:
+ # child
+ # nodemanagers are vulnerable, not the supervisor
+ make_vulnerable()
+ if foreground == False:
+ try:
+ lfd = os.open(LOG_FILE, os.O_APPEND|os.O_CREAT|os.O_WRONLY)
+ except IOError:
+ lfd = os.open("/dev/null", os.O_WRONLY)
+ # make this fd stdout and stderr
+ os.dup2(lfd, 1)
+ os.dup2(lfd, 2)
+ # close stdin
+ os.close(0)
+ os.chdir(TASHI_PATH)
+ os.execle("./bin/nodemanager.py", "./bin/nodemanager.py", env)
+ os._exit(-1)
+ # sleep before checking child status
+ time.sleep(SLEEP_INTERVAL)
+ os.waitpid(pid, 0)
+ return 0
if __name__ == "__main__":
- sys.exit(main())
+ sys.exit(main())
Modified: incubator/tashi/branches/oldstable/src/zoni/client/zoni-cli.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/zoni/client/zoni-cli.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/zoni/client/zoni-cli.py (original)
+++ incubator/tashi/branches/oldstable/src/zoni/client/zoni-cli.py Mon Jul 16 23:52:41 2012
@@ -327,11 +327,11 @@ def main():
if (options.nodeName):
cmdargs["sys_id"] = options.nodeName
- if (options.numCores or options.clockSpeed or options.numMemory or options.numProcs or options.cpuFlags) and not options.showResources:
- usage = "MISSING OPTION: When specifying hardware parameters, you need the -s or --showResources switch"
- print usage
- parser.print_help()
- exit()
+ if (options.numCores or options.clockSpeed or options.numMemory or options.numProcs or options.cpuFlags) and not options.showResources:
+ usage = "MISSING OPTION: When specifying hardware parameters, you need the -s or --showResources switch"
+ print usage
+ parser.print_help()
+ exit()
if options.getResources:
print "ALL resources"
Modified: incubator/tashi/branches/oldstable/src/zoni/extra/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/zoni/extra/util.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/zoni/extra/util.py (original)
+++ incubator/tashi/branches/oldstable/src/zoni/extra/util.py Mon Jul 16 23:52:41 2012
@@ -19,6 +19,7 @@
#
import os
+import sys
import string
import ConfigParser
import time
@@ -218,19 +219,25 @@ def createKey(name):
return val
-
+def __getShellFn():
+ if sys.version_info < (2, 6, 1):
+ from IPython.Shell import IPShellEmbed
+ return IPShellEmbed()
+ else:
+ import IPython
+ return IPython.embed()
def debugConsole(globalDict):
"""A debugging console that optionally uses pysh"""
def realDebugConsole(globalDict):
try :
import atexit
- from IPython.Shell import IPShellEmbed
+ shellfn = __getShellFn()
def resetConsole():
# XXXpipe: make input window sane
(stdin, stdout) = os.popen2("reset")
stdout.read()
- dbgshell = IPShellEmbed()
+ dbgshell = shellfn()
atexit.register(resetConsole)
dbgshell(local_ns=globalDict, global_ns=globalDict)
except Exception:
Modified: incubator/tashi/branches/oldstable/src/zoni/hardware/delldrac.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/zoni/hardware/delldrac.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/zoni/hardware/delldrac.py (original)
+++ incubator/tashi/branches/oldstable/src/zoni/hardware/delldrac.py Mon Jul 16 23:52:41 2012
@@ -147,7 +147,7 @@ class dellDrac(SystemManagementInterface
for val in fout.readlines():
if "OK" in val:
code = 1
- if "CURRENTLY POWER-OFF" in val:
+ if "CURRENTLY POWER-OFF" in val:
self.log.info("Hardware already power off : %s", self.hostname)
code = 1
if code < 1:
@@ -171,7 +171,7 @@ class dellDrac(SystemManagementInterface
for val in fout.readlines():
if "OK" in val:
code = 1
- if "CURRENTLY POWER-OFF" in val:
+ if "CURRENTLY POWER-OFF" in val:
self.log.info("Hardware already power off : %s", self.hostname)
code = 1
if code < 1:
Modified: incubator/tashi/branches/oldstable/src/zoni/hardware/dellswitch.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/zoni/hardware/dellswitch.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/zoni/hardware/dellswitch.py (original)
+++ incubator/tashi/branches/oldstable/src/zoni/hardware/dellswitch.py Mon Jul 16 23:52:41 2012
@@ -54,7 +54,7 @@ class HwDellSwitch(HwSwitchInterface):
pass
- def setVerbose(self, verbose):
+ def setVerbose(self, verbose):
self.verbose = verbose
def __login(self):
Modified: incubator/tashi/branches/oldstable/src/zoni/hardware/hpswitch.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/zoni/hardware/hpswitch.py?rev=1362316&r1=1362315&r2=1362316&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/zoni/hardware/hpswitch.py (original)
+++ incubator/tashi/branches/oldstable/src/zoni/hardware/hpswitch.py Mon Jul 16 23:52:41 2012
@@ -74,10 +74,10 @@ class HwHPSwitch(HwSwitchInterface):
child.sendline(cmd)
opt = child.expect(["Confirm(.*)", "No save(.*)", pexpect.EOF, pexpect.TIMEOUT])
if opt == 0:
- print "saving to flash"
- child.sendline("y\n")
+ print "saving to flash"
+ child.sendline("y\n")
if opt == 1:
- print "no save needed"
+ print "no save needed"
child.sendline('exit')
child.terminate()