You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2009/07/20 16:41:15 UTC

svn commit: r795906 - in /incubator/tashi/trunk: etc/ src/tashi/ src/tashi/agents/ src/tashi/client/ src/tashi/clustermanager/ src/tashi/clustermanager/data/ src/tashi/nodemanager/ src/tashi/nodemanager/vmcontrol/ src/tashi/rpycservices/

Author: mryan3
Date: Mon Jul 20 16:41:15 2009
New Revision: 795906

URL: http://svn.apache.org/viewvc?rev=795906&view=rev
Log:
Michael Wang's rpyc patch from tashi-dev@i.a.o


Added:
    incubator/tashi/trunk/src/tashi/rpycservices/
    incubator/tashi/trunk/src/tashi/rpycservices/__init__.py
    incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
    incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py
Modified:
    incubator/tashi/trunk/etc/TashiDefaults.cfg
    incubator/tashi/trunk/src/tashi/agents/dhcpdns.py
    incubator/tashi/trunk/src/tashi/agents/instancehook.py
    incubator/tashi/trunk/src/tashi/agents/primitive.py
    incubator/tashi/trunk/src/tashi/client/tashi-client.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
    incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
    incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
    incubator/tashi/trunk/src/tashi/connectionmanager.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
    incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
    incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
    incubator/tashi/trunk/src/tashi/util.py

Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Mon Jul 20 16:41:15 2009
@@ -15,6 +15,27 @@
 # specific language governing permissions and limitations
 # under the License.    
 
+[Security]
+authAndEncrypt = False
+
+[AccessClusterManager]
+#If username and password are left empty, user will be prompted for username and password on the command line.
+username = nodemanager
+password = nodemanager
+
+[AccessNodeManager]
+#If username and password are left empty, user will be prompted for username and password on the command line.
+username = clustermanager
+password = clustermanager
+
+[AllowedUsers]
+nodeManagerUser = nodemanager
+nodeManagerPassword = nodemanager
+agentUser = agent
+agentPassword = agent
+clusterManagerUser = clustermanager
+clusterManagerPassword = clustermanager
+
 # ClusterManager portion
 [ClusterManager]
 service = tashi.clustermanager.ClusterManagerService

Modified: incubator/tashi/trunk/src/tashi/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/dhcpdns.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/dhcpdns.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/dhcpdns.py Mon Jul 20 16:41:15 2009
@@ -23,7 +23,7 @@
 from tashi import boolean
 
 class DhcpDns(InstanceHook):
-	def __init__(self, config, client, transport, post=False):
+	def __init__(self, config, client, post=False):
 		InstanceHook.__init__(self, config, client, post)
 		self.dnsKeyFile = self.config.get('DhcpDns', 'dnsKeyFile')
 		self.dnsServer = self.config.get('DhcpDns', 'dnsServer')

Modified: incubator/tashi/trunk/src/tashi/agents/instancehook.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/instancehook.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/instancehook.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/instancehook.py Mon Jul 20 16:41:15 2009
@@ -18,12 +18,11 @@
 # under the License.    
 
 class InstanceHook(object):
-	def __init__(self, config, client, transport, post=False):
+	def __init__(self, config, client, post=False):
 		if (self.__class__ is InstanceHook):
 			raise NotImplementedError
 		self.config = config
 		self.client = client
-		self.transport = transport
 		self.post = post
 	
 	def preCreate(self, instance):

Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Mon Jul 20 16:41:15 2009
@@ -25,15 +25,14 @@
 import time
 import logging.config
 
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.util import getConfig, createClient, instantiateImplementation, boolean
 import tashi
 
 class Primitive(object):
-	def __init__(self, config, client, transport):
+	def __init__(self, config, client):
 		self.config = config
 		self.client = client
-		self.transport = transport
 		self.hooks = []
 		self.log = logging.getLogger(__file__)
 		self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -45,7 +44,7 @@
 			name = name.lower()
 			if (name.startswith("hook")):
 				try:
-					self.hooks.append(instantiateImplementation(value, config, client, transport, False))
+					self.hooks.append(instantiateImplementation(value, config, client, False))
 				except:
 					self.log.exception("Failed to load hook %s" % (value))
 	
@@ -53,9 +52,6 @@
 		oldInstances = {}
 		while True:
 			try:
-				# Make sure transport is open
-				if (not self.transport.isOpen()):
-					self.transport.open()
 				# Generate a list of VMs/host
 				hosts = {}
 				load = {}
@@ -120,26 +116,18 @@
 				time.sleep(self.scheduleDelay)
 			except TashiException, e:
 				self.log.exception("Tashi exception")
