You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/05/17 03:06:59 UTC

svn commit: r1339460 - in /incubator/tashi/trunk: etc/ src/tashi/ src/tashi/clustermanager/data/ src/tashi/nodemanager/ src/tashi/nodemanager/vmcontrol/ src/tashi/utils/

Author: stroucki
Date: Thu May 17 03:06:58 2012
New Revision: 1339460

URL: http://svn.apache.org/viewvc?rev=1339460&view=rev
Log:
revise default configuration
replace getConfig with Config object supporting defaults
log restored communication with the CM in the nodemanager
allow configurable reserved memory
use Alexey Tumanov's timeout monitor (Thanks Alexey!)
open new utils directory for Tashi utility functions and objects

Squashed commit of the following:

commit 2b45b80f6d586321a1d270637253594857b169bf
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Wed May 16 22:59:38 2012 -0400

    timeout: comment change

commit 5673bc72b159c595fb6004102ba1356ee3cb4c9a
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Wed May 16 22:46:16 2012 -0400

    util: minor fix
    timeout: correct indentation. Oh god, python!
    timeout: short-circuit if the monitored thread has already finished

commit 2d7f27a78b65beff61b405f1e6f23807e9107844
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Wed May 16 22:45:14 2012 -0400

    nodemanager: log the fact that we are in successful communication with CM

commit c00fc6e918f6748abe12d324c53aacb11c7a3d89
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Wed May 16 17:46:32 2012 -0400

    timeout, util: use Alexey's thread safe timeout thread wrapper

commit 1648e93088312d326c13e04e4349c05798b1ce4e
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Wed May 16 17:04:58 2012 -0400

    config: fix syntax

commit a8453f46cd54a702fded60498ee9adee32141668
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Wed May 16 17:04:26 2012 -0400

    util: Move imports to top

commit 0a7882c034cd88df8d7b10adb865cd533469f735
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Tue May 15 15:53:02 2012 -0400

    util: see if we can trigger an exception where it will be obvious

commit b8bbfb3f0f00ed31502d7d6daaa4e7329c880d98
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Tue May 15 15:39:43 2012 -0400

    nodemanager: use tashi config handler
    qemu: allow reserved memory for host to be configured
    qemu: use tashi config handler, remove its own
    config: handle getint
    TashiDefaults: remove infoDir option

commit 27f772389ac20e2d81c6dbc7d9a2185fc9a8aa67
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Tue May 15 11:45:48 2012 -0400

    util, fromconfig: notes on python 2.5 compatibility

commit cabbd45ae4b6c6d32dc468b6eab69ad79e60021f
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Tue May 15 09:24:19 2012 -0400

    config: forgot colon

commit b5f969a5020da3843d69e144565e107dc5285c94
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Mon May 14 19:34:37 2012 -0400

    util: Timeout network communications at 10s

commit 4fb565ef20370124f3e063f9b63f6f1d60f2b08d
Author: Michael Stroucken <mx...@cmu.edu>
Date:   Mon May 14 19:09:37 2012 -0400

    utils/config: provide a wrapper for python's config handler that allows defaults
    utils/timeout: provide Luis Pedro Coelho's timeout class for use with Tashi

Added:
    incubator/tashi/trunk/src/tashi/utils/
    incubator/tashi/trunk/src/tashi/utils/__init__.py
    incubator/tashi/trunk/src/tashi/utils/config.py
    incubator/tashi/trunk/src/tashi/utils/timeout.py
Modified:
    incubator/tashi/trunk/etc/TashiDefaults.cfg
    incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
    incubator/tashi/trunk/src/tashi/util.py

Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Thu May 17 03:06:58 2012
@@ -113,7 +113,6 @@ statsInterval = 0.0
 
 [Qemu]
 qemuBin = /usr/bin/kvm
-infoDir = /var/tmp/VmControlQemu/
 pollDelay = 1.0
 migrationRetries = 10
 monitorTimeout = 60.0
@@ -123,6 +122,9 @@ useMigrateArgument = False
 statsInterval = 0.0
 scratchDir = /tmp
 scratchVg = vgscratch
+suspendHandler = gzip
+resumeHandler = zcat
+reservedMem = 512
 
 [XenPV]
 vmNamePrefix = tashi

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Thu May 17 03:06:58 2012
@@ -15,7 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+#XXXstroucki: for compatibility with python 2.5
 from __future__ import with_statement
