You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by mr...@apache.org on 2009/07/20 16:41:15 UTC
svn commit: r795906 - in /incubator/tashi/trunk: etc/ src/tashi/
src/tashi/agents/ src/tashi/client/ src/tashi/clustermanager/
src/tashi/clustermanager/data/ src/tashi/nodemanager/
src/tashi/nodemanager/vmcontrol/ src/tashi/rpycservices/
Author: mryan3
Date: Mon Jul 20 16:41:15 2009
New Revision: 795906
URL: http://svn.apache.org/viewvc?rev=795906&view=rev
Log:
Michael Wang's rpyc patch from tashi-dev@i.a.o
Added:
incubator/tashi/trunk/src/tashi/rpycservices/
incubator/tashi/trunk/src/tashi/rpycservices/__init__.py
incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py
Modified:
incubator/tashi/trunk/etc/TashiDefaults.cfg
incubator/tashi/trunk/src/tashi/agents/dhcpdns.py
incubator/tashi/trunk/src/tashi/agents/instancehook.py
incubator/tashi/trunk/src/tashi/agents/primitive.py
incubator/tashi/trunk/src/tashi/client/tashi-client.py
incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
incubator/tashi/trunk/src/tashi/connectionmanager.py
incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
incubator/tashi/trunk/src/tashi/util.py
Modified: incubator/tashi/trunk/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/etc/TashiDefaults.cfg?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/trunk/etc/TashiDefaults.cfg Mon Jul 20 16:41:15 2009
@@ -15,6 +15,27 @@
# specific language governing permissions and limitations
# under the License.
+[Security]
+authAndEncrypt = False
+
+[AccessClusterManager]
+#If username and password are left empty, user will be prompted for username and password on the command line.
+username = nodemanager
+password = nodemanager
+
+[AccessNodeManager]
+#If username and password are left empty, user will be prompted for username and password on the command line.
+username = clustermanager
+password = clustermanager
+
+[AllowedUsers]
+nodeManagerUser = nodemanager
+nodeManagerPassword = nodemanager
+agentUser = agent
+agentPassword = agent
+clusterManagerUser = clustermanager
+clusterManagerPassword = clustermanager
+
# ClusterManager portion
[ClusterManager]
service = tashi.clustermanager.ClusterManagerService
Modified: incubator/tashi/trunk/src/tashi/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/dhcpdns.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/dhcpdns.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/dhcpdns.py Mon Jul 20 16:41:15 2009
@@ -23,7 +23,7 @@
from tashi import boolean
class DhcpDns(InstanceHook):
- def __init__(self, config, client, transport, post=False):
+ def __init__(self, config, client, post=False):
InstanceHook.__init__(self, config, client, post)
self.dnsKeyFile = self.config.get('DhcpDns', 'dnsKeyFile')
self.dnsServer = self.config.get('DhcpDns', 'dnsServer')
Modified: incubator/tashi/trunk/src/tashi/agents/instancehook.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/instancehook.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/instancehook.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/instancehook.py Mon Jul 20 16:41:15 2009
@@ -18,12 +18,11 @@
# under the License.
class InstanceHook(object):
- def __init__(self, config, client, transport, post=False):
+ def __init__(self, config, client, post=False):
if (self.__class__ is InstanceHook):
raise NotImplementedError
self.config = config
self.client = client
- self.transport = transport
self.post = post
def preCreate(self, instance):
Modified: incubator/tashi/trunk/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/agents/primitive.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/trunk/src/tashi/agents/primitive.py Mon Jul 20 16:41:15 2009
@@ -25,15 +25,14 @@
import time
import logging.config
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.util import getConfig, createClient, instantiateImplementation, boolean
import tashi
class Primitive(object):
- def __init__(self, config, client, transport):
+ def __init__(self, config, client):
self.config = config
self.client = client
- self.transport = transport
self.hooks = []
self.log = logging.getLogger(__file__)
self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -45,7 +44,7 @@
name = name.lower()
if (name.startswith("hook")):
try:
- self.hooks.append(instantiateImplementation(value, config, client, transport, False))
+ self.hooks.append(instantiateImplementation(value, config, client, False))
except:
self.log.exception("Failed to load hook %s" % (value))
@@ -53,9 +52,6 @@
oldInstances = {}
while True:
try:
- # Make sure transport is open
- if (not self.transport.isOpen()):
- self.transport.open()
# Generate a list of VMs/host
hosts = {}
load = {}
@@ -120,26 +116,18 @@
time.sleep(self.scheduleDelay)
except TashiException, e:
self.log.exception("Tashi exception")
- try:
- self.transport.close()
- except Exception, e:
- self.log.exception("Failed to close the transport")
time.sleep(self.scheduleDelay)
except Exception, e:
self.log.exception("General exception")
- try:
- self.transport.close()
- except Exception, e:
- self.log.exception("Failed to close the transport")
time.sleep(self.scheduleDelay)
def main():
(config, configFiles) = getConfig(["Agent"])
publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
tashi.publisher = publisher
- (client, transport) = createClient(config)
+ client = createClient(config)
logging.config.fileConfig(configFiles)
- agent = Primitive(config, client, transport)
+ agent = Primitive(config, client)
agent.start()
if __name__ == "__main__":
Modified: incubator/tashi/trunk/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/client/tashi-client.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/trunk/src/tashi/client/tashi-client.py Mon Jul 20 16:41:15 2009
@@ -21,12 +21,7 @@
import random
import sys
import types
-from tashi.services.ttypes import *
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport, TTransportException
-from thrift.transport.TSocket import TSocket
-
-from tashi.services import clustermanagerservice
+from tashi.rpycservices.rpyctypes import *
from tashi import vmStates, hostStates, boolean, getConfig, stringPartition, createClient
users = {}
@@ -46,7 +41,10 @@
def getUser():
fetchUsers()
- userStr = os.getenv("USER", "unknown")
+ if client.username != None:
+ userStr = client.username
+ else:
+ userStr = os.getenv("USER", "unknown")
for user in users:
if (users[user].name == userStr):
return users[user].id
@@ -66,7 +64,7 @@
raise ValueError("Unknown instance %s" % (str(instance)))
for instance in instances:
if (instance.id == instanceId):
- if (instance.userId != userId and instance.userId != None):
+ if (instance.userId != userId and instance.userId != None and userId != 0):
raise ValueError("You don't own that VM")
return instanceId
@@ -461,7 +459,7 @@
usage(function)
try:
vals = {}
- (client, transport) = createClient(config)
+ client = createClient(config)
for parg in possibleArgs:
(parg, conv, default, required) = parg
val = None
@@ -498,8 +496,6 @@
print "TashiException:"
print e.msg
exitCode = e.errno
- except TTransportException, e:
- print e
except Exception, e:
print e
usage(function)
Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanager.py Mon Jul 20 16:41:15 2009
@@ -24,27 +24,41 @@
import logging.config
from getopt import getopt, GetoptError
from ConfigParser import ConfigParser
-from thrift.transport.TSocket import TServerSocket
-from thrift.server.TServer import TThreadedServer
-from tashi.services import clustermanagerservice
from tashi.util import signalHandler, boolean, instantiateImplementation, getConfig, debugConsole
import tashi
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import VdbAuthenticator
+
def startClusterManager(config):
global service, data
dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
data = instantiateImplementation(config.get("ClusterManager", "data"), config)
service = instantiateImplementation(config.get("ClusterManager", "service"), config, data, dfs)
- processor = clustermanagerservice.Processor(service)
- transport = TServerSocket(int(config.get('ClusterManagerService', 'port')))
- server = TThreadedServer(processor, transport)
-
+
+ if boolean(config.get("Security", "authAndEncrypt")):
+ users = {}
+ userDatabase = data.getUsers()
+ for user in userDatabase.values():
+ if user.passwd != None:
+ users[user.name] = user.passwd
+ users[config.get('AllowedUsers', 'nodeManagerUser')] = config.get('AllowedUsers', 'nodeManagerPassword')
+ users[config.get('AllowedUsers', 'agentUser')] = config.get('AllowedUsers', 'agentPassword')
+ authenticator = VdbAuthenticator.from_dict(users)
+ t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False, authenticator=authenticator)
+ else:
+ t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False)
+ t.logger.quiet = True
+ t.service.service = service
+ t.service._type = 'ClusterManagerService'
+
debugConsole(globals())
try:
- server.serve()
+ t.start()
except KeyboardInterrupt:
handleSIGTERM(signal.SIGTERM, None)
Modified: incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/clustermanagerservice.py Mon Jul 20 16:41:15 2009
@@ -18,28 +18,28 @@
from datetime import datetime
from random import randint
from socket import gethostname
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
import logging
import threading
import time
-from tashi.services.ttypes import Errors, InstanceState, HostState, TashiException
-from tashi.services import nodemanagerservice
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, HostState, TashiException
from tashi import boolean, convertExceptions, ConnectionManager, vmStates, timed, version, scrubString
-def RPC(oldFunc):
- return convertExceptions(oldFunc)
-
class ClusterManagerService(object):
"""RPC service for the ClusterManager"""
def __init__(self, config, data, dfs):
self.config = config
self.data = data
+ self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+ if self.authAndEncrypt:
+ self.username = config.get('AccessNodeManager', 'username')
+ self.password = config.get('AccessNodeManager', 'password')
+ else:
+ self.username = None
+ self.password = None
+ self.proxy = ConnectionManager(self.username, self.password, int(self.config.get('ClusterManager', 'nodeManagerPort')))
self.dfs = dfs
- self.proxy = ConnectionManager(nodemanagerservice.Client, int(self.config.get('ClusterManager', 'nodeManagerPort')))
self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
self.log = logging.getLogger(__name__)
self.lastContacted = {}
@@ -181,7 +181,6 @@
del instance.hints[hint]
return instance
- @RPC
def createVm(self, instance):
"""Function to add a VM to the list of pending VMs"""
instance = self.normalize(instance)
@@ -189,7 +188,6 @@
self.data.releaseInstance(instance)
return instance
- @RPC
def shutdownVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Running, InstanceState.ShuttingDown)
@@ -202,7 +200,6 @@
raise
return
- @RPC
def destroyVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
if (instance.state is InstanceState.Pending or instance.state is InstanceState.Held):
@@ -221,7 +218,6 @@
raise
return
- @RPC
def suspendVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Running, InstanceState.Suspending)
@@ -235,7 +231,6 @@
raise TashiException(d={'errno':Errors.UnableToSuspend, 'msg':'Failed to suspend %s' % (instance.name)})
return
- @RPC
def resumeVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Suspended, InstanceState.Pending)
@@ -244,7 +239,6 @@
self.data.releaseInstance(instance)
return instance
- @RPC
def migrateVm(self, instanceId, targetHostId):
instance = self.data.acquireInstance(instanceId)
try:
@@ -285,7 +279,6 @@
raise
return
- @RPC
def pauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Running, InstanceState.Pausing)
@@ -301,7 +294,6 @@
self.data.releaseInstance(instance)
return
- @RPC
def unpauseVm(self, instanceId):
instance = self.data.acquireInstance(instanceId)
self.stateTransition(instance, InstanceState.Paused, InstanceState.Unpausing)
@@ -317,23 +309,18 @@
self.data.releaseInstance(instance)
return
- @RPC
def getHosts(self):
return self.data.getHosts().values()
- @RPC
def getNetworks(self):
return self.data.getNetworks().values()
- @RPC
def getUsers(self):
return self.data.getUsers().values()
- @RPC
def getInstances(self):
return self.data.getInstances().values()
- @RPC
def vmmSpecificCall(self, instanceId, arg):
instance = self.data.getInstance(instanceId)
hostname = self.data.getHost(instance.hostId).name
@@ -345,7 +332,6 @@
return res
# @timed
- @RPC
def registerNodeManager(self, host, instances):
"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
if (host.id == None):
@@ -408,7 +394,6 @@
self.data.releaseHost(oldHost)
return host.id
- @RPC
def vmUpdate(self, instanceId, instance, oldState):
try:
oldInstance = self.data.acquireInstance(instanceId)
@@ -451,7 +436,6 @@
self.data.releaseInstance(oldInstance)
return
- @RPC
def activateVm(self, instanceId, host):
dataHost = self.data.acquireHost(host.id)
if (dataHost.name != host.name):
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/fromconfig.py Mon Jul 20 16:41:15 2009
@@ -17,7 +17,7 @@
import threading
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.clustermanager.data import DataInterface
class FromConfig(DataInterface):
@@ -128,7 +128,7 @@
self.releaseLock(host._lock)
def getHosts(self):
- return self.hosts
+ return self.cleanHosts()
def getHost(self, id):
host = self.hosts.get(id, None)
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/getentoverride.py Mon Jul 20 16:41:15 2009
@@ -17,7 +17,7 @@
import subprocess
import time
-from tashi.services.ttypes import User
+from tashi.rpycservices.rpyctypes import User
from tashi.clustermanager.data import DataInterface
from tashi.util import instantiateImplementation
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/ldapoverride.py Mon Jul 20 16:41:15 2009
@@ -17,7 +17,7 @@
import subprocess
import time
-from tashi.services.ttypes import User
+from tashi.rpycservices.rpyctypes import User
from tashi.clustermanager.data import DataInterface
from tashi.util import instantiateImplementation
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/pickled.py Mon Jul 20 16:41:15 2009
@@ -18,7 +18,7 @@
import cPickle
import os
import threading
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.clustermanager.data import FromConfig, DataInterface
class Pickled(FromConfig):
Modified: incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/trunk/src/tashi/clustermanager/data/sql.py Mon Jul 20 16:41:15 2009
@@ -19,7 +19,7 @@
import threading
import time
import types
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.clustermanager.data.datainterface import DataInterface
from tashi.util import stringPartition, boolean
@@ -74,7 +74,7 @@
self.executeStatement("CREATE TABLE IF NOT EXISTS instances (id int(11) NOT NULL, vmId int(11), hostId int(11), decayed tinyint(1) NOT NULL, state int(11) NOT NULL, userId int(11), name varchar(256), cores int(11) NOT NULL, memory int(11) NOT NULL, disks varchar(1024) NOT NULL, nics varchar(1024) NOT NULL, hints varchar(1024) NOT NULL)")
self.executeStatement("CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY, name varchar(256) NOT NULL, up tinyint(1) DEFAULT 0, decayed tinyint(1) DEFAULT 0, state int(11) DEFAULT 1, memory int(11), cores int(11), version varchar(256))")
self.executeStatement("CREATE TABLE IF NOT EXISTS networks (id int(11) NOT NULL, name varchar(256) NOT NULL)")
- self.executeStatement("CREATE TABLE IF NOT EXISTS users (id int(11) NOT NULL, name varchar(256) NOT NULL)")
+ self.executeStatement("CREATE TABLE IF NOT EXISTS users (id int(11) NOT NULL, name varchar(256) NOT NULL, passwd varchar(256))")
def sanitizeForSql(self, s):
if (s == '"True"'):
@@ -247,12 +247,12 @@
res = cur.fetchall()
users = {}
for r in res:
- user = User(d={'id':r[0], 'name':r[1]})
+ user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
users[user.id] = user
return users
def getUser(self, id):
cur = self.executeStatement("SELECT * FROM users WHERE id = %d" % (id))
r = cur.fetchone()
- user = User(d={'id':r[0], 'name':r[1]})
+ user = User(d={'id':r[0], 'name':r[1], 'passwd':r[2]})
return user
Modified: incubator/tashi/trunk/src/tashi/connectionmanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/connectionmanager.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/connectionmanager.py (original)
+++ incubator/tashi/trunk/src/tashi/connectionmanager.py Mon Jul 20 16:41:15 2009
@@ -15,49 +15,21 @@
# specific language governing permissions and limitations
# under the License.
-from thrift.transport.TSocket import TSocket, socket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
+import rpyc
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import *
class ConnectionManager(object):
- def __init__(self, clientClass, port, timeout=10000.0):
- self.clientClass = clientClass
+ def __init__(self, username, password, port, timeout=10000.0):
+ self.username = username
+ self.password = password
self.timeout = timeout
self.port = port
- class anonClass(object):
- def __init__(self, clientObject):
- self.co = clientObject
-
- def __getattr__(self, name):
- if (name.startswith("_")):
- return self.__dict__[name]
- def connectWrap(*args, **kw):
- if (not self.co._iprot.trans.isOpen()):
- self.co._iprot.trans.open()
- try:
- res = getattr(self.co, name)(*args, **kw)
- except socket.error, e:
- # Force a close for the case of a "Broken pipe"
-# print "Forced a socket close"
- self.co._iprot.trans.close()
- self.co._iprot.trans.open()
- res = getattr(self.co, name)(*args, **kw)
- self.co._iprot.trans.close()
- raise
- self.co._iprot.trans.close()
- return res
- return connectWrap
-
def __getitem__(self, hostname):
port = self.port
if len(hostname) == 2:
port = hostname[1]
hostname = hostname[0]
- socket = TSocket(hostname, port)
- socket.setTimeout(self.timeout)
- transport = TBufferedTransport(socket)
- protocol = TBinaryProtocol(transport)
- client = self.clientClass(protocol)
- client.__transport__ = transport
- return self.anonClass(client)
+
+ return rpycservices.client(hostname, port, username=self.username, password=self.password)
Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanager.py Mon Jul 20 16:41:15 2009
@@ -20,13 +20,15 @@
import logging.config
import signal
import sys
-from thrift.transport.TSocket import TServerSocket
-from thrift.server.TServer import TThreadedServer
from tashi.util import instantiateImplementation, getConfig, debugConsole, signalHandler
-from tashi.services import nodemanagerservice, clustermanagerservice
from tashi import ConnectionManager
import tashi
+from tashi import boolean
+
+from tashi.rpycservices import rpycservices
+from rpyc.utils.server import ThreadedServer
+from rpyc.utils.authenticators import VdbAuthenticator
@signalHandler(signal.SIGTERM)
def handleSIGTERM(signalNumber, stackFrame):
@@ -45,13 +47,22 @@
vmm = instantiateImplementation(config.get("NodeManager", "vmm"), config, dfs, None)
service = instantiateImplementation(config.get("NodeManager", "service"), config, vmm)
vmm.nm = service
- processor = nodemanagerservice.Processor(service)
- transport = TServerSocket(int(config.get('NodeManagerService', 'port')))
- server = TThreadedServer(processor, transport)
+
+ if boolean(config.get("Security", "authAndEncrypt")):
+ users = {}
+ users[config.get('AllowedUsers', 'clusterManagerUser')] = config.get('AllowedUsers', 'clusterManagerPassword')
+ authenticator = VdbAuthenticator.from_dict(users)
+ t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False, authenticator=authenticator)
+ else:
+ t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False)
+ t.logger.quiet = True
+ t.service.service = service
+ t.service._type = 'NodeManagerService'
+
debugConsole(globals())
try:
- server.serve()
+ t.start()
except KeyboardInterrupt:
handleSIGTERM(signal.SIGTERM, None)
except Exception, e:
Modified: incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/nodemanagerservice.py Mon Jul 20 16:41:15 2009
@@ -22,16 +22,13 @@
import sys
import threading
import time
-from thrift.transport.TSocket import TSocket
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from tashi.services.ttypes import Host, HostState, InstanceState, TashiException, Errors, Instance
-from tashi.services import clustermanagerservice
+from tashi.rpycservices.rpyctypes import Host, HostState, InstanceState, TashiException, Errors, Instance
from tashi.nodemanager import RPC
from tashi import boolean, vmStates, logged, ConnectionManager, timed
import tashi
+
class NodeManagerService(object):
"""RPC handler for the NodeManager
@@ -43,6 +40,13 @@
self.vmm = vmm
self.cmHost = config.get("NodeManagerService", "clusterManagerHost")
self.cmPort = int(config.get("NodeManagerService", "clusterManagerPort"))
+ self.authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+ if self.authAndEncrypt:
+ self.username = config.get('AccessClusterManager', 'username')
+ self.password = config.get('AccessClusterManager', 'password')
+ else:
+ self.username = None
+ self.password = None
self.log = logging.getLogger(__file__)
self.convertExceptions = boolean(config.get('NodeManagerService', 'convertExceptions'))
self.registerFrequency = float(config.get('NodeManagerService', 'registerFrequency'))
@@ -84,7 +88,7 @@
self.log.exception('Failed to save VM info to %s' % (self.infoFile))
def vmStateChange(self, vmId, old, cur):
- cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
instance = self.getInstance(vmId)
if (old and instance.state != old):
self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
@@ -103,7 +107,7 @@
return True
def backupVmInfoAndFlushNotifyCM(self):
- cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
while True:
start = time.time()
try:
@@ -135,7 +139,7 @@
time.sleep(toSleep)
def registerWithClusterManager(self):
- cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
while True:
start = time.time()
try:
@@ -154,7 +158,6 @@
raise TashiException(d={'errno':Errors.NoSuchVmId,'msg':"There is no vmId %d on this host" % (vmId)})
return instance
- @RPC
def instantiateVm(self, instance):
vmId = self.vmm.instantiateVm(instance)
instance.vmId = vmId
@@ -162,7 +165,6 @@
self.instances[vmId] = instance
return vmId
- @RPC
def suspendVm(self, vmId, destination):
instance = self.getInstance(vmId)
instance.state = InstanceState.Suspending
@@ -173,7 +175,7 @@
instance.state = InstanceState.Running
newInstance = Instance(d={'id':instance.id,'state':instance.state})
success = lambda: None
- cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
try:
cm.vmUpdate(newInstance.id, newInstance, InstanceState.Resuming)
except Exception, e:
@@ -182,7 +184,6 @@
else:
success()
- @RPC
def resumeVm(self, instance, name):
instance.state = InstanceState.Resuming
instance.hostId = self.id
@@ -195,7 +196,6 @@
raise TashiException(d={'errno':Errors.UnableToResume,'msg':"resumeVm failed on the node manager"})
return instance.vmId
- @RPC
def prepReceiveVm(self, instance, source):
instance.state = InstanceState.MigratePrep
instance.vmId = -1
@@ -206,7 +206,6 @@
self.vmm.migrateVm(instance.vmId, target.name, transportCookie)
del self.instances[instance.vmId]
- @RPC
def migrateVm(self, vmId, target, transportCookie):
instance = self.getInstance(vmId)
instance.state = InstanceState.MigrateTrans
@@ -214,7 +213,7 @@
return
def receiveVmHelper(self, instance, transportCookie):
- cm = ConnectionManager(clustermanagerservice.Client, self.cmPort)[self.cmHost]
+ cm = ConnectionManager(self.username, self.password, self.cmPort)[self.cmHost]
vmId = self.vmm.receiveVm(transportCookie)
instance.state = InstanceState.Running
instance.hostId = self.id
@@ -230,48 +229,40 @@
else:
success()
- @RPC
def receiveVm(self, instance, transportCookie):
instance.state = InstanceState.MigrateTrans
threading.Thread(target=self.receiveVmHelper, args=(instance, transportCookie)).start()
return
- @RPC
def pauseVm(self, vmId):
instance = self.getInstance(vmId)
instance.state = InstanceState.Pausing
self.vmm.pauseVm(vmId)
instance.state = InstanceState.Paused
- @RPC
def unpauseVm(self, vmId):
instance = self.getInstance(vmId)
instance.state = InstanceState.Unpausing
self.vmm.unpauseVm(vmId)
instance.state = InstanceState.Running
- @RPC
def shutdownVm(self, vmId):
instance = self.getInstance(vmId)
instance.state = InstanceState.ShuttingDown
self.vmm.shutdownVm(vmId)
- @RPC
def destroyVm(self, vmId):
instance = self.getInstance(vmId)
instance.state = InstanceState.Destroying
self.vmm.destroyVm(vmId)
- @RPC
def getVmInfo(self, vmId):
instance = self.getInstance(vmId)
return instance
- @RPC
def vmmSpecificCall(self, vmId, arg):
return self.vmm.vmmSpecificCall(vmId, arg)
- @RPC
def listVms(self):
return self.instances.keys()
Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/qemu.py Mon Jul 20 16:41:15 2009
@@ -27,7 +27,7 @@
import sys
import time
-from tashi.services.ttypes import *
+from tashi.rpycservices.rpyctypes import *
from tashi.util import broken, logged, scrubString, boolean
from tashi import version, stringPartition
from vmcontrolinterface import VmControlInterface
Modified: incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py (original)
+++ incubator/tashi/trunk/src/tashi/nodemanager/vmcontrol/xenpv.py Mon Jul 20 16:41:15 2009
@@ -25,8 +25,8 @@
import socket
from vmcontrolinterface import VmControlInterface
-from tashi.services.ttypes import Errors, InstanceState, TashiException
-from tashi.services.ttypes import Instance, Host
+from tashi.rpycservices.rpyctypes import Errors, InstanceState, TashiException
+from tashi.rpycservices.rpyctypes import Instance, Host
from tashi import boolean, convertExceptions, ConnectionManager, version
from tashi.util import isolatedRPC, broken
Added: incubator/tashi/trunk/src/tashi/rpycservices/__init__.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/__init__.py?rev=795906&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/__init__.py (added)
+++ incubator/tashi/trunk/src/tashi/rpycservices/__init__.py Mon Jul 20 16:41:15 2009
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import rpyc
Added: incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py?rev=795906&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py (added)
+++ incubator/tashi/trunk/src/tashi/rpycservices/rpycservices.py Mon Jul 20 16:41:15 2009
@@ -0,0 +1,100 @@
+import rpyc
+from tashi.rpycservices.rpyctypes import *
+import cPickle
+
+clusterManagerRPCs = ['createVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'migrateVm', 'pauseVm', 'unpauseVm', 'getHosts', 'getNetworks', 'getUsers', 'getInstances', 'vmmSpecificCall', 'registerNodeManager', 'vmUpdate', 'activateVm']
+nodeManagerRPCs = ['instantiateVm', 'shutdownVm', 'destroyVm', 'suspendVm', 'resumeVm', 'prepReceiveVm', 'migrateVm', 'receiveVm', 'pauseVm', 'unpauseVm', 'getVmInfo', 'listVms', 'vmmSpecificCall', 'getHostInfo']
+
+def clean(args):
+ """Cleans the object so cPickle can be used."""
+ if isinstance(args, list) or isinstance(args, tuple):
+ cleanArgs = []
+ for arg in args:
+ cleanArgs.append(clean(arg))
+ if isinstance(args, tuple):
+ cleanArgs = tuple(cleanArgs)
+ return cleanArgs
+ if isinstance(args, Instance):
+ return Instance(args.__dict__)
+ if isinstance(args, Host):
+ return Host(args.__dict__)
+ if isinstance(args, User):
+ user = User(args.__dict__)
+ user.passwd = None
+ return user
+ return args
+
+class client():
+ def __init__(self, host, port, username=None, password=None):
+ """Client for ManagerService. If username and password are provided, rpyc.tls_connect will be used to connect, else rpyc.connect will be used."""
+ self.host = host
+ self.port = int(port)
+ self.username = username
+ self.password = password
+ self.conn = self.createConn()
+
+ def createConn(self):
+ """Creates a rpyc connection."""
+ if self.username != None and self.password != None:
+ return rpyc.tls_connect(host=self.host, port=self.port, username=self.username, password=self.password)
+ else:
+ return rpyc.connect(host=self.host, port=self.port)
+
+ def __getattr__(self, name):
+ """Returns a function that makes the RPC call. No keyword arguments allowed when calling this function."""
+ if self.conn.closed == True:
+ self.conn = self.createConn()
+ if name not in clusterManagerRPCs and name not in nodeManagerRPCs:
+ return None
+ def connectWrap(*args):
+ args = cPickle.dumps(clean(args))
+ try:
+ res = getattr(self.conn.root, name)(args)
+ except Exception, e:
+ self.conn.close()
+ raise e
+ res = cPickle.loads(res)
+ if isinstance(res, Exception):
+ raise res
+ return res
+ return connectWrap
+
+class ManagerService(rpyc.Service):
+ """Wrapper for rpyc service"""
+ # Note: self.service and self._type are set before rpyc.utils.server.ThreadedServer is started.
+ def checkValidUser(self, functionName, clientUsername, args):
+ """Checks whether the operation requested by the user is valid based on clientUsername. An exception will be thrown if not valid."""
+ if self._type == 'NodeManagerService':
+ return
+ if clientUsername in ['nodeManager', 'agent', 'root']:
+ return
+ if functionName in ['destroyVm', 'shutdownVm', 'pauseVm', 'vmmSpecificCall', 'suspendVm', 'unpauseVm', 'migrateVm', 'resumeVm']:
+ instanceId = args[0]
+ instance = self.service.data.getInstance(instanceId)
+ instanceUsername = self.service.data.getUser(instance.userId).name
+ if clientUsername != instanceUsername:
+ raise Exception('Permission Denied: %s cannot perform %s on VM owned by %s' % (clientUsername, functionName, instanceUsername))
+ return
+
+ def _rpyc_getattr(self, name):
+ """Returns the RPC corresponding to the function call"""
+ def makeCall(args):
+ args = cPickle.loads(args)
+ if self._conn._config['credentials'] != None:
+ try:
+ self.checkValidUser(makeCall._name, self._conn._config['credentials'], args)
+ except Exception, e:
+ e = cPickle.dumps(clean(e))
+ return e
+ try:
+ res = getattr(self.service, makeCall._name)(*args)
+ except Exception, e:
+ res = e
+ res = cPickle.dumps(clean(res))
+ return res
+ makeCall._name = name
+ if self._type == 'ClusterManagerService' and name in clusterManagerRPCs:
+ return makeCall
+ if self._type == 'NodeManagerService' and name in nodeManagerRPCs:
+ return makeCall
+ raise AttributeError('RPC does not exist')
Added: incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py?rev=795906&view=auto
==============================================================================
--- incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py (added)
+++ incubator/tashi/trunk/src/tashi/rpycservices/rpyctypes.py Mon Jul 20 16:41:15 2009
@@ -0,0 +1,247 @@
+class Errors(object):
+ ConvertedException = 1
+ NoSuchInstanceId = 2
+ NoSuchVmId = 3
+ IncorrectVmState = 4
+ NoSuchHost = 5
+ NoSuchHostId = 6
+ InstanceIdAlreadyExists = 7
+ HostNameMismatch = 8
+ HostNotUp = 9
+ HostStateError = 10
+ InvalidInstance = 11
+ UnableToResume = 12
+ UnableToSuspend = 13
+
+class InstanceState(object):
+ Pending = 1
+ Activating = 2
+ Running = 3
+ Pausing = 4
+ Paused = 5
+ Unpausing = 6
+ Suspending = 7
+ Resuming = 8
+ MigratePrep = 9
+ MigrateTrans = 10
+ ShuttingDown = 11
+ Destroying = 12
+ Orphaned = 13
+ Held = 14
+ Exited = 15
+ Suspended = 16
+
+class HostState(object):
+ Normal = 1
+ Drained = 2
+ VersionMismatch = 3
+
+class TashiException(Exception):
+ def __init__(self, d=None):
+ self.errno = None
+ self.msg = None
+ if isinstance(d, dict):
+ if 'errno' in d:
+ self.errno = d['errno']
+ if 'msg' in d:
+ self.msg = d['msg']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Host(object):
+ def __init__(self, d=None):
+ self.id = None
+ self.name = None
+ self.up = None
+ self.decayed = None
+ self.state = None
+ self.memory = None
+ self.cores = None
+ self.version = None
+ if isinstance(d, dict):
+ if 'id' in d:
+ self.id = d['id']
+ if 'name' in d:
+ self.name = d['name']
+ if 'up' in d:
+ self.up = d['up']
+ if 'decayed' in d:
+ self.decayed = d['decayed']
+ if 'state' in d:
+ self.state = d['state']
+ if 'memory' in d:
+ self.memory = d['memory']
+ if 'cores' in d:
+ self.cores = d['cores']
+ if 'version' in d:
+ self.version = d['version']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Network(object):
+ def __init__(self, d=None):
+ self.id = None
+ self.name = None
+ if isinstance(d, dict):
+ if 'id' in d:
+ self.id = d['id']
+ if 'name' in d:
+ self.name = d['name']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class User(object):
+ def __init__(self, d=None):
+ self.id = None
+ self.name = None
+ self.passwd = None
+ if isinstance(d, dict):
+ if 'id' in d:
+ self.id = d['id']
+ if 'name' in d:
+ self.name = d['name']
+ if 'passwd' in d:
+ self.passwd = d['passwd']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class DiskConfiguration(object):
+ def __init__(self, d=None):
+ self.uri = None
+ self.persistent = None
+ if isinstance(d, dict):
+ if 'uri' in d:
+ self.uri = d['uri']
+ if 'persistent' in d:
+ self.persistent = d['persistent']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class NetworkConfiguration(object):
+ def __init__(self, d=None):
+ self.network = None
+ self.mac = None
+ self.ip = None
+ if isinstance(d, dict):
+ if 'network' in d:
+ self.network = d['network']
+ if 'mac' in d:
+ self.mac = d['mac']
+ if 'ip' in d:
+ self.ip = d['ip']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Instance(object):
+ def __init__(self, d=None):
+ self.id = None
+ self.vmId = None
+ self.hostId = None
+ self.decayed = None
+ self.state = None
+ self.userId = None
+ self.name = None
+ self.cores = None
+ self.memory = None
+ self.disks = None
+ #Quick fix so self.nics is not None
+ self.nics = []
+ self.hints = None
+ if isinstance(d, dict):
+ if 'id' in d:
+ self.id = d['id']
+ if 'vmId' in d:
+ self.vmId = d['vmId']
+ if 'hostId' in d:
+ self.hostId = d['hostId']
+ if 'decayed' in d:
+ self.decayed = d['decayed']
+ if 'state' in d:
+ self.state = d['state']
+ if 'userId' in d:
+ self.userId = d['userId']
+ if 'name' in d:
+ self.name = d['name']
+ if 'cores' in d:
+ self.cores = d['cores']
+ if 'memory' in d:
+ self.memory = d['memory']
+ if 'disks' in d:
+ self.disks = d['disks']
+ if 'nics' in d:
+ self.nics = d['nics']
+ if 'hints' in d:
+ self.hints = d['hints']
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return repr(self.__dict__)
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
Modified: incubator/tashi/trunk/src/tashi/util.py
URL: http://svn.apache.org/viewvc/incubator/tashi/trunk/src/tashi/util.py?rev=795906&r1=795905&r2=795906&view=diff
==============================================================================
--- incubator/tashi/trunk/src/tashi/util.py (original)
+++ incubator/tashi/trunk/src/tashi/util.py Mon Jul 20 16:41:15 2009
@@ -26,13 +26,11 @@
import time
import traceback
import types
+import getpass
-from thrift.transport.TSocket import TServerSocket, TSocket
-from thrift.server.TServer import TThreadedServer
-from thrift.protocol.TBinaryProtocol import TBinaryProtocol
-from thrift.transport.TTransport import TBufferedTransport
-from tashi.services import clustermanagerservice
-from tashi.services.ttypes import TashiException, Errors, InstanceState, HostState
+import rpyc
+from tashi.rpycservices import rpycservices
+from tashi.rpycservices.rpyctypes import TashiException, Errors, InstanceState, HostState
def broken(oldFunc):
"""Decorator that is used to mark a function as temporarily broken"""
@@ -269,14 +267,19 @@
host = os.getenv('TASHI_CM_HOST', cfgHost)
port = os.getenv('TASHI_CM_PORT', cfgPort)
timeout = float(os.getenv('TASHI_CM_TIMEOUT', cfgTimeout)) * 1000.0
- socket = TSocket(host, int(port))
- socket.setTimeout(timeout)
- transport = TBufferedTransport(socket)
- protocol = TBinaryProtocol(transport)
- client = clustermanagerservice.Client(protocol)
- transport.open()
- client._transport = transport
- return (client, transport)
+
+ authAndEncrypt = boolean(config.get('Security', 'authAndEncrypt'))
+ if authAndEncrypt:
+ username = config.get('AccessClusterManager', 'username')
+ if username == '':
+ username = raw_input('Enter Username:')
+ password = config.get('AccessClusterManager', 'password')
+ if password == '':
+ password = getpass.getpass('Enter Password:')
+ client = rpycservices.client(host, port, username=username, password=password)
+ else:
+ client = rpycservices.client(host, port)
+ return client
def enumToStringDict(cls):
d = {}