-				try:
-					self.transport.close()
-				except Exception, e:
-					self.log.exception("Failed to close the transport")
 				time.sleep(self.scheduleDelay)
 			except Exception, e:
 				self.log.exception("General exception")
-				try:
-					self.transport.close()
-				except Exception, e:
-					self.log.exception("Failed to close the transport")
 				time.sleep(self.scheduleDelay)
 
 def main():
 	(config, configFiles) = getConfig(["Agent"])
 	publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
 	tashi.publisher = publisher
-	(client, transport) = createClient(config)
+	client = createClient(config)
 	logging.config.fileConfig(configFiles)
-	agent = Primitive(config, client, transport)
+	agent = Primitive(config, client)
 	agent.start()
 
 if __name__ == "__main__":

Modified: incubator/tashi/trunk/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/client/tashi-client.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/trunk/src/tashi/client/tashi-client.py Mon Jul 20 16:41:15 2009
@@ -21,12 +21,7 @@
 import random
 import sys
 import types
-from tashi.services.ttypes import *
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport, TTransportException
-from thrift.transport.TSocket import TSocket
-
-from tashi.services import clustermanagerservice
+from tashi.rpycservices.rpyctypes import *
 from tashi import vmStates, hostStates, boolean, getConfig, stringPartition, createClient
 
 users = {}
@@ -46,7 +41,10 @@
 
 def getUser():
 	fetchUsers()
-	userStr = os.getenv("USER", "unknown")
+	if client.username != None:
+		userStr = client.username
+	else:
+		userStr = os.getenv("USER", "unknown")
 	for user in users:
 		if (users[user].name == userStr):
 			return users[user].id
@@ -66,7 +64,7 @@
 		raise ValueError("Unknown instance %s" % (str(instance)))
 	for instance in instances:
 		if (instance.id == instanceId):
-			if (instance.userId != userId and instance.userId != None):
+			if (instance.userId != userId and instance.userId != None and userId != 0):
 				raise ValueError("You don't own that VM")
 	return instanceId
 
@@ -461,7 +459,7 @@
 			usage(function)
 	try:
 		vals = {}
-		(client, transport) = createClient(config)
+		client = createClient(config)
 		for parg in possibleArgs:
 			(parg, conv, default, required) = parg
 			val = None
@@ -498,8 +496,6 @@
 		print "TashiException:"
 		print e.msg
 		exitCode = e.errno
-	except TTransportException, e:
-		print e
 	except Exception, e:
 		print e
 		usage(function)

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py Mon Jul 20 16:41:15 2009
@@ -24,27 +24,41 @@
 import logging.config
 from getopt import getopt, GetoptError
 from ConfigParser import ConfigParser
-from thrift.transport.TSocket import TServerSocket
-from thrift.server.TServer import TThreadedServer
 
-from tashi.services import clustermanagerservice
 from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
 import tashi
 
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import VdbAuthenticator
+
 def startClusterManager(config):
 	global service, data
 	
 	dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
 	data = instantiateImplementation(config.get("ClusterManager", "data"), config)
 	service = instantiateImplementation(config.get("ClusterManager", "service"), config, data, dfs)
-	processor = clustermanagerservice.Processor(service)
-	transport = TServerSocket(int(config.get('ClusterManagerService', 'port')))
-	server = TThreadedServer(processor, transport)
-	
+
+	if boolean(config.get("Security", "authAndEncrypt")):
+		users = {}
+		userDatabase = data.getUsers()
+		for user in userDatabase.values():
+			if user.passwd != None:
+				users[user.name] = user.passwd
+		users[config.get('AllowedUsers', 'nodeManagerUser')] = config.get('AllowedUsers', 'nodeManagerPassword')
+		users[config.get('AllowedUsers', 'agentUser')] = config.get('AllowedUsers', 'agentPassword')
+		authenticator = VdbAuthenticator.from_dict(users)
+		t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False, authenticator=authenticator)
+	else:
+		t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False)
+	t.logger.quiet = True
+	t.service.service = service
+	t.service._type = 'ClusterManagerService'
+
 	debugConsole(globals())
 	
 	try:
-		server.serve()
+		t.start()
 	except KeyboardInterrupt:
 		handleSIGTERM(signal.SIGTERM, None)
 

Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Mon Jul 20 16:41:15 2009
@@ -18,28 +18,28 @@
 from datetime import datetime
 from random import randint
 from socket import gethostname
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
 import logging
 import threading
 import time
 
-from tashi.services.ttypes import Errors, InstanceState, HostState, TashiException
-from tashi.services import nodemanagerservice
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
 from tashi import boolean, convertExceptions, ConnectionManager, vmStates, timed, version, scrubString
 
