You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/03 16:44:26 UTC
svn commit: r906093 - in /qpid/trunk/qpid/python/qmf2: agent.py common.py
console.py tests/basic_method.py tests/basic_query.py tests/events.py
tests/obj_gets.py
Author: kgiusti
Date: Wed Feb 3 15:44:26 2010
New Revision: 906093
URL: http://svn.apache.org/viewvc?rev=906093&view=rev
Log:
QPID-2261: 1) remove direct reference to schema in QmfData (use schema id instead). 2) schema_id wildcarding query. 3) Prevent set connection calls from returning until after the management threads start.
Modified:
qpid/trunk/qpid/python/qmf2/agent.py
qpid/trunk/qpid/python/qmf2/common.py
qpid/trunk/qpid/python/qmf2/console.py
qpid/trunk/qpid/python/qmf2/tests/basic_method.py
qpid/trunk/qpid/python/qmf2/tests/basic_query.py
qpid/trunk/qpid/python/qmf2/tests/events.py
qpid/trunk/qpid/python/qmf2/tests/obj_gets.py
Modified: qpid/trunk/qpid/python/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/agent.py?rev=906093&r1=906092&r2=906093&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/agent.py (original)
+++ qpid/trunk/qpid/python/qmf2/agent.py Wed Feb 3 15:44:26 2010
@@ -21,7 +21,7 @@
import datetime
import time
import Queue
-from threading import Thread, Lock, currentThread
+from threading import Thread, Lock, currentThread, Event
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
from common import (make_subject, parse_subject, OpCode, QmfQuery,
@@ -88,6 +88,7 @@
_max_msg_size=0, _capacity=10):
Thread.__init__(self)
self._running = False
+ self._ready = Event()
self.name = str(name)
self._domain = _domain
@@ -179,6 +180,9 @@
self._running = True
self.start()
+ self._ready.wait(10)
+ if not self._ready.isSet():
+ raise Exception("Agent managment thread failed to start.")
def remove_connection(self, timeout=None):
# tell connection thread to shutdown
@@ -222,11 +226,15 @@
if not isinstance(schema, SchemaClass):
raise TypeError("SchemaClass instance expected")
+ classId = schema.get_class_id()
+ pname = classId.get_package_name()
+ cname = classId.get_class_name()
+ hstr = classId.get_hash_string()
+ if not hstr:
+ raise Exception("Schema hash is not set.")
+
self._lock.acquire()
try:
- classId = schema.get_class_id()
- pname = classId.get_package_name()
- cname = classId.get_class_name()
if pname not in self._packages:
self._packages[pname] = [cname]
else:
@@ -355,6 +363,9 @@
global _callback_thread
next_heartbeat = datetime.datetime.utcnow()
batch_limit = 10 # a guess
+
+ self._ready.set()
+
while self._running:
now = datetime.datetime.utcnow()
@@ -496,7 +507,9 @@
query = cmap.get(MsgKey.query)
if query is not None:
# fake a QmfData containing my identifier for the query compare
- tmpData = QmfData(_values={QmfQuery.KEY_AGENT_NAME: self.get_name()})
+ tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
+ self.get_name()},
+ _object_id="my-name")
reply = QmfQuery.from_map(query).evaluate(tmpData)
if reply:
@@ -555,7 +568,35 @@
msg.reply_to,
mname,
oid, schema_id)
- param = MethodCallParams( mname, oid, schema_id, in_args, msg.user_id)
+ param = MethodCallParams( mname, oid, schema_id, in_args,
+ msg.user_id)
+
+ # @todo: validate the method against the schema:
+ # if self._schema:
+ # # validate
+ # _in_args = _in_args.copy()
+ # ms = self._schema.get_method(name)
+ # if ms is None:
+ # raise ValueError("Method '%s' is undefined." % name)
+
+ # for aname,prop in ms.get_arguments().iteritems():
+ # if aname not in _in_args:
+ # if prop.get_default():
+ # _in_args[aname] = prop.get_default()
+ # elif not prop.is_optional():
+ # raise ValueError("Method '%s' requires argument '%s'"
+ # % (name, aname))
+ # for aname in _in_args.iterkeys():
+ # prop = ms.get_argument(aname)
+ # if prop is None:
+ # raise ValueError("Method '%s' does not define argument"
+ # " '%s'" % (name, aname))
+ # if "I" not in prop.get_direction():
+ # raise ValueError("Method '%s' argument '%s' is not an"
+ # " input." % (name, aname))
+
+ # # @todo check if value is correct (type, range, etc)
+
self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
self._work_q_put = True
@@ -567,7 +608,9 @@
self._lock.acquire()
try:
for name in self._packages.iterkeys():
- if query.evaluate(QmfData.create({SchemaClassId.KEY_PACKAGE:name})):
+ qmfData = QmfData.create({SchemaClassId.KEY_PACKAGE:name},
+ _object_id="_package")
+ if query.evaluate(qmfData):
pnames.append(name)
finally:
self._lock.release()
@@ -631,41 +674,64 @@
t_params = query.get_target_param()
if t_params:
sid = t_params.get(QmfData.KEY_SCHEMA_ID)
-
# if querying for a specific object, do a direct lookup
if query.get_selector() == QmfQuery.ID:
oid = query.get_id()
found = None
self._lock.acquire()
try:
- if sid:
- found = self._described_data.get(sid)
- if found:
- found = found.get(oid)
+ if sid and not sid.get_hash_string():
+ # wildcard schema_id match, check each schema
+ for name,db in self._described_data.iteritems():
+ if (name.get_class_name() == sid.get_class_name()
+ and name.get_package_name() == sid.get_package_name()):
+ found = db.get(oid)
+ if found:
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(found.map_encode())
else:
- found = self._undescribed_data.get(oid)
+ if sid:
+ db = self._described_data.get(sid)
+ if db:
+ found = db.get(oid)
+ else:
+ found = self._undescribed_data.get(oid)
+ if found:
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(found.map_encode())
finally:
self._lock.release()
- if found:
- if _idOnly:
- data_objs.append(query.get_id())
- else:
- data_objs.append(found.map_encode())
else: # otherwise, evaluate all data
self._lock.acquire()
try:
- if sid:
- db = self._described_data.get(sid)
+ if sid and not sid.get_hash_string():
+ # wildcard schema_id match, check each schema
+ for name,db in self._described_data.iteritems():
+ if (name.get_class_name() == sid.get_class_name()
+ and name.get_package_name() == sid.get_package_name()):
+ for oid,data in db.iteritems():
+ if query.evaluate(data):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(data.map_encode())
else:
- db = self._undescribed_data
-
- if db:
- for oid,val in db.iteritems():
- if query.evaluate(val):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(val.map_encode())
+ if sid:
+ db = self._described_data.get(sid)
+ else:
+ db = self._undescribed_data
+
+ if db:
+ for oid,data in db.iteritems():
+ if query.evaluate(data):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(data.map_encode())
finally:
self._lock.release()
@@ -693,15 +759,40 @@
A managed data object that is owned by an agent.
"""
- def __init__(self, agent, _values={}, _subtypes={}, _tag=None, _object_id=None,
- _schema=None):
+ def __init__(self, agent, _values={}, _subtypes={}, _tag=None,
+ _object_id=None, _schema=None):
+ schema_id = None
+ if _schema:
+ schema_id = _schema.get_class_id()
+
+ if _object_id is None:
+ if not isinstance(_schema, SchemaObjectClass):
+ raise Exception("An object_id must be provided if the object"
+ "doesn't have an associated schema.")
+ ids = _schema.get_id_names()
+ if not ids:
+ raise Exception("Object must have an Id or a schema that"
+ " provides an Id")
+ _object_id = u""
+ for key in ids:
+ value = _values.get(key)
+ if value is None:
+ raise Exception("Object must have a value for key"
+ " attribute '%s'" % str(key))
+ try:
+ _object_id += unicode(value)
+ except:
+ raise Exception("Cannot create object_id from key"
+ " value '%s'" % str(value))
+
# timestamp in millisec since epoch UTC
ctime = long(time.time() * 1000)
super(QmfAgentData, self).__init__(_values=_values, _subtypes=_subtypes,
_tag=_tag, _ctime=ctime,
_utime=ctime, _object_id=_object_id,
- _schema=_schema, _const=False)
+ _schema_id=schema_id, _const=False)
self._agent = agent
+ self._validated = False
def destroy(self):
self._dtime = long(time.time() * 1000)
@@ -729,6 +820,25 @@
# @todo: need to take write-lock
logging.error(" TBD!!!")
+ def validate(self):
+ """
+ Compares this object's data against the associated schema. Throws an
+ exception if the data does not conform to the schema.
+ """
+ props = self._schema.get_properties()
+ for name,val in props.iteritems():
+ # @todo validate: type compatible with amqp_type?
+ # @todo validate: primary keys have values
+ if name not in self._values:
+ if val._isOptional:
+ # ok not to be present, put in dummy value
+ # to simplify access
+ self._values[name] = None
+ else:
+ raise Exception("Required property '%s' not present." % name)
+ self._validated = True
+
+
################################################################################
################################################################################
Modified: qpid/trunk/qpid/python/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/common.py?rev=906093&r1=906092&r2=906093&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/common.py (original)
+++ qpid/trunk/qpid/python/qmf2/common.py Wed Feb 3 15:44:26 2010
@@ -338,8 +338,7 @@
class QmfData(_mapEncoder):
"""
- Base data class representing arbitrarily structure data. No schema or
- managing agent is associated with data of this class.
+ Base class representing management data.
Map format:
map["_values"] = map of unordered "name"=<value> pairs (optional)
@@ -356,10 +355,10 @@
KEY_DELETE_TS = "_delete_ts"
def __init__(self,
- _values={}, _subtypes={}, _tag=None, _object_id=None,
+ _values={}, _subtypes={}, _tag=None,
+ _object_id=None, _schema_id=None,
_ctime = 0, _utime = 0, _dtime = 0,
- _map=None,
- _schema=None, _const=False):
+ _map=None, _const=False):
"""
@type _values: dict
@param _values: dictionary of initial name=value pairs for object's
@@ -372,7 +371,6 @@
@type _const: boolean
@param _const: if true, this object cannot be modified
"""
- self._schema_id = None
if _map is not None:
# construct from map
_tag = _map.get(self.KEY_TAG, _tag)
@@ -381,11 +379,14 @@
_object_id = _map.get(self.KEY_OBJECT_ID, _object_id)
sid = _map.get(self.KEY_SCHEMA_ID)
if sid:
- self._schema_id = SchemaClassId(_map=sid)
+ _schema_id = SchemaClassId.from_map(sid)
_ctime = long(_map.get(self.KEY_CREATE_TS, _ctime))
_utime = long(_map.get(self.KEY_UPDATE_TS, _utime))
_dtime = long(_map.get(self.KEY_DELETE_TS, _dtime))
+ if _object_id is None:
+ raise Exception("An object_id must be provided.")
+
self._values = _values.copy()
self._subtypes = _subtypes.copy()
self._tag = _tag
@@ -393,30 +394,21 @@
self._utime = _utime
self._dtime = _dtime
self._const = _const
+ self._schema_id = _schema_id
+ self._object_id = str(_object_id)
- if _object_id is not None:
- self._object_id = str(_object_id)
- else:
- self._object_id = None
-
- if _schema is not None:
- self._set_schema(_schema)
- else:
- # careful: map constructor may have already set self._schema_id, do
- # not override it!
- self._schema = None
def __create(cls, values, _subtypes={}, _tag=None, _object_id=None,
- _schema=None, _const=False):
+ _schema_id=None, _const=False):
# timestamp in millisec since epoch UTC
ctime = long(time.time() * 1000)
return cls(_values=values, _subtypes=_subtypes, _tag=_tag,
_ctime=ctime, _utime=ctime,
- _object_id=_object_id, _schema=_schema, _const=_const)
+ _object_id=_object_id, _schema_id=_schema_id, _const=_const)
create = classmethod(__create)
- def __from_map(cls, map_, _schema=None, _const=False):
- return cls(_map=map_, _schema=_schema, _const=_const)
+ def __from_map(cls, map_, _const=False):
+ return cls(_map=map_, _const=_const)
from_map = classmethod(__from_map)
def is_managed(self):
@@ -507,30 +499,7 @@
@rtype: str
@returns: the identification string, or None if not assigned and id.
"""
- if self._object_id:
- return self._object_id
-
- # if object id not assigned, see if schema defines a set of field
- # values to use as an id
- if not self._schema:
- return None
-
- ids = self._schema.get_id_names()
- if not ids:
- return None
-
- if not self._validated:
- self._validate()
-
- result = u""
- for key in ids:
- try:
- result += unicode(self._values[key])
- except:
- log.error("get_object_id(): cannot convert value '%s'." % key)
- return None
- self._object_id = result
- return result
+ return self._object_id
def map_encode(self):
_map = {}
@@ -555,34 +524,6 @@
_map[self.KEY_SCHEMA_ID] = self._schema_id.map_encode()
return _map
- def _set_schema(self, schema):
- self._validated = False
- self._schema = schema
- if schema:
- self._schema_id = schema.get_class_id()
- if self._const:
- self._validate()
- else:
- self._schema_id = None
-
- def _validate(self):
- """
- Compares this object's data against the associated schema. Throws an
- exception if the data does not conform to the schema.
- """
- props = self._schema.get_properties()
- for name,val in props.iteritems():
- # @todo validate: type compatible with amqp_type?
- # @todo validate: primary keys have values
- if name not in self._values:
- if val._isOptional:
- # ok not to be present, put in dummy value
- # to simplify access
- self._values[name] = None
- else:
- raise Exception("Required property '%s' not present." % name)
- self._validated = True
-
def __repr__(self):
return "QmfData=<<" + str(self.map_encode()) + ">>"
@@ -629,7 +570,7 @@
def __init__(self, _timestamp=None, _sev=SEV_NOTICE, _values={},
_subtypes={}, _tag=None,
_map=None,
- _schema=None, _const=True):
+ _schema_id=None, _const=True):
"""
@type _map: dict
@param _map: if not None, construct instance from map representation.
@@ -646,14 +587,16 @@
if _map is not None:
# construct from map
- super(QmfEvent, self).__init__(_map=_map, _schema=_schema,
- _const=_const)
+ super(QmfEvent, self).__init__(_map=_map, _const=_const,
+ _object_id="_event")
_timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp)
_sev = _map.get(self.KEY_SEVERITY, _sev)
else:
- super(QmfEvent, self).__init__(_values=_values,
+ super(QmfEvent, self).__init__(_object_id="_event",
+ _values=_values,
_subtypes=_subtypes, _tag=_tag,
- _schema=_schema, _const=_const)
+ _schema_id=_schema_id,
+ _const=_const)
if _timestamp is None:
raise TypeError("QmfEvent: a valid timestamp is required.")
@@ -665,13 +608,13 @@
self._severity = _sev
def _create(cls, timestamp, severity, values,
- _subtypes={}, _tag=None, _schema=None, _const=False):
+ _subtypes={}, _tag=None, _schema_id=None, _const=False):
return cls(_timestamp=timestamp, _sev=severity, _values=values,
- _subtypes=_subtypes, _tag=_tag, _schema=_schema, _const=_const)
+ _subtypes=_subtypes, _tag=_tag, _schema_id=_schema_id, _const=_const)
create = classmethod(_create)
- def _from_map(cls, map_, _schema=None, _const=False):
- return cls(_map=map_, _schema=_schema, _const=_const)
+ def _from_map(cls, map_, _const=False):
+ return cls(_map=map_, _const=_const)
from_map = classmethod(_from_map)
def get_timestamp(self):
@@ -1761,7 +1704,9 @@
self._object_id_names = _map.get(self.KEY_PRIMARY_KEY_NAMES,[])
_desc = _map.get(self.KEY_DESC)
else:
- super(SchemaClass, self).__init__()
+ if _classId is None:
+ raise Exception("A class identifier must be supplied.")
+ super(SchemaClass, self).__init__(_object_id=str(_classId))
self._object_id_names = []
self._classId = _classId
@@ -1876,8 +1821,8 @@
Map format:
map(SchemaClass)
"""
- def __init__(self, _classId=None, _desc=None,
- _props={}, _methods={}, _object_id_names=None,
+ def __init__(self, _classId=None, _desc=None,
+ _props={}, _methods={}, _object_id_names=[],
_map=None):
"""
@type pname: str
Modified: qpid/trunk/qpid/python/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/console.py?rev=906093&r1=906092&r2=906093&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/console.py (original)
+++ qpid/trunk/qpid/python/qmf2/console.py Wed Feb 3 15:44:26 2010
@@ -23,7 +23,7 @@
import time
import datetime
import Queue
-from threading import Thread
+from threading import Thread, Event
from threading import Lock
from threading import currentThread
from threading import Condition
@@ -193,9 +193,8 @@
"""
Console's representation of an managed QmfData instance.
"""
- def __init__(self, map_, agent, _schema=None):
+ def __init__(self, map_, agent):
super(QmfConsoleData, self).__init__(_map=map_,
- _schema=_schema,
_const=True)
self._agent = agent
@@ -276,31 +275,6 @@
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- if self._schema:
- # validate
- _in_args = _in_args.copy()
- ms = self._schema.get_method(name)
- if ms is None:
- raise ValueError("Method '%s' is undefined." % name)
-
- for aname,prop in ms.get_arguments().iteritems():
- if aname not in _in_args:
- if prop.get_default():
- _in_args[aname] = prop.get_default()
- elif not prop.is_optional():
- raise ValueError("Method '%s' requires argument '%s'"
- % (name, aname))
- for aname in _in_args.iterkeys():
- prop = ms.get_argument(aname)
- if prop is None:
- raise ValueError("Method '%s' does not define argument"
- " '%s'" % (name, aname))
- if "I" not in prop.get_direction():
- raise ValueError("Method '%s' argument '%s' is not an"
- " input." % (name, aname))
-
- # @todo check if value is correct (type, range, etc)
-
handle = self._agent._console._req_correlation.allocate()
if handle == 0:
raise Exception("Can not allocate a correlation id!")
@@ -349,7 +323,7 @@
_tag=newer._tag, _object_id=newer._object_id,
_ctime=newer._ctime, _utime=newer._utime,
_dtime=newer._dtime,
- _schema=newer._schema, _const=True)
+ _schema_id=newer._schema_id, _const=True)
class QmfLocalData(QmfData):
"""
@@ -396,7 +370,7 @@
def is_active(self):
return self._announce_timestamp != None
-
+
def _send_msg(self, msg, correlation_id=None):
"""
Low-level routine to asynchronously send a message to this agent.
@@ -598,6 +572,9 @@
@param kwargs: ??? Unused
"""
Thread.__init__(self)
+ self._operational = False
+ self._ready = Event()
+
if not name:
self._name = "qmfc-%s.%d" % (platform.node(), os.getpid())
else:
@@ -615,7 +592,6 @@
self._locate_sender = None
self._schema_cache = {}
self._req_correlation = SequencedWaiter()
- self._operational = False
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
@@ -706,6 +682,9 @@
#
self._operational = True
self.start()
+ self._ready.wait(10)
+ if not self._ready.isSet():
+ raise Exception("Console managment thread failed to start.")
@@ -906,21 +885,21 @@
logging.debug("Response to Object Query received")
obj_list = []
for obj_map in reply.content.get(MsgKey.data_obj):
- # if the object references a schema, fetch it
- sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
- if sid_map:
- sid = SchemaClassId.from_map(sid_map)
- schema = self._fetch_schema(sid, _agent=agent,
- _timeout=timeout)
- if not schema:
- logging.warning("Unknown schema, id=%s" % sid)
- continue
- obj = QmfConsoleData(map_=obj_map, agent=agent,
- _schema=schema)
- else:
- # no schema needed
- obj = QmfConsoleData(map_=obj_map, agent=agent)
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
obj_list.append(obj)
+ # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+ # if sid_map:
+ # sid = SchemaClassId.from_map(sid_map)
+ # # if the object references a schema, fetch it
+ # # schema = self._fetch_schema(sid, _agent=agent,
+ # # _timeout=timeout)
+ # # if not schema:
+ # # logging.warning("Unknown schema, id=%s" % sid)
+ # # continue
+ # obj = QmfConsoleData(map_=obj_map, agent=agent,
+ # _schema=schema)
+ # else:
+ # # no schema needed
return obj_list
else:
logging.warning("Unexpected Target for a Query: '%s'" % target)
@@ -928,9 +907,9 @@
def run(self):
global _callback_thread
- #
- # @todo KAG Rewrite when api supports waiting on multiple receivers
- #
+
+ self._ready.set()
+
while self._operational:
# qLen = self._work_q.qsize()
@@ -1152,7 +1131,7 @@
# need to create and add a new agent?
matched = False
if self._agent_discovery_filter:
- tmp = QmfData.create(values=ai_map)
+ tmp = QmfData.create(values=ai_map, _object_id="agent-filter")
matched = self._agent_discovery_filter.evaluate(tmp)
if (correlated or matched):
Modified: qpid/trunk/qpid/python/qmf2/tests/basic_method.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/basic_method.py?rev=906093&r1=906092&r2=906093&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/basic_method.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/basic_method.py Wed Feb 3 15:44:26 2010
@@ -84,9 +84,8 @@
# instantiate managed data objects matching the schema
- _obj1 = QmfAgentData( self.agent, _schema=_schema )
- _obj1.set_value("index1", 100)
- _obj1.set_value("index2", "a name" )
+ _obj1 = QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":100, "index2":"a name"})
_obj1.set_value("set_string", "UNSET")
_obj1.set_value("set_int", 0)
_obj1.set_value("query_count", 0)
@@ -153,7 +152,8 @@
if obj is None:
error_info = QmfData.create({"code": -2,
"description":
- "Bad Object Id."})
+ "Bad Object Id."},
+ _object_id="_error")
self.agent.method_response(wi.get_handle(),
_error=error_info)
else:
@@ -170,13 +170,15 @@
if obj is None:
error_info = QmfData.create({"code": -3,
"description":
- "Unknown object id."})
+ "Unknown object id."},
+ _object_id="_error")
self.agent.method_response(wi.get_handle(),
_error=error_info)
elif obj.get_object_id() != "01545":
- error_info = QmfData.create({"code": -4,
- "description":
- "Unexpected id."})
+ error_info = QmfData.create( {"code": -4,
+ "description":
+ "Unexpected id."},
+ _object_id="_error")
self.agent.method_response(wi.get_handle(),
_error=error_info)
else:
@@ -187,15 +189,18 @@
self.agent.method_response(wi.get_handle(),
{"code" : 0})
else:
- error_info = QmfData.create({"code": -5,
- "description":
- "Bad Args."})
+ error_info = QmfData.create(
+ {"code": -5,
+ "description":
+ "Bad Args."},
+ _object_id="_error")
self.agent.method_response(wi.get_handle(),
_error=error_info)
else:
- error_info = QmfData.create({"code": -1,
+ error_info = QmfData.create( {"code": -1,
"description":
- "Unknown method call."})
+ "Unknown method call."},
+ _object_id="_error")
self.agent.method_response(wi.get_handle(), _error=error_info)
self.agent.release_workitem(wi)
@@ -284,7 +289,7 @@
# find agents
# synchronous query for all objects with schema
# invalid method call on each object
- # - should throw a ValueError
+ # - should throw a ValueError - NOT YET.
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
@@ -313,11 +318,18 @@
obj_list = self.console.do_query(agent, query)
self.assertTrue(len(obj_list) == 2)
for obj in obj_list:
- self.failUnlessRaises(ValueError,
- obj.invoke_method,
- "unknown_meth",
- {"arg1": -99, "arg2": "Now set!"},
- _timeout=3)
+ mr = obj.invoke_method("unknown_method",
+ {"arg1": -99, "arg2": "Now set!"},
+ _timeout=3)
+ # self.failUnlessRaises(ValueError,
+ # obj.invoke_method,
+ # "unknown_meth",
+ # {"arg1": -99, "arg2": "Now set!"},
+ # _timeout=3)
+ self.assertTrue(isinstance(mr, qmf2.console.MethodResult))
+ self.assertFalse(mr.succeeded())
+ self.assertTrue(isinstance(mr.get_exception(), QmfData))
+
self.console.destroy(10)
def test_bad_method_no_schema(self):
Modified: qpid/trunk/qpid/python/qmf2/tests/basic_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/basic_query.py?rev=906093&r1=906092&r2=906093&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/basic_query.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/basic_query.py Wed Feb 3 15:44:26 2010
@@ -84,9 +84,8 @@
# instantiate managed data objects matching the schema
- _obj1 = QmfAgentData( self.agent, _schema=_schema )
- _obj1.set_value("index1", 100)
- _obj1.set_value("index2", "a name" )
+ _obj1 = QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":100, "index2":"a name"})
_obj1.set_value("set_string", "UNSET")
_obj1.set_value("set_int", 0)
_obj1.set_value("query_count", 0)
Modified: qpid/trunk/qpid/python/qmf2/tests/events.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/events.py?rev=906093&r1=906092&r2=906093&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/events.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/events.py Wed Feb 3 15:44:26 2010
@@ -114,7 +114,7 @@
QmfEvent.SEV_WARNING,
{"prop-1": counter,
"prop-2": str(datetime.datetime.utcnow())},
- _schema=self.schema)
+ _schema_id=self.schema.get_class_id())
counter += 1
self.agent.raise_event(event)
wi = self.agent.get_next_workitem(timeout=0)
Modified: qpid/trunk/qpid/python/qmf2/tests/obj_gets.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/obj_gets.py?rev=906093&r1=906092&r2=906093&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/obj_gets.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/obj_gets.py Wed Feb 3 15:44:26 2010
@@ -74,14 +74,16 @@
self.agent.register_object_class(_schema)
- _obj = QmfAgentData( self.agent, _schema=_schema )
- _obj.set_value("key", "p1c1_key1")
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p1c1_key1"},
+ _schema=_schema)
_obj.set_value("count1", 0)
_obj.set_value("count2", 0)
self.agent.add_object( _obj )
- _obj = QmfAgentData( self.agent, _schema=_schema )
- _obj.set_value("key", "p1c1_key2")
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p1c1_key2"},
+ _schema=_schema )
_obj.set_value("count1", 9)
_obj.set_value("count2", 10)
self.agent.add_object( _obj )
@@ -97,8 +99,9 @@
self.agent.register_object_class(_schema)
- _obj = QmfAgentData( self.agent, _schema=_schema )
- _obj.set_value("name", "p1c2_name1")
+ _obj = QmfAgentData( self.agent,
+ _values={"name":"p1c2_name1"},
+ _schema=_schema )
_obj.set_value("string1", "a data string")
self.agent.add_object( _obj )
@@ -114,13 +117,15 @@
self.agent.register_object_class(_schema)
- _obj = QmfAgentData( self.agent, _schema=_schema )
- _obj.set_value("key", "p2c1_key1")
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p2c1_key1"},
+ _schema=_schema )
_obj.set_value("counter", 0)
self.agent.add_object( _obj )
- _obj = QmfAgentData( self.agent, _schema=_schema )
- _obj.set_value("key", "p2c1_key2")
+ _obj = QmfAgentData( self.agent,
+ _values={"key":"p2c1_key2"},
+ _schema=_schema )
_obj.set_value("counter", 2112)
self.agent.add_object( _obj )
@@ -515,3 +520,80 @@
self.console.destroy(10)
+
+ def test_wildcard_schema_id(self):
+ # create console
+ # find all agents
+ # synchronous query for all described objects by:
+ # oid & wildcard schema_id
+ # wildcard schema_id
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ wild_schema_id = SchemaClassId("package1", "class1")
+ objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5)
+ self.assertTrue(len(objs) == (self.agent_count * 2))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
+
+ wild_schema_id = SchemaClassId("package1", "class2")
+ objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+ self.assertTrue(obj.get_object_id() == "p1c2_name1")
+
+ wild_schema_id = SchemaClassId("package2", "class1")
+ objs = self.console.get_objects(_schema_id=wild_schema_id, _timeout=5)
+ self.assertTrue(len(objs) == (self.agent_count * 2))
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
+
+ wild_schema_id = SchemaClassId("package1", "class1")
+ objs = self.console.get_objects(_schema_id=wild_schema_id,
+ _object_id="p1c1_key2", _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
+ self.assertTrue(obj.get_object_id() == "p1c1_key2")
+
+ # should fail
+ objs = self.console.get_objects(_schema_id=wild_schema_id,
+ _object_id="does not exist",
+ _timeout=5)
+ self.assertTrue(objs == None)
+
+ wild_schema_id = SchemaClassId("package2", "class1")
+ objs = self.console.get_objects(_schema_id=wild_schema_id,
+ _object_id="p2c1_key2", _timeout=5)
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
+ self.assertTrue(obj.get_object_id() == "p2c1_key2")
+
+ # should fail
+ wild_schema_id = SchemaClassId("package1", "bad-class")
+ objs = self.console.get_objects(_schema_id=wild_schema_id,
+ _object_id="p1c1_key2", _timeout=5)
+ self.assertTrue(objs == None)
+
+ self.console.destroy(10)
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org