You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/10/20 17:22:19 UTC
svn commit: r827686 - in /qpid/trunk/qpid/cpp: bindings/qmf/python/qmf.py
bindings/qmf/tests/python_agent.py src/qmf/engine/ConnectionSettingsImpl.cpp
Author: tross
Date: Tue Oct 20 15:22:19 2009
New Revision: 827686
URL: http://svn.apache.org/viewvc?rev=827686&view=rev
Log:
QPID-2126 - Sync the python QMF bindings to the current Ruby QMF bindings implementation
Applied patch from Ken Giusti
Modified:
qpid/trunk/qpid/cpp/bindings/qmf/python/qmf.py
qpid/trunk/qpid/cpp/bindings/qmf/tests/python_agent.py
qpid/trunk/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
Modified: qpid/trunk/qpid/cpp/bindings/qmf/python/qmf.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/python/qmf.py?rev=827686&r1=827685&r2=827686&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/python/qmf.py (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/python/qmf.py Tue Oct 20 15:22:19 2009
@@ -21,6 +21,7 @@
import os
from threading import Thread
from threading import RLock
+from threading import Condition
import qmfengine
from qmfengine import (ACCESS_READ_CREATE, ACCESS_READ_ONLY, ACCESS_READ_WRITE)
from qmfengine import (CLASS_EVENT, CLASS_OBJECT)
@@ -38,35 +39,60 @@
## CONNECTION
##==============================================================================
-class ConnectionSettings:
+class ConnectionSettings(object):
#attr_reader :impl
def __init__(self, url=None):
if url:
self.impl = qmfengine.ConnectionSettings(url)
else:
self.impl = qmfengine.ConnectionSettings()
-
-
+
+
def set_attr(self, key, val):
if type(val) == str:
_v = qmfengine.Value(TYPE_LSTR)
_v.setString(val)
- elif type(val) == bool:
- _v = qmfengine.Value(TYPE_BOOL)
- _v.setBool(val)
elif type(val) == int:
_v = qmfengine.Value(TYPE_UINT32)
_v.setUint(val)
+ elif type(val) == bool:
+ _v = qmfengine.Value(TYPE_BOOL)
+ _v.setBool(val)
else:
- raise ArgumentError("Value for attribute '%s' has unsupported type: %s" % ( key, type(val)))
-
- self.impl.setAttr(key, _v)
+ raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, type(val)))
+
+ good = self.impl.setAttr(key, _v)
+ if not good:
+ raise Exception("Argument error: unsupported attribute '%s'" % key )
+
+
+ def get_attr(self, key):
+ _v = self.impl.getAttr(key)
+ if _v.isString():
+ return _v.asString()
+ elif _v.isUint():
+ return _v.asUint()
+ elif _v.isBool():
+ return _v.asBool()
+ else:
+ raise Exception("Argument error: value for attribute '%s' has unsupported type: %s" % ( key, str(_v.getType())))
+
+
+ def __getattr__(self, name):
+ return self.get_attr(name)
+
+
+ def __setattr__(self, name, value):
+ if name == "impl":
+ return super.__setattr__(self, name, value)
+ return self.set_attr(name, value)
class ConnectionHandler:
def conn_event_connected(self): None
def conn_event_disconnected(self, error): None
+ def conn_event_visit(self): None
def sess_event_session_closed(self, context, error): None
def sess_event_recv(self, context, message): None
@@ -80,23 +106,43 @@
self._sockEngine, self._sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
self.impl.setNotifyFd(self._sockEngine.fileno())
self._new_conn_handlers = []
+ self._conn_handlers_to_delete = []
self._conn_handlers = []
+ self._connected = False
self.start()
-
+
+ def connected(self):
+ return self._connected
+
+
+ def kick(self):
+ self._sockEngine.send(".")
+ # self._sockEngine.flush() Not available with python?
+
+
def add_conn_handler(self, handler):
self._lock.acquire()
try:
self._new_conn_handlers.append(handler)
finally:
self._lock.release()
- self._sockEngine.send("x")
+ self.kick()
+ def del_conn_handler(self, handler):
+ self._lock.acquire()
+ try:
+ self._conn_handlers_to_delete.append(handler)
+ finally:
+ self._lock.release()
+ self.kick()
+
+
def run(self):
eventImpl = qmfengine.ResilientConnectionEvent()
- connected = False
new_handlers = []
+ del_handlers = []
bt_count = 0
while True:
@@ -106,27 +152,33 @@
self._lock.acquire()
try:
new_handlers = self._new_conn_handlers
+ del_handlers = self._conn_handlers_to_delete
self._new_conn_handlers = []
+ self._conn_handlers_to_delete = []
finally:
self._lock.release()
for nh in new_handlers:
self._conn_handlers.append(nh)
- if connected:
+ if self._connected:
nh.conn_event_connected()
-
new_handlers = []
+
+ for dh in del_handlers:
+ if dh in self._conn_handlers:
+ self._conn_handlers.remove(dh)
+ del_handlers = []
valid = self.impl.getEvent(eventImpl)
while valid:
try:
if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED:
- connected = True
+ self._connected = True
for h in self._conn_handlers:
h.conn_event_connected()
elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED:
- connected = False
+ self._connected = False
for h in self._conn_handlers:
h.conn_event_disconnected(eventImpl.errorText)
@@ -146,6 +198,9 @@
self.impl.popEvent()
valid = self.impl.getEvent(eventImpl)
+
+ for h in self._conn_handlers:
+ h.conn_event_visit()
@@ -158,7 +213,7 @@
result = self._conn.impl.createSession(label, self, self.handle)
- def __del__(self):
+ def destroy(self):
self._conn.impl.destroySession(self.handle)
@@ -167,14 +222,30 @@
## OBJECTS
##==============================================================================
-class QmfObject:
+class QmfObject(object):
# attr_reader :impl, :object_class
- def __init__(self, cls):
- self.object_class = cls
- self.impl = qmfengine.Object(self.object_class.impl)
+ def __init__(self, cls, kwargs={}):
+ self._cv = Condition()
+ self._sync_count = 0
+ self._sync_result = None
+ self._allow_sets = False
+ if kwargs.has_key("broker"):
+ self._broker = kwargs["broker"]
+ else:
+ self._broker = None
+ if cls:
+ self.object_class = cls
+ self.impl = qmfengine.Object(self.object_class.impl)
+ elif kwargs.has_key("impl"):
+ self.impl = qmfengine.Object(kwargs["impl"])
+ self.object_class = SchemaObjectClass(None,
+ None,
+ {"impl":self.impl.getClass()})
+ else:
+ raise Exception("Argument error: required parameter ('impl') not supplied")
- def __del__(self):
+ def destroy(self):
self.impl.destroy()
@@ -184,6 +255,20 @@
def set_object_id(self, oid):
self.impl.setObjectId(oid.impl)
+
+
+ def properties(self):
+ list = []
+ for prop in self.object_class.properties:
+ list.append([prop, self.get_attr(prop.name())])
+ return list
+
+
+ def statistics(self):
+ list = []
+ for stat in self.object_class.statistics:
+ list.append([stat, self.get_attr(stat.name())])
+ return list
def get_attr(self, name):
@@ -197,7 +282,7 @@
elif vType == TYPE_LSTR: return val.asString()
elif vType == TYPE_ABSTIME: return val.asInt64()
elif vType == TYPE_DELTATIME: return val.asUint64()
- elif vType == TYPE_REF: return val.asObjectId()
+ elif vType == TYPE_REF: return ObjectId(val.asObjectId())
elif vType == TYPE_BOOL: return val.asBool()
elif vType == TYPE_FLOAT: return val.asFloat()
elif vType == TYPE_DOUBLE: return val.asDouble()
@@ -264,26 +349,172 @@
self.set_attr(name, self.get_attr(name) - by)
+ def __setattr__(self, name, value):
+ #
+ # Ignore the internal attributes, set them normally...
+ #
+ if (name[0] == '_' or
+ name == 'impl' or
+ name == 'object_class'):
+ return super.__setattr__(self, name, value)
+
+ if not self._allow_sets:
+ raise Exception("'Set' operations not permitted on this object")
+ #
+ # If the name matches a property name, set the value of the property.
+ #
+ # print "set name=%s" % str(name)
+ for prop in self.object_class.properties:
+ if prop.name() == name:
+ return self.set_attr(name, value)
+ #
+ # otherwise, check for a statistic set...
+ #
+ for stat in self.object_class.statistics:
+ if stat.name() == name:
+ return self.set_attr(name, value)
+
+ # unrecognized name? should I raise an exception?
+ super.__setattr__(self, name, value)
+
+
+ def __getattr__(self, name, *args):
+ #
+ # If the name matches a property name, return the value of the property.
+ #
+ for prop in self.object_class.properties:
+ if prop.name() == name:
+ return self.get_attr(name)
+ #
+ # Do the same for statistics
+ #
+ for stat in self.object_class.statistics:
+ if stat.name() == name:
+ return self.get_attr(name)
+ #
+ # If we still haven't found a match for the name, check to see if
+ # it matches a method name. If so, marshall up the arguments into
+ # a map, and invoke the method.
+ #
+ for method in self.object_class.methods:
+ if method.name() == name:
+ argMap = self._marshall(method, args)
+ return lambda name, argMap : self._invokeMethod(name, argMap)
+
+ #
+ # This name means nothing to us, pass it up the line to the parent
+ # class's handler.
+ #
+ # print "__getattr__=%s" % str(name)
+ super.__getattr__(self, name)
+
+
+ def _invokeMethod(self, name, argMap):
+ """
+ Private: Helper function that invokes an object's method, and waits for the result.
+ """
+ self._cv.acquire()
+ try:
+ timeout = 30
+ self._sync_count = 1
+ self.impl.invokeMethod(name, argMap, self)
+ if self._broker:
+ self._broker.conn.kick()
+ self._cv.wait(timeout)
+ if self._sync_count == 1:
+ raise Exception("Timed out: waiting for response to method call.")
+ finally:
+ self._cv.release()
+
+ return self._sync_result
+
+
+ def _method_result(self, result):
+ """
+ Called to return the result of a method call on an object
+ """
+ self._cv.acquire();
+ try:
+ self._sync_result = result
+ self._sync_count -= 1
+ self._cv.notify()
+ finally:
+ self._cv.release()
+
+
+ def _marshall(schema, args):
+ '''
+ Private: Convert a list of arguments (positional) into a Value object of type "map".
+ Used to create the argument parameter for an object's method invokation.
+ '''
+ # Build a map of the method's arguments
+ map = qmfengine.Value(TYPE_MAP)
+ for arg in schema.arguments:
+ if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT:
+ map.insert(arg.name, qmfengine.Value(arg.typecode))
+
+ # install each argument's value into the map
+ marshalled = Arguments(map)
+ idx = 0
+ for arg in schema.arguments:
+ if arg.direction == DIR_IN or arg.direction == DIR_IN_OUT:
+ if args[idx]:
+ marshalled[arg.name] = args[idx]
+ idx += 1
+
+ return marshalled.map
+
+
def _value(self, name):
val = self.impl.getValue(name)
if not val:
- raise ArgumentError("Attribute '%s' not defined for class %s" % (name, self.object_class.impl.getName()))
+ raise Exception("Argument error: attribute named '%s' not defined for package %s, class %s" %
+ (name,
+ self.object_class.impl.getClassKey().getPackageName(),
+ self.object_class.impl.getClassKey().getClassName()))
return val
+class AgentObject(QmfObject):
+ def __init__(self, cls, kwargs={}):
+ QmfObject.__init__(self, cls, kwargs)
+ self._allow_sets = True
+
+
+ def destroy(self):
+ self.impl.destroy()
+
+
+ def set_object_id(self, oid):
+ self.impl.setObjectId(oid.impl)
+
+
+
class ConsoleObject(QmfObject):
# attr_reader :current_time, :create_time, :delete_time
- def __init__(self, cls):
- QmfObject.__init__(self, cls)
+ def __init__(self, cls, kwargs={}):
+ QmfObject.__init__(self, cls, kwargs)
- def update(self): pass
- def mergeUpdate(self, newObject): pass
+ def update(self):
+ if not self._broker:
+ raise Exception("No linkage to broker")
+ newer = self._broker.console.objects(Query({"object_id":object_id}))
+ if newer.size != 1:
+ raise Exception("Expected exactly one update for this object, %d present" % int(newer.size))
+ self.merge_update(newer[0])
+
+
+ def merge_update(self, newObject):
+ self.impl.merge(new_object.impl)
+
+
def is_deleted(self):
- return self.delete_time > 0
+ return self.impl.isDeleted()
+
+
def index(self): pass
- def method_missing(self, name, *args): pass
@@ -303,8 +534,16 @@
return self.impl.getObjectNumLo()
+ def broker_bank(self):
+ return self.impl.getBrokerBank()
+
+
+ def agent_bank(self):
+ return self.impl.getAgentBank()
+
+
def __eq__(self, other):
- if self.__class__ != other.__class__: return False
+ if not isinstance(other, self.__class__): return False
return (self.impl.getObjectNumHi() == other.impl.getObjectNumHi() and
self.impl.getObjectNumLo() == other.impl.getObjectNumLo())
@@ -312,9 +551,12 @@
def __ne__(self, other):
return not self.__eq__(other)
+ def __repr__(self):
+ return self.impl.str()
-class Arguments:
+
+class Arguments(object):
def __init__(self, map):
self.map = map
self._by_hash = {}
@@ -335,9 +577,30 @@
def __iter__(self):
- return _by_hash.__iter__
-
-
+ return self._by_hash.__iter__
+
+
+ def __getattr__(self, name):
+ if name in self._by_hash:
+ return self._by_hash[name]
+ return super.__getattr__(self, name)
+
+
+ def __setattr__(self, name, value):
+ #
+ # ignore local data members
+ #
+ if (name[0] == '_' or
+ name == 'map'):
+ return super.__setattr__(self, name, value)
+
+ if name in self._by_hash:
+ self._by_hash[name] = value
+ return self.set(name, value)
+
+ return super.__setattr__(self, name, value)
+
+
def by_key(self, key):
val = self.map.byKey(key)
vType = val.getType()
@@ -349,7 +612,7 @@
elif vType == TYPE_LSTR: return val.asString()
elif vType == TYPE_ABSTIME: return val.asInt64()
elif vType == TYPE_DELTATIME: return val.asUint64()
- elif vType == TYPE_REF: return val.asObjectId()
+ elif vType == TYPE_REF: return ObjectId(val.asObjectId())
elif vType == TYPE_BOOL: return val.asBool()
elif vType == TYPE_FLOAT: return val.asFloat()
elif vType == TYPE_DOUBLE: return val.asDouble()
@@ -405,18 +668,65 @@
+class MethodResponse(object):
+ def __init__(self, impl):
+ self.impl = qmfengine.MethodResponse(impl)
+
+
+ def status(self):
+ return self.impl.getStatus()
+
+
+ def exception(self):
+ return self.impl.getException()
+
+
+ def text(self):
+ return exception().asString()
+
+
+ def args(self):
+ return Arguments(self.impl.getArgs())
+
+
+ def __getattr__(self, name):
+ myArgs = self.args()
+ return myArgs.__getattr__(name)
+
+
+ def __setattr__(self, name, value):
+ if name == 'impl':
+ return super.__setattr__(self, name, value)
+
+ myArgs = self.args()
+ return myArgs.__setattr__(name, value)
+
+
+
+ ##==============================================================================
+ ## QUERY
+ ##==============================================================================
+
+
class Query:
- def __init__(self, i=None, package="", cls=None, oid=None):
- if i:
- self.impl = i
- else:
- if cls:
- self.impl = qmfengine.Query(cls, package)
- elif oid:
- self.impl = qmfengine.Query(oid)
+ def __init__(self, kwargs={}):
+ if "impl" in kwargs:
+ self.impl = kwargs["impl"]
+ else:
+ package = ''
+ if "key" in kwargs:
+ # construct using SchemaClassKey:
+ self.impl = qmfengine.Query(kwargs["key"])
+ elif "object_id" in kwargs:
+ self.impl = qmfengine.Query(kwargs["object_id"].impl)
else:
- raise "Argument error"
-
+ if "package" in kwargs:
+ package = kwargs["package"]
+ if "class" in kwargs:
+ self.impl = qmfengine.Query(kwargs["class"], package)
+ else:
+ raise Exception("Argument error: invalid arguments, use 'key', 'object_id' or 'class'[,'package']")
+
def package_name(self): return self.impl.getPackage()
def class_name(self): return self.impl.getClass()
@@ -437,48 +747,95 @@
class SchemaArgument:
#attr_reader :impl
def __init__(self, name, typecode, kwargs={}):
- self.impl = qmfengine.SchemaArgument(name, typecode)
- if kwargs.has_key("dir"): self.impl.setDirection(kwargs["dir"])
- if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
+ if "impl" in kwargs:
+ self.impl = kwargs["impl"]
+ else:
+ self.impl = qmfengine.SchemaArgument(name, typecode)
+ if kwargs.has_key("dir"): self.impl.setDirection(kwargs["dir"])
+ if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
+ if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
+
+
+ def name(self):
+ return self.impl.getName()
+
+
+ def direction(self):
+ return self.impl.getDirection()
+
+
+ def typecode(self):
+ return self.impl.getType()
+
+
+ def __repr__(self):
+ return self.name()
class SchemaMethod:
- # attr_reader :impl
+ # attr_reader :impl, arguments
def __init__(self, name, kwargs={}):
- self.impl = qmfengine.SchemaMethod(name)
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
- self._arguments = []
+ self.arguments = []
+ if "impl" in kwargs:
+ self.impl = kwargs["impl"]
+ for i in range(self.impl.getArgumentCount()):
+ self.arguments.append(SchemaArgument(None,None,{"impl":self.impl.getArgument(i)}))
+ else:
+ self.impl = qmfengine.SchemaMethod(name)
+ if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
def add_argument(self, arg):
- self._arguments.append(arg)
+ self.arguments.append(arg)
self.impl.addArgument(arg.impl)
+ def name(self):
+ return self.impl.getName()
+
+ def __repr__(self):
+ return self.name()
+
+
class SchemaProperty:
#attr_reader :impl
def __init__(self, name, typecode, kwargs={}):
- self.impl = qmfengine.SchemaProperty(name, typecode)
- if kwargs.has_key("access"): self.impl.setAccess(kwargs["access"])
- if kwargs.has_key("index"): self.impl.setIndex(kwargs["index"])
- if kwargs.has_key("optional"): self.impl.setOptional(kwargs["optional"])
- if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
+ if "impl" in kwargs:
+ self.impl = kwargs["impl"]
+ else:
+ self.impl = qmfengine.SchemaProperty(name, typecode)
+ if kwargs.has_key("access"): self.impl.setAccess(kwargs["access"])
+ if kwargs.has_key("index"): self.impl.setIndex(kwargs["index"])
+ if kwargs.has_key("optional"): self.impl.setOptional(kwargs["optional"])
+ if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
+ if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
def name(self):
return self.impl.getName()
+ def __repr__(self):
+ return self.name()
+
class SchemaStatistic:
# attr_reader :impl
def __init__(self, name, typecode, kwargs={}):
- self.impl = qmfengine.SchemaStatistic(name, typecode)
- if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
+ if "impl" in kwargs:
+ self.impl = kwargs["impl"]
+ else:
+ self.impl = qmfengine.SchemaStatistic(name, typecode)
+ if kwargs.has_key("unit"): self.impl.setUnit(kwargs["unit"])
+ if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
+
+
+ def name(self):
+ return self.impl.getName()
+
+ def __repr__(self):
+ return self.name()
@@ -488,60 +845,88 @@
self.impl = i
- def get_package(self):
- self.impl.getPackageName()
+ def package_name(self):
+ return self.impl.getPackageName()
- def get_class(self):
- self.impl.getClassName()
+ def class_name(self):
+ return self.impl.getClassName()
+
+ def __repr__(self):
+ return self.impl.asString()
class SchemaObjectClass:
- # attr_reader :impl
+ # attr_reader :impl, :properties, :statistics, :methods
def __init__(self, package, name, kwargs={}):
- self.impl = qmfengine.SchemaObjectClass(package, name)
- self._properties = []
- self._statistics = []
- self._methods = []
+ self.properties = []
+ self.statistics = []
+ self.methods = []
+ if "impl" in kwargs:
+ self.impl = kwargs["impl"]
+
+ for i in range(self.impl.getPropertyCount()):
+ self.properties.append(SchemaProperty(None, None, {"impl":self.impl.getProperty(i)}))
+
+ for i in range(self.impl.getStatisticCount()):
+ self.statistics.append(SchemaStatistic(None, None, {"impl":self.impl.getStatistic(i)}))
+
+ for i in range(self.impl.getMethodCount()):
+ self.methods.append(SchemaMethod(None, {"impl":self.impl.getMethod(i)}))
+ else:
+ self.impl = qmfengine.SchemaObjectClass(package, name)
def add_property(self, prop):
- self._properties.append(prop)
+ self.properties.append(prop)
self.impl.addProperty(prop.impl)
def add_statistic(self, stat):
- self._statistics.append(stat)
+ self.statistics.append(stat)
self.impl.addStatistic(stat.impl)
def add_method(self, meth):
- self._methods.append(meth)
+ self.methods.append(meth)
self.impl.addMethod(meth.impl)
- def name(self):
- return self.impl.getName()
-
-
- def properties(self):
- return self._properties
+ def class_key(self):
+ return SchemaClassKey(self.impl.getClassKey())
+
+
+ def package_name(self):
+ return self.impl.getClassKey().getPackageName()
+
+
+ def class_name(self):
+ return self.impl.getClassKey().getClassName()
+
class SchemaEventClass:
- # attr_reader :impl
+ # attr_reader :impl :arguments
def __init__(self, package, name, kwargs={}):
- self.impl = qmfengine.SchemaEventClass(package, name)
- if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
- self._arguments = []
+ self.arguments = []
+ if "impl" in kwargs:
+ self.impl = kwargs["impl"]
+ for i in range(self.impl.getArgumentCount()):
+ self.arguments.append(SchemaArgument(nil, nil, {"impl":self.impl.getArgument(i)}))
+ else:
+ self.impl = qmfengine.SchemaEventClass(package, name)
+ if kwargs.has_key("desc"): self.impl.setDesc(kwargs["desc"])
def add_argument(self, arg):
- self._arguments.append(arg)
+ self.arguments.append(arg)
self.impl.addArgument(arg.impl)
+ def name(self):
+ return self.impl.getClassKey().getClassName()
+
##==============================================================================
## CONSOLE
@@ -562,45 +947,178 @@
-class Console:
+class Console(Thread):
# attr_reader :impl
- def initialize(handler=None, kwargs={}):
- self._handler = handler
- self.impl = qmfengine.Console()
- self._event = qmfengine.ConsoleEvent()
- self._broker_list = []
-
+ def __init__(self, handler=None, kwargs={}):
+ Thread.__init__(self)
+ self._handler = handler
+ self.impl = qmfengine.Console()
+ self._event = qmfengine.ConsoleEvent()
+ self._broker_list = []
+ self._cv = Condition()
+ self._sync_count = 0
+ self._sync_result = None
+ self._select = {}
+ self._cb_cond = Condition()
+ self.start()
+
def add_connection(self, conn):
broker = Broker(self, conn)
- self._broker_list.append(broker)
+ self._cv.acquire()
+ try:
+ self._broker_list.append(broker)
+ finally:
+ self._cv.release()
return broker
- def del_connection(self, broker): pass
-
-
- def get_packages(self): pass
-
-
- def get_classes(self, package): pass
-
-
- def get_schema(self, class_key): pass
-
-
- def bind_package(self, package): pass
+ def del_connection(self, broker):
+ broker.shutdown()
+ self._cv.acquire()
+ try:
+ self._broker_list.remove(broker)
+ finally:
+ self._cv.release()
- def bind_class(self, kwargs = {}): pass
+ def packages(self):
+ plist = []
+ for i in range(self.impl.packageCount()):
+ plist.append(self.impl.getPackageName(i))
+ return plist
+
+
+ def classes(self, package, kind=CLASS_OBJECT):
+ clist = []
+ for i in range(self.impl.classCount(package)):
+ key = self.impl.getClass(package, i)
+ class_kind = self.impl.getClassKind(key)
+ if class_kind == kind:
+ if kind == CLASS_OBJECT:
+ clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)}))
+ elif kind == CLASS_EVENT:
+ clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)}))
+ return clist
+
+
+ def bind_package(self, package):
+ return self.impl.bindPackage(package)
+
+
+ def bind_class(self, kwargs = {}):
+ if "key" in kwargs:
+ self.impl.bindClass(kwargs["key"])
+ elif "package" in kwargs:
+ package = kwargs["package"]
+ if "class" in kwargs:
+ self.impl.bindClass(package, kwargs["class"])
+ else:
+ self.impl.bindClass(package)
+ else:
+ raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']")
- def get_agents(self, broker=None): pass
+ def agents(self, broker=None):
+ blist = []
+ if broker:
+ blist.append(broker)
+ else:
+ self._cv.acquire()
+ try:
+ # copy while holding lock
+ blist = self._broker_list[:]
+ finally:
+ self._cv.release()
+
+ agents = []
+ for b in blist:
+ for idx in range(b.impl.agentCount()):
+ agents.append(AgentProxy(b.impl.getAgent(idx), b))
+
+ return agents
- def get_objects(self, query, kwargs = {}): pass
+ def objects(self, query, kwargs = {}):
+ timeout = 30
+ temp_args = kwargs.copy()
+ if type(query) == type({}):
+ temp_args.update(query)
+
+ if "timeout" in temp_args:
+ timeout = temp_args["timeout"]
+ temp_args.pop("timeout")
+
+ if type(query) == type({}):
+ query = Query(temp_args)
+
+ self._select = {}
+ for k in temp_args.iterkeys():
+ if type(k) == str:
+ self._select[k] = temp_args[k]
+
+ self._cv.acquire()
+ try:
+ self._sync_count = 1
+ self._sync_result = []
+ broker = self._broker_list[0]
+ broker.send_query(query.impl, None)
+ self._cv.wait(timeout)
+ if self._sync_count == 1:
+ raise Exception("Timed out: waiting for query response")
+ finally:
+ self._cv.release()
+
+ return self._sync_result
+ def object(self, query, kwargs = {}):
+ '''
+ Return one and only one object or None.
+ '''
+ objs = objects(query, kwargs)
+ if len(objs) == 1:
+ return objs[0]
+ else:
+ return None
+
+
+ def first_object(self, query, kwargs = {}):
+ '''
+ Return the first of potentially many objects.
+ '''
+ objs = objects(query, kwargs)
+ if objs:
+ return objs[0]
+ else:
+ return None
+
+
+ # Check the object against select to check for a match
+ def _select_match(self, object):
+ schema_props = object.properties()
+ for key in self._select.iterkeys():
+ for prop in schema_props:
+ if key == p[0].name() and self._select[key] != p[1]:
+ return False
+ return True
+
+
+ def _get_result(self, list, context):
+ '''
+ Called by Broker proxy to return the result of a query.
+ '''
+ self._cv.acquire()
+ try:
+ for item in list:
+ if self._select_match(item):
+ self._sync_result.append(item)
+ self._sync_count -= 1
+ self._cv.notify()
+ finally:
+ self._cv.release()
+
+
def start_sync(self, query): pass
@@ -610,26 +1128,56 @@
def end_sync(self, sync): pass
+ def run(self):
+ while True:
+ self._cb_cond.acquire()
+ try:
+ self._cb_cond.wait(1)
+ while self.do_console_events():
+ pass
+ finally:
+ self._cb_cond.release()
+
+
+ def start_console_events(self):
+ self._cb_cond.acquire()
+ try:
+ self._cb_cond.notify()
+ finally:
+ self._cb_cond.release()
+
+
def do_console_events(self):
+ '''
+ Called by Broker proxy to poll for Console events. Passes the events
+ onto the ConsoleHandler associated with this Console.
+ '''
count = 0
valid = self.impl.getEvent(self._event)
while valid:
count += 1
- print "Console Event:", self._event.kind
+ # print "Console Event:", self._event.kind
if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
- pass
+ if self._handler:
+ self._handler.agent_added(AgentProxy(self._event.agent, None))
elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
- pass
+ if self._handler:
+ self._handler.agent_deleted(AgentProxy(self._event.agent, None))
elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
- pass
+ if self._handler:
+ self._handler.new_package(self._event.name)
elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
- pass
+ if self._handler:
+ self._handler.new_class(SchemaClassKey(self._event.classKey))
elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
- pass
+ if self._handler:
+ self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
+ self._event.hasProps, self._event.hasStats)
elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
pass
elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
- pass
+ if self._handler:
+ self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
pass
@@ -639,37 +1187,104 @@
+class AgentProxy:
+ # attr_reader :broker
+ def __init__(self, impl, broker):
+ self.impl = impl
+ self.broker = broker
+
+
+ def label(self):
+ return self.impl.getLabel()
+
+
+ def broker_bank(self):
+ return self.impl.getBrokerBank()
+
+
+ def agent_bank(self):
+ return self.impl.getAgentBank()
+
+
+
class Broker(ConnectionHandler):
- # attr_reader :impl
+ # attr_reader :impl :conn, :console, :broker_bank
def __init__(self, console, conn):
- self._console = console
- self._conn = conn
+ ConnectionHandler.__init__(self)
+ self.broker_bank = 1
+ self.console = console
+ self.conn = conn
self._session = None
+ self._cv = Condition()
+ self._stable = None
self._event = qmfengine.BrokerEvent()
self._xmtMessage = qmfengine.Message()
- self.impl = qmfengine.BrokerProxy(self._console.impl)
- self._console.impl.addConnection(self.impl, self)
- self._conn.add_conn_handler(self)
+ self.impl = qmfengine.BrokerProxy(self.console.impl)
+ self.console.impl.addConnection(self.impl, self)
+ self.conn.add_conn_handler(self)
+ self._operational = True
+ def shutdown(self):
+ self.console.impl.delConnection(self.impl)
+ self.conn.del_conn_handler(self)
+ self._operational = False
+
+
+ def wait_for_stable(self, timeout = None):
+ self._cv.acquire()
+ try:
+ if self._stable:
+ return
+ if timeout:
+ self._cv.wait(timeout)
+ if not self._stable:
+ raise Exception("Timed out: waiting for broker connection to become stable")
+ else:
+ while not self._stable:
+ self._cv.wait()
+ finally:
+ self._cv.release()
+
+
+ def send_query(self, query, ctx):
+ self.impl.sendQuery(query, ctx)
+ self.conn.kick()
+
+
def do_broker_events(self):
count = 0
valid = self.impl.getEvent(self._event)
while valid:
count += 1
- print "Broker Event: ", self._event.kind
+ # print "Broker Event: ", self._event.kind
if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
pass
elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
- self._conn.impl.declareQueue(self._session.handle, self._event.name)
+ self.conn.impl.declareQueue(self._session.handle, self._event.name)
elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
- self._conn.impl.deleteQueue(self._session.handle, self._event.name)
+ self.conn.impl.deleteQueue(self._session.handle, self._event.name)
elif self._event.kind == qmfengine.BrokerEvent.BIND:
- self._conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
+ self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
- self._conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
+ self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
self.impl.startProtocol()
+ elif self._event.kind == qmfengine.BrokerEvent.STABLE:
+ self_.cv.acquire()
+ try:
+ self._stable = True
+ self._cv.notify()
+ finally:
+ self._cv.release()
+ elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE:
+ result = []
+ for idx in range(self._event.queryResponse.getObjectCount()):
+ result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self}))
+ self.console._get_result(result, self._event.context)
+ elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE:
+ obj = self._event.context
+ obj._method_result(MethodResponse(self._event.methodResponse()))
self.impl.popEvent()
valid = self.impl.getEvent(self._event)
@@ -682,7 +1297,7 @@
valid = self.impl.getXmtMessage(self._xmtMessage)
while valid:
count += 1
- self._conn.impl.sendMessage(self._session.handle, self._xmtMessage)
+ self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
self.impl.popXmt()
valid = self.impl.getXmtMessage(self._xmtMessage)
@@ -691,16 +1306,16 @@
def do_events(self):
while True:
- ccnt = self._console.do_console_events()
+ self.console.start_console_events()
bcnt = do_broker_events()
mcnt = do_broker_messages()
- if ccnt == 0 and bcnt == 0 and mcnt == 0:
+ if bcnt == 0 and mcnt == 0:
break;
def conn_event_connected(self):
print "Console Connection Established..."
- self._session = Session(self._conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
+ self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
self.impl.sessionOpened(self._session.handle)
self.do_events()
@@ -710,12 +1325,18 @@
pass
+ def conn_event_visit(self):
+ self.do_events()
+
+
def sess_event_session_closed(self, context, error):
print "Console Session Lost"
self.impl.sessionClosed()
def sess_event_recv(self, context, message):
+ if not self._operational:
+ print "Unexpected RECV Event"
self.impl.handleRcvMessage(message)
self.do_events()
@@ -778,7 +1399,7 @@
count += 1
if self._event.kind == qmfengine.AgentEvent.GET_QUERY:
self._handler.get_query(self._event.sequence,
- Query(self._event.query),
+ Query({"impl":self._event.query}),
self._event.authUserId)
elif self._event.kind == qmfengine.AgentEvent.START_SYNC:
@@ -846,6 +1467,10 @@
pass
+ def conn_event_visit(self):
+ self.do_events()
+
+
def sess_event_session_closed(self, context, error):
print "Agent Session Lost"
pass
Modified: qpid/trunk/qpid/cpp/bindings/qmf/tests/python_agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qmf/tests/python_agent.py?rev=827686&r1=827685&r2=827686&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qmf/tests/python_agent.py (original)
+++ qpid/trunk/qpid/cpp/bindings/qmf/tests/python_agent.py Tue Oct 20 15:22:19 2009
@@ -30,40 +30,40 @@
self.parent_class = qmf.SchemaObjectClass("org.apache.qpid.qmf", "parent")
self.parent_class.add_property(qmf.SchemaProperty("name", qmf.TYPE_SSTR, {"index":True}))
self.parent_class.add_property(qmf.SchemaProperty("state", qmf.TYPE_SSTR))
-
+
self.parent_class.add_property(qmf.SchemaProperty("uint64val", qmf.TYPE_UINT64))
self.parent_class.add_property(qmf.SchemaProperty("uint32val", qmf.TYPE_UINT32))
self.parent_class.add_property(qmf.SchemaProperty("uint16val", qmf.TYPE_UINT16))
self.parent_class.add_property(qmf.SchemaProperty("uint8val", qmf.TYPE_UINT8))
-
+
self.parent_class.add_property(qmf.SchemaProperty("int64val", qmf.TYPE_INT64))
self.parent_class.add_property(qmf.SchemaProperty("int32val", qmf.TYPE_INT32))
self.parent_class.add_property(qmf.SchemaProperty("int16val", qmf.TYPE_INT16))
self.parent_class.add_property(qmf.SchemaProperty("int8val", qmf.TYPE_INT8))
-
+
self.parent_class.add_statistic(qmf.SchemaStatistic("queryCount", qmf.TYPE_UINT32, {"unit":"query", "desc":"Query count"}))
-
+
_method = qmf.SchemaMethod("echo", {"desc":"Check responsiveness of the agent object"})
_method.add_argument(qmf.SchemaArgument("sequence", qmf.TYPE_UINT32, {"dir":qmf.DIR_IN_OUT}))
self.parent_class.add_method(_method)
-
+
_method = qmf.SchemaMethod("set_numerics", {"desc":"Set the numeric values in the object"})
_method.add_argument(qmf.SchemaArgument("test", qmf.TYPE_SSTR, {"dir":qmf.DIR_IN}))
self.parent_class.add_method(_method)
-
+
_method = qmf.SchemaMethod("create_child", {"desc":"Create a new child object"})
_method.add_argument(qmf.SchemaArgument("child_name", qmf.TYPE_LSTR, {"dir":qmf.DIR_IN}))
_method.add_argument(qmf.SchemaArgument("child_ref", qmf.TYPE_REF, {"dir":qmf.DIR_OUT}))
self.parent_class.add_method(_method)
-
+
_method = qmf.SchemaMethod("probe_userid", {"desc":"Return the user-id for this method call"})
_method.add_argument(qmf.SchemaArgument("userid", qmf.TYPE_SSTR, {"dir":qmf.DIR_OUT}))
self.parent_class.add_method(_method)
self.child_class = qmf.SchemaObjectClass("org.apache.qpid.qmf", "child")
self.child_class.add_property(qmf.SchemaProperty("name", qmf.TYPE_SSTR, {"index":True}))
-
-
+
+
def register(self, agent):
agent.register_class(self.parent_class)
agent.register_class(self.child_class)
@@ -71,7 +71,13 @@
class App(qmf.AgentHandler):
+ '''
+ Object that handles events received by the Agent.
+ '''
def get_query(self, context, query, userId):
+ '''
+ Respond to a Query request from a console.
+ '''
#print "Query: user=%s context=%d class=%s" % (userId, context, query.class_name())
#if query.object_id():
# print query.object_id().object_num_low()
@@ -84,108 +90,139 @@
def method_call(self, context, name, object_id, args, userId):
- # puts "Method: user=#{userId} context=#{context} method=#{name} object_num=#{object_id.object_num_low if object_id} args=#{args}"
- # oid = self._agent.alloc_object_id(2)
- # args['child_ref'] = oid
- # self._child = qmf.QmfObject(self._model.child_class)
- # self._child.set_attr("name", args.by_key("child_name"))
- # self._child.set_object_id(oid)
- # self._agent.method_response(context, 0, "OK", args)
+ '''
+ Invoke a method call requested by the console.
+ '''
+ #print "Method: name=%s user=%s context=%d object_id=%s args=%s" % (name, userId, context, object_id, args)
if name == "echo":
self._agent.method_response(context, 0, "OK", args)
-
+
elif name == "set_numerics":
_retCode = 0
_retText = "OK"
-
+
if args['test'] == "big":
+ #
+ # note the alternate forms for setting object attributes:
+ #
self._parent.set_attr("uint64val", 0x9494949449494949)
- self._parent.set_attr("uint32val", 0xa5a55a5a)
+ self._parent.uint32val = 0xa5a55a5a
self._parent.set_attr("uint16val", 0xb66b)
- self._parent.set_attr("uint8val", 0xc7)
-
- self._parent.set_attr("int64val", 1000000000000000000)
+ self._parent["uint8val"] = 0xc7
+
+ self._parent.int64val = 1000000000000000000
self._parent.set_attr("int32val", 1000000000)
- self._parent.set_attr("int16val", 10000)
+ self._parent["int16val"] = 10000
self._parent.set_attr("int8val", 100)
-
- elif args['test'] == "small":
+
+ ## Test the __getattr__ implementation:
+ ## @todo: remove once python_client implements this
+ ## form of property access
+ assert self._parent["uint8val"] == 0xc7
+ assert self._parent.uint64val == 0x9494949449494949
+ assert self._parent.queryCount >= 0
+
+ # note the alternative argument access syntax:
+ elif args.test == "small":
self._parent.set_attr("uint64val", 4)
self._parent.set_attr("uint32val", 5)
self._parent.set_attr("uint16val", 6)
self._parent.set_attr("uint8val", 7)
-
+
self._parent.set_attr("int64val", 8)
self._parent.set_attr("int32val", 9)
self._parent.set_attr("int16val", 10)
self._parent.set_attr("int8val", 11)
-
+
elif args['test'] == "negative":
self._parent.set_attr("uint64val", 0)
self._parent.set_attr("uint32val", 0)
self._parent.set_attr("uint16val", 0)
self._parent.set_attr("uint8val", 0)
-
+
self._parent.set_attr("int64val", -10000000000)
self._parent.set_attr("int32val", -100000)
self._parent.set_attr("int16val", -1000)
self._parent.set_attr("int8val", -100)
-
+
else:
_retCode = 1
_retText = "Invalid argument value for test"
-
+
self._agent.method_response(context, _retCode, _retText, args)
-
+
elif name == "create_child":
+ #
+ # Instantiate an object based on the Child Schema Class
+ #
_oid = self._agent.alloc_object_id(2)
args['child_ref'] = _oid
- self._child = qmf.QmfObject(self._model.child_class)
+ self._child = qmf.AgentObject(self._model.child_class)
self._child.set_attr("name", args["child_name"])
self._child.set_object_id(_oid)
self._agent.method_response(context, 0, "OK", args)
-
+
elif name == "probe_userid":
args['userid'] = userId
self._agent.method_response(context, 0, "OK", args)
-
+
else:
self._agent.method_response(context, 1, "Unimplemented Method: %s" % name, args)
-
-
+
+
def main(self):
+ '''
+ Agent application's main processing loop.
+ '''
+ # Connect to the broker
self._settings = qmf.ConnectionSettings()
+ self._settings.sendUserId = True
if len(sys.argv) > 1:
- self._settings.set_attr("host", sys.argv[1])
+ self._settings.host = str(sys.argv[1])
if len(sys.argv) > 2:
- self._settings.set_attr("port", int(sys.argv[2]))
+ self._settings.port = int(sys.argv[2])
self._connection = qmf.Connection(self._settings)
+
+ # Instantiate an Agent to serve me queries and method calls
self._agent = qmf.Agent(self)
-
+
+ # Dynamically define the parent and child schemas, then
+ # register them with the agent
self._model = Model()
self._model.register(self._agent)
-
+
+ # Tell the agent about our connection to the broker
self._agent.set_connection(self._connection)
-
- self._parent = qmf.QmfObject(self._model.parent_class)
+
+ # Instantiate and populate an instance of the Parent
+ # Schema Object
+ self._parent = qmf.AgentObject(self._model.parent_class)
+
+ ## @todo how do we force a test failure?
+ # verify the properties() and statistics() object methods:
+ assert len(self._parent.properties()) == 10
+ assert len(self._parent.statistics()) == 1
+
self._parent.set_attr("name", "Parent One")
self._parent.set_attr("state", "OPERATIONAL")
-
+
self._parent.set_attr("uint64val", 0)
self._parent.set_attr("uint32val", 0)
self._parent.set_attr("uint16val", 0)
self._parent.set_attr("uint8val", 0)
-
+
self._parent.set_attr("int64val", 0)
self._parent.set_attr("int32val", 0)
self._parent.set_attr("int16val", 0)
self._parent.set_attr("int8val", 0)
-
+
self._parent_oid = self._agent.alloc_object_id(1)
self._parent.set_object_id(self._parent_oid)
-
- while True: # there may be a better way, but
- time.sleep(1000) # I'm a python noob...
+
+ # Now wait for events arriving on the connection
+ # to the broker...
+ while True:
+ time.sleep(1000)
Modified: qpid/trunk/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp?rev=827686&r1=827685&r2=827686&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp Tue Oct 20 15:22:19 2009
@@ -185,6 +185,11 @@
return intval;
}
+ if (key == attrSendUserId) {
+ boolval.setBool(sendUserId);
+ return boolval;
+ }
+
return strval;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org