-def RPC(oldFunc):
-	return convertExceptions(oldFunc)
-
 class ClusterManagerService(object):
 	"""RPC service for the ClusterManager"""
 	
 	def __init__(self, config, data, dfs):
 		self.config = config
 		self.data = data
+		self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+		if self.authAndEncrypt:
+			self.username = config.get('AccessNodeManager', 'username')
+			self.password = config.get('AccessNodeManager', 'password')
+		else:
+			self.username = None
+			self.password = None
+		self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')))
 		self.dfs = dfs
-		self.proxy = ConnectionManager(nodemanagerservice.Client, int(self.config.get('ClusterManager', 'nodeManagerPort')))
 		self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
 		self.log = logging.getLogger(__name__)
 		self.lastContacted = {}
@@ -181,7 +181,6 @@
 				del instance.hints[hint]
 		return instance
 	
-	@RPC
 	def createVm(self, instance):
 		"""Function to add a VM to the list of pending VMs"""
 		instance = self.normalize(instance)
@@ -189,7 +188,6 @@
 		self.data.releaseInstance(instance)
 		return instance
 	
-	@RPC
 	def shutdownVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		self.stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
@@ -202,7 +200,6 @@
 			raise
 		return
 	
-	@RPC
 	def destroyVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
@@ -221,7 +218,6 @@
 				raise
 		return
 	
-	@RPC
 	def suspendVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		self.stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
@@ -235,7 +231,6 @@
 			raise TashiException(d={'errno':Errors.UnableToSuspend, 'msg':'Failed to suspend %s' % (instance.name)})
 		return
 	
-	@RPC
 	def resumeVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		self.stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
@@ -244,7 +239,6 @@
 		self.data.releaseInstance(instance)
 		return instance
 	
-	@RPC
 	def migrateVm(self, instanceId, targetHostId):
 		instance = self.data.acquireInstance(instanceId)
 		try:
@@ -285,7 +279,6 @@
 			raise
 		return
 	
-	@RPC
 	def pauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		self.stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
@@ -301,7 +294,6 @@
 		self.data.releaseInstance(instance)
 		return
 
-	@RPC
 	def unpauseVm(self, instanceId):
 		instance = self.data.acquireInstance(instanceId)
 		self.stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
@@ -317,23 +309,18 @@
 		self.data.releaseInstance(instance)
 		return
 	
-	@RPC
 	def getHosts(self):
 		return self.data.getHosts().values()
 	
-	@RPC
 	def getNetworks(self):
 		return self.data.getNetworks().values()
 	
-	@RPC
 	def getUsers(self):
 		return self.data.getUsers().values()
 	
-	@RPC
 	def getInstances(self):
 		return self.data.getInstances().values()
 	
-	@RPC
 	def vmmSpecificCall(self, instanceId, arg):
 		instance = self.data.getInstance(instanceId)
 		hostname = self.data.getHost(instance.hostId).name
@@ -345,7 +332,6 @@
 		return res
 	
 #	@timed
-	@RPC
 	def registerNodeManager(self, host, instances):
 		"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
 		if (host.id == None):
@@ -408,7 +394,6 @@
 			self.data.releaseHost(oldHost)
 		return host.id
 	
-	@RPC
 	def vmUpdate(self, instanceId, instance, oldState):
 		try:
 			oldInstance = self.data.acquireInstance(instanceId)
@@ -451,7 +436,6 @@
 			self.data.releaseInstance(oldInstance)
 		return
 	
-	@RPC
 	def activateVm(self, instanceId, host):
 		dataHost = self.data.acquireHost(host.id)
 		if (dataHost.name != host.name):

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Mon Jul 20 16:41:15 2009
@@ -17,7 +17,7 @@
 
 import threading
 
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data import DataInterface
 
 class FromConfig(DataInterface):
@@ -128,7 +128,7 @@
 			self.releaseLock(host._lock)
 	
 	def getHosts(self):
-		return self.hosts
+		return self.cleanHosts()
 	
 	def getHost(self, id):
 		host = self.hosts.get(id, None)

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py Mon Jul 20 16:41:15 2009
@@ -17,7 +17,7 @@
 
 import subprocess
 import time
-from tashi.services.ttypes import User
+from tashi.rpycservices.rpyctypes import User
 from tashi.clustermanager.data import DataInterface
 from tashi.util import instantiateImplementation
 

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py Mon Jul 20 16:41:15 2009
@@ -17,7 +17,7 @@
 
 import subprocess
 import time