+
 import logging
 import threading
 import os

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Thu May 17 03:06:58 2012
@@ -23,20 +23,22 @@ import sys
 import os
 import time
 
-from tashi.util import instantiateImplementation, getConfig, debugConsole
+from tashi.util import instantiateImplementation, debugConsole
 import tashi
 from tashi import boolean
 
 from tashi.rpycservices import rpycservices
+from tashi.utils.config import Config
+
 from rpyc.utils.server import ThreadedServer
 from rpyc.utils.authenticators import TlsliteVdbAuthenticator
 
 def main():
 	global config, log
 	
-	(config, configFiles) = getConfig(["NodeManager"])
-	publisher = instantiateImplementation(config.get("NodeManager", "publisher"), config)
-	tashi.publisher = publisher
+	config = Config(["NodeManager"])
+	configFiles = config.getFiles()
+
 	logging.config.fileConfig(configFiles)
 	log = logging.getLogger(__name__)
 	log.info('Using configuration file(s) %s' % configFiles)

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Thu May 17 03:06:58 2012
@@ -33,35 +33,32 @@ class NodeManagerService(object):
 	   VmControlInterface and do all dfs operations here?"""
 
 	def __init__(self, config, vmm):
+		# XXXstroucki: vmm will wait for this constructor to complete
 		self.config = config
 		self.vmm = vmm
-		self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
-		self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
-		self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+		self.cmHost = self.config.get("NodeManagerService", "clusterManagerHost")
+		self.cmPort = int(self.config.get("NodeManagerService", "clusterManagerPort"))
+		self.authAndEncrypt = boolean(self.config.get('Security', 'authAndEncrypt'))
 		if self.authAndEncrypt:
-			self.username = config.get('AccessClusterManager', 'username')
-			self.password = config.get('AccessClusterManager', 'password')
+			self.username = self.config.get('AccessClusterManager', 'username')
+			self.password = self.config.get('AccessClusterManager', 'password')
 		else:
 			self.username = None
 			self.password = None
 		self.log = logging.getLogger(__file__)
-		self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
-		self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
-		self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval'))
-		self.registerHost = boolean(config.get('NodeManagerService', 'registerHost'))
+		self.convertExceptions = boolean(self.config.get('NodeManagerService', 'convertExceptions'))
+		self.registerFrequency = float(self.config.get('NodeManagerService', 'registerFrequency'))
+		self.statsInterval = float(self.config.get('NodeManagerService', 'statsInterval', default = 0))
+		self.registerHost = boolean(self.config.get('NodeManagerService', 'registerHost'))
 		try:
 			self.cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
 		except:
 			self.log.exception("Could not connect to CM")
+			# XXXstroucki: raise?
 			return
 
-		self.accountingHost = None
-		self.accountingPort = None
-		try:
-			self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
-			self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
-		except:
-			pass
+		self.accountingHost = self.config.get('NodeManagerService', 'accountingHost')
+		self.accountingPort = self.config.getint('NodeManagerService', 'accountingPort')
 
 		self.notifyCM = []
 
@@ -77,11 +74,9 @@ class NodeManagerService(object):
 		self.__registerHost()
 
 		# XXXstroucki: should make an effort to retry
-		# otherwise vmm will wait forever
+		# This can time out now with an exception
 		self.id = self.cm.registerNodeManager(self.host, self.instances.values())
 
-		# XXXstroucki cut cross check for NM/VMM state
-
 		# start service threads
 		threading.Thread(target=self.__registerWithClusterManager).start()
 		threading.Thread(target=self.__statsThread).start()
@@ -180,14 +175,20 @@ class NodeManagerService(object):
 
 	# service thread function
 	def __registerWithClusterManager(self):
+		happy = False
 		while True:
 			#self.__ACCOUNT("TESTING")
 			start = time.time()
 			try:
 				instances = self.instances.values()
 				self.id = self.cm.registerNodeManager(self.host, instances)
+				if not happy:
+					happy = True
+					self.log.info("Registered with the CM")
+
 			except Exception:
 				self.log.exception('Failed to register with the CM')
+				happy = False
 
 			toSleep = start - time.time() + self.registerFrequency
 			if (toSleep > 0):

Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Thu May 17 03:06:58 2012
@@ -87,17 +87,19 @@ class Qemu(VmControlInterface):
 	
 	def __init__(self, config, dfs, nm):
 		VmControlInterface.__init__(self, config, dfs, nm)
-		self.QEMU_BIN = self.config.get("Qemu", "qemuBin")
-		self.INFO_DIR = self.config.get("Qemu", "infoDir")
-		self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay"))
-		self.migrationRetries = int(self.config.get("Qemu", "migrationRetries"))
-		self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout"))
-		self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout"))
-		self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument"))
-		self.statsInterval = float(self.config.get("Qemu", "statsInterval"))
-		# XXXstroucki amount of reserved memory could be configurable
-		self.reservedMem = 512
-		# XXXstroucki perhaps make this configurable
+		self.QEMU_BIN = self.config.get("Qemu", "qemuBin", default = "/usr/bin/kvm")
+		self.INFO_DIR = self.config.get("Qemu", "infoDir", default = "/var/tmp/VmControlQemu/")
+		self.POLL_DELAY = float(self.config.get("Qemu", "pollDelay", default = 1))
+		self.migrationRetries = int(self.config.get("Qemu", "migrationRetries", default = 10))
+		self.monitorTimeout = float(self.config.get("Qemu", "monitorTimeout", default = 60))
+		self.migrateTimeout = float(self.config.get("Qemu", "migrateTimeout", default = 300))
+		self.useMigrateArgument = boolean(self.config.get("Qemu", "useMigrateArgument", default = False))
+		self.statsInterval = float(self.config.get("Qemu", "statsInterval", default = 0))
+		reservedMem = self.config.get("Qemu", "reservedMem", default = 512)
+		reservedMem = int(reservedMem)
+
+		self.reservedMem = reservedMem
+
 		self.ifPrefix = "tashi"
 		self.controlledVMs = {}
 		self.usedPorts = []
@@ -106,15 +108,20 @@ class Qemu(VmControlInterface):
 		self.vncPortLock = threading.Lock()
 		self.consolePort = 10000
 		self.consolePortLock = threading.Lock()
-		self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
+		maxParallelMigrations = self.config.get("Qemu", "maxParallelMigrations")
+		maxParallelMigrations = int(maxParallelMigrations)
+		if maxParallelMigrations < 1:
+			maxParallelMigrations = 1
+
+		self.migrationSemaphore = threading.Semaphore(maxParallelMigrations)
 		self.stats = {}
 
-		self.suspendHandler = self.__config("Qemu", "suspendHandler", default = "gzip")
-		self.resumeHandler = self.__config("Qemu", "resumeHandler", default = "zcat")
+		self.suspendHandler = self.config.get("Qemu", "suspendHandler", default = "gzip")
+		self.resumeHandler = self.config.get("Qemu", "resumeHandler", default = "zcat")
 
-		self.scratchVg = self.__config("Qemu", "scratchVg")
+		self.scratchVg = self.config.get("Qemu", "scratchVg")
 
-		self.scratchDir = self.__config("Qemu", "scratchDir", default = "/tmp")
+		self.scratchDir = self.config.get("Qemu", "scratchDir", default = "/tmp")
 
 		try:
 			os.mkdir(self.INFO_DIR)
@@ -131,19 +138,6 @@ class Qemu(VmControlInterface):
 		def __init__(self, **attrs):
 			self.__dict__.update(attrs)
 
-	def __config(self, section, option, default = None):
-		# soft version of self.config.get. returns default value
-		# if not found in configuration
-		import ConfigParser
-
-		value = default
-		try:
-			value = self.config.get(section, option)
-		except ConfigParser.NoOptionError:
-			pass
-
-		return value
-
 	def __dereferenceLink(self, spec):
 		newspec = os.path.realpath(spec)
 		return newspec

Modified: incubator/tashi/trunk/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/util.py?rev=1339460&r1=1339459&r2=1339460&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/util.py (original)
+++ incubator/tashi/trunk/src/tashi/util.py Thu May 17 03:06:58 2012
@@ -15,6 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+#XXXstroucki: for compatibility with python 2.5
+from __future__ import with_statement
+
 import ConfigParser
 #import cPickle
 import os
@@ -31,6 +34,7 @@ import functools
 
 from tashi.rpycservices import rpycservices
 from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
+from tashi.utils.timeout import *
 
 def broken(oldFunc):
 	"""Decorator that is used to mark a function as temporarily broken"""
@@ -163,6 +167,13 @@ def boolean(value):
 		return value
 	if (type(value) == types.IntType):
 		return (value != 0)
+
+	# See if it can be expressed as a string
+	try:
+		value = str(value)
+	except:
+		raise ValueError
+
 	lowercaseValue = value.lower()
 	if lowercaseValue in ['yes', 'true', '1']:
 		return True
@@ -270,6 +281,7 @@ def scrubString(s, allowed="ABCDEFGHIJKL
 	return ns
 
 class Connection:
+
 	def __init__(self, host, port, authAndEncrypt=False, credentials=None):
 		self.host = host
 		self.port = port
@@ -312,11 +324,23 @@ class Connection:
 		if self.connection is None:
 			self.__connect()
 
-		remotefn = getattr(self.connection, name, None)
+		# XXXstroucki: Use 10 second timeout, ok?
+		# XXXstroucki: does this fn touch the network?
+		t = TimeoutThread(getattr, (self.connection, name, None))
+		threading.Thread(target=t.run).start()
+
+		try:
+			remotefn = t.wait(timeout=10)
+		except TimeoutException:
+			self.connection = None
+			raise
 
 		try:
 			if callable(remotefn):
-				returns = remotefn(*args, **kwargs)
+				# XXXstroucki: Use 10 second timeout, ok?
+				t = TimeoutThread(remotefn, args, kwargs)
+				threading.Thread(target=t.run).start()
+				returns = t.wait(timeout=10.0)
 
 			else:
 				raise TashiException({'msg':'%s not callable' % name})

Added: incubator/tashi/trunk/src/tashi/utils/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/utils/__init__.py?rev=1339460&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/utils/__init__.py (added)
+++ incubator/tashi/trunk/src/tashi/utils/__init__.py Thu May 17 03:06:58 2012
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+

Added: incubator/tashi/trunk/src/tashi/utils/config.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/utils/config.py?rev=1339460&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/utils/config.py (added)
+++ incubator/tashi/trunk/src/tashi/utils/config.py Thu May 17 03:06:58 2012
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Wrapper class for python configuration
+
+class Config:
+	def __init__(self, additionalNames=[], additionalFiles=[]):
+		from tashi.util import getConfig
+		(config, files) = getConfig(additionalNames = additionalNames, additionalFiles = additionalFiles)
+		self.config = config
+		self.files = files
+
+	def getFiles(self):
+		return self.files
+
+	def get(self, section, option, default = None):
+		# soft version of self.config.get. Returns configured
+		# value or default value (if specified) or None.
+		import ConfigParser
+
+		value = default
+		try:
+			value = self.config.get(section, option)
+		except ConfigParser.NoOptionError:
+			pass
+
+		return value
+
+	def getint(self, section, option, default = None):
+		# soft version of self.config.getint. Returns configured
+		# value forced to int or default value (as and if specified)
+		# or None.
+		import ConfigParser
+
+		value = default
+		try:
+			value = self.config.get(section, option)
+			value = int(value)
+		except ConfigParser.NoOptionError:
+			pass
+
+		return value

Added: incubator/tashi/trunk/src/tashi/utils/timeout.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/utils/timeout.py?rev=1339460&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/utils/timeout.py (added)
+++ incubator/tashi/trunk/src/tashi/utils/timeout.py Thu May 17 03:06:58 2012
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# module to provide a thread timeout monitor
+# by Alexey Tumanov and Michael Stroucken
+
+import threading
+
+class TimeoutException(Exception):
+	def __init__(self, string):
+		Exception.__init__(self,'Timeout: %s' % string)
+
+class TimeoutThread:
+	def __init__(self, function, args = (), kwargs = {}):
+		self.cv	   = threading.Condition()
+		self.function = function
+		self.args = args
+		self.kwargs = kwargs
+		self.finished = False
+		self.rval	 = None
+
+	def wait(self, timeout=None):
+		self.cv.acquire()
+		if not self.finished:
+			if timeout:
+				self.cv.wait(timeout)
+			else:
+				self.cv.wait()
+		finished = self.finished
+		rval	 = self.rval
+		self.cv.release()
+
+		#
+		# Raise an exception if a timeout occurred.
+		#
+		if finished:
+			return rval
+		else: # NOTE: timeout must be set for this to be true.
+			raise TimeoutException("function %s timed out after %f seconds" % (str(self.function), timeout))
+
+	def run(self):
+		rval = self.function(*self.args, **self.kwargs)
+		self.cv.acquire()
+		self.finished = True
+		self.rval	 = rval
+		self.cv.notify()
+		self.cv.release()
+