You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/12 21:15:25 UTC
svn commit: r909591 - in /qpid/trunk/qpid/python/qmf2: common.py console.py
tests/__init__.py tests/async_query.py
Author: kgiusti
Date: Fri Feb 12 20:15:21 2010
New Revision: 909591
URL: http://svn.apache.org/viewvc?rev=909591&view=rev
Log:
QPID-2261: add async query and schema prefetch
Added:
qpid/trunk/qpid/python/qmf2/tests/async_query.py
Modified:
qpid/trunk/qpid/python/qmf2/common.py
qpid/trunk/qpid/python/qmf2/console.py
qpid/trunk/qpid/python/qmf2/tests/__init__.py
Modified: qpid/trunk/qpid/python/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/common.py?rev=909591&r1=909590&r2=909591&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/common.py (original)
+++ qpid/trunk/qpid/python/qmf2/common.py Fri Feb 12 20:15:21 2010
@@ -131,6 +131,7 @@
OBJECT_UPDATE=5
EVENT_RECEIVED=7
AGENT_HEARTBEAT=8
+ QUERY_COMPLETE=9
# Enumeration of the types of WorkItems produced on the Agent
METHOD_CALL=1000
QUERY=1001
Modified: qpid/trunk/qpid/python/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/console.py?rev=909591&r1=909590&r2=909591&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/console.py (original)
+++ qpid/trunk/qpid/python/qmf2/console.py Fri Feb 12 20:15:21 2010
@@ -56,21 +56,47 @@
"""
Virtual base class for all Mailbox-like objects.
"""
+ def __init__(self, console):
+ self.console = console
+ self.cid = 0
+ self.console._add_mailbox(self)
+
+ def get_address(self):
+ return self.cid
+
def deliver(self, data):
+ """
+ Invoked by Console Management thread when a message arrives for
+ this mailbox.
+ """
raise Exception("_Mailbox deliver() method must be provided")
+ def destroy(self):
+ """
+ Release the mailbox. Once called, the mailbox should no longer be
+ referenced.
+ """
+ self.console._remove_mailbox(self.cid)
+
-class _WaitableMailbox(_Mailbox):
+class _SyncMailbox(_Mailbox):
"""
A simple mailbox that allows a consumer to wait for delivery of data.
"""
- def __init__(self):
- self._data = []
+ def __init__(self, console):
+ """
+ Invoked by application thread.
+ """
+ super(_SyncMailbox, self).__init__(console)
self._cv = Condition()
+ self._data = []
self._waiting = False
def deliver(self, data):
- """ Drop data into the mailbox, waking any waiters if necessary. """
+ """
+ Drop data into the mailbox, waking any waiters if necessary.
+ Invoked by Console Management thread only.
+ """
self._cv.acquire()
try:
self._data.append(data)
@@ -81,7 +107,10 @@
self._cv.release()
def fetch(self, timeout=None):
- """ Get one data item from a mailbox, with timeout. """
+ """
+ Get one data item from a mailbox, with timeout.
+ Invoked by application thread.
+ """
self._cv.acquire()
try:
if len(self._data) == 0:
@@ -93,6 +122,189 @@
self._cv.release()
+class _AsyncMailbox(_Mailbox):
+ """
+ A Mailbox for asynchronous delivery, with a timeout value.
+ """
+ def __init__(self, console,
+ agent_name,
+ _timeout=None):
+ """
+ Invoked by application thread.
+ """
+ super(_AsyncMailbox, self).__init__(console)
+
+ self.agent_name = agent_name
+ self.console = console
+
+ if _timeout is None:
+ _timeout = console._reply_timeout
+ self.expiration_date = (datetime.datetime.utcnow() +
+ datetime.timedelta(seconds=_timeout))
+ console._lock.acquire()
+ try:
+ console._async_mboxes[self.cid] = self
+ finally:
+ console._lock.release()
+
+ # now that an async mbox has been created, wake the
+ # console mgmt thread so it will know about the mbox expiration
+ # date (and adjust its idle sleep period correctly)
+
+ console._wake_thread()
+
+ def deliver(self, msg):
+ """
+ """
+ raise Exception("deliver() method must be provided")
+
+ def expire(self):
+ raise Exception("expire() method must be provided")
+
+
+ def destroy(self):
+ self.console._lock.acquire()
+ try:
+ if self.cid in self.console._async_mboxes:
+ del self.console._async_mboxes[self.cid]
+ finally:
+ self.console._lock.release()
+ super(_AsyncMailbox, self).destroy()
+
+
+
+class _QueryMailbox(_AsyncMailbox):
+ """
+ A mailbox used for asynchronous query requests.
+ """
+ def __init__(self, console,
+ agent_name,
+ context,
+ target, msgkey,
+ _timeout=None):
+ """
+ Invoked by application thread.
+ """
+ super(_QueryMailbox, self).__init__(console,
+ agent_name,
+ _timeout)
+ self.target = target
+ self.msgkey = msgkey
+ self.context = context
+ self.result = []
+
+ def deliver(self, reply):
+ """
+ Process query response messages delivered to this mailbox.
+ Invoked by Console Management thread only.
+ """
+ done = False
+ objects = reply.content.get(self.msgkey)
+ if not objects:
+ done = True
+ else:
+ # convert from map to native types if needed
+ if self.target == QmfQuery.TARGET_SCHEMA_ID:
+ for sid_map in objects:
+ self.result.append(SchemaClassId.from_map(sid_map))
+
+ elif self.target == QmfQuery.TARGET_SCHEMA:
+ for schema_map in objects:
+ # extract schema id, convert based on schema type
+ sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ if sid:
+ if sid.get_type() == SchemaClassId.TYPE_DATA:
+ schema = SchemaObjectClass.from_map(schema_map)
+ else:
+ schema = SchemaEventClass.from_map(schema_map)
+ self.console._add_schema(schema) # add to schema cache
+ self.result.append(schema)
+
+ elif self.target == QmfQuery.TARGET_OBJECT:
+ for obj_map in objects:
+ # @todo: need the agent name - ideally from the
+ # reply message iself.
+ agent = self.console.get_agent(self.agent_name)
+ if agent:
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ # start fetch of schema if not known
+ sid = obj.get_schema_class_id()
+ if sid:
+ self.console._prefetch_schema(sid, agent)
+ self.result.append(obj)
+
+
+ else:
+ # no conversion needed.
+ self.result += objects
+
+ if done:
+ # create workitem
+ # logging.error("QUERY COMPLETE for %s" % str(self.context))
+ wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ self.destroy()
+
+
+ def expire(self):
+ logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
+ datetime.datetime.utcnow())
+ # send along whatever (possibly none) has been received so far
+ wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ self.destroy()
+
+
+
+class _SchemaPrefetchMailbox(_AsyncMailbox):
+ """
+ Handles responses to schema fetches made by the console.
+ """
+ def __init__(self, console,
+ agent_name,
+ schema_id,
+ _timeout=None):
+ """
+ Invoked by application thread.
+ """
+ super(_SchemaPrefetchMailbox, self).__init__(console,
+ agent_name,
+ _timeout)
+
+ self.schema_id = schema_id
+
+
+ def deliver(self, reply):
+ """
+ Process schema response messages.
+ """
+ done = False
+ schemas = reply.content.get(MsgKey.schema)
+ if schemas:
+ for schema_map in schemas:
+ # extract schema id, convert based on schema type
+ sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
+ if sid_map:
+ sid = SchemaClassId.from_map(sid_map)
+ if sid:
+ if sid.get_type() == SchemaClassId.TYPE_DATA:
+ schema = SchemaObjectClass.from_map(schema_map)
+ else:
+ schema = SchemaEventClass.from_map(schema_map)
+ self.console._add_schema(schema) # add to schema cache
+ self.destroy()
+
+
+ def expire(self):
+ self.destroy()
+
+
##==============================================================================
## DATA MODEL
@@ -185,8 +397,8 @@
if _timeout is None:
_timeout = self._agent._console._reply_timeout
- mbox = _WaitableMailbox()
- cid = self._agent._console._add_mailbox(mbox)
+ mbox = _SyncMailbox(self._agent._console)
+ cid = mbox.get_address()
_map = {self.KEY_OBJECT_ID:str(oid),
SchemaMethod.KEY_NAME:name}
@@ -202,7 +414,7 @@
self._agent._send_method_req(_map, cid)
except SendError, e:
logging.error(str(e))
- self._agent._console._remove_mailbox(cid)
+ mbox.destroy()
return None
# @todo async method calls!!!
@@ -211,7 +423,7 @@
logging.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
- self._agent._console._remove_mailbox(cid)
+ mbox.destroy()
if not replyMsg:
logging.debug("Agent method req wait timed-out.")
@@ -258,7 +470,7 @@
"""
def __init__(self, name, console):
"""
- @type name: AgentId
+ @type name: string
@param name: uniquely identifies this agent in the AMQP domain.
"""
@@ -358,8 +570,8 @@
if _in_args:
_in_args = _in_args.copy()
- mbox = _WaitableMailbox()
- cid = self._console._add_mailbox(mbox)
+ mbox = _SyncMailbox(self._console)
+ cid = mbox.get_address()
_map = {SchemaMethod.KEY_NAME:name}
if _in_args:
@@ -370,7 +582,7 @@
self._send_method_req(_map, cid)
except SendError, e:
logging.error(str(e))
- self._console._remove_mailbox(cid)
+ mbox.destroy()
return None
# @todo async method calls!!!
@@ -379,7 +591,7 @@
logging.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
- self._console._remove_mailbox(cid)
+ mbox.destroy()
if not replyMsg:
logging.debug("Agent method req wait timed-out.")
@@ -497,19 +709,20 @@
self._announce_recvr = None
self._locate_sender = None
self._schema_cache = {}
+ self._pending_schema_req = []
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
self._next_agent_expire = None
- # lock out run() thread
- self._cv = Condition()
+ self._next_mbox_expire = None
# for passing WorkItems to the application
self._work_q = Queue.Queue()
self._work_q_put = False
# Correlation ID and mailbox storage
self._correlation_id = long(time.time()) # pseudo-randomize
self._post_office = {} # indexed by cid
-
+ self._async_mboxes = {} # indexed by cid, used to expire them
+
## Old stuff below???
#self._broker_list = []
#self.impl = qmfengine.Console()
@@ -612,15 +825,7 @@
self._operational = False
if self.isAlive():
# kick my thread to wake it up
- logging.debug("Sending noop to wake up [%s]" % self._address)
- try:
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
- subject=self._name,
- content={"noop":"noop"})
- self._direct_sender.send( msg, sync=True )
- except SendError, e:
- logging.error(str(e))
+ self._wake_thread()
logging.debug("waiting for console receiver thread to exit")
self.join(timeout)
if self.isAlive():
@@ -651,8 +856,8 @@
self._lock.acquire()
try:
- if agent._id in self._agent_map:
- del self._agent_map[agent._id]
+ if agent._name in self._agent_map:
+ del self._agent_map[agent._name]
finally:
self._lock.release()
@@ -672,8 +877,8 @@
# agent not present yet - ping it with an agent_locate
- mbox = _WaitableMailbox()
- cid = self._add_mailbox(mbox)
+ mbox = _SyncMailbox(self)
+ cid = mbox.get_address()
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
msg = Message(subject="console.ind.locate." + name,
@@ -690,7 +895,7 @@
self._topic_sender.send(msg)
except SendError, e:
logging.error(str(e))
- self._remove_mailbox(cid)
+ mbox.destroy()
return None
if timeout is None:
@@ -699,7 +904,7 @@
new_agent = None
logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
mbox.fetch(timeout)
- self._remove_mailbox(cid)
+ mbox.destroy()
logging.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
@@ -749,15 +954,15 @@
if not msgkey:
raise Exception("Invalid target for query: %s" % str(query))
- mbox = _WaitableMailbox()
- cid = self._add_mailbox(mbox)
+ mbox = _SyncMailbox(self)
+ cid = mbox.get_address()
try:
logging.debug("Sending Query to Agent (%s)" % time.time())
agent._send_query(query, cid)
except SendError, e:
logging.error(str(e))
- self._remove_mailbox(cid)
+ mbox.destroy()
return None
if not timeout:
@@ -802,33 +1007,74 @@
elif target == QmfQuery.TARGET_OBJECT:
for obj_map in objects:
obj = QmfConsoleData(map_=obj_map, agent=agent)
+ # start fetch of schema if not known
+ sid = obj.get_schema_class_id()
+ if sid:
+ self._prefetch_schema(sid, agent)
response.append(obj)
- # @todo prefetch unknown schema
- # sid_map = obj_map.get(QmfData.KEY_SCHEMA_ID)
- # if sid_map:
- # sid = SchemaClassId.from_map(sid_map)
- # # if the object references a schema, fetch it
- # # schema = self._fetch_schema(sid, _agent=agent,
- # # _timeout=timeout)
- # # if not schema:
- # # logging.warning("Unknown schema, id=%s" % sid)
- # # continue
- # obj = QmfConsoleData(map_=obj_map, agent=agent,
- # _schema=schema)
- # else:
- # # no schema needed
else:
# no conversion needed.
response += objects
now = datetime.datetime.utcnow()
- self._remove_mailbox(cid)
+ mbox.destroy()
return response
+ def do_async_query(self, agent, query, app_handle, _timeout=None ):
+ """
+ """
+ query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
+ QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
+ QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
+ QmfQuery.TARGET_SCHEMA: MsgKey.schema,
+ QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
+ QmfQuery.TARGET_AGENT: MsgKey.agent_info}
+
+ target = query.get_target()
+ msgkey = query_keymap.get(target)
+ if not msgkey:
+ raise Exception("Invalid target for query: %s" % str(query))
+
+ mbox = _QueryMailbox(self,
+ agent.get_name(),
+ app_handle,
+ target, msgkey,
+ _timeout)
+ cid = mbox.get_address()
+
+ try:
+ logging.debug("Sending Query to Agent (%s)" % time.time())
+ agent._send_query(query, cid)
+ except SendError, e:
+ logging.error(str(e))
+ mbox.destroy()
+ return False
+ return True
+
+
+ def _wake_thread(self):
+ """
+ Make the console management thread loop wakeup from its next_receiver
+ sleep.
+ """
+ logging.debug("Sending noop to wake up [%s]" % self._address)
+ msg = Message(properties={"method":"request",
+ "qmf.subject":make_subject(OpCode.noop)},
+ subject=self._name,
+ content={"noop":"noop"})
+ try:
+ self._direct_sender.send( msg, sync=True )
+ except SendError, e:
+ logging.error(str(e))
+
def run(self):
+ """
+ Console Management Thread main loop.
+ Handles inbound messages, agent discovery, async mailbox timeouts.
+ """
global _callback_thread
self._ready.set()
@@ -858,6 +1104,7 @@
self._dispatch(msg, _direct=True)
self._expire_agents() # check for expired agents
+ self._expire_mboxes() # check for expired async mailbox requests
#if qLen == 0 and self._work_q.qsize() and self._notifier:
if self._work_q_put and self._notifier:
@@ -869,11 +1116,15 @@
_callback_thread = None
if self._operational:
- # wait for a message to arrive or an agent
- # to expire
+ # wait for a message to arrive, or an agent
+ # to expire, or a mailbox requrest to time out
now = datetime.datetime.utcnow()
- if self._next_agent_expire > now:
- timeout = timedelta_to_secs(self._next_agent_expire - now)
+ next_expire = self._next_agent_expire
+ if (self._next_mbox_expire and
+ self._next_mbox_expire < next_expire):
+ next_expire = self._next_mbox_expire
+ if next_expire > now:
+ timeout = timedelta_to_secs(next_expire - now)
try:
logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
xxx = self._session.next_receiver(timeout = timeout)
@@ -1089,7 +1340,8 @@
return
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ logging.debug("waking waiters for correlation id %s" %
+ msg.correlation_id)
mbox.deliver(msg)
@@ -1151,6 +1403,36 @@
self._work_q_put = True
+ def _expire_mboxes(self):
+ """
+ Check all async mailboxes for outstanding requests that have expired.
+ """
+ now = datetime.datetime.utcnow()
+ if self._next_mbox_expire and now < self._next_mbox_expire:
+ return
+ expired_mboxes = []
+ self._next_mbox_expire = None
+ self._lock.acquire()
+ try:
+ for mbox in self._async_mboxes.itervalues():
+ if now >= mbox.expiration_date:
+ expired_mboxes.append(mbox)
+ else:
+ if (self._next_mbox_expire is None or
+ mbox.expiration_date < self._next_mbox_expire):
+ self._next_mbox_expire = mbox.expiration_date
+
+ for mbox in expired_mboxes:
+ del self._async_mboxes[mbox.cid]
+ finally:
+ self._lock.release()
+
+ for mbox in expired_mboxes:
+ # note: expire() may deallocate the mbox, so don't touch
+ # it further.
+ mbox.expire()
+
+
def _expire_agents(self):
"""
Check for expired agents and issue notifications when they expire.
@@ -1288,9 +1570,43 @@
sid = schema.get_class_id()
if not self._schema_cache.has_key(sid):
self._schema_cache[sid] = schema
+ if sid in self._pending_schema_req:
+ self._pending_schema_req.remove(sid)
+ finally:
+ self._lock.release()
+
+ def _prefetch_schema(self, schema_id, agent):
+ """
+ Send an async request for the schema identified by schema_id if the
+ schema is not available in the cache.
+ """
+ need_fetch = False
+ self._lock.acquire()
+ try:
+ if ((not self._schema_cache.has_key(schema_id)) and
+ schema_id not in self._pending_schema_req):
+ self._pending_schema_req.append(schema_id)
+ need_fetch = True
finally:
self._lock.release()
+ if need_fetch:
+ mbox = _SchemaPrefetchMailbox(self, agent.get_name(),
+ schema_id)
+ query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
+ logging.debug("Sending Schema Query to Agent (%s)" % time.time())
+ try:
+ agent._send_query(query, mbox.get_address())
+ except SendError, e:
+ logging.error(str(e))
+ mbox.destroy()
+ self._lock.acquire()
+ try:
+ self._pending_schema_req.remove(schema_id)
+ finally:
+ self._lock.release()
+
+
def _fetch_schema(self, schema_id, _agent=None, _timeout=None):
"""
Find the schema identified by schema_id. If not in the cache, ask the
@@ -1320,16 +1636,16 @@
return None
def _add_mailbox(self, mbox):
- """ Add a mailbox to the post office, return a unique identifier """
- cid = 0
+ """
+ Add a mailbox to the post office, and assign it a unique address.
+ """
self._lock.acquire()
try:
- cid = self._correlation_id
+ mbox.cid = self._correlation_id
self._correlation_id += 1
- self._post_office[cid] = mbox
+ self._post_office[mbox.cid] = mbox
finally:
self._lock.release()
- return cid
def _get_mailbox(self, mid):
try:
@@ -1355,7 +1671,8 @@
self._lock.acquire()
try:
- del self._post_office[mid]
+ if mid in self._post_office:
+ del self._post_office[mid]
finally:
self._lock.release()
Modified: qpid/trunk/qpid/python/qmf2/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/__init__.py?rev=909591&r1=909590&r2=909591&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/__init__.py (original)
+++ qpid/trunk/qpid/python/qmf2/tests/__init__.py Fri Feb 12 20:15:21 2010
@@ -25,3 +25,4 @@
import obj_gets
import events
import multi_response
+import async_query
Added: qpid/trunk/qpid/python/qmf2/tests/async_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf2/tests/async_query.py?rev=909591&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qmf2/tests/async_query.py (added)
+++ qpid/trunk/qpid/python/qmf2/tests/async_query.py Fri Feb 12 20:15:21 2010
@@ -0,0 +1,460 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf2.common import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, WorkItem)
+import qmf2.console
+from qmf2.agent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, broker_url, heartbeat):
+ Thread.__init__(self)
+ self.notifier = _testNotifier()
+ self.broker_url = broker_url
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat)
+
+ # Dynamically construct a management database
+
+ _schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"),
+ _desc="A test data schema",
+ _object_id_names=["index1", "index2"] )
+ # add properties
+ _schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # these two properties are statistics
+ _schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+ _schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # These two properties can be set via the method call
+ _schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR))
+ _schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # add method
+ _meth = SchemaMethod( _desc="Method to set string and int in object." )
+ _meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) )
+ _meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) )
+ _schema.add_method( "set_meth", _meth )
+
+ # Add schema to Agent
+
+ self.agent.register_object_class(_schema)
+
+ # instantiate managed data objects matching the schema
+
+ _obj1 = QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":100, "index2":"a name"})
+ _obj1.set_value("set_string", "UNSET")
+ _obj1.set_value("set_int", 0)
+ _obj1.set_value("query_count", 0)
+ _obj1.set_value("method_call_count", 0)
+ self.agent.add_object( _obj1 )
+
+ self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":99,
+ "index2": "another name",
+ "set_string": "UNSET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
+
+ self.agent.add_object( QmfAgentData( self.agent, _schema=_schema,
+ _values={"index1":50,
+ "index2": "my name",
+ "set_string": "SET",
+ "set_int": 0,
+ "query_count": 0,
+ "method_call_count": 0} ))
+
+
+ # add an "unstructured" object to the Agent
+ _obj2 = QmfAgentData(self.agent, _object_id="01545")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 2)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _obj2.set_value("index1", 50)
+ self.agent.add_object(_obj2)
+
+ _obj2 = QmfAgentData(self.agent, _object_id="01546")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 3)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _obj2.set_value("index1", 51)
+ self.agent.add_object(_obj2)
+
+ _obj2 = QmfAgentData(self.agent, _object_id="01544")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 4)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _obj2.set_value("index1", 49)
+ self.agent.add_object(_obj2)
+
+ _obj2 = QmfAgentData(self.agent, _object_id="01543")
+ _obj2.set_value("field1", "a value")
+ _obj2.set_value("field2", 4)
+ _obj2.set_value("field3", {"a":1, "map":2, "value":3})
+ _obj2.set_value("field4", ["a", "list", "value"])
+ _obj2.set_value("index1", 48)
+ self.agent.add_object(_obj2)
+
+ self.running = False
+ self.ready = Event()
+
+ def start_app(self):
+ self.running = True
+ self.start()
+ self.ready.wait(10)
+ if not self.ready.is_set():
+ raise Exception("Agent failed to connect to broker.")
+
+ def stop_app(self):
+ self.running = False
+ # wake main thread
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(10)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ self.conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ self.conn.connect()
+ self.agent.set_connection(self.conn)
+ self.ready.set()
+
+ while self.running:
+ self.notifier.wait_for_work(None)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+
+ if self.conn:
+ self.agent.remove_connection(10)
+ self.agent.destroy(10)
+
+
+
+
+class BaseTest(unittest.TestCase):
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ # one second agent indication interval
+ self.agent_heartbeat = 1
+ self.agent1 = _agentApp("agent1", self.broker, self.agent_heartbeat)
+ self.agent1.start_app()
+ self.agent2 = _agentApp("agent2", self.broker, self.agent_heartbeat)
+ self.agent2.start_app()
+
+ def tearDown(self):
+ if self.agent1:
+ self.agent1.stop_app()
+ self.agent1 = None
+ if self.agent2:
+ self.agent2.stop_app()
+ self.agent2 = None
+
+ def test_all_schema_ids(self):
+ # create console
+ # find agents
+ # asynchronous query for all schema ids
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # send queries
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA_ID)
+ rc = self.console.do_async_query(agent, query, aname)
+ self.assertTrue(rc)
+
+ # done. Now wait for async responses
+
+ count = 0
+ while self.notifier.wait_for_work(3):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ count += 1
+ self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+ self.assertTrue(wi.get_handle() == "agent1" or
+ wi.get_handle() == "agent2")
+ reply = wi.get_params()
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], SchemaClassId))
+ self.assertTrue(reply[0].get_package_name() == "MyPackage")
+ self.assertTrue(reply[0].get_class_name() == "MyClass")
+ self.console.release_workitem(wi)
+ wi = self.console.get_next_workitem(timeout=0)
+
+ self.assertTrue(count == 2)
+ self.console.destroy(10)
+
+
+
+ def test_undescribed_objs(self):
+ # create console
+ # find agents
+ # asynchronous query for all non-schema objects
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # send queries
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT)
+ rc = self.console.do_async_query(agent, query, aname)
+ self.assertTrue(rc)
+
+ # done. Now wait for async responses
+
+ count = 0
+ while self.notifier.wait_for_work(3):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ count += 1
+ self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+ self.assertTrue(wi.get_handle() == "agent1" or
+ wi.get_handle() == "agent2")
+ reply = wi.get_params()
+ self.assertTrue(len(reply) == 4)
+ self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData))
+ self.assertFalse(reply[0].is_described()) # no schema
+ self.console.release_workitem(wi)
+ wi = self.console.get_next_workitem(timeout=0)
+
+ self.assertTrue(count == 2)
+ self.console.destroy(10)
+
+
+
+ def test_described_objs(self):
+ # create console
+ # find agents
+ # asynchronous query for all schema-based objects
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ #
+ t_params = {QmfData.KEY_SCHEMA_ID: SchemaClassId("MyPackage", "MyClass")}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT, t_params)
+ #
+ rc = self.console.do_async_query(agent, query, aname)
+ self.assertTrue(rc)
+
+ # done. Now wait for async responses
+
+ count = 0
+ while self.notifier.wait_for_work(3):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ count += 1
+ self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+ self.assertTrue(wi.get_handle() == "agent1" or
+ wi.get_handle() == "agent2")
+ reply = wi.get_params()
+ self.assertTrue(len(reply) == 3)
+ self.assertTrue(isinstance(reply[0], qmf2.console.QmfConsoleData))
+ self.assertTrue(reply[0].is_described()) # has schema
+ self.console.release_workitem(wi)
+ wi = self.console.get_next_workitem(timeout=0)
+
+ self.assertTrue(count == 2)
+ # @todo test if the console has learned the corresponding schemas....
+ self.console.destroy(10)
+
+
+
+ def test_all_schemas(self):
+ # create console
+ # find agents
+ # asynchronous query for all schemas
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # test internal state using non-api calls:
+ # no schemas present yet
+ self.assertTrue(len(self.console._schema_cache) == 0)
+ # end test
+
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # send queries
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
+ rc = self.console.do_async_query(agent, query, aname)
+ self.assertTrue(rc)
+
+ # done. Now wait for async responses
+
+ count = 0
+ while self.notifier.wait_for_work(3):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ count += 1
+ self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+ self.assertTrue(wi.get_handle() == "agent1" or
+ wi.get_handle() == "agent2")
+ reply = wi.get_params()
+ self.assertTrue(len(reply) == 1)
+ self.assertTrue(isinstance(reply[0], qmf2.common.SchemaObjectClass))
+ self.assertTrue(reply[0].get_class_id().get_package_name() == "MyPackage")
+ self.assertTrue(reply[0].get_class_id().get_class_name() == "MyClass")
+ self.console.release_workitem(wi)
+ wi = self.console.get_next_workitem(timeout=0)
+
+ self.assertTrue(count == 2)
+
+ # test internal state using non-api calls:
+ # schema has been learned
+ self.assertTrue(len(self.console._schema_cache) == 1)
+ # end test
+
+ self.console.destroy(10)
+
+
+
+ def test_query_expiration(self):
+ # create console
+ # find agents
+ # kill the agents
+ # send async query
+ # wait for & verify expiration
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=30)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ # find the agents
+ agents = []
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+ agents.append(agent)
+
+ # now nuke the agents from orbit. It's the only way to be sure.
+
+ self.agent1.stop_app()
+ self.agent1 = None
+ self.agent2.stop_app()
+ self.agent2 = None
+
+ # now send queries to agents that no longer exist
+ for agent in agents:
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_SCHEMA)
+ rc = self.console.do_async_query(agent, query, agent.get_name(),
+ _timeout=2)
+ self.assertTrue(rc)
+
+ # done. Now wait for async responses due to timeouts
+
+ count = 0
+ while self.notifier.wait_for_work(3):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ count += 1
+ self.assertTrue(wi.get_type() == WorkItem.QUERY_COMPLETE)
+ self.assertTrue(wi.get_handle() == "agent1" or
+ wi.get_handle() == "agent2")
+ reply = wi.get_params()
+ self.assertTrue(len(reply) == 0) # empty
+
+ self.console.release_workitem(wi)
+ wi = self.console.get_next_workitem(timeout=0)
+
+ self.assertTrue(count == 2)
+ self.console.destroy(10)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org