-from tashi.services.ttypes import User
+from tashi.rpycservices.rpyctypes import User
 from tashi.clustermanager.data import DataInterface
 from tashi.util import instantiateImplementation
 

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py Mon Jul 20 16:41:15 2009
@@ -18,7 +18,7 @@
 import cPickle
 import os
 import threading
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data import FromConfig, DataInterface
 
 class Pickled(FromConfig):

Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Mon Jul 20 16:41:15 2009
@@ -19,7 +19,7 @@
 import threading
 import time
 import types
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data.datainterface import DataInterface
 from tashi.util import stringPartition, boolean
 
@@ -74,7 +74,7 @@
 		self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL, state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024) NOT NULL, hints varchar(1024) NOT NULL)")
 		self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0, decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores int(11), version varchar(256))")
 		self.executeStatement("CREATE TABLE IF NOT EXISTS networks (id int(11) NOT NULL, name varchar(256) NOT NULL)")
-		self.executeStatement("CREATE TABLE IF NOT EXISTS users (id int(11) NOT NULL, name varchar(256) NOT NULL)")
+		self.executeStatement("CREATE TABLE IF NOT EXISTS users (id int(11) NOT NULL, name varchar(256) NOT NULL, passwd varchar(256))")
 	
 	def sanitizeForSql(self, s):
 		if (s == '"True"'):
@@ -247,12 +247,12 @@
 		res = cur.fetchall()
 		users = {}
 		for r in res:
-			user = User(d={'id':r[0], 'name':r[1]})
+			user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
 			users[user.id] = user
 		return users
 	
 	def getUser(self, id):
 		cur = self.executeStatement("SELECT * FROM users WHERE id = %d" % (id))
 		r = cur.fetchone()
-		user = User(d={'id':r[0], 'name':r[1]})
+		user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
 		return user

Modified: incubator/tashi/trunk/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/connectionmanager.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/trunk/src/tashi/connectionmanager.py Mon Jul 20 16:41:15 2009
@@ -15,49 +15,21 @@
 # specific language governing permissions and limitations
 # under the License.    
 
-from thrift.transport.TSocket import TSocket, socket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
+import rpyc
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import *
 
 class ConnectionManager(object):
-	def __init__(self, clientClass, port, timeout=10000.0):
-		self.clientClass = clientClass
+	def __init__(self, username, password, port, timeout=10000.0):
+		self.username = username
+		self.password = password
 		self.timeout = timeout
 		self.port = port
 	
-	class anonClass(object):
-		def __init__(self, clientObject):
-			self.co = clientObject
-		
-		def __getattr__(self, name):
-			if (name.startswith("_")):
-				return self.__dict__[name]
-			def connectWrap(*args, **kw):
-				if (not self.co._iprot.trans.isOpen()):
-					self.co._iprot.trans.open()
-				try:
-					res = getattr(self.co, name)(*args, **kw)
-				except socket.error, e:
-					# Force a close for the case of a "Broken pipe"
-#					print "Forced a socket close"
-					self.co._iprot.trans.close()
-					self.co._iprot.trans.open()
-					res = getattr(self.co, name)(*args, **kw)
-					self.co._iprot.trans.close()
-					raise
-				self.co._iprot.trans.close()
-				return res
-			return connectWrap
-	
 	def __getitem__(self, hostname):
 		port = self.port
 		if len(hostname) == 2:
 			port = hostname[1]
 			hostname = hostname[0]
-		socket = TSocket(hostname, port)
-		socket.setTimeout(self.timeout)
-		transport = TBufferedTransport(socket)
-		protocol = TBinaryProtocol(transport)
-		client = self.clientClass(protocol)
-		client.__transport__ = transport
-		return self.anonClass(client)
+
+		return rpycservices.client(hostname, port, username=self.username, password=self.password)

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Mon Jul 20 16:41:15 2009
@@ -20,13 +20,15 @@
 import logging.config
 import signal
 import sys
-from thrift.transport.TSocket import TServerSocket
-from thrift.server.TServer import TThreadedServer
 
 from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
-from tashi.services import nodemanagerservice, clustermanagerservice
 from tashi import ConnectionManager
 import tashi
+from tashi import boolean
+
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import VdbAuthenticator
 
 @signalHandler(signal.SIGTERM)
 def handleSIGTERM(signalNumber, stackFrame):
@@ -45,13 +47,22 @@
 	vmm = instantiateImplementation(config.get("NodeManager", "vmm"), config, dfs, None)
 	service = instantiateImplementation(config.get("NodeManager", "service"), config, vmm)
 	vmm.nm = service
