You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/05/17 03:06:59 UTC
svn commit: r1339460 - in /incubator/tashi/trunk: etc/ src/tashi/
src/tashi/clustermanager/data/ src/tashi/nodemanager/
src/tashi/nodemanager/vmcontrol/ src/tashi/utils/
Author: stroucki
Date: Thu May 17 03:06:58 2012
New Revision: 1339460
URL: http://svn.apache.org/viewvc?rev=1339460&view=rev
Log:
revise default configuration
replace getConfig with Config object supporting defaults
log restored communication with the CM in the nodemanager
allow configurable reserved memory
use Alexey Tumanov's timeout monitor (Thanks Alexey!)
open new utils directory for Tashi utility functions and objects
Squashed commit of the following:
commit 2b45b80f6d586321a1d270637253594857b169bf
Author: Michael Stroucken <mx...@cmu.edu>
Date: Wed May 16 22:59:38 2012 -0400
timeout: comment change
commit 5673bc72b159c595fb6004102ba1356ee3cb4c9a
Author: Michael Stroucken <mx...@cmu.edu>
Date: Wed May 16 22:46:16 2012 -0400
util: minor fix
timeout: correct indentation. Oh god, python!
timeout: short-circuit if the monitored thread has already finished
commit 2d7f27a78b65beff61b405f1e6f23807e9107844
Author: Michael Stroucken <mx...@cmu.edu>
Date: Wed May 16 22:45:14 2012 -0400
nodemanager: log the fact that we are in successful communication with CM
commit c00fc6e918f6748abe12d324c53aacb11c7a3d89
Author: Michael Stroucken <mx...@cmu.edu>
Date: Wed May 16 17:46:32 2012 -0400
timeout, util: use Alexey's thread safe timeout thread wrapper
commit 1648e93088312d326c13e04e4349c05798b1ce4e
Author: Michael Stroucken <mx...@cmu.edu>
Date: Wed May 16 17:04:58 2012 -0400
config: fix syntax
commit a8453f46cd54a702fded60498ee9adee32141668
Author: Michael Stroucken <mx...@cmu.edu>
Date: Wed May 16 17:04:26 2012 -0400
util: Move imports to top
commit 0a7882c034cd88df8d7b10adb865cd533469f735
Author: Michael Stroucken <mx...@cmu.edu>
Date: Tue May 15 15:53:02 2012 -0400
util: see if we can trigger an exception where it will be obvious
commit b8bbfb3f0f00ed31502d7d6daaa4e7329c880d98
Author: Michael Stroucken <mx...@cmu.edu>
Date: Tue May 15 15:39:43 2012 -0400
nodemanager: use tashi config handler
qemu: allow reserved memory for host to be configured
qemu: use tashi config handler, remove its own
config: handle getint
TashiDefaults: remove infoDir option
commit 27f772389ac20e2d81c6dbc7d9a2185fc9a8aa67
Author: Michael Stroucken <mx...@cmu.edu>
Date: Tue May 15 11:45:48 2012 -0400
util, fromconfig: notes on python 2.5 compatibility
commit cabbd45ae4b6c6d32dc468b6eab69ad79e60021f
Author: Michael Stroucken <mx...@cmu.edu>
Date: Tue May 15 09:24:19 2012 -0400
config: forgot colon
commit b5f969a5020da3843d69e144565e107dc5285c94
Author: Michael Stroucken <mx...@cmu.edu>
Date: Mon May 14 19:34:37 2012 -0400
util: Timeout network communications at 10s
commit 4fb565ef20370124f3e063f9b63f6f1d60f2b08d
Author: Michael Stroucken <mx...@cmu.edu>
Date: Mon May 14 19:09:37 2012 -0400
utils/config: provide a wrapper for python's config handler that allows defaults
utils/timeout: provide Luis Pedro Coelho's timeout class for use with Tashi
Added:
incubator/tashi/trunk/src/tashi/utils/
incubator/tashi/trunk/src/tashi/utils/__init__.py
incubator/tashi/trunk/src/tashi/utils/config.py
incubator/tashi/trunk/src/tashi/utils/timeout.py
Modified:
incubator/tashi/trunk/etc/TashiDefaults.cfg
incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/trunk/src/tashi/util.py
Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Thu May 17 03:06:58 2012
@@ -113,7 +113,6 @@ statsInterval = 0.0
[Qemu]
qemuBin = /usr/bin/kvm
-infoDir = /var/tmp/VmControlQemu/
pollDelay = 1.0
migrationRetries = 10
monitorTimeout = 60.0
@@ -123,6 +122,9 @@ useMigrateArgument = False
statsInterval = 0.0
scratchDir = /tmp
scratchVg = vgscratch
+suspendHandler = gzip
+resumeHandler = zcat
+reservedMem = 512
[XenPV]
vmNamePrefix = tashi
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Thu May 17 03:06:58 2012
@@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+#XXXstroucki: for compatibility with python 2.5
from __future__ import with_statement
+
import logging
import threading
import os
Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Thu May 17 03:06:58 2012
@@ -23,20 +23,22 @@ import sys
import os
import time
-from tashi.util import instantiateImplementation, getConfig, debugConsole
+from tashi.util import instantiateImplementation, debugConsole
import tashi
from tashi import boolean
from tashi.rpycservices import rpycservices
+from tashi.utils.config import Config
+
from rpyc.utils.server import ThreadedServer
from rpyc.utils.authenticators import TlsliteVdbAuthenticator
def main():
global config, log
- (config, configFiles) = getConfig(["NodeManager"])
- publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
- tashi.publisher = publisher
+ config = Config(["NodeManager"])
+ configFiles = config.getFiles()
+
logging.config.fileConfig(configFiles)
log = logging.getLogger(__name__)
log.info('Using configuration file(s) %s' % configFiles)
Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Thu May 17 03:06:58 2012
@@ -33,35 +33,32 @@ class NodeManagerService(object):
VmControlInterface and do all dfs operations here?"""
def __init__(self, config, vmm):
+ # XXXstroucki: vmm will wait for this constructor to complete
self.config = config
self.vmm = vmm
- self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
- self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
- self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+ self.cmHost = self.config.get("NodeManagerService", "clusterManagerHost")
+ self.cmPort = int(self.config.get("NodeManagerService", "clusterManagerPort"))
+ self.authAndEncrypt = boolean(self.config.get('Security', 'authAndEncrypt'))
if self.authAndEncrypt:
- self.username = config.get('AccessClusterManager', 'username')
- self.password = config.get('AccessClusterManager', 'password')
+ self.username = self.config.get('AccessClusterManager', 'username')
+ self.password = self.config.get('AccessClusterManager', 'password')
else:
self.username = None
self.password = None
self.log = logging.getLogger(__file__)
- self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
- self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
- self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval'))
- self.registerHost = boolean(config.get('NodeManagerService', 'registerHost'))
+ self.convertExceptions = boolean(self.config.get('NodeManagerService', 'convertExceptions'))
+ self.registerFrequency = float(self.config.get('NodeManagerService', 'registerFrequency'))
+ self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval', default = 0))
+ self.registerHost = boolean(self.config.get('NodeManagerService', 'registerHost'))
try:
self.cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
except:
self.log.exception("Could not connect to CM")
+ # XXXstroucki: raise?
return
- self.accountingHost = None
- self.accountingPort = None
- try:
- self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
- self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
- except:
- pass
+ self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
+ self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
self.notifyCM = []
@@ -77,11 +74,9 @@ class NodeManagerService(object):
self.__registerHost()
# XXXstroucki: should make an effort to retry
- # otherwise vmm will wait forever
+ # This can time out now with an exception
self.id = self.cm.registerNodeManager(self.host, self.instances.values())
- # XXXstroucki cut cross check for NM/VMM state
-
# start service threads
threading.Thread(target=self.__registerWithClusterManager).start()
threading.Thread(target=self.__statsThread).start()
@@ -180,14 +175,20 @@ class NodeManagerService(object):
# service thread function
def __registerWithClusterManager(self):
+ happy = False
while True:
#self.__ACCOUNT("TESTING")
start = time.time()
try:
instances = self.instances.values()
self.id = self.cm.registerNodeManager(self.host, instances)
+ if not happy:
+ happy = True
+ self.log.info("Registered with the CM")
+
except Exception:
self.log.exception('Failed to register with the CM')
+ happy = False
toSleep = start - time.time() + self.registerFrequency
if (toSleep > 0):
Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Thu May 17 03:06:58 2012
@@ -87,17 +87,19 @@ class Qemu(VmControlInterface):
def __init__(self, config, dfs, nm):
VmControlInterface.__init__(self, config, dfs, nm)
- self.QEMU_BIN = self.config.get("Qemu", "qemuBin")
- self.INFO_DIR = self.config.get("Qemu", "infoDir")
- self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay"))
- self.migrationRetries = int(self.config.get("Qemu", "migrationRetries"))
- self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout"))
- self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
- self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument"))
- self.statsInterval = float(self.config.get("Qemu", "statsInterval"))
- # XXXstroucki amount of reserved memory could be configurable
- self.reservedMem = 512
- # XXXstroucki perhaps make this configurable
+ self.QEMU_BIN = self.config.get("Qemu", "qemuBin", default = "/usr/bin/kvm")
+ self.INFO_DIR = self.config.get("Qemu", "infoDir", default = "/var/tmp/VmControlQemu/")
+ self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay", default = 1))
+ self.migrationRetries = int(self.config.get("Qemu", "migrationRetries", default = 10))
+ self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout", default = 60))
+ self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout", default = 300))
+ self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument", default = False))
+ self.statsInterval = float(self.config.get("Qemu", "statsInterval", default = 0))
+ reservedMem = self.config.get("Qemu", "reservedMem", default = 512)
+ reservedMem = int(reservedMem)
+
+ self.reservedMem = reservedMem
+
self.ifPrefix = "tashi"
self.controlledVMs = {}
self.usedPorts = []
@@ -106,15 +108,20 @@ class Qemu(VmControlInterface):
self.vncPortLock = threading.Lock()
self.consolePort = 10000
self.consolePortLock = threading.Lock()
- self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
+ maxParallelMigrations = self.config.get("Qemu", "maxParallelMigrations")
+ maxParallelMigrations = int(maxParallelMigrations)
+ if maxParallelMigrations < 1:
+ maxParallelMigrations = 1
+
+ self.migrationSemaphore = threading.Semaphore(maxParallelMigrations)
self.stats = {}
- self.suspendHandler = self.__config("Qemu", "suspendHandler", default = "gzip")
- self.resumeHandler = self.__config("Qemu", "resumeHandler", default = "zcat")
+ self.suspendHandler = self.config.get("Qemu", "suspendHandler", default = "gzip")
+ self.resumeHandler = self.config.get("Qemu", "resumeHandler", default = "zcat")
- self.scratchVg = self.__config("Qemu", "scratchVg")
+ self.scratchVg = self.config.get("Qemu", "scratchVg")
- self.scratchDir = self.__config("Qemu", "scratchDir", default = "/tmp")
+ self.scratchDir = self.config.get("Qemu", "scratchDir", default = "/tmp")
try:
os.mkdir(self.INFO_DIR)
@@ -131,19 +138,6 @@ class Qemu(VmControlInterface):
def __init__(self, **attrs):
self.__dict__.update(attrs)
- def __config(self, section, option, default = None):
- # soft version of self.config.get. returns default value
- # if not found in configuration
- import ConfigParser
-
- value = default
- try:
- value = self.config.get(section, option)
- except ConfigParser.NoOptionError:
- pass
-
- return value
-
def __dereferenceLink(self, spec):
newspec = os.path.realpath(spec)
return newspec
Modified: incubator/tashi/trunk/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/util.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/util.py (original)
+++ incubator/tashi/trunk/src/tashi/util.py Thu May 17 03:06:58 2012
@@ -15,6 +15,9 @@
# specific language governing permissions and limitations
# under the License.
+#XXXstroucki: for compatibility with python 2.5
+from __future__ import with_statement
+
import ConfigParser
#import cPickle
import os
@@ -31,6 +34,7 @@ import functools
from tashi.rpycservices import rpycservices
from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
+from tashi.utils.timeout import *
def broken(oldFunc):
"""Decorator that is used to mark a function as temporarily broken"""
@@ -163,6 +167,13 @@ def boolean(value):
return value
if (type(value) == types.IntType):
return (value != 0)
+
+ # See if it can be expressed as a string
+ try:
+ value = str(value)
+ except:
+ raise ValueError
+
lowercaseValue = value.lower()
if lowercaseValue in ['yes', 'true', '1']:
return True
@@ -270,6 +281,7 @@ def scrubString(s, allowed="ABCDEFGHIJKL
return ns
class Connection:
+
def __init__(self, host, port, authAndEncrypt=False, credentials=None):
self.host = host
self.port = port
@@ -312,11 +324,23 @@ class Connection:
if self.connection is None:
self.__connect()
- remotefn = getattr(self.connection, name, None)
+ # XXXstroucki: Use 10 second timeout, ok?
+ # XXXstroucki: does this fn touch the network?
+ t = TimeoutThread(getattr, (self.connection, name, None))
+ threading.Thread(target=t.run).start()
+
+ try:
+ remotefn = t.wait(timeout=10)
+ except TimeoutException:
+ self.connection = None
+ raise
try:
if callable(remotefn):
- returns = remotefn(*args, **kwargs)
+ # XXXstroucki: Use 10 second timeout, ok?
+ t = TimeoutThread(remotefn, args, kwargs)
+ threading.Thread(target=t.run).start()
+ returns = t.wait(timeout=10.0)
else:
raise TashiException({'msg':'%s not callable' % name})
Added: incubator/tashi/trunk/src/tashi/utils/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/utils/__init__.py?rev=1339460&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/utils/__init__.py (added)
+++ incubator/tashi/trunk/src/tashi/utils/__init__.py Thu May 17 03:06:58 2012
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
Added: incubator/tashi/trunk/src/tashi/utils/config.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/utils/config.py?rev=1339460&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/utils/config.py (added)
+++ incubator/tashi/trunk/src/tashi/utils/config.py Thu May 17 03:06:58 2012
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Wrapper class for python configuration
+
+class Config:
+ def __init__(self, additionalNames=[], additionalFiles=[]):
+ from tashi.util import getConfig
+ (config, files) = getConfig(additionalNames = additionalNames, additionalFiles = additionalFiles)
+ self.config = config
+ self.files = files
+
+ def getFiles(self):
+ return self.files
+
+ def get(self, section, option, default = None):
+ # soft version of self.config.get. Returns configured
+ # value or default value (if specified) or None.
+ import ConfigParser
+
+ value = default
+ try:
+ value = self.config.get(section, option)
+ except ConfigParser.NoOptionError:
+ pass
+
+ return value
+
+ def getint(self, section, option, default = None):
+ # soft version of self.config.getint. Returns configured
+ # value forced to int or default value (as and if specified)
+ # or None.
+ import ConfigParser
+
+ value = default
+ try:
+ value = self.config.get(section, option)
+ value = int(value)
+ except ConfigParser.NoOptionError:
+ pass
+
+ return value
Added: incubator/tashi/trunk/src/tashi/utils/timeout.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/utils/timeout.py?rev=1339460&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/utils/timeout.py (added)
+++ incubator/tashi/trunk/src/tashi/utils/timeout.py Thu May 17 03:06:58 2012
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# module to provide a thread timeout monitor
+# by Alexey Tumanov and Michael Stroucken
+
+import threading
+
+class TimeoutException(Exception):
+ def __init__(self, string):
+ Exception.__init__(self,'Timeout: %s' % string)
+
+class TimeoutThread:
+ def __init__(self, function, args = (), kwargs = {}):
+ self.cv = threading.Condition()
+ self.function = function
+ self.args = args
+ self.kwargs = kwargs
+ self.finished = False
+ self.rval = None
+
+ def wait(self, timeout=None):
+ self.cv.acquire()
+ if not self.finished:
+ if timeout:
+ self.cv.wait(timeout)
+ else:
+ self.cv.wait()
+ finished = self.finished
+ rval = self.rval
+ self.cv.release()
+
+ #
+ # Raise an exception if a timeout occurred.
+ #
+ if finished:
+ return rval
+ else: # NOTE: timeout must be set for this to be true.
+ raise TimeoutException("function %s timed out after %f seconds" % (str(self.function), timeout))
+
+ def run(self):
+ rval = self.function(*self.args, **self.kwargs)
+ self.cv.acquire()
+ self.finished = True
+ self.rval = rval
+ self.cv.notify()
+ self.cv.release()
+