You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/04 20:38:55 UTC
svn commit: r906615 - in /qpid/trunk/qpid/python/qmf2: agent.py common.py
console.py tests/__init__.py tests/multi_response.py
Author: kgiusti
Date: Thu Feb 4 19:38:55 2010
New Revision: 906615
URL: http://svn.apache.org/viewvc?rev=906615&view=rev
Log:
QPID-2261: add multi-msg query response support. Fix mailbox code to allow mult-msg per correlation id.
Added:
qpid/trunk/qpid/python/qmf2/tests/multi_response.py
Modified:
qpid/trunk/qpid/python/qmf2/agent.py
qpid/trunk/qpid/python/qmf2/common.py
qpid/trunk/qpid/python/qmf2/console.py
qpid/trunk/qpid/python/qmf2/tests/__init__.py
Modified: qpid/trunk/qpid/python/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/agent.py?rev=906615&r1=906614&r2=906615&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/agent.py (original)
+++ qpid/trunk/qpid/python/qmf2/agent.py Thu Feb 4 19:38:55 2010
@@ -95,6 +95,8 @@
self._address = QmfAddress.direct(self.name, self._domain)
self._notifier = _notifier
self._heartbeat_interval = _heartbeat_interval
+ # @todo: currently, max # of objects in a single reply message, would
+ # be better if it were max bytesize of per-msg content...
self._max_msg_size = _max_msg_size
self._capacity = _capacity
@@ -456,6 +458,38 @@
except SendError, e:
logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
+ def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
+ """
+ Send a response to a query, breaking the result into multiple
+ messages based on the agent's _max_msg_size config parameter
+ """
+
+ total = len(objects)
+ if self._max_msg_size:
+ max_count = self._max_msg_size
+ else:
+ max_count = total
+
+ start = 0
+ end = min(total, max_count)
+ while end <= total:
+ m = Message(properties={"qmf.subject":subject,
+ "method":"response"},
+ correlation_id = cid,
+ content={msgkey:objects[start:end]})
+ self._send_reply(m, reply_to)
+ if end == total:
+ break;
+ start = end
+ end = min(total, end + max_count)
+
+ # response terminator - last message has empty object array
+ if total:
+ m = Message(properties={"qmf.subject":subject,
+ "method":"response"},
+ correlation_id = cid,
+ content={msgkey: []} )
+ self._send_reply(m, reply_to)
def _dispatch(self, msg, _direct=False):
"""
@@ -615,12 +649,11 @@
finally:
self._lock.release()
- m = Message(properties={"qmf.subject":make_subject(OpCode.data_ind),
- "method":"response"},
- content={MsgKey.package_info: pnames} )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
- self._send_reply(m, msg.reply_to)
+ self._send_query_response(make_subject(OpCode.data_ind),
+ MsgKey.package_info,
+ msg.correlation_id,
+ msg.reply_to,
+ pnames)
def _querySchema( self, msg, query, _idOnly=False ):
"""
@@ -652,17 +685,15 @@
self._lock.release()
if _idOnly:
- content = {MsgKey.schema_id: schemas}
+ msgkey = MsgKey.schema_id
else:
- content = {MsgKey.schema:schemas}
-
- m = Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.data_ind)},
- content=content )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
+ msgkey = MsgKey.schema
- self._send_reply(m, msg.reply_to)
+ self._send_query_response(make_subject(OpCode.data_ind),
+ msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ schemas)
def _queryData( self, msg, query, _idOnly=False ):
@@ -736,17 +767,16 @@
self._lock.release()
if _idOnly:
- content = {MsgKey.object_id:data_objs}
+ msgkey = MsgKey.object_id
else:
- content = {MsgKey.data_obj:data_objs}
+ msgkey = MsgKey.data_obj
- m = Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.data_ind)},
- content=content )
- if msg.correlation_id != None:
- m.correlation_id = msg.correlation_id
+ self._send_query_response(make_subject(OpCode.data_ind),
+ msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ data_objs)
- self._send_reply(m, msg.reply_to)
##==============================================================================
Modified: qpid/trunk/qpid/python/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/common.py?rev=906615&r1=906614&r2=906615&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/common.py (original)
+++ qpid/trunk/qpid/python/qmf2/common.py Thu Feb 4 19:38:55 2010
@@ -969,6 +969,28 @@
return cls(_target=target, _target_params=_target_params)
create_wildcard = classmethod(_create_wildcard)
+ def _create_wildcard_object_id(cls, schema_id):
+ """
+ Create a wildcard to match all object_ids for a given schema.
+ """
+ if not isinstance(schema_id, SchemaClassId):
+ raise TypeError("class SchemaClassId expected")
+ params = {QmfData.KEY_SCHEMA_ID: schema_id}
+ return cls(_target=QmfQuery.TARGET_OBJECT_ID,
+ _target_params=params)
+ create_wildcard_object_id = classmethod(_create_wildcard_object_id)
+
+ def _create_wildcard_object(cls, schema_id):
+ """
+ Create a wildcard to match all objects for a given schema.
+ """
+ if not isinstance(schema_id, SchemaClassId):
+ raise TypeError("class SchemaClassId expected")
+ params = {QmfData.KEY_SCHEMA_ID: schema_id}
+ return cls(_target=QmfQuery.TARGET_OBJECT,
+ _target_params=params)
+ create_wildcard_object = classmethod(_create_wildcard_object)
+
def _create_predicate(cls, target, predicate, _target_params=None):
return cls(_target=target, _target_params=_target_params,
_predicate=predicate)
Modified: qpid/trunk/qpid/python/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/console.py?rev=906615&r1=906614&r2=906615&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/console.py (original)
+++ qpid/trunk/qpid/python/qmf2/console.py Thu Feb 4 19:38:55 2010
@@ -44,146 +44,56 @@
##==============================================================================
-## Sequence Manager
+## Console Transaction Management
+##
+## At any given time, a console application may have multiple outstanding
+## message transactions with agents. The following objects allow the console
+## to track these outstanding transactions.
##==============================================================================
+
class _Mailbox(object):
"""
- Virtual base class for all Mailbox-like objects
+ Virtual base class for all Mailbox-like objects.
+ """
+ def deliver(self, data):
+ raise Exception("_Mailbox deliver() method must be provided")
+
+
+class _WaitableMailbox(_Mailbox):
+ """
+ A simple mailbox that allows a consumer to wait for delivery of data.
"""
def __init__(self):
- self._msgs = []
+ self._data = []
self._cv = Condition()
self._waiting = False
- def deliver(self, obj):
+ def deliver(self, data):
+ """ Drop data into the mailbox, waking any waiters if necessary. """
self._cv.acquire()
try:
- self._msgs.append(obj)
+ self._data.append(data)
# if was empty, notify waiters
- if len(self._msgs) == 1:
+ if len(self._data) == 1:
self._cv.notify()
finally:
self._cv.release()
def fetch(self, timeout=None):
+ """ Get one data item from a mailbox, with timeout. """
self._cv.acquire()
try:
- if len(self._msgs) == 0:
+ if len(self._data) == 0:
self._cv.wait(timeout)
- if len(self._msgs):
- return self._msgs.pop()
+ if len(self._data):
+ return self._data.pop(0)
return None
finally:
self._cv.release()
-class SequencedWaiter(object):
- """
- Manage sequence numbers for asynchronous method calls.
- Allows the caller to associate a generic piece of data with a unique sequence
- number."""
-
- def __init__(self):
- self.lock = Lock()
- self.sequence = long(time.time()) # pseudo-randomize seq start
- self.pending = {}
-
-
- def allocate(self):
- """
- Reserve a sequence number.
-
- @rtype: long
- @return: a unique nonzero sequence number.
- """
- self.lock.acquire()
- try:
- seq = self.sequence
- self.sequence = self.sequence + 1
- self.pending[seq] = _Mailbox()
- finally:
- self.lock.release()
- logging.debug( "sequence %d allocated" % seq)
- return seq
-
-
- def put_data(self, seq, new_data):
- seq = long(seq)
- logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) )
- self.lock.acquire()
- try:
- if seq in self.pending:
- # logging.error("Putting seq %d @ %s" % (seq,time.time()))
- self.pending[seq].deliver(new_data)
- else:
- logging.error( "seq %d not found!" % seq )
- finally:
- self.lock.release()
-
-
-
- def get_data(self, seq, timeout=None):
- """
- Release a sequence number reserved using the reserve method. This must
- be called when the sequence is no longer needed.
-
- @type seq: int
- @param seq: a sequence previously allocated by calling reserve().
- @rtype: any
- @return: the data originally associated with the reserved sequence number.
- """
- seq = long(seq)
- logging.debug( "getting data for seq=%d" % seq)
- mbox = None
- self.lock.acquire()
- try:
- if seq in self.pending:
- mbox = self.pending[seq]
- finally:
- self.lock.release()
-
- # Note well: pending list is unlocked, so we can wait.
- # we reference mbox locally, so it will not be released
- # until we are done.
-
- if mbox:
- d = mbox.fetch(timeout)
- logging.debug( "seq %d fetched %r!" % (seq, d) )
- return d
-
- logging.debug( "seq %d not found!" % seq )
- return None
-
-
- def release(self, seq):
- """
- Release the sequence, and its mailbox
- """
- seq = long(seq)
- logging.debug( "releasing seq %d" % seq )
- self.lock.acquire()
- try:
- if seq in self.pending:
- del self.pending[seq]
- finally:
- self.lock.release()
-
-
- def is_valid(self, seq):
- """
- True if seq is in use, else False (seq is unknown)
- """
- seq = long(seq)
- self.lock.acquire()
- try:
- return seq in self.pending
- finally:
- self.lock.release()
- return False
-
-
##==============================================================================
## DATA MODEL
##==============================================================================
@@ -275,9 +185,8 @@
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- handle = self._agent._console._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
+ mbox = _WaitableMailbox()
+ cid = self._agent._console._add_mailbox(mbox)
_map = {self.KEY_OBJECT_ID:str(oid),
SchemaMethod.KEY_NAME:name}
@@ -290,10 +199,10 @@
logging.debug("Sending method req to Agent (%s)" % time.time())
try:
- self._agent._send_method_req(_map, handle)
+ self._agent._send_method_req(_map, cid)
except SendError, e:
logging.error(str(e))
- self._agent._console._req_correlation.release(handle)
+ self._agent._console._remove_mailbox(cid)
return None
# @todo async method calls!!!
@@ -301,8 +210,9 @@
print("ASYNC TBD")
logging.debug("Waiting for response to method req (%s)" % _timeout)
- replyMsg = self._agent._console._req_correlation.get_data(handle, _timeout)
- self._agent._console._req_correlation.release(handle)
+ replyMsg = mbox.fetch(_timeout)
+ self._agent._console._remove_mailbox(cid)
+
if not replyMsg:
logging.debug("Agent method req wait timed-out.")
return None
@@ -376,10 +286,6 @@
Low-level routine to asynchronously send a message to this agent.
"""
msg.reply_to = str(self._console._address)
- # handle = self._console._req_correlation.allocate()
- # if handle == 0:
- # raise Exception("Can not allocate a correlation id!")
- # msg.correlation_id = str(handle)
if correlation_id:
msg.correlation_id = str(correlation_id)
# TRACE
@@ -452,9 +358,8 @@
if _in_args:
_in_args = _in_args.copy()
- handle = self._console._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
+ mbox = _WaitableMailbox()
+ cid = self._console._add_mailbox(mbox)
_map = {SchemaMethod.KEY_NAME:name}
if _in_args:
@@ -462,10 +367,10 @@
logging.debug("Sending method req to Agent (%s)" % time.time())
try:
- self._send_method_req(_map, handle)
+ self._send_method_req(_map, cid)
except SendError, e:
logging.error(str(e))
- self._console._req_correlation.release(handle)
+ self._console._remove_mailbox(cid)
return None
# @todo async method calls!!!
@@ -473,8 +378,9 @@
print("ASYNC TBD")
logging.debug("Waiting for response to method req (%s)" % _timeout)
- replyMsg = self._console._req_correlation.get_data(handle, _timeout)
- self._console._req_correlation.release(handle)
+ replyMsg = mbox.fetch(_timeout)
+ self._console._remove_mailbox(cid)
+
if not replyMsg:
logging.debug("Agent method req wait timed-out.")
return None
@@ -591,7 +497,6 @@
self._announce_recvr = None
self._locate_sender = None
self._schema_cache = {}
- self._req_correlation = SequencedWaiter()
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
@@ -601,6 +506,10 @@
# for passing WorkItems to the application
self._work_q = Queue.Queue()
self._work_q_put = False
+ # Correlation ID and mailbox storage
+ self._correlation_id = long(time.time()) # pseudo-randomize
+ self._post_office = {} # indexed by cid
+
## Old stuff below???
#self._broker_list = []
#self.impl = qmfengine.Console()
@@ -763,9 +672,8 @@
# agent not present yet - ping it with an agent_locate
- handle = self._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
+ mbox = _WaitableMailbox()
+ cid = self._add_mailbox(mbox)
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
msg = Message(subject="console.ind.locate." + name,
@@ -773,7 +681,7 @@
"qmf.subject":make_subject(OpCode.agent_locate)},
content={MsgKey.query: query.map_encode()})
msg.reply_to = str(self._address)
- msg.correlation_id = str(handle)
+ msg.correlation_id = str(cid)
logging.debug("Sending Agent Locate (%s)" % time.time())
# TRACE
#logging.error("!!! Console %s sending agent locate (%s)" %
@@ -782,7 +690,7 @@
self._topic_sender.send(msg)
except SendError, e:
logging.error(str(e))
- self._req_correlation.release(handle)
+ self._remove_mailbox(cid)
return None
if timeout is None:
@@ -790,14 +698,15 @@
new_agent = None
logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
- self._req_correlation.get_data( handle, timeout )
- self._req_correlation.release(handle)
+ mbox.fetch(timeout)
+ self._remove_mailbox(cid)
logging.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
new_agent = self._agent_map.get(name)
finally:
self._lock.release()
+
return new_agent
@@ -828,82 +737,96 @@
def do_query(self, agent, query, timeout=None ):
"""
"""
+ query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
+ QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
+ QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
+ QmfQuery.TARGET_SCHEMA: MsgKey.schema,
+ QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
+ QmfQuery.TARGET_AGENT: MsgKey.agent_info}
target = query.get_target()
- handle = self._req_correlation.allocate()
- if handle == 0:
- raise Exception("Can not allocate a correlation id!")
+ msgkey = query_keymap.get(target)
+ if not msgkey:
+ raise Exception("Invalid target for query: %s" % str(query))
+
+ mbox = _WaitableMailbox()
+ cid = self._add_mailbox(mbox)
+
try:
logging.debug("Sending Query to Agent (%s)" % time.time())
- agent._send_query(query, handle)
+ agent._send_query(query, cid)
except SendError, e:
logging.error(str(e))
- self._req_correlation.release(handle)
+ self._remove_mailbox(cid)
return None
if not timeout:
timeout = self._reply_timeout
logging.debug("Waiting for response to Query (%s)" % timeout)
- reply = self._req_correlation.get_data(handle, timeout)
- self._req_correlation.release(handle)
- if not reply:
- logging.debug("Agent Query wait timed-out.")
- return None
+ now = datetime.datetime.utcnow()
+ expire = now + datetime.timedelta(seconds=timeout)
+
+ response = []
+ while (expire > now):
+ timeout = timedelta_to_secs(expire - now)
+ reply = mbox.fetch(timeout)
+ if not reply:
+ logging.debug("Query wait timed-out.")
+ break
+
+ objects = reply.content.get(msgkey)
+ if not objects:
+ # last response is empty
+ break
+
+ # convert from map to native types if needed
+ if target == QmfQuery.TARGET_SCHEMA_ID:
+ for sid_map in objects:
+ response.append(SchemaClassId.from_map(sid_map))
+
+ elif target == QmfQuery.TARGET_SCHEMA:
+ for schema_map in objects:
+ # extract schema id, convert based on schema type
+ sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ if sid:
+ if sid.get_type() == SchemaClassId.TYPE_DATA:
+ schema = SchemaObjectClass.from_map(schema_map)
+ else:
+ schema = SchemaEventClass.from_map(schema_map)
+ self._add_schema(schema) # add to schema cache
+ response.append(schema)
+
+ elif target == QmfQuery.TARGET_OBJECT:
+ for obj_map in objects:
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ response.append(obj)
+ # @todo prefetch unknown schema
+ # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
+ # if sid_map:
+ # sid = SchemaClassId.from_map(sid_map)
+ # # if the object references a schema, fetch it
+ # # schema = self._fetch_schema(sid, _agent=agent,
+ # # _timeout=timeout)
+ # # if not schema:
+ # # logging.warning("Unknown schema, id=%s" % sid)
+ # # continue
+ # obj = QmfConsoleData(map_=obj_map, agent=agent,
+ # _schema=schema)
+ # else:
+ # # no schema needed
+ else:
+ # no conversion needed.
+ response += objects
+
+ now = datetime.datetime.utcnow()
+
+ self._remove_mailbox(cid)
+ return response
+
- if target == QmfQuery.TARGET_PACKAGES:
- # simply pass back the list of package names
- logging.debug("Response to Packet Query received")
- return reply.content.get(MsgKey.package_info)
- elif target == QmfQuery.TARGET_OBJECT_ID:
- # simply pass back the list of object_id's
- logging.debug("Response to Object Id Query received")
- return reply.content.get(MsgKey.object_id)
- elif target == QmfQuery.TARGET_SCHEMA_ID:
- logging.debug("Response to Schema Id Query received")
- id_list = []
- for sid_map in reply.content.get(MsgKey.schema_id):
- id_list.append(SchemaClassId.from_map(sid_map))
- return id_list
- elif target == QmfQuery.TARGET_SCHEMA:
- logging.debug("Response to Schema Query received")
- schema_list = []
- for schema_map in reply.content.get(MsgKey.schema):
- # extract schema id, convert based on schema type
- sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
- if sid_map:
- sid = SchemaClassId.from_map(sid_map)
- if sid:
- if sid.get_type() == SchemaClassId.TYPE_DATA:
- schema = SchemaObjectClass.from_map(schema_map)
- else:
- schema = SchemaEventClass.from_map(schema_map)
- schema_list.append(schema)
- self._add_schema(schema)
- return schema_list
- elif target == QmfQuery.TARGET_OBJECT:
- logging.debug("Response to Object Query received")
- obj_list = []
- for obj_map in reply.content.get(MsgKey.data_obj):
- obj = QmfConsoleData(map_=obj_map, agent=agent)
- obj_list.append(obj)
- # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
- # if sid_map:
- # sid = SchemaClassId.from_map(sid_map)
- # # if the object references a schema, fetch it
- # # schema = self._fetch_schema(sid, _agent=agent,
- # # _timeout=timeout)
- # # if not schema:
- # # logging.warning("Unknown schema, id=%s" % sid)
- # # continue
- # obj = QmfConsoleData(map_=obj_map, agent=agent,
- # _schema=schema)
- # else:
- # # no schema needed
- return obj_list
- else:
- logging.warning("Unexpected Target for a Query: '%s'" % target)
- return None
def run(self):
global _callback_thread
@@ -1115,7 +1038,8 @@
correlated = False
if msg.correlation_id:
- correlated = self._req_correlation.is_valid(msg.correlation_id)
+ mbox = self._get_mailbox(msg.correlation_id)
+ correlated = mbox is not None
agent = None
self._lock.acquire()
@@ -1150,7 +1074,7 @@
if correlated:
# wake up all waiters
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- self._req_correlation.put_data(msg.correlation_id, msg)
+ mbox.deliver(msg)
def _handle_data_ind_msg(self, msg, cmap, version, direct):
"""
@@ -1158,14 +1082,15 @@
"""
logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
- if not self._req_correlation.is_valid(msg.correlation_id):
+ mbox = self._get_mailbox(msg.correlation_id)
+ if not mbox:
logging.debug("Data indicate received with unknown correlation_id"
" msg='%s'" % str(msg))
return
# wake up all waiters
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- self._req_correlation.put_data(msg.correlation_id, msg)
+ mbox.deliver(msg)
def _handle_response_msg(self, msg, cmap, version, direct):
@@ -1175,14 +1100,15 @@
# @todo code replication - clean me.
logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
- if not self._req_correlation.is_valid(msg.correlation_id):
+ mbox = self._get_mailbox(msg.correlation_id)
+ if not mbox:
logging.debug("Response msg received with unknown correlation_id"
" msg='%s'" % str(msg))
return
# wake up all waiters
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
- self._req_correlation.put_data(msg.correlation_id, msg)
+ mbox.deliver(msg)
def _handle_event_ind_msg(self, msg, cmap, version, _direct):
ei_map = cmap.get(MsgKey.event)
@@ -1393,6 +1319,46 @@
else:
return None
+ def _add_mailbox(self, mbox):
+ """ Add a mailbox to the post office, return a unique identifier """
+ cid = 0
+ self._lock.acquire()
+ try:
+ cid = self._correlation_id
+ self._correlation_id += 1
+ self._post_office[cid] = mbox
+ finally:
+ self._lock.release()
+ return cid
+
+ def _get_mailbox(self, mid):
+ try:
+ mid = long(mid)
+ except TypeError:
+ logging.error("Invalid mailbox id: %s" % str(mid))
+ return None
+
+ self._lock.acquire()
+ try:
+ return self._post_office.get(mid)
+ finally:
+ self._lock.release()
+
+
+ def _remove_mailbox(self, mid):
+ """ Remove a mailbox and its address from the post office """
+ try:
+ mid = long(mid)
+ except TypeError:
+ logging.error("Invalid mailbox id: %s" % str(mid))
+ return None
+
+ self._lock.acquire()
+ try:
+ del self._post_office[mid]
+ finally:
+ self._lock.release()
+
def __repr__(self):
return str(self._address)
Modified: qpid/trunk/qpid/python/qmf2/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/__init__.py?rev=906615&r1=906614&r2=906615&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/__init__.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/__init__.py Thu Feb 4 19:38:55 2010
@@ -24,3 +24,4 @@
import basic_method
import obj_gets
import events
+import multi_response
Added: qpid/trunk/qpid/python/qmf2/tests/multi_response.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/multi_response.py?rev=906615&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/multi_response.py (added)
+++ qpid/trunk/qpid/python/qmf2/tests/multi_response.py Thu Feb 4 19:38:55 2010
@@ -0,0 +1,295 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+# note: objects, schema per agent must each be > max objs
+_SCHEMAS_PER_AGENT=7
+_OBJS_PER_AGENT=19
+_MAX_OBJS_PER_MSG=3
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, broker_url, heartbeat):
+ Thread.__init__(self)
+ self.schema_count = _SCHEMAS_PER_AGENT
+ self.obj_count = _OBJS_PER_AGENT
+ self.notifier = _testNotifier()
+ self.broker_url = broker_url
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat,
+ _max_msg_size=_MAX_OBJS_PER_MSG)
+
+ # Dynamically construct a management database
+ for i in range(self.schema_count):
+ _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage",
+ "MyClass-" + str(i)),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"] )
+ # add properties
+ _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # These two properties can be set via the method call
+ _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # add method
+ _meth = SchemaMethod( _desc="Method to set string and int in object." )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ self.agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ for j in range(self.obj_count):
+
+ self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":j,
+ "index2": "name-" + str(j),
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
+
+ self.running = False
+ self.ready = Event()
+
+ def start_app(self):
+ self.running = True
+ self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
+
+ def stop_app(self):
+ self.running = False
+ # wake main thread
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+ self.ready.set()
+
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+ if self.conn:
+ self.agent.remove_connection(10)
+ self.agent.destroy(10)
+
+
+
+
+class BaseTest(unittest.TestCase):
+ def configure(self, config):
+ self.agent_count = 2
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ # one second agent indication interval
+ self.agent_heartbeat = 1
+ self.agents = []
+ for a in range(self.agent_count):
+ agent = _agentApp("agent-" + str(a),
+ self.broker,
+ self.agent_heartbeat)
+ agent.start_app()
+ self.agents.append(agent)
+
+ def tearDown(self):
+ for agent in self.agents:
+ if agent is not None:
+ agent.stop_app()
+
+ def test_all_schema_id(self):
+ # create console
+ # find agents
+ # synchronous query for all schemas_ids
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for agent_app in self.agents:
+ agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
+ self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
+
+ # get a list of all schema_ids
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+ sid_list = self.console.do_query(agent, query)
+ self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
+ for sid in sid_list:
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "MyPackage")
+ self.assertTrue(sid.get_class_name().split('-')[0] == "MyClass")
+
+ self.console.destroy(10)
+
+
+ def test_all_schema(self):
+ # create console
+ # find agents
+ # synchronous query for all schemas
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for agent_app in self.agents:
+ agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
+ self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
+
+ # get a list of all schema_ids
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
+ schema_list = self.console.do_query(agent, query)
+ self.assertTrue(schema_list and
+ len(schema_list) == _SCHEMAS_PER_AGENT)
+ for schema in schema_list:
+ self.assertTrue(isinstance(schema, SchemaObjectClass))
+
+ self.console.destroy(10)
+
+
+ def test_all_object_id(self):
+ # create console
+ # find agents
+ # synchronous query for all object_ids by schema_id
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for agent_app in self.agents:
+ agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
+ self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
+
+ # get a list of all schema_ids
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+ sid_list = self.console.do_query(agent, query)
+ self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
+ for sid in sid_list:
+ query = QmfQuery.create_wildcard_object_id(sid)
+ oid_list = self.console.do_query(agent, query)
+ self.assertTrue(oid_list and
+ len(oid_list) == _OBJS_PER_AGENT)
+ for oid in oid_list:
+ self.assertTrue(isinstance(oid, basestring))
+
+ self.console.destroy(10)
+
+
+ def test_all_objects(self):
+ # create console
+ # find agents
+ # synchronous query for all objects by schema_id
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for agent_app in self.agents:
+ agent = self.console.find_agent(agent_app.agent.get_name(), timeout=3)
+ self.assertTrue(agent and agent.get_name() == agent_app.agent.get_name())
+
+ # get a list of all schema_ids
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+ sid_list = self.console.do_query(agent, query)
+ self.assertTrue(sid_list and len(sid_list) == _SCHEMAS_PER_AGENT)
+ for sid in sid_list:
+ query = QmfQuery.create_wildcard_object(sid)
+ obj_list = self.console.do_query(agent, query)
+ self.assertTrue(obj_list and
+ len(obj_list) == _OBJS_PER_AGENT)
+ for obj in obj_list:
+ self.assertTrue(isinstance(obj,
+ qmf2.console.QmfConsoleData))
+
+ self.console.destroy(10)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org