-	processor = nodemanagerservice.Processor(service)
-	transport = TServerSocket(int(config.get('NodeManagerService', 'port')))
-	server = TThreadedServer(processor, transport)
+
+	if boolean(config.get("Security", "authAndEncrypt")):
+		users = {}
+		users[config.get('AllowedUsers', 'clusterManagerUser')] = config.get('AllowedUsers', 'clusterManagerPassword')
+		authenticator = VdbAuthenticator.from_dict(users)
+		t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False, authenticator=authenticator)
+	else:
+		t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False)
+	t.logger.quiet = True
+	t.service.service = service
+	t.service._type = 'NodeManagerService'
+
 	debugConsole(globals())
 	
 	try:
-		server.serve()
+		t.start()
 	except KeyboardInterrupt:
 		handleSIGTERM(signal.SIGTERM, None)
 	except Exception, e:

Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Mon Jul 20 16:41:15 2009
@@ -22,16 +22,13 @@
 import sys
 import threading
 import time
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
 
-from tashi.services.ttypes import Host, HostState, InstanceState, TashiException, Errors, Instance
-from tashi.services import clustermanagerservice
+from tashi.rpycservices.rpyctypes import Host, HostState, InstanceState, TashiException, Errors, Instance
 from tashi.nodemanager import RPC
 from tashi import boolean, vmStates, logged, ConnectionManager, timed
 import tashi
 
