You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/01 22:14:40 UTC
svn commit: r905415 - in /qpid/trunk/qpid/python/qmf2: agent.py common.py
console.py tests/basic_method.py tests/basic_query.py tests/events.py
tests/obj_gets.py
Author: kgiusti
Date: Mon Feb 1 21:14:39 2010
New Revision: 905415
URL: http://svn.apache.org/viewvc?rev=905415&view=rev
Log:
QPID-2261: add schema_id to object query target params. Add more tests
Modified:
qpid/trunk/qpid/python/qmf2/agent.py
qpid/trunk/qpid/python/qmf2/common.py
qpid/trunk/qpid/python/qmf2/console.py
qpid/trunk/qpid/python/qmf2/tests/basic_method.py
qpid/trunk/qpid/python/qmf2/tests/basic_query.py
qpid/trunk/qpid/python/qmf2/tests/events.py
qpid/trunk/qpid/python/qmf2/tests/obj_gets.py
Modified: qpid/trunk/qpid/python/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/agent.py?rev=905415&r1=905414&r2=905415&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/agent.py (original)
+++ qpid/trunk/qpid/python/qmf2/agent.py Mon Feb 1 21:14:39 2010
@@ -26,7 +26,8 @@
from uuid import uuid4
from common import (make_subject, parse_subject, OpCode, QmfQuery,
SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod)
+ SchemaClass, SchemaClassId, WorkItem, SchemaMethod,
+ timedelta_to_secs)
# global flag that indicates which thread (if any) is
# running the agent notifier callback
@@ -42,18 +43,22 @@
application. Given to the app in a WorkItem, provided to the agent when
method_response() is invoked.
"""
- def __init__(self, correlation_id, reply_to, meth_name, _oid=None):
+ def __init__(self, correlation_id, reply_to, meth_name, _oid=None,
+ _schema_id=None):
self.correlation_id = correlation_id
self.reply_to = reply_to
self.meth_name = meth_name
self.oid = _oid
+ self.schema_id = _schema_id
class MethodCallParams(object):
"""
"""
- def __init__(self, name, _oid=None, _in_args=None, _user_id=None):
+ def __init__(self, name, _oid=None, _schema_id=None, _in_args=None,
+ _user_id=None):
self._meth_name = name
self._oid = _oid
+ self._schema_id = _schema_id
self._in_args = _in_args
self._user_id = _user_id
@@ -63,6 +68,9 @@
def get_object_id(self):
return self._oid
+ def get_schema_id(self):
+ return self._schema_id
+
def get_args(self):
return self._in_args
@@ -100,7 +108,12 @@
self._packages = {}
self._schema_timestamp = long(0)
self._schema = {}
- self._agent_data = {}
+ # _described_data holds QmfData objects that are associated with schema
+ # it is index by schema_id, object_id
+ self._described_data = {}
+ # _undescribed_data holds unstructured QmfData objects - these objects
+ # have no schema. it is indexed by object_id only.
+ self._undescribed_data = {}
self._work_q = Queue.Queue()
self._work_q_put = False
@@ -247,7 +260,7 @@
# (self.name, str(msg)))
self._topic_sender.send(msg)
- def add_object(self, data ):
+ def add_object(self, data):
"""
Register an instance of a QmfAgentData object.
"""
@@ -256,20 +269,34 @@
if not isinstance(data, QmfAgentData):
raise TypeError("QmfAgentData instance expected")
- id_ = data.get_object_id()
- if not id_:
+ oid = data.get_object_id()
+ if not oid:
raise TypeError("No identifier assigned to QmfAgentData!")
+ sid = data.get_schema_class_id()
+
self._lock.acquire()
try:
- self._agent_data[id_] = data
+ if sid:
+ if sid not in self._described_data:
+ self._described_data[sid] = {oid: data}
+ else:
+ self._described_data[sid][oid] = data
+ else:
+ self._undescribed_data[oid] = data
finally:
self._lock.release()
- def get_object(self, id):
+ def get_object(self, oid, schema_id):
+ data = None
self._lock.acquire()
try:
- data = self._agent_data.get(id)
+ if schema_id:
+ data = self._described_data.get(schema_id)
+ if data:
+ data = data.get(oid)
+ else:
+ data = self._undescribed_data.get(oid)
finally:
self._lock.release()
return data
@@ -284,6 +311,8 @@
_map = {SchemaMethod.KEY_NAME:handle.meth_name}
if handle.oid is not None:
_map[QmfData.KEY_OBJECT_ID] = handle.oid
+ if handle.schema_id is not None:
+ _map[QmfData.KEY_SCHEMA_ID] = handle.schema_id.map_encode()
if _out_args is not None:
_map[SchemaMethod.KEY_ARGUMENTS] = _out_args.copy()
if _error is not None:
@@ -340,7 +369,7 @@
logging.debug("Agent Indication Sent")
next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
- timeout = ((next_heartbeat - now) + datetime.timedelta(microseconds=999999)).seconds
+ timeout = timedelta_to_secs(next_heartbeat - now)
# print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
try:
self._session.next_receiver(timeout=timeout)
@@ -519,11 +548,14 @@
in_args = cmap.get(SchemaMethod.KEY_ARGUMENTS)
oid = cmap.get(QmfData.KEY_OBJECT_ID)
+ schema_id = cmap.get(QmfData.KEY_SCHEMA_ID)
+ if schema_id:
+ schema_id = SchemaClassId.from_map(schema_id)
handle = _MethodCallHandle(msg.correlation_id,
msg.reply_to,
mname,
- oid)
- param = MethodCallParams( mname, oid, in_args, msg.user_id)
+ oid, schema_id)
+ param = MethodCallParams( mname, oid, schema_id, in_args, msg.user_id)
self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
self._work_q_put = True
@@ -594,12 +626,24 @@
"""
"""
data_objs = []
+ # extract optional schema_id from target params
+ sid = None
+ t_params = query.get_target_param()
+ if t_params:
+ sid = t_params.get(QmfData.KEY_SCHEMA_ID)
+
# if querying for a specific object, do a direct lookup
if query.get_selector() == QmfQuery.ID:
+ oid = query.get_id()
found = None
self._lock.acquire()
try:
- found = self._agent_data.get(query.get_id())
+ if sid:
+ found = self._described_data.get(sid)
+ if found:
+ found = found.get(oid)
+ else:
+ found = self._undescribed_data.get(oid)
finally:
self._lock.release()
if found:
@@ -610,12 +654,18 @@
else: # otherwise, evaluate all data
self._lock.acquire()
try:
- for oid,val in self._agent_data.iteritems():
- if query.evaluate(val):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(val.map_encode())
+ if sid:
+ db = self._described_data.get(sid)
+ else:
+ db = self._undescribed_data
+
+ if db:
+ for oid,val in db.iteritems():
+ if query.evaluate(val):
+ if _idOnly:
+ data_objs.append(oid)
+ else:
+ data_objs.append(val.map_encode())
finally:
self._lock.release()
Modified: qpid/trunk/qpid/python/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/common.py?rev=905415&r1=905414&r2=905415&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/common.py (original)
+++ qpid/trunk/qpid/python/qmf2/common.py Mon Feb 1 21:14:39 2010
@@ -90,6 +90,12 @@
return _sub[3:].split('.', 1)
+def timedelta_to_secs(td):
+ """
+ Convert a time delta to a time interval in seconds (float)
+ """
+ return td.days * 86400 + td.seconds + td.microseconds/1000000.0
+
##==============================================================================
## Async Event Model
@@ -906,8 +912,10 @@
TARGET_SCHEMA_ID="schema_id"
TARGET_SCHEMA="schema"
- # allowed predicate key(s):
+ # allowed id: value:
+ # SchemaClassId
#
+ # allowed predicate key(s):
# SchemaClassId.KEY_PACKAGE
# SchemaClassId.KEY_CLASS
# SchemaClassId.KEY_TYPE
@@ -917,19 +925,22 @@
# name of method (exist test only)
TARGET_AGENT="agent"
+ # allowed id: value:
+ # string name of agent
# allowed predicate keys(s):
- #
+ #
KEY_AGENT_NAME="_name"
TARGET_OBJECT_ID="object_id"
TARGET_OBJECT="object"
+ # If object is described by a schema, the value of the target map must
+ # include a "_schema_id": {map encoded schema id} value.
+ #
+ # allowed id: value:
+ # object_id string
+ #
# allowed predicate keys(s):
#
- # SchemaClassId.KEY_PACKAGE
- # SchemaClassId.KEY_CLASS
- # SchemaClassId.KEY_TYPE
- # SchemaClassId.KEY_HASH
- # QmfData.KEY_SCHEMA_ID
# QmfData.KEY_OBJECT_ID
# QmfData.KEY_UPDATE_TS
# QmfData.KEY_CREATE_TS
@@ -977,8 +988,23 @@
if key in self._valid_targets:
_target = key
break
+ if _target is None:
+ raise TypeError("Invalid QmfQuery target: '%s'" %
+ str(target_map))
+ # convert target params from map format
_target_params = target_map.get(_target)
+ if _target_params:
+ if not isinstance(_target_params, type({})):
+ raise TypeError("target params must be a map: '%s'" %
+ str(_target_params))
+ t_params = {}
+ for name,value in _target_params.iteritems():
+ if name == QmfData.KEY_SCHEMA_ID:
+ t_params[name] = SchemaClassId.from_map(value)
+ else:
+ t_params[name] = value
+ _target_params = t_params
_id = _map.get(self.KEY_ID)
if _id is not None:
@@ -1009,9 +1035,40 @@
return cls(_target=target, _target_params=_target_params, _id=ident)
create_id = classmethod(_create_id)
+ def _create_id_object(cls, object_id, _schema_id=None):
+ """
+ Create a ID Query for an object (schema optional).
+ """
+ if _schema_id is not None:
+ if not isinstance(_schema_id, SchemaClassId):
+ raise TypeError("class SchemaClassId expected")
+ params = {QmfData.KEY_SCHEMA_ID: _schema_id}
+ else:
+ params = None
+ return cls(_target=QmfQuery.TARGET_OBJECT,
+ _id=object_id,
+ _target_params=params)
+ create_id_object = classmethod(_create_id_object)
+
+ def _create_id_object_id(cls, object_id, _schema_id=None):
+ """
+ Create a ID Query for object_ids (schema optional).
+ """
+ if _schema_id is not None:
+ if not isinstance(_schema_id, SchemaClassId):
+ raise TypeError("class SchemaClassId expected")
+ params = {QmfData.KEY_SCHEMA_ID: _schema_id}
+ else:
+ params = None
+ return cls(_target=QmfQuery.TARGET_OBJECT_ID,
+ _id=object_id,
+ _target_params=params)
+ create_id_object_id = classmethod(_create_id_object_id)
+
def _from_map(cls, map_):
return cls(_map=map_)
from_map = classmethod(_from_map)
+ # end constructors
def get_target(self):
return self._target
@@ -1055,7 +1112,18 @@
return True
def map_encode(self):
- _map = {self.KEY_TARGET: {self._target: self._target_params}}
+ t_params = {}
+ if self._target_params:
+ for name,value in self._target_params.iteritems():
+ if isinstance(value, _mapEncoder):
+ t_params[name] = value.map_encode()
+ else:
+ t_params[name] = value
+ if t_params:
+ _map = {self.KEY_TARGET: {self._target: t_params}}
+ else:
+ _map = {self.KEY_TARGET: {self._target: None}}
+
if self._id is not None:
if isinstance(self._id, _mapEncoder):
_map[self.KEY_ID] = self._id.map_encode()
Modified: qpid/trunk/qpid/python/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/console.py?rev=905415&r1=905414&r2=905415&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/console.py (original)
+++ qpid/trunk/qpid/python/qmf2/console.py Mon Feb 1 21:14:39 2010
@@ -31,9 +31,9 @@
from qpid.messaging import Connection, Message, Empty, SendError
from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
- MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, SchemaEventClass,
- SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
+ MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId,
+ SchemaEventClass, SchemaObjectClass, WorkItem,
+ SchemaMethod, QmfEvent, timedelta_to_secs)
# global flag that indicates which thread (if any) is
@@ -249,11 +249,9 @@
if _timeout is None:
_timeout = self._agent._console._reply_timeout
-
# create query to agent using this objects ID
- oid = self.get_object_id()
- query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT,
- self.get_object_id())
+ query = QmfQuery.create_id_object(self.get_object_id(),
+ self.get_schema_class_id())
obj_list = self._agent._console.do_query(self._agent, query,
timeout=_timeout)
if obj_list is None or len(obj_list) != 1:
@@ -309,6 +307,10 @@
_map = {self.KEY_OBJECT_ID:str(oid),
SchemaMethod.KEY_NAME:name}
+
+ sid = self.get_schema_class_id()
+ if sid:
+ _map[self.KEY_SCHEMA_ID] = sid.map_encode()
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args
@@ -969,7 +971,7 @@
# to expire
now = datetime.datetime.utcnow()
if self._next_agent_expire > now:
- timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds
+ timeout = timedelta_to_secs(self._next_agent_expire - now)
try:
logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
xxx = self._session.next_receiver(timeout = timeout)
@@ -980,34 +982,20 @@
logging.debug("Shutting down Console thread")
def get_objects(self,
+ _object_id=None,
_schema_id=None,
_pname=None, _cname=None,
- _object_id=None,
_agents=None,
_timeout=None):
"""
- @todo
- """
- if _object_id is not None:
- # query by object id
- query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, _object_id)
- elif _schema_id is not None:
- pred = [QmfQuery.EQ, QmfData.KEY_SCHEMA_ID, _schema_id.map_encode()]
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
- elif _pname is not None:
- # query by package name (and maybe class name)
- if _cname is not None:
- pred = [QmfQuery.AND, [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE,
- [QmfQuery.QUOTE, _pname]],
- [QmfQuery.EQ, SchemaClassId.KEY_CLASS,
- [QmfQuery.QUOTE, _cname]]]
- else:
- pred = [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE,
- [QmfQuery.QUOTE, _pname]]
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT, pred)
- else:
- raise Exception("invalid arguments")
+ Retrieve objects by id or schema.
+ By object_id: must specify schema_id or pname & cname if object defined
+ by a schema. Undescribed objects: only object_id needed.
+
+ By schema: must specify schema_id or pname & cname - all instances of
+ objects defined by that schema are returned.
+ """
if _agents is None:
# use copy of current agent list
self._lock.acquire()
@@ -1021,12 +1009,12 @@
agent_list = _agents
# @todo validate this list!
- # @todo: fix when async do_query done - query all agents at once, then
- # wait for replies, instead of per-agent querying....
-
if _timeout is None:
_timeout = self._reply_timeout
+ # @todo: fix when async do_query done - query all agents at once, then
+ # wait for replies, instead of per-agent querying....
+
obj_list = []
expired = datetime.datetime.utcnow() + datetime.timedelta(seconds=_timeout)
for agent in agent_list:
@@ -1035,11 +1023,54 @@
now = datetime.datetime.utcnow()
if now >= expired:
break
- timeout = ((expired - now) + datetime.timedelta(microseconds=999999)).seconds
- reply = self.do_query(agent, query, timeout)
- if reply:
- obj_list = obj_list + reply
+ if _pname is None:
+ if _object_id:
+ query = QmfQuery.create_id_object(_object_id,
+ _schema_id)
+ else:
+ if _schema_id is not None:
+ t_params = {QmfData.KEY_SCHEMA_ID: _schema_id}
+ else:
+ t_params = None
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ t_params)
+ timeout = timedelta_to_secs(expired - now)
+ reply = self.do_query(agent, query, timeout)
+ if reply:
+ obj_list = obj_list + reply
+ else:
+ # looking up by package name (and maybe class name), need to
+ # find all schema_ids in that package, then lookup object by
+ # schema_id
+ if _cname is not None:
+ pred = [QmfQuery.AND,
+ [QmfQuery.EQ,
+ SchemaClassId.KEY_PACKAGE,
+ [QmfQuery.QUOTE, _pname]],
+ [QmfQuery.EQ, SchemaClassId.KEY_CLASS,
+ [QmfQuery.QUOTE, _cname]]]
+ else:
+ pred = [QmfQuery.EQ,
+ SchemaClassId.KEY_PACKAGE,
+ [QmfQuery.QUOTE, _pname]]
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID, pred)
+ timeout = timedelta_to_secs(expired - now)
+ sid_list = self.do_query(agent, query, timeout)
+ if sid_list:
+ for sid in sid_list:
+ now = datetime.datetime.utcnow()
+ if now >= expired:
+ break
+ if _object_id is not None:
+ query = QmfQuery.create_id_object(_object_id, sid)
+ else:
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
+ timeout = timedelta_to_secs(expired - now)
+ reply = self.do_query(agent, query, timeout)
+ if reply:
+ obj_list = obj_list + reply
if obj_list:
return obj_list
return None
@@ -1103,55 +1134,44 @@
" '%s'" % msg)
return
- ignore = True
- matched = False
correlated = False
- agent_query = self._agent_discovery_filter
-
if msg.correlation_id:
correlated = self._req_correlation.is_valid(msg.correlation_id)
- if direct and correlated:
- ignore = False
- elif agent_query:
- matched = agent_query.evaluate(QmfData.create(values=ai_map))
- ignore = not matched
+ agent = None
+ self._lock.acquire()
+ try:
+ agent = self._agent_map.get(name)
+ if agent:
+ # agent already known, just update timestamp
+ agent._announce_timestamp = datetime.datetime.utcnow()
+ finally:
+ self._lock.release()
- if not ignore:
- agent = None
- self._lock.acquire()
- try:
- agent = self._agent_map.get(name)
- finally:
- self._lock.release()
+ if not agent:
+ # need to create and add a new agent?
+ matched = False
+ if self._agent_discovery_filter:
+ tmp = QmfData.create(values=ai_map)
+ matched = self._agent_discovery_filter.evaluate(tmp)
- if not agent:
- # need to create and add a new agent
+ if (correlated or matched):
agent = self._create_agent(name)
if not agent:
return # failed to add agent
-
- # lock out expiration scanning code
- self._lock.acquire()
- try:
- old_timestamp = agent._announce_timestamp
agent._announce_timestamp = datetime.datetime.utcnow()
- finally:
- self._lock.release()
-
- if old_timestamp == None and matched:
- logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
- wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
- self._work_q.put(wi)
- self._work_q_put = True
-
- if correlated:
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- self._req_correlation.put_data(msg.correlation_id, msg)
-
-
+ if matched:
+ # unsolicited, but newly discovered
+ logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
+ wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
+ self._work_q.put(wi)
+ self._work_q_put = True
+
+ if correlated:
+ # wake up all waiters
+ logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ self._req_correlation.put_data(msg.correlation_id, msg)
def _handle_data_ind_msg(self, msg, cmap, version, direct):
"""
Modified: qpid/trunk/qpid/python/qmf2/tests/basic_method.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/basic_method.py?rev=905415&r1=905414&r2=905415&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/basic_method.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/basic_method.py Mon Feb 1 21:14:39 2010
@@ -148,7 +148,8 @@
raise Exception("Unexpected method call parameters")
if mc.get_name() == "set_meth":
- obj = self.agent.get_object(mc.get_object_id())
+ obj = self.agent.get_object(mc.get_object_id(),
+ mc.get_schema_id())
if obj is None:
error_info = QmfData.create({"code": -2,
"description":
@@ -164,7 +165,8 @@
self.agent.method_response(wi.get_handle(),
{"code" : 0})
elif mc.get_name() == "a_method":
- obj = self.agent.get_object(mc.get_object_id())
+ obj = self.agent.get_object(mc.get_object_id(),
+ mc.get_schema_id())
if obj is None:
error_info = QmfData.create({"code": -3,
"description":
@@ -246,38 +248,82 @@
agent = self.console.find_agent(aname, timeout=3)
self.assertTrue(agent and agent.get_name() == aname)
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
- [QmfQuery.AND,
- [QmfQuery.EXISTS, [QmfQuery.QUOTE, SchemaClassId.KEY_PACKAGE]],
- [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE, [QmfQuery.QUOTE, "MyPackage"]]])
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
- obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 2)
- for obj in obj_list:
- mr = obj.invoke_method( "set_meth", {"arg_int": -99,
- "arg_str": "Now set!"},
- _timeout=3)
- self.assertTrue(isinstance(mr, qmf2.console.MethodResult))
- self.assertTrue(mr.succeeded())
- self.assertTrue(mr.get_argument("code") == 0)
-
- self.assertTrue(obj.get_value("method_call_count") == 0)
- self.assertTrue(obj.get_value("set_string") == "UNSET")
- self.assertTrue(obj.get_value("set_int") == 0)
-
- obj.refresh()
-
- self.assertTrue(obj.get_value("method_call_count") == 1)
- self.assertTrue(obj.get_value("set_string") == "Now set!")
- self.assertTrue(obj.get_value("set_int") == -99)
+ sid_list = self.console.do_query(agent, query)
+ self.assertTrue(sid_list and len(sid_list) == 1)
+ for sid in sid_list:
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ _target_params=t_params)
+ obj_list = self.console.do_query(agent, query)
+ self.assertTrue(len(obj_list) == 2)
+ for obj in obj_list:
+ mr = obj.invoke_method( "set_meth", {"arg_int": -99,
+ "arg_str": "Now set!"},
+ _timeout=3)
+ self.assertTrue(isinstance(mr, qmf2.console.MethodResult))
+ self.assertTrue(mr.succeeded())
+ self.assertTrue(mr.get_argument("code") == 0)
+
+ self.assertTrue(obj.get_value("method_call_count") == 0)
+ self.assertTrue(obj.get_value("set_string") == "UNSET")
+ self.assertTrue(obj.get_value("set_int") == 0)
+
+ obj.refresh()
+
+ self.assertTrue(obj.get_value("method_call_count") == 1)
+ self.assertTrue(obj.get_value("set_string") == "Now set!")
+ self.assertTrue(obj.get_value("set_int") == -99)
self.console.destroy(10)
- def test_bad_method(self):
+ def test_bad_method_schema(self):
# create console
# find agents
- # synchronous query for all objects in schema
+ # synchronous query for all objects with schema
+ # invalid method call on each object
+ # - should throw a ValueError
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+
+ sid_list = self.console.do_query(agent, query)
+ self.assertTrue(sid_list and len(sid_list) == 1)
+ for sid in sid_list:
+
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
+ [QmfQuery.TRUE],
+ _target_params=t_params)
+
+ obj_list = self.console.do_query(agent, query)
+ self.assertTrue(len(obj_list) == 2)
+ for obj in obj_list:
+ self.failUnlessRaises(ValueError,
+ obj.invoke_method,
+ "unknown_meth",
+ {"arg1": -99, "arg2": "Now set!"},
+ _timeout=3)
+ self.console.destroy(10)
+
+ def test_bad_method_no_schema(self):
+ # create console
+ # find agents
+ # synchronous query for all objects with no schema
# invalid method call on each object
# - should throw a ValueError
self.notifier = _testNotifier()
@@ -294,21 +340,20 @@
agent = self.console.find_agent(aname, timeout=3)
self.assertTrue(agent and agent.get_name() == aname)
- query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
- [QmfQuery.AND,
- [QmfQuery.EXISTS, [QmfQuery.QUOTE, SchemaClassId.KEY_PACKAGE]],
- [QmfQuery.EQ, [QmfQuery.UNQUOTE, SchemaClassId.KEY_PACKAGE], [QmfQuery.QUOTE, "MyPackage"]]])
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT)
obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 2)
+ self.assertTrue(len(obj_list) == 1)
for obj in obj_list:
- self.failUnlessRaises(ValueError,
- obj.invoke_method,
- "unknown_meth",
- {"arg1": -99, "arg2": "Now set!"},
- _timeout=3)
- self.console.destroy(10)
+ self.assertTrue(obj.get_schema_class_id() == None)
+ mr = obj.invoke_method("unknown_meth",
+ {"arg1": -99, "arg2": "Now set!"},
+ _timeout=3)
+ self.assertTrue(isinstance(mr, qmf2.console.MethodResult))
+ self.assertFalse(mr.succeeded())
+ self.assertTrue(isinstance(mr.get_exception(), QmfData))
+ self.console.destroy(10)
def test_managed_obj(self):
# create console
Modified: qpid/trunk/qpid/python/qmf2/tests/basic_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/basic_query.py?rev=905415&r1=905414&r2=905415&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/basic_query.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/basic_query.py Mon Feb 1 21:14:39 2010
@@ -94,13 +94,22 @@
self.agent.add_object( _obj1 )
self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
- _values={"index1":99,
+ _values={"index1":99,
"index2": "another name",
"set_string": "UNSET",
"set_int": 0,
"query_count": 0,
"method_call_count": 0} ))
+ self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":50,
+ "index2": "my name",
+ "set_string": "SET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
+
+
# add an "unstructured" object to the Agent
_obj2 = QmfAgentData(self.agent, _object_id="01545")
_obj2.set_value("field1", "a value")
@@ -110,6 +119,30 @@
_obj2.set_value("index1", 50)
self.agent.add_object(_obj2)
+ _obj2 = QmfAgentData(self.agent, _object_id="01546")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 3)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _obj2.set_value("index1", 51)
+ self.agent.add_object(_obj2)
+
+ _obj2 = QmfAgentData(self.agent, _object_id="01544")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 4)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _obj2.set_value("index1", 49)
+ self.agent.add_object(_obj2)
+
+ _obj2 = QmfAgentData(self.agent, _object_id="01543")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 4)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _obj2.set_value("index1", 48)
+ self.agent.add_object(_obj2)
+
self.running = False
self.ready = Event()
@@ -178,7 +211,8 @@
def test_all_oids(self):
# create console
# find agents
- # synchronous query for all objects by id
+ # synchronous query for all schemas
+ # synchronous query for all objects per schema
# verify known object ids are returned
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
@@ -194,15 +228,39 @@
agent = self.console.find_agent(aname, timeout=3)
self.assertTrue(agent and agent.get_name() == aname)
+ # first, find objects per schema
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+ sid_list = self.console.do_query(agent, query)
+ self.assertTrue(sid_list and len(sid_list) == 1)
+ for sid in sid_list:
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID,
+ _target_params=t_params)
+
+ oid_list = self.console.do_query(agent, query)
+
+ self.assertTrue(isinstance(oid_list, type([])),
+ "Unexpected return type")
+ self.assertTrue(len(oid_list) == 3, "Wrong count")
+ self.assertTrue('100a name' in oid_list)
+ self.assertTrue('99another name' in oid_list)
+ self.assertTrue('50my name' in oid_list)
+ self.assertTrue('01545' not in oid_list)
+
+
+ # now, find all unmanaged objects (no schema)
query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT_ID)
oid_list = self.console.do_query(agent, query)
self.assertTrue(isinstance(oid_list, type([])),
"Unexpected return type")
- self.assertTrue(len(oid_list) == 3, "Wrong count")
- self.assertTrue('100a name' in oid_list)
- self.assertTrue('99another name' in oid_list)
+ self.assertTrue(len(oid_list) == 4, "Wrong count")
+ self.assertTrue('100a name' not in oid_list)
+ self.assertTrue('99another name' not in oid_list)
self.assertTrue('01545' in oid_list)
+ self.assertTrue('01544' in oid_list)
+ self.assertTrue('01543' in oid_list)
+ self.assertTrue('01546' in oid_list)
self.console.destroy(10)
@@ -226,8 +284,13 @@
agent = self.console.find_agent(aname, timeout=3)
self.assertTrue(agent and agent.get_name() == aname)
- for oid in ['100a name', '99another name', '01545']:
- query = QmfQuery.create_id(QmfQuery.TARGET_OBJECT, oid)
+ # first, find objects per schema
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+ sid_list = self.console.do_query(agent, query)
+ self.assertTrue(sid_list and len(sid_list) == 1)
+
+ for oid in ['100a name', '99another name']:
+ query = QmfQuery.create_id_object(oid, sid_list[0])
obj_list = self.console.do_query(agent, query)
self.assertTrue(isinstance(obj_list, type([])),
@@ -236,15 +299,23 @@
obj = obj_list[0]
self.assertTrue(isinstance(obj, QmfData))
self.assertTrue(obj.get_object_id() == oid)
+ self.assertTrue(obj.get_schema_class_id() == sid_list[0])
+ schema_id = obj.get_schema_class_id()
+ self.assertTrue(isinstance(schema_id, SchemaClassId))
+ self.assertTrue(obj.is_described())
+
+ # now find schema-less objects
+ for oid in ['01545']:
+ query = QmfQuery.create_id_object(oid)
+ obj_list = self.console.do_query(agent, query)
- if obj.is_described():
- self.assertTrue(oid in ['100a name', '99another name'])
- schema_id = obj.get_schema_class_id()
- self.assertTrue(isinstance(schema_id, SchemaClassId))
- else:
- self.assertTrue(oid == "01545")
-
-
+ self.assertTrue(isinstance(obj_list, type([])),
+ "Unexpected return type")
+ self.assertTrue(len(obj_list) == 1)
+ obj = obj_list[0]
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == oid)
+ self.assertFalse(obj.is_described())
self.console.destroy(10)
@@ -360,11 +431,21 @@
agent = self.console.find_agent(aname, timeout=3)
self.assertTrue(agent and agent.get_name() == aname)
+ # get the schema id for MyPackage:MyClass schema
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_SCHEMA_ID,
+ [QmfQuery.AND,
+ [QmfQuery.EQ, SchemaClassId.KEY_PACKAGE,
+ [QmfQuery.QUOTE, "MyPackage"]],
+ [QmfQuery.EQ, SchemaClassId.KEY_CLASS,
+ [QmfQuery.QUOTE, "MyClass"]]])
+ sid_list = self.console.do_query(agent, query)
+ self.assertTrue(len(sid_list) == 1)
+
query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
[QmfQuery.AND,
[QmfQuery.EXISTS, [QmfQuery.QUOTE, "set_string"]],
- [QmfQuery.EQ, "set_string", [QmfQuery.QUOTE, "UNSET"]]])
-
+ [QmfQuery.EQ, "set_string", [QmfQuery.QUOTE, "UNSET"]]],
+ _target_params={QmfData.KEY_SCHEMA_ID: sid_list[0]})
obj_list = self.console.do_query(agent, query)
self.assertTrue(len(obj_list) == 2)
for obj in obj_list:
@@ -394,41 +475,43 @@
agent = self.console.find_agent(aname, timeout=3)
self.assertTrue(agent and agent.get_name() == aname)
- # == 99
+ # Query the unmanaged (no schema) objects
+
+ # == 50
query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
[QmfQuery.AND,
[QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]],
- [QmfQuery.EQ, "index1", 99]])
+ [QmfQuery.EQ, "index1", 50]])
obj_list = self.console.do_query(agent, query)
self.assertTrue(len(obj_list) == 1)
self.assertTrue(obj_list[0].has_value("index1"))
- self.assertTrue(obj_list[0].get_value("index1") == 99)
+ self.assertTrue(obj_list[0].get_value("index1") == 50)
- # <= 99
+ # <= 50
query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
[QmfQuery.AND,
[QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]],
- [QmfQuery.LE, "index1", 99]])
+ [QmfQuery.LE, "index1", 50]])
obj_list = self.console.do_query(agent, query)
- self.assertTrue(len(obj_list) == 2)
+ self.assertTrue(len(obj_list) == 3)
for obj in obj_list:
self.assertTrue(obj.has_value("index1"))
- self.assertTrue(obj.get_value("index1") <= 99)
+ self.assertTrue(obj.get_value("index1") <= 50)
- # > 99
+ # > 50
query = QmfQuery.create_predicate(QmfQuery.TARGET_OBJECT,
[QmfQuery.AND,
[QmfQuery.EXISTS, [QmfQuery.QUOTE, "index1"]],
- [QmfQuery.GT, "index1", 99]])
+ [QmfQuery.GT, "index1", 50]])
obj_list = self.console.do_query(agent, query)
self.assertTrue(len(obj_list) == 1)
for obj in obj_list:
self.assertTrue(obj.has_value("index1"))
- self.assertTrue(obj.get_value("index1") > 99)
+ self.assertTrue(obj.get_value("index1") > 50)
self.console.destroy(10)
Modified: qpid/trunk/qpid/python/qmf2/tests/events.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/events.py?rev=905415&r1=905414&r2=905415&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/events.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/events.py Mon Feb 1 21:14:39 2010
@@ -84,7 +84,6 @@
if not self.ready.is_set():
raise Exception("Agent failed to connect to broker.")
# time.sleep(1)
- print("!!! agent=%s setup complete (%s)" % (self.agent, time.time()))
def stop_app(self):
self.running = False
@@ -106,7 +105,6 @@
raise Skipped(e)
self.agent.set_connection(conn)
- print("!!! agent=%s connection done (%s)" % (self.agent, time.time()))
self.ready.set()
counter = 1
Modified: qpid/trunk/qpid/python/qmf2/tests/obj_gets.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/obj_gets.py?rev=905415&r1=905414&r2=905415&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/obj_gets.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/obj_gets.py Mon Feb 1 21:14:39 2010
@@ -17,6 +17,7 @@
#
import unittest
import logging
+import datetime
from threading import Thread, Event
import qpid.messaging
@@ -193,8 +194,10 @@
agent = _agentApp("agent-" + str(i), self.broker, 1)
agent.start_app()
self.agents.append(agent)
+ #print("!!!! STARTING TEST: %s" % datetime.datetime.utcnow())
def tearDown(self):
+ #print("!!!! STOPPING TEST: %s" % datetime.datetime.utcnow())
for agent in self.agents:
if agent is not None:
agent.stop_app()
@@ -221,25 +224,34 @@
self.assertTrue(agent and agent.get_name() == aname)
# console has discovered all agents, now query all undesc-2 objects
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
objs = self.console.get_objects(_object_id="undesc-2", _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
self.assertTrue(len(objs) == self.agent_count)
for obj in objs:
self.assertTrue(obj.get_object_id() == "undesc-2")
# now query all objects from schema "package1"
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
objs = self.console.get_objects(_pname="package1", _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
self.assertTrue(len(objs) == (self.agent_count * 3))
for obj in objs:
self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
# now query all objects from schema "package2"
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
objs = self.console.get_objects(_pname="package2", _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
self.assertTrue(len(objs) == (self.agent_count * 2))
for obj in objs:
self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
# now query all objects from schema "package1/class2"
- objs = self.console.get_objects(_pname="package1", _cname="class2", _timeout=5)
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_pname="package1", _cname="class2",
+ _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
self.assertTrue(len(objs) == self.agent_count)
for obj in objs:
self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
@@ -247,7 +259,9 @@
# given the schema identifier from the last query, repeat using the
# specific schema id
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
schema_id = objs[0].get_schema_class_id()
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
objs = self.console.get_objects(_schema_id=schema_id, _timeout=5)
self.assertTrue(len(objs) == self.agent_count)
for obj in objs:
@@ -352,7 +366,7 @@
self.assertTrue(agent and agent.get_name() == aname)
agent_list.append(agent)
- # Only use one agetn
+ # Only use one agent
agent = agent_list[0]
# console has discovered all agents, now query all undesc-2 objects
@@ -400,3 +414,104 @@
self.console.destroy(10)
+
+
+ def test_all_objs_by_oid(self):
+ # create console
+ # find all agents
+ # synchronous query for all described objects by:
+ # oid & schema_id
+ # oid & package name
+ # oid & package and class name
+ # verify known object ids are returned
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for agent_app in self.agents:
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now query all objects from schema "package1"
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_pname="package1",
+ _object_id="p1c1_key1", _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
+ self.assertTrue(obj.get_object_id() == "p1c1_key1")
+ # mooch the schema for a later test
+ schema_id_p1c1 = objs[0].get_schema_class_id()
+
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_pname="package1",
+ _object_id="p1c2_name1", _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class2")
+ self.assertTrue(obj.get_object_id() == "p1c2_name1")
+
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_pname="package2", _cname="class1",
+ _object_id="p2c1_key1", _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package2")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
+ self.assertTrue(obj.get_object_id() == "p2c1_key1")
+
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_schema_id=schema_id_p1c1,
+ _object_id="p1c1_key2", _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
+ self.assertTrue(len(objs) == self.agent_count)
+ for obj in objs:
+ self.assertTrue(obj.get_schema_class_id().get_package_name() == "package1")
+ self.assertTrue(obj.get_schema_class_id().get_class_name() == "class1")
+ self.assertTrue(obj.get_object_id() == "p1c1_key2")
+
+ # this should return all "undescribed" objects
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
+ self.assertTrue(len(objs) == (self.agent_count * 2))
+ for obj in objs:
+ self.assertTrue(obj.get_object_id() == "undesc-1" or
+ obj.get_object_id() == "undesc-2")
+
+ # these should fail
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_schema_id=schema_id_p1c1,
+ _object_id="does not exist",
+ _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
+ self.assertTrue(objs == None)
+
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_pname="package2",
+ _object_id="does not exist",
+ _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
+ self.assertTrue(objs == None)
+
+ #print("!!!! STARTING GET: %s" % datetime.datetime.utcnow())
+ objs = self.console.get_objects(_pname="package3",
+ _object_id="does not exist",
+ _timeout=5)
+ #print("!!!! STOPPING GET: %s" % datetime.datetime.utcnow())
+ self.assertTrue(objs == None)
+
+ self.console.destroy(10)
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org