+
 class NodeManagerService(object):
 	"""RPC handler for the NodeManager
 	   
@@ -43,6 +40,13 @@
 		self.vmm = vmm
 		self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
 		self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
+		self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+		if self.authAndEncrypt:
+			self.username = config.get('AccessClusterManager', 'username')
+			self.password = config.get('AccessClusterManager', 'password')
+		else:
+			self.username = None
+			self.password = None
 		self.log = logging.getLogger(__file__)
 		self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
 		self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
@@ -84,7 +88,7 @@
 			self.log.exception('Failed to save VM info to %s' % (self.infoFile))
 	
 	def vmStateChange(self, vmId, old, cur):
-		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
 		instance = self.getInstance(vmId)
 		if (old and instance.state != old):
 			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
@@ -103,7 +107,7 @@
 		return True
 	
 	def backupVmInfoAndFlushNotifyCM(self):
-		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
 		while True:
 			start = time.time()
 			try:
@@ -135,7 +139,7 @@
 				time.sleep(toSleep)
 	
 	def registerWithClusterManager(self):
-		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
 		while True:
 			start = time.time()
 			try:
@@ -154,7 +158,6 @@
 			raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
 		return instance
 	
-	@RPC
 	def instantiateVm(self, instance):
 		vmId = self.vmm.instantiateVm(instance)
 		instance.vmId = vmId
@@ -162,7 +165,6 @@
 		self.instances[vmId] = instance
 		return vmId
 	
-	@RPC
 	def suspendVm(self, vmId, destination):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.Suspending
@@ -173,7 +175,7 @@
 		instance.state = InstanceState.Running
 		newInstance = Instance(d={'id':instance.id,'state':instance.state})
 		success = lambda: None
-		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
 		try:
 			cm.vmUpdate(newInstance.id, newInstance, InstanceState.Resuming)
 		except Exception, e:
@@ -182,7 +184,6 @@
 		else:
 			success()
 	
-	@RPC
 	def resumeVm(self, instance, name):
 		instance.state = InstanceState.Resuming
 		instance.hostId = self.id
@@ -195,7 +196,6 @@
 			raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
 		return instance.vmId
 	
-	@RPC
 	def prepReceiveVm(self, instance, source):
 		instance.state = InstanceState.MigratePrep
 		instance.vmId = -1
@@ -206,7 +206,6 @@
 		self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
 		del self.instances[instance.vmId]
 		
-	@RPC
 	def migrateVm(self, vmId, target, transportCookie):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.MigrateTrans
@@ -214,7 +213,7 @@
 		return
 	
 	def receiveVmHelper(self, instance, transportCookie):
-		cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+		cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
 		vmId = self.vmm.receiveVm(transportCookie)
 		instance.state = InstanceState.Running
 		instance.hostId = self.id
@@ -230,48 +229,40 @@
 		else:
 			success()
 	
-	@RPC
 	def receiveVm(self, instance, transportCookie):
 		instance.state = InstanceState.MigrateTrans
 		threading.Thread(target=self.receiveVmHelper, args=(instance, transportCookie)).start()
 		return
 	
-	@RPC
 	def pauseVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.Pausing
 		self.vmm.pauseVm(vmId)
 		instance.state = InstanceState.Paused
 	
-	@RPC
 	def unpauseVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.Unpausing
 		self.vmm.unpauseVm(vmId)
 		instance.state = InstanceState.Running
 	
-	@RPC
 	def shutdownVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.ShuttingDown
 		self.vmm.shutdownVm(vmId)
 	
-	@RPC
 	def destroyVm(self, vmId):
 		instance = self.getInstance(vmId)
 		instance.state = InstanceState.Destroying
 		self.vmm.destroyVm(vmId)
 	
-	@RPC
 	def getVmInfo(self, vmId):
 		instance = self.getInstance(vmId)
 		return instance
 	
-	@RPC
 	def vmmSpecificCall(self, vmId, arg):
 		return self.vmm.vmmSpecificCall(vmId, arg)
 	
-	@RPC
 	def listVms(self):
 		return self.instances.keys()
 	

Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Mon Jul 20 16:41:15 2009
@@ -27,7 +27,7 @@
 import sys
 import time
 
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
 from tashi.util import broken, logged, scrubString, boolean
 from tashi import version, stringPartition
 from vmcontrolinterface import VmControlInterface

Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py Mon Jul 20 16:41:15 2009
@@ -25,8 +25,8 @@
 import socket
 
 from vmcontrolinterface import VmControlInterface
-from tashi.services.ttypes import Errors, InstanceState, TashiException
-from tashi.services.ttypes import Instance, Host
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
+from tashi.rpycservices.rpyctypes import Instance, Host
 from tashi import boolean, convertExceptions, ConnectionManager, version
 from tashi.util import isolatedRPC, broken
 

Added: incubator/tashi/trunk/src/tashi/rpycservices/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/__init__.py?rev=795906&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/__init__.py (added)
+++ incubator/tashi/trunk/src/tashi/rpycservices/__init__.py Mon Jul 20 16:41:15 2009
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.    
+
+import rpyc

Added: incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py?rev=795906&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py (added)
+++ incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py Mon Jul 20 16:41:15 2009
@@ -0,0 +1,100 @@
+import rpyc
+from tashi.rpycservices.rpyctypes import *
+import cPickle
+
+clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 'vmUpdate', 'activateVm']
+nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'prepReceiveVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo']
+
+def clean(args):
+	"""Cleans the object so cPickle can be used."""
+	if isinstance(args, list) or isinstance(args, tuple):
+		cleanArgs = []
+		for arg in args:
+			cleanArgs.append(clean(arg))
+		if isinstance(args, tuple):
+			cleanArgs = tuple(cleanArgs)
+		return cleanArgs
+	if isinstance(args, Instance):
+		return Instance(args.__dict__)
+	if isinstance(args, Host):
+		return Host(args.__dict__)
+	if isinstance(args, User):
+		user = User(args.__dict__)
+		user.passwd = None
+		return user
+	return args
+
+class client():
+	def __init__(self, host, port, username=None, password=None):
+		"""Client for ManagerService. If username and password are provided, rpyc.tls_connect will be used to connect, else rpyc.connect will be used."""
+		self.host = host
+		self.port = int(port)
+		self.username = username
+		self.password = password
+		self.conn = self.createConn()
+	
+	def createConn(self):
+		"""Creates a rpyc connection."""
+		if self.username != None and self.password != None:
+			return rpyc.tls_connect(host=self.host, port=self.port, username=self.username, password=self.password)
+		else:
+			return rpyc.connect(host=self.host, port=self.port)
+
+	def __getattr__(self, name):
+		"""Returns a function that makes the RPC call. No keyword arguments allowed when calling this function."""
+		if self.conn.closed == True:
+			self.conn = self.createConn()
+		if name not in clusterManagerRPCs and name not in nodeManagerRPCs:
+			return None
+		def connectWrap(*args):
+			args = cPickle.dumps(clean(args))
+			try:
+				res = getattr(self.conn.root, name)(args)
+			except Exception, e:
+				self.conn.close()
+				raise e
+			res = cPickle.loads(res)
+			if isinstance(res, Exception):
+				raise res
+			return res
+		return connectWrap
+
+class ManagerService(rpyc.Service):
+	"""Wrapper for rpyc service"""
+	# Note: self.service and self._type are set before rpyc.utils.server.ThreadedServer is started.
+	def checkValidUser(self, functionName, clientUsername, args):
+		"""Checks whether the operation requested by the user is valid based on clientUsername. An exception will be thrown if not valid."""
+		if self._type == 'NodeManagerService':
+			return
+		if clientUsername in ['nodeManager', 'agent', 'root']:
+			return
+		if functionName in ['destroyVm', 'shutdownVm', 'pauseVm', 'vmmSpecificCall', 'suspendVm', 'unpauseVm', 'migrateVm', 'resumeVm']:
+			instanceId = args[0]
+			instance = self.service.data.getInstance(instanceId)
+			instanceUsername = self.service.data.getUser(instance.userId).name
+			if clientUsername != instanceUsername:
+				raise Exception('Permission Denied: %s cannot perform %s on VM owned by %s' % (clientUsername, functionName, instanceUsername))
+		return
+		
+	def _rpyc_getattr(self, name):
+		"""Returns the RPC corresponding to the function call"""
+		def makeCall(args):
+			args = cPickle.loads(args)
+			if self._conn._config['credentials'] != None:
+				try:
+					self.checkValidUser(makeCall._name, self._conn._config['credentials'], args)
+				except Exception, e:
+					e = cPickle.dumps(clean(e))
+					return e
+			try:
+				res = getattr(self.service, makeCall._name)(*args)
+			except Exception, e:
+				res = e
+			res = cPickle.dumps(clean(res))
+			return res
+		makeCall._name = name
+		if self._type == 'ClusterManagerService' and name in clusterManagerRPCs:
+			return makeCall
+		if self._type == 'NodeManagerService' and name in nodeManagerRPCs:
+			return makeCall
+		raise AttributeError('RPC does not exist')

Added: incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py?rev=795906&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py (added)
+++ incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py Mon Jul 20 16:41:15 2009
@@ -0,0 +1,247 @@
+class Errors(object):
+	ConvertedException = 1
+	NoSuchInstanceId = 2
+	NoSuchVmId = 3
+	IncorrectVmState = 4
+	NoSuchHost = 5
+	NoSuchHostId = 6
+	InstanceIdAlreadyExists = 7
+	HostNameMismatch = 8
+	HostNotUp = 9
+	HostStateError = 10
+	InvalidInstance = 11
+	UnableToResume = 12
+	UnableToSuspend = 13
+
+class InstanceState(object):
+	Pending = 1
+	Activating = 2
+	Running = 3
+	Pausing = 4
+	Paused = 5
+	Unpausing = 6
+	Suspending = 7
+	Resuming = 8
+	MigratePrep = 9
+	MigrateTrans = 10
+	ShuttingDown = 11
+	Destroying = 12
+	Orphaned = 13
+	Held = 14
+	Exited = 15
+	Suspended = 16
+
+class HostState(object):
+	Normal = 1
+	Drained = 2
+	VersionMismatch = 3
+
+class TashiException(Exception):
+	def __init__(self, d=None):
+		self.errno = None
+		self.msg = None
+		if isinstance(d, dict):
+			if 'errno' in d:
+				self.errno = d['errno']
+			if 'msg' in d:
+				self.msg = d['msg']
+
+	def __str__(self): 
+		return str(self.__dict__)
+
+	def __repr__(self): 
+		return repr(self.__dict__)
+
+	def __eq__(self, other):
+		return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+	def __ne__(self, other):
+		return not (self == other)
+
+class Host(object):
+	def __init__(self, d=None):
+		self.id = None
+		self.name = None
+		self.up = None
+		self.decayed = None
+		self.state = None
+		self.memory = None
+		self.cores = None
+		self.version = None
+		if isinstance(d, dict):
+			if 'id' in d:
+				self.id = d['id']
+			if 'name' in d:
+				self.name = d['name']
+			if 'up' in d:
+				self.up = d['up']
+			if 'decayed' in d:
+				self.decayed = d['decayed']
+			if 'state' in d:
+				self.state = d['state']
+			if 'memory' in d:
+				self.memory = d['memory']
+			if 'cores' in d:
+				self.cores = d['cores']
+			if 'version' in d:
+				self.version = d['version']
+
+	def __str__(self): 
+		return str(self.__dict__)
+
+	def __repr__(self): 
+		return repr(self.__dict__)
+
+	def __eq__(self, other):
+		return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+	def __ne__(self, other):
+		return not (self == other)
+
+class Network(object):
+	def __init__(self, d=None):
+		self.id = None
+		self.name = None
+		if isinstance(d, dict):
+			if 'id' in d:
+				self.id = d['id']
+			if 'name' in d:
+				self.name = d['name']
+
+	def __str__(self): 
+		return str(self.__dict__)
+
+	def __repr__(self): 
+		return repr(self.__dict__)
+
+	def __eq__(self, other):
+		return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+	def __ne__(self, other):
+		return not (self == other)
+
+class User(object):
+	def __init__(self, d=None):
+		self.id = None
+		self.name = None
+		self.passwd = None
+		if isinstance(d, dict):
+			if 'id' in d:
+				self.id = d['id']
+			if 'name' in d:
+				self.name = d['name']
+			if 'passwd' in d:
+				self.passwd = d['passwd']
+
+	def __str__(self): 
+		return str(self.__dict__)
+
+	def __repr__(self): 
+		return repr(self.__dict__)
+
+	def __eq__(self, other):
+		return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+	def __ne__(self, other):
+		return not (self == other)
+
+class DiskConfiguration(object):
+	def __init__(self, d=None):
+		self.uri = None
+		self.persistent = None
+		if isinstance(d, dict):
+			if 'uri' in d:
+				self.uri = d['uri']
+			if 'persistent' in d:
+				self.persistent = d['persistent']
+
+	def __str__(self): 
+		return str(self.__dict__)
+
+	def __repr__(self): 
+		return repr(self.__dict__)
+
+	def __eq__(self, other):
+		return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+	def __ne__(self, other):
+		return not (self == other)
+
+class NetworkConfiguration(object):
+	def __init__(self, d=None):
+		self.network = None
+		self.mac = None
+		self.ip = None
+		if isinstance(d, dict):
+			if 'network' in d:
+				self.network = d['network']
+			if 'mac' in d:
+				self.mac = d['mac']
+			if 'ip' in d:
+				self.ip = d['ip']
+
+	def __str__(self): 
+		return str(self.__dict__)
+
+	def __repr__(self): 
+		return repr(self.__dict__)
+
+	def __eq__(self, other):
+		return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+	def __ne__(self, other):
+		return not (self == other)
+
+class Instance(object):
+	def __init__(self, d=None):
+		self.id = None
+		self.vmId = None
+		self.hostId = None
+		self.decayed = None
+		self.state = None
+		self.userId = None
+		self.name = None
+		self.cores = None
+		self.memory = None
+		self.disks = None
+		#Quick fix so self.nics is not None
+		self.nics = []
+		self.hints = None
+		if isinstance(d, dict):
+			if 'id' in d:
+				self.id = d['id']
+			if 'vmId' in d:
+				self.vmId = d['vmId']
+			if 'hostId' in d:
+				self.hostId = d['hostId']
+			if 'decayed' in d:
+				self.decayed = d['decayed']
+			if 'state' in d:
+				self.state = d['state']
+			if 'userId' in d:
+				self.userId = d['userId']
+			if 'name' in d:
+				self.name = d['name']
+			if 'cores' in d:
+				self.cores = d['cores']
+			if 'memory' in d:
+				self.memory = d['memory']
+			if 'disks' in d:
+				self.disks = d['disks']
+			if 'nics' in d:
+				self.nics = d['nics']
+			if 'hints' in d:
+				self.hints = d['hints']
+
+	def __str__(self): 
+		return str(self.__dict__)
+
+	def __repr__(self): 
+		return repr(self.__dict__)
+
+	def __eq__(self, other):
+		return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+	def __ne__(self, other):
+		return not (self == other)
+

Modified: incubator/tashi/trunk/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/util.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/util.py (original)
+++ incubator/tashi/trunk/src/tashi/util.py Mon Jul 20 16:41:15 2009
@@ -26,13 +26,11 @@
 import time
 import traceback
 import types
+import getpass
 
-from thrift.transport.TSocket import TServerSocket, TSocket
-from thrift.server.TServer import TThreadedServer
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from tashi.services import clustermanagerservice
-from tashi.services.ttypes import TashiException, Errors, InstanceState, HostState
+import rpyc
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
 
 def broken(oldFunc):
 	"""Decorator that is used to mark a function as temporarily broken"""
@@ -269,14 +267,19 @@
 	host = os.getenv('TASHI_CM_HOST', cfgHost)
 	port = os.getenv('TASHI_CM_PORT', cfgPort)
 	timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
-	socket = TSocket(host, int(port))
-	socket.setTimeout(timeout)
-	transport = TBufferedTransport(socket)
-	protocol = TBinaryProtocol(transport)
-	client = clustermanagerservice.Client(protocol)
-	transport.open()
-	client._transport = transport
-	return (client, transport)
+
+	authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+	if authAndEncrypt:
+		username = config.get('AccessClusterManager', 'username')
+		if username == '':
+			username = raw_input('Enter Username:')
+		password = config.get('AccessClusterManager', 'password')
+		if password == '':
+			password = getpass.getpass('Enter Password:')
+		client = rpycservices.client(host, port, username=username, password=password)
+	else:
+		client = rpycservices.client(host, port)
+	return client
 
 def enumToStringDict(cls):
 	d = {}