You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/24 20:40:31 UTC

svn commit: r915946 [1/2] - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: ./ tests/

Author: kgiusti
Date: Wed Feb 24 19:40:31 2010
New Revision: 915946

URL: http://svn.apache.org/viewvc?rev=915946&view=rev
Log:
QPID-2261: sync with msg formats defined on wiki, start subscription impl.

Added:
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
    qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py Wed Feb 24 19:40:31 2010
@@ -21,18 +21,19 @@
 import datetime
 import time
 import Queue
-from threading import Thread, Lock, currentThread, Event
+from threading import Thread, RLock, currentThread, Event
 from qpid.messaging import Connection, Message, Empty, SendError
 from uuid import uuid4
-from common import (make_subject, parse_subject, OpCode, QmfQuery,
-                    SchemaObjectClass, MsgKey, QmfData, QmfAddress,
-                    SchemaClass, SchemaClassId, WorkItem, SchemaMethod,
-                    timedelta_to_secs)
+from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass,
+                    QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem,
+                    SchemaMethod, timedelta_to_secs, QMF_APP_ID)
 
 # global flag that indicates which thread (if any) is
 # running the agent notifier callback
 _callback_thread=None
 
+
+
   ##==============================================================================
   ## METHOD CALL
   ##==============================================================================
@@ -78,14 +79,73 @@
         return self._user_id
 
 
+  ##==============================================================================
+  ## SUBSCRIPTIONS
+  ##==============================================================================
+
+class _ConsoleHandle(object):
+    """
+    """
+    def __init__(self, handle, reply_to):
+        self.console_handle = handle
+        self.reply_to = reply_to
+
+class SubscriptionParams(object):
+    """
+    """
+    def __init__(self, console_handle, query, interval, duration, user_id):
+        self._console_handle = console_handle
+        self._query = query
+        self._interval = interval
+        self._duration = duration
+        self._user_id = user_id
+
+    def get_console_handle(self):
+        return self._console_handle
+
+    def get_query(self):
+        return self._query
+
+    def get_interval(self):
+        return self._interval
+
+    def get_duration(self):
+        return self._duration
+
+    def get_user_id(self):
+        return self._user_id
+
+class _SubscriptionState(object):
+    """
+    An internally-managed subscription.
+    """
+    def __init__(self, reply_to, cid, query, interval, duration):
+        self.reply_to = reply_to
+        self.correlation_id = cid
+        self.query = query
+        self.interval = interval
+        self.duration = duration
+        now = datetime.datetime.utcnow()
+        self.next_update = now  # do an immediate update
+        self.expiration = now + datetime.timedelta(seconds=duration)
+        self.id = 0
+
+    def resubscribe(self, now, _duration=None):
+        if _duration is not None:
+            self.duration = _duration
+        self.expiration = now + datetime.timedelta(seconds=self.duration)
+
+    def reset_interval(self, now):
+        self.next_update = now + datetime.timedelta(seconds=self.interval)
+
+
 
   ##==============================================================================
   ## AGENT
   ##==============================================================================
 
 class Agent(Thread):
-    def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30, 
-                 _max_msg_size=0, _capacity=10):
+    def __init__(self, name, _domain=None, _notifier=None, **options):
         Thread.__init__(self)
         self._running = False
         self._ready = Event()
@@ -94,11 +154,20 @@
         self._domain = _domain
         self._address = QmfAddress.direct(self.name, self._domain)
         self._notifier = _notifier
-        self._heartbeat_interval = _heartbeat_interval
+
+        # configurable parameters
+        #
+        self._heartbeat_interval = options.get("heartbeat_interval", 30)
+        self._capacity = options.get("capacity", 10)
+        self._default_duration = options.get("default_duration", 300)
+        self._max_duration = options.get("max_duration", 3600)
+        self._min_duration = options.get("min_duration", 10)
+        self._default_interval = options.get("default_interval", 30)
+        self._min_interval = options.get("min_interval", 5)
+
         # @todo: currently, max # of objects in a single reply message, would
         # be better if it were max bytesize of per-msg content...
-        self._max_msg_size = _max_msg_size
-        self._capacity = _capacity
+        self._max_msg_size = options.get("max_msg_size", 0)
 
         self._conn = None
         self._session = None
@@ -107,7 +176,7 @@
         self._direct_sender = None
         self._topic_sender = None
 
-        self._lock = Lock()
+        self._lock = RLock()
         self._packages = {}
         self._schema_timestamp = long(0)
         self._schema = {}
@@ -119,6 +188,10 @@
         self._undescribed_data = {}
         self._work_q = Queue.Queue()
         self._work_q_put = False
+        # subscriptions
+        self._subscription_id = long(time.time())
+        self._subscriptions = {}
+        self._next_subscribe_event = None
 
 
     def destroy(self, timeout=None):
@@ -192,10 +265,11 @@
         if self.isAlive():
             # kick my thread to wake it up
             try:
-                msg = Message(properties={"method":"request",
-                                          "qmf.subject":make_subject(OpCode.noop)},
+                msg = Message(id=QMF_APP_ID,
                               subject=self.name,
-                              content={"noop":"noop"})
+                              properties={ "method":"request",
+                                          "qmf.opcode":OpCode.noop},
+                              content={})
 
                 # TRACE
                 #logging.error("!!! sending wakeup to myself: %s" % msg)
@@ -258,13 +332,14 @@
             raise Exception("No connection available")
 
         # @todo: should we validate against the schema?
-        _map = {"_name": self.get_name(),
-                "_event": qmfEvent.map_encode()}
-        msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
+        msg = Message(id=QMF_APP_ID,
+                      subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
                       qmfEvent.get_severity() + "." + self.name,
-                      properties={"method":"response",
-                                  "qmf.subject":make_subject(OpCode.event_ind)},
-                      content={MsgKey.event:_map})
+                      properties={"method":"indication",
+                                  "qmf.opcode":OpCode.data_ind,
+                                  "qmf.content": ContentType.event,
+                                  "qmf.agent":self.name},
+                      content=[qmfEvent.map_encode()])
         # TRACE
         # logging.error("!!! Agent %s sending Event (%s)" % 
         # (self.name, str(msg)))
@@ -330,9 +405,10 @@
                 raise TypeError("Invalid type for error - must be QmfData")
             _map[SchemaMethod.KEY_ERROR] = _error.map_encode()
 
-        msg = Message( properties={"method":"response",
-                                   "qmf.subject":make_subject(OpCode.response)},
-                       content={MsgKey.method:_map})
+        msg = Message(id=QMF_APP_ID,
+                      properties={"method":"response",
+                                  "qmf.opcode":OpCode.method_rsp},
+                      content=_map)
         msg.correlation_id = handle.correlation_id
 
         self._send_reply(msg, handle.reply_to)
@@ -370,25 +446,10 @@
 
         while self._running:
 
-            now = datetime.datetime.utcnow()
-            # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
-            if  now >= next_heartbeat:
-                ind = self._makeAgentIndMsg()
-                ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT
-                # TRACE
-                #logging.error("!!! Agent %s sending Heartbeat (%s)" % 
-                # (self.name, str(ind)))
-                self._topic_sender.send(ind)
-                logging.debug("Agent Indication Sent")
-                next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
-
-            timeout = timedelta_to_secs(next_heartbeat - now)
-            # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
-            try:
-                self._session.next_receiver(timeout=timeout)
-            except Empty:
-                continue
-
+            #
+            # Process inbound messages
+            #
+            logging.debug("%s processing inbound messages..." % self.name)
             for i in range(batch_limit):
                 try:
                     msg = self._topic_receiver.fetch(timeout=0)
@@ -409,7 +470,71 @@
                 # (self.name, self._direct_receiver.source, msg))
                 self._dispatch(msg, _direct=True)
 
+            #
+            # Send Heartbeat Notification
+            #
+            now = datetime.datetime.utcnow()
+            if now >= next_heartbeat:
+                logging.debug("%s sending heartbeat..." % self.name)
+                ind = Message(id=QMF_APP_ID,
+                              subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT,
+                              properties={"method":"indication",
+                                          "qmf.opcode":OpCode.agent_heartbeat_ind,
+                                          "qmf.agent":self.name},
+                              content=self._makeAgentInfoBody())
+                # TRACE
+                #logging.error("!!! Agent %s sending Heartbeat (%s)" % 
+                # (self.name, str(ind)))
+                self._topic_sender.send(ind)
+                logging.debug("Agent Indication Sent")
+                next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+
+
+            #
+            # Monitor Subscriptions
+            #
+            if (self._next_subscribe_event is None or
+                now >= self._next_subscribe_event):
+
+                logging.debug("%s polling subscriptions..." % self.name)
+                self._next_subscribe_event = now + datetime.timedelta(seconds=
+                                                                      self._max_duration)
+                self._lock.acquire()
+                try:
+                    dead_ss = []
+                    for sid,ss in self._subscriptions.iteritems():
+                        if now >= ss.expiration:
+                            dead_ss.append(sid)
+                            continue
+                        if now >= ss.next_update:
+                            response = []
+                            objs = self._queryData(ss.query)
+                            if objs:
+                                for obj in objs:
+                                    response.append(obj.map_encode())
+                                logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id))
+                                self._send_query_response( ContentType.data,
+                                                           ss.correlation_id,
+                                                           ss.reply_to,
+                                                           response)
+                            ss.reset_interval(now)
+
+                        next_timeout = min(ss.expiration, ss.next_update)
+                        if next_timeout < self._next_subscribe_event:
+                            self._next_subscribe_event = next_timeout
+
+                    for sid in dead_ss:
+                        del self._subscriptions[sid]
+                finally:
+                    self._lock.release()
+
+            #
+            # notify application of pending WorkItems
+            #
+
             if self._work_q_put and self._notifier:
+                logging.debug("%s notifying application..." % self.name)
                 # new stuff on work queue, kick the the application...
                 self._work_q_put = False
                 _callback_thread = currentThread()
@@ -417,19 +542,33 @@
                 self._notifier.indication()
                 _callback_thread = None
 
+            #
+            # Sleep until messages arrive or something times out
+            #
+            next_timeout = min(next_heartbeat, self._next_subscribe_event)
+            timeout = timedelta_to_secs(next_timeout -
+                                        datetime.datetime.utcnow())
+            if timeout > 0.0:
+                logging.debug("%s sleeping %s seconds..." % (self.name,
+                                                             timeout))
+                try:
+                    self._session.next_receiver(timeout=timeout)
+                except Empty:
+                    pass
+
+
+
+
     #
     # Private:
     #
 
-    def _makeAgentIndMsg(self):
+    def _makeAgentInfoBody(self):
         """
-        Create an agent indication message identifying this agent
+        Create an agent indication message body identifying this agent
         """
-        _map = {"_name": self.get_name(),
+        return {"_name": self.get_name(),
                 "_schema_timestamp": self._schema_timestamp}
-        return Message(properties={"method":"response",
-                                   "qmf.subject":make_subject(OpCode.agent_ind)},
-                       content={MsgKey.agent_info: _map})
 
     def _send_reply(self, msg, reply_to):
         """
@@ -458,7 +597,7 @@
         except SendError, e:
             logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
 
-    def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
+    def _send_query_response(self, content_type, cid, reply_to, objects):
         """
         Send a response to a query, breaking the result into multiple
         messages based on the agent's _max_msg_size config parameter
@@ -472,24 +611,28 @@
 
         start = 0
         end = min(total, max_count)
-        while end <= total:
-            m = Message(properties={"qmf.subject":subject,
-                                    "method":"response"},
+        # send partial response if too many objects present
+        while end < total:
+            m = Message(id=QMF_APP_ID,
+                        properties={"method":"response",
+                                    "partial":None,
+                                    "qmf.opcode":OpCode.data_ind,
+                                    "qmf.content":content_type,
+                                    "qmf.agent":self.name},
                         correlation_id = cid,
-                        content={msgkey:objects[start:end]})
+                        content=objects[start:end])
             self._send_reply(m, reply_to)
-            if end == total:
-                break;
             start = end
             end = min(total, end + max_count)
 
-        # response terminator - last message has empty object array
-        if total:
-            m = Message(properties={"qmf.subject":subject,
-                                    "method":"response"},
-                        correlation_id = cid,
-                        content={msgkey: []} )
-            self._send_reply(m, reply_to)
+        m = Message(id=QMF_APP_ID,
+                    properties={"method":"response",
+                                "qmf.opcode":OpCode.data_ind,
+                                "qmf.content":content_type,
+                                "qmf.agent":self.name},
+                    correlation_id = cid,
+                    content=objects[start:end])
+        self._send_reply(m, reply_to)
 
     def _dispatch(self, msg, _direct=False):
         """
@@ -497,33 +640,32 @@
 
         @param _direct: True if msg directly addressed to this agent.
         """
-        logging.debug( "Message received from Console! [%s]" % msg )
-        try:
-            version,opcode = parse_subject(msg.properties.get("qmf.subject"))
-        except:
-            logging.warning("Ignoring unrecognized message '%s'" % msg.subject)
-            return
+        # logging.debug( "Message received from Console! [%s]" % msg )
+        # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) )
 
+        opcode = msg.properties.get("qmf.opcode")
+        if not opcode:
+            logging.warning("Ignoring unrecognized message '%s'" % msg)
+            return
+        version = 2  # @todo: fix me
         cmap = {}; props={}
         if msg.content_type == "amqp/map":
             cmap = msg.content
         if msg.properties:
             props = msg.properties
 
-        if opcode == OpCode.agent_locate:
+        if opcode == OpCode.agent_locate_req:
             self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
-        elif opcode == OpCode.get_query:
+        elif opcode == OpCode.query_req:
             self._handleQueryMsg( msg, cmap, props, version, _direct )
         elif opcode == OpCode.method_req:
             self._handleMethodReqMsg(msg, cmap, props, version, _direct)
-        elif opcode == OpCode.cancel_subscription:
-            logging.warning("!!! CANCEL_SUB TBD !!!")
-        elif opcode == OpCode.create_subscription:
-            logging.warning("!!! CREATE_SUB TBD !!!")
-        elif opcode == OpCode.renew_subscription:
-            logging.warning("!!! RENEW_SUB TBD !!!")
-        elif opcode == OpCode.schema_query:
-            logging.warning("!!! SCHEMA_QUERY TBD !!!")
+        elif opcode == OpCode.subscribe_req:
+            self._handleSubscribeReqMsg(msg, cmap, props, version, _direct)
+        elif opcode == OpCode.subscribe_refresh_req:
+            self._handleResubscribeReqMsg(msg, cmap, props, version, _direct)
+        elif opcode == OpCode.subscribe_cancel_ind:
+            self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
         elif opcode == OpCode.noop:
             logging.debug("No-op msg received.")
         else:
@@ -536,18 +678,28 @@
         """
         logging.debug("_handleAgentLocateMsg")
 
-        reply = True
-        if "method" in props and props["method"] == "request":
-            query = cmap.get(MsgKey.query)
-            if query is not None:
-                # fake a QmfData containing my identifier for the query compare
-                tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
-                                              self.get_name()},
-                                         _object_id="my-name")
-                reply = QmfQuery.from_map(query).evaluate(tmpData)
+        reply = False
+        if props.get("method") == "request":
+            # if the message is addressed to me or wildcard, process it
+            if (msg.subject == "console.ind" or
+                msg.subject == "console.ind.locate" or
+                msg.subject == "console.ind.locate." + self.name):
+                pred = msg.content
+                if not pred:
+                    reply = True
+                elif isinstance(pred, type([])):
+                    # fake a QmfData containing my identifier for the query compare
+                    query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred)
+                    tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
+                                                  self.get_name()},
+                                             _object_id="my-name")
+                    reply = query.evaluate(tmpData)
 
         if reply:
-            m = self._makeAgentIndMsg()
+            m = Message(id=QMF_APP_ID,
+                        properties={"method":"response",
+                                    "qmf.opcode":OpCode.agent_locate_rsp},
+                        content=self._makeAgentInfoBody())
             m.correlation_id = msg.correlation_id
             self._send_reply(m, msg.reply_to)
         else:
@@ -561,22 +713,25 @@
         logging.debug("_handleQueryMsg")
 
         if "method" in props and props["method"] == "request":
-            qmap = cmap.get(MsgKey.query)
-            if qmap:
-                query = QmfQuery.from_map(qmap)
+            if cmap:
+                try:
+                    query = QmfQuery.from_map(cmap)
+                except TypeError:
+                    logging.error("Invalid Query format: '%s'" % str(cmap))
+                    return
                 target = query.get_target()
                 if target == QmfQuery.TARGET_PACKAGES:
-                    self._queryPackages( msg, query )
+                    self._queryPackagesReply( msg, query )
                 elif target == QmfQuery.TARGET_SCHEMA_ID:
-                    self._querySchema( msg, query, _idOnly=True )
+                    self._querySchemaReply( msg, query, _idOnly=True )
                 elif target == QmfQuery.TARGET_SCHEMA:
-                    self._querySchema( msg, query)
+                    self._querySchemaReply( msg, query)
                 elif target == QmfQuery.TARGET_AGENT:
                     logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
                 elif target == QmfQuery.TARGET_OBJECT_ID:
-                    self._queryData(msg, query, _idOnly=True)
+                    self._queryDataReply(msg, query, _idOnly=True)
                 elif target == QmfQuery.TARGET_OBJECT:
-                    self._queryData(msg, query)
+                    self._queryDataReply(msg, query)
                 else:
                     logging.warning("Unrecognized query target: '%s'" % str(target))
 
@@ -634,7 +789,159 @@
             self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
             self._work_q_put = True
 
-    def _queryPackages(self, msg, query):
+    def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct):
+        """
+        Process received Subscription Request
+        """
+        if "method" in props and props["method"] == "request":
+            query_map = cmap.get("_query")
+            interval = cmap.get("_interval")
+            duration = cmap.get("_duration")
+
+            try:
+                query = QmfQuery.from_map(query_map)
+            except TypeError:
+                logging.warning("Invalid query for subscription: %s" %
+                                str(query_map))
+                return
+
+            if isinstance(self, AgentExternal):
+                # param = SubscriptionParams(_ConsoleHandle(console_handle,
+                #                                           msg.reply_to),
+                #                            query,
+                #                            interval,
+                #                            duration,
+                #                            msg.user_id)
+                # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST,
+                #                           msg.correlation_id, param))
+                # self._work_q_put = True
+                logging.error("External Subscription TBD")
+                return
+
+            # validate the query - only specific objects, or
+            # objects wildcard, are currently supported.
+            if (query.get_target() != QmfQuery.TARGET_OBJECT or
+                (query.get_selector() == QmfQuery.PREDICATE and
+                 query.get_predicate())):
+                logging.error("Subscriptions only support (wildcard) Object"
+                              " Queries.")
+                err = QmfData.create(
+                    {"reason": "Unsupported Query type for subscription.",
+                     "query": str(query.map_encode())})
+                m = Message(id=QMF_APP_ID,
+                            properties={"method":"response",
+                                        "qmf.opcode":OpCode.subscribe_rsp},
+                            correlation_id = msg.correlation_id,
+                            content={"_error": err.map_encode()})
+                self._send_reply(m, msg.reply_to)
+                return
+
+            if duration is None:
+                duration = self._default_duration
+            else:
+                try:
+                    duration = float(duration)
+                    if duration > self._max_duration:
+                        duration = self._max_duration
+                    elif duration < self._min_duration:
+                        duration = self._min_duration
+                except:
+                    logging.warning("Bad duration value: %s" % str(msg))
+                    duration = self._default_duration
+
+            if interval is None:
+                interval = self._default_interval
+            else:
+                try:
+                    interval = float(interval)
+                    if interval < self._min_interval:
+                        interval = self._min_interval
+                except:
+                    logging.warning("Bad interval value: %s" % str(msg))
+                    interval = self._default_interval
+
+            ss = _SubscriptionState(msg.reply_to,
+                                    msg.correlation_id,
+                                    query,
+                                    interval,
+                                    duration)
+            self._lock.acquire()
+            try:
+                sid = self._subscription_id
+                self._subscription_id += 1
+                ss.id = sid
+                self._subscriptions[sid] = ss
+                self._next_subscribe_event = None
+            finally:
+                self._lock.release()
+
+            sr_map = {"_subscription_id": sid,
+                      "_interval": interval,
+                      "_duration": duration}
+            m = Message(id=QMF_APP_ID,
+                        properties={"method":"response",
+                                   "qmf.opcode":OpCode.subscribe_rsp},
+                        correlation_id = msg.correlation_id,
+                        content=sr_map)
+            self._send_reply(m, msg.reply_to)
+
+
+
+    def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct):
+        """
+        Process received Renew Subscription Request
+        """
+        if props.get("method") == "request":
+            sid = cmap.get("_subscription_id")
+            if not sid:
+                logging.debug("Invalid subscription refresh msg: %s" %
+                              str(msg))
+                return
+
+            self._lock.acquire()
+            try:
+                ss = self._subscriptions.get(sid)
+                if not ss:
+                    logging.debug("Ignoring unknown subscription: %s" %
+                                  str(sid))
+                    return
+                duration = cmap.get("_duration")
+                if duration is not None:
+                    try:
+                        duration = float(duration)
+                        if duration > self._max_duration:
+                            duration = self._max_duration
+                        elif duration < self._min_duration:
+                            duration = self._min_duration
+                    except:
+                        logging.debug("Bad duration value: %s" % str(msg))
+                        duration = None  # use existing duration
+
+                ss.resubscribe(datetime.datetime.utcnow(), duration)
+
+            finally:
+                self._lock.release()
+
+
+    def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct):
+        """
+        Process received Cancel Subscription Request
+        """
+        if props.get("method") == "request":
+            sid = cmap.get("_subscription_id")
+            if not sid:
+                logging.warning("No subscription id supplied: %s" % msg)
+                return
+
+            self._lock.acquire()
+            try:
+                if sid in self._subscriptions:
+                    del self._subscriptions[sid]
+            finally:
+                self._lock.release()
+
+
+    def _queryPackagesReply(self, msg, query):
         """
         Run a query against the list of known packages
         """
@@ -646,58 +953,83 @@
                                          _object_id="_package")
                 if query.evaluate(qmfData):
                     pnames.append(name)
+
+            self._send_query_response(ContentType.schema_package,
+                                      msg.correlation_id,
+                                      msg.reply_to,
+                                      pnames)
         finally:
             self._lock.release()
 
-        self._send_query_response(make_subject(OpCode.data_ind),
-                                  MsgKey.package_info,
-                                  msg.correlation_id,
-                                  msg.reply_to,
-                                  pnames)
 
-    def _querySchema( self, msg, query, _idOnly=False ):
+    def _querySchemaReply( self, msg, query, _idOnly=False ):
         """
         """
         schemas = []
-        # if querying for a specific schema, do a direct lookup
-        if query.get_selector() == QmfQuery.ID:
-            found = None
-            self._lock.acquire()
-            try:
+
+        self._lock.acquire()
+        try:
+            # if querying for a specific schema, do a direct lookup
+            if query.get_selector() == QmfQuery.ID:
                 found = self._schema.get(query.get_id())
-            finally:
-                self._lock.release()
-            if found:
-                if _idOnly:
-                    schemas.append(query.get_id().map_encode())
-                else:
-                    schemas.append(found.map_encode())
-        else: # otherwise, evaluate all schema
-            self._lock.acquire()
-            try:
+                if found:
+                    if _idOnly:
+                        schemas.append(query.get_id().map_encode())
+                    else:
+                        schemas.append(found.map_encode())
+            else: # otherwise, evaluate all schema
                 for sid,val in self._schema.iteritems():
                     if query.evaluate(val):
                         if _idOnly:
                             schemas.append(sid.map_encode())
                         else:
                             schemas.append(val.map_encode())
-            finally:
-                self._lock.release()
+            if _idOnly:
+                msgkey = ContentType.schema_id
+            else:
+                msgkey = ContentType.schema_class
 
-        if _idOnly:
-            msgkey = MsgKey.schema_id
-        else:
-            msgkey = MsgKey.schema
+            self._send_query_response(msgkey,
+                                      msg.correlation_id,
+                                      msg.reply_to,
+                                      schemas)
+        finally:
+            self._lock.release()
 
-        self._send_query_response(make_subject(OpCode.data_ind),
-                                  msgkey,
-                                  msg.correlation_id,
-                                  msg.reply_to,
-                                  schemas)
 
+    def _queryDataReply( self, msg, query, _idOnly=False ):
+        """
+        """
+        # hold the (recursive) lock for the duration so the Agent
+        # won't send data that is currently being modified by the
+        # app.
+        self._lock.acquire()
+        try:
+            response = []
+            data_objs = self._queryData(query)
+            if _idOnly:
+                for obj in data_objs:
+                    response.append(obj.get_object_id())
+            else:
+                for obj in data_objs:
+                    response.append(obj.map_encode())
+
+            if _idOnly:
+                msgkey = ContentType.object_id
+            else:
+                msgkey = ContentType.data
 
-    def _queryData( self, msg, query, _idOnly=False ):
+            self._send_query_response(msgkey,
+                                      msg.correlation_id,
+                                      msg.reply_to,
+                                      response)
+        finally:
+            self._lock.release()
+
+
+    def _queryData(self, query):
         """
+        Return a list of QmfData objects that match a given query
         """
         data_objs = []
         # extract optional schema_id from target params
@@ -705,12 +1037,12 @@
         t_params = query.get_target_param()
         if t_params:
             sid = t_params.get(QmfData.KEY_SCHEMA_ID)
-        # if querying for a specific object, do a direct lookup
-        if query.get_selector() == QmfQuery.ID:
-            oid = query.get_id()
-            found = None
-            self._lock.acquire()
-            try:
+
+        self._lock.acquire()
+        try:
+            # if querying for a specific object, do a direct lookup
+            if query.get_selector() == QmfQuery.ID:
+                oid = query.get_id()
                 if sid and not sid.get_hash_string():
                     # wildcard schema_id match, check each schema
                     for name,db in self._described_data.iteritems():
@@ -718,11 +1050,9 @@
                             and name.get_package_name() == sid.get_package_name()):
                             found = db.get(oid)
                             if found:
-                                if _idOnly:
-                                    data_objs.append(oid)
-                                else:
-                                    data_objs.append(found.map_encode())
+                                data_objs.append(found)
                 else:
+                    found = None
                     if sid:
                         db = self._described_data.get(sid)
                         if db:
@@ -730,15 +1060,9 @@
                     else:
                         found = self._undescribed_data.get(oid)
                     if found:
-                        if _idOnly:
-                            data_objs.append(oid)
-                        else:
-                            data_objs.append(found.map_encode())
-            finally:
-                self._lock.release()
-        else: # otherwise, evaluate all data
-            self._lock.acquire()
-            try:
+                        data_objs.append(found)
+
+            else: # otherwise, evaluate all data
                 if sid and not sid.get_hash_string():
                     # wildcard schema_id match, check each schema
                     for name,db in self._described_data.iteritems():
@@ -746,10 +1070,7 @@
                             and name.get_package_name() == sid.get_package_name()):
                             for oid,data in db.iteritems():
                                 if query.evaluate(data):
-                                    if _idOnly:
-                                        data_objs.append(oid)
-                                    else:
-                                        data_objs.append(data.map_encode())
+                                    data_objs.append(data)
                 else:
                     if sid:
                         db = self._described_data.get(sid)
@@ -759,23 +1080,28 @@
                     if db:
                         for oid,data in db.iteritems():
                             if query.evaluate(data):
-                                if _idOnly:
-                                    data_objs.append(oid)
-                                else:
-                                    data_objs.append(data.map_encode())
-            finally:
-                self._lock.release()
+                                data_objs.append(data)
+        finally:
+            self._lock.release()
 
-        if _idOnly:
-            msgkey = MsgKey.object_id
-        else:
-            msgkey = MsgKey.data_obj
+        return data_objs
 
-        self._send_query_response(make_subject(OpCode.data_ind),
-                                  msgkey,
-                                  msg.correlation_id,
-                                  msg.reply_to,
-                                  data_objs)
+
+
+  ##==============================================================================
+  ## EXTERNAL DATABASE AGENT
+  ##==============================================================================
+
+class AgentExternal(Agent):
+    """
+    An Agent which uses an external management database.
+    """
+    def __init__(self, name, _domain=None, _notifier=None,
+                 _heartbeat_interval=30, _max_msg_size=0, _capacity=10):
+        super(AgentExternal, self).__init__(name, _domain, _notifier,
+                                            _heartbeat_interval,
+                                            _max_msg_size, _capacity)
+        logging.error("AgentExternal TBD")
 
 
 
@@ -823,9 +1149,11 @@
                                            _schema_id=schema_id, _const=False)
         self._agent = agent
         self._validated = False
+        self._modified = True
 
     def destroy(self): 
         self._dtime = long(time.time() * 1000)
+        self._touch()
         # @todo: publish change
 
     def is_deleted(self): 
@@ -833,6 +1161,7 @@
 
     def set_value(self, _name, _value, _subType=None):
         super(QmfAgentData, self).set_value(_name, _value, _subType)
+        self._touch()
         # @todo: publish change
 
     def inc_value(self, name, delta=1):
@@ -849,6 +1178,7 @@
         """ subtract the delta from the property """
         # @todo: need to take write-lock
         logging.error(" TBD!!!")
+        self._touch()
 
     def validate(self):
         """
@@ -868,6 +1198,13 @@
                     raise Exception("Required property '%s' not present." % name)
         self._validated = True
 
+    def _touch(self):
+        """
+        Mark this object as modified.  Used to force a publish of this object
+        if on subscription.
+        """
+        self._modified = True
+
 
 
 ################################################################################

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py Wed Feb 24 19:40:31 2010
@@ -34,61 +34,44 @@
 ## Constants
 ##
 
-AMQP_QMF_SUBJECT = "qmf"
-AMQP_QMF_VERSION = 4
-AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
-
-class MsgKey(object):
-    agent_info = "agent_info"
-    query = "query"
-    package_info = "package_info"
-    schema_id = "schema_id"
-    schema = "schema"
-    object_id="object_id"
-    data_obj="object"
-    method="method"
-    event="event"
+QMF_APP_ID="qmf2"
 
 
-class OpCode(object):
-    noop = "noop"
-
-    # codes sent by a console and processed by the agent
-    agent_locate = "agent-locate"
-    cancel_subscription = "cancel-subscription"
-    create_subscription = "create-subscription"
-    get_query = "get-query"
-    method_req = "method"
-    renew_subscription = "renew-subscription"
-    schema_query = "schema-query"  # @todo: deprecate
-
-    # codes sent by the agent to a console
-    agent_ind = "agent"
-    data_ind = "data"
-    event_ind = "event"
-    managed_object = "managed-object"
-    object_ind = "object"
-    response = "response"
-    schema_ind="schema"   # @todo: deprecate
+class ContentType(object):
+    """ Values for the 'qmf.content' message header
+    """
+    schema_package = "_schema_package"
+    schema_id = "_schema_id"
+    schema_class = "_schema_class"
+    object_id = "_object_id"
+    data = "_data"
+    event = "_event"
 
 
+class OpCode(object):
+    """ Values for the 'qmf.opcode' message header.
+    """
+    noop = "_noop"
 
+    # codes sent by a console and processed by the agent
+    agent_locate_req = "_agent_locate_request"
+    subscribe_req = "_subscribe_request"
+    subscribe_cancel_ind = "_subscribe_cancel_indication"
+    subscribe_refresh_req = "_subscribe_refresh_indication"
+    query_req = "_query_request"
+    method_req = "_method_request"
 
-def make_subject(_code): 
-    """
-    Create a message subject field value.
-    """
-    return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code)
 
+    # codes sent by the agent to a console
+    agent_locate_rsp = "_agent_locate_response"
+    agent_heartbeat_ind = "_agent_heartbeat_indication"
+    query_rsp = "_query_response"
+    subscribe_rsp = "_subscribe_response"
+    subscribe_refresh_rsp = "_subscribe_refresh_response"
+    data_ind = "_data_indication"
+    method_rsp = "_method_response"
 
-def parse_subject(_sub):
-    """
-    Deconstruct a subject field, return version,opcode values
-    """
-    if _sub[:3] != "qmf":
-        raise Exception("Non-QMF message received")
 
-    return _sub[3:].split('.', 1)
 
 def timedelta_to_secs(td):
     """
@@ -133,11 +116,15 @@
     AGENT_HEARTBEAT=8
     QUERY_COMPLETE=9
     METHOD_RESPONSE=10
+    SUBSCRIBE_RESPONSE=11
+    SUBSCRIBE_INDICATION=12
+    RESUBSCRIBE_RESPONSE=13
     # Enumeration of the types of WorkItems produced on the Agent
     METHOD_CALL=1000
     QUERY=1001
-    SUBSCRIBE=1002
-    UNSUBSCRIBE=1003
+    SUBSCRIBE_REQUEST=1002
+    RESUBSCRIBE_REQUEST=1003
+    UNSUBSCRIBE_REQUEST=1004
 
     def __init__(self, kind, handle, _params=None):
         """

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Wed Feb 24 19:40:31 2010
@@ -24,14 +24,14 @@
 import datetime
 import Queue
 from threading import Thread, Event
-from threading import Lock
+from threading import RLock
 from threading import currentThread
 from threading import Condition
 
 from qpid.messaging import Connection, Message, Empty, SendError
 
-from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
-                    MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId,
+from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType,
+                    QmfData, QmfAddress, SchemaClass, SchemaClassId,
                     SchemaEventClass, SchemaObjectClass, WorkItem,
                     SchemaMethod, QmfEvent, timedelta_to_secs)
 
@@ -141,6 +141,7 @@
         console._lock.acquire()
         try:
             console._async_mboxes[self.cid] = self
+            console._next_mbox_expire = None
         finally:
             console._lock.release()
 
@@ -177,7 +178,7 @@
     def __init__(self, console, 
                  agent_name,
                  context,
-                 target, msgkey,
+                 target,
                  _timeout=None):
         """
         Invoked by application thread.
@@ -186,7 +187,6 @@
                                             _timeout)
         self.agent_name = agent_name
         self.target = target
-        self.msgkey = msgkey
         self.context = context
         self.result = []
 
@@ -195,11 +195,8 @@
         Process query response messages delivered to this mailbox.
         Invoked by Console Management thread only.
         """
-        done = False
-        objects = reply.content.get(self.msgkey)
-        if not objects:
-            done = True
-        else:
+        objects = reply.content
+        if isinstance(objects, type([])):
             # convert from map to native types if needed
             if self.target == QmfQuery.TARGET_SCHEMA_ID:
                 for sid_map in objects:
@@ -237,8 +234,7 @@
                 # no conversion needed.
                 self.result += objects
 
-        if done:
-            # create workitem
+        if not "partial" in reply.properties:
             # logging.error("QUERY COMPLETE for %s" % str(self.context))
             wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
             self.console._work_q.put(wi)
@@ -278,8 +274,8 @@
         Process schema response messages.
         """
         done = False
-        schemas = reply.content.get(MsgKey.schema)
-        if schemas:
+        schemas = reply.content
+        if schemas and isinstance(schemas, type([])):
             for schema_map in schemas:
                 # extract schema id, convert based on schema type
                 sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
@@ -319,8 +315,8 @@
         Invoked by Console Management thread only.
         """
 
-        _map = reply.content.get(MsgKey.method)
-        if not _map:
+        _map = reply.content
+        if not _map or not isinstance(_map, type({})):
             logging.error("Invalid method call reply message")
             result = None
         else:
@@ -355,6 +351,128 @@
 
 
 
+class _SubscriptionMailbox(_AsyncMailbox):
+    """
+    A Mailbox for a single subscription.
+    """
+    def __init__(self, console, lifetime, context, agent):
+        """
+        Invoked by application thread.
+        """
+        super(_SubscriptionMailbox, self).__init__(console, lifetime)
+        self.cv = Condition()
+        self.data = []
+        self.result = []
+        self.context = context
+        self.agent_name = agent.get_name()
+        self.agent_subscription_id = None          # from agent
+
+    def deliver(self, msg):
+        """
+        """
+        opcode = msg.properties.get("qmf.opcode")
+        if (opcode == OpCode.subscribe_rsp or
+            opcode == OpCode.subscribe_refresh_rsp):
+            #
+            # sync only - just deliver the msg
+            #
+            self.cv.acquire()
+            try:
+                self.data.append(msg)
+                # if was empty, notify waiters
+                if len(self.data) == 1:
+                    self.cv.notify()
+            finally:
+                self.cv.release()
+            return
+
+            # sid = msg.content.get("_subscription_id")
+            # lifetime = msg.content.get("_duration")
+            # error = msg.content.get("_error")
+            # sp = SubscribeParams(sid,
+            #                      msg.content.get("_interval"),
+            #                      lifetime, error)
+            # if sid and self.subscription_id is None:
+            #     self.subscription_id = sid
+            # if lifetime:
+            #     self.console._lock.acquire()
+            #     try:
+            #         self.expiration_date = (datetime.datetime.utcnow() +
+            #                                 datetime.timedelta(seconds=lifetime))
+            #     finally:
+            #         self.console._lock.release()
+
+            # if self.waiting:
+            #     self.cv.acquire()
+            #     try:
+            #         self.data.append(sp)
+            #         # if was empty, notify waiters
+            #         if len(self._data) == 1:
+            #             self._cv.notify()
+            #     finally:
+            #         self._cv.release()
+            # else:
+            #     if opcode == OpCode.subscribe_rsp:
+            #         wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+            #                       self.context, sp)
+            #     else:
+            #         wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+            #                       self.context, sp)
+            #     self.console._work_q.put(wi)
+            #     self.console._work_q_put = True
+            #     if error:
+            #         self.destroy()
+
+        agent_name = msg.properties.get("qmf.agent")
+        if not agent_name:
+            logging.warning("Ignoring data_ind - no agent name given: %s" %
+                            msg)
+            return
+        agent = self.console.get_agent(agent_name)
+        if not agent:
+            logging.warning("Ignoring data_ind - unknown agent '%s'" %
+                            agent_name)
+            return
+
+        objects = msg.content
+        for obj_map in objects:
+            obj = QmfConsoleData(map_=obj_map, agent=agent)
+            # start fetch of schema if not known
+            sid = obj.get_schema_class_id()
+            if sid:
+                self.console._prefetch_schema(sid, agent)
+            self.result.append(obj)
+
+        if not "partial" in msg.properties:
+            wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result)
+            self.result = []
+            self.console._work_q.put(wi)
+            self.console._work_q_put = True
+
+    def fetch(self, timeout=None):
+        """
+        Get one data item from a mailbox, with timeout.
+        Invoked by application thread.
+        """
+        self.cv.acquire()
+        try:
+            if len(self.data) == 0:
+                self.cv.wait(timeout)
+            if len(self.data):
+                return self.data.pop(0)
+            return None
+        finally:
+            self.cv.release()
+
+    def expire(self):
+        """ The subscription expired.
+        """
+        self.destroy()
+
+
+
+
+
 ##==============================================================================
 ## DATA MODEL
 ##==============================================================================
@@ -481,8 +599,8 @@
             logging.debug("Agent method req wait timed-out.")
             return None
 
-        _map = replyMsg.content.get(MsgKey.method)
-        if not _map:
+        _map = replyMsg.content
+        if not _map or not isinstance(_map, type({})):
             logging.error("Invalid method call reply message")
             return None
 
@@ -650,8 +768,8 @@
             logging.debug("Agent method req wait timed-out.")
             return None
 
-        _map = replyMsg.content.get(MsgKey.method)
-        if not _map:
+        _map = replyMsg.content
+        if not _map or not isinstance(_map, type({})):
             logging.error("Invalid method call reply message")
             return None
 
@@ -676,20 +794,66 @@
     def _send_query(self, query, correlation_id=None):
         """
         """
-        msg = Message(properties={"method":"request",
-                                   "qmf.subject":make_subject(OpCode.get_query)},
-                      content={MsgKey.query: query.map_encode()})
+        msg = Message(id=QMF_APP_ID,
+                      properties={"method":"request",
+                                  "qmf.opcode":OpCode.query_req},
+                      content=query.map_encode())
         self._send_msg( msg, correlation_id )
 
 
     def _send_method_req(self, mr_map, correlation_id=None):
         """
         """
-        msg = Message(properties={"method":"request",
-                                  "qmf.subject":make_subject(OpCode.method_req)},
+        msg = Message(id=QMF_APP_ID,
+                      properties={"method":"request",
+                                  "qmf.opcode":OpCode.method_req},
                       content=mr_map)
         self._send_msg( msg, correlation_id )
 
+    def _send_subscribe_req(self, query, correlation_id, _interval=None,
+                            _lifetime=None):
+        """
+        """
+        sr_map = {"_query":query.map_encode()}
+        if _interval is not None:
+            sr_map["_interval"] = _interval
+        if _lifetime is not None:
+            sr_map["_duration"] = _lifetime
+
+        msg = Message(id=QMF_APP_ID,
+                      properties={"method":"request",
+                                  "qmf.opcode":OpCode.subscribe_req},
+                      content=sr_map)
+        self._send_msg(msg, correlation_id)
+
+
+    def _send_resubscribe_req(self, correlation_id,
+                              subscription_id,
+                              _lifetime=None):
+        """
+        """
+        sr_map = {"_subscription_id":subscription_id}
+        if _lifetime is not None:
+            sr_map["_duration"] = _lifetime
+
+        msg = Message(id=QMF_APP_ID,
+                      properties={"method":"request",
+                                  "qmf.opcode":OpCode.subscribe_refresh_req},
+                      content=sr_map)
+        self._send_msg(msg, correlation_id)
+
+
+    def _send_unsubscribe_ind(self, correlation_id, subscription_id):
+        """
+        """
+        sr_map = {"_subscription_id":subscription_id}
+
+        msg = Message(id=QMF_APP_ID,
+                      properties={"method":"request",
+                                  "qmf.opcode":OpCode.subscribe_cancel_ind},
+                      content=sr_map)
+        self._send_msg(msg, correlation_id)
+
 
   ##==============================================================================
   ## METHOD CALL
@@ -716,6 +880,36 @@
         return arg
 
 
+
+  ##==============================================================================
+  ## SUBSCRIPTION
+  ##==============================================================================
+
+class SubscribeParams(object):
+    """ Represents a standing subscription for this console.
+    """
+    def __init__(self, sid, interval, duration, _error=None):
+        self._sid = sid
+        self._interval = interval
+        self._duration = duration
+        self._error = _error
+
+    def succeeded(self):
+        return self._error is None
+
+    def get_error(self):
+        return self._error
+
+    def get_subscription_id(self):
+        return self._sid
+
+    def get_publish_interval(self):
+        return self._interval
+
+    def get_duration(self):
+        return self._duration
+
+
   ##==============================================================================
   ## CONSOLE
   ##==============================================================================
@@ -753,7 +947,7 @@
         self._domain = _domain
         self._address = QmfAddress.direct(self._name, self._domain)
         self._notifier = notifier
-        self._lock = Lock()
+        self._lock = RLock()
         self._conn = None
         self._session = None
         # dict of "agent-direct-address":class Agent entries
@@ -766,6 +960,7 @@
         self._agent_discovery_filter = None
         self._reply_timeout = reply_timeout
         self._agent_timeout = agent_timeout
+        self._subscribe_timeout = 300  # @todo: parameterize
         self._next_agent_expire = None
         self._next_mbox_expire = None
         # for passing WorkItems to the application
@@ -776,18 +971,6 @@
         self._post_office = {} # indexed by cid
         self._async_mboxes = {} # indexed by cid, used to expire them
 
-        ## Old stuff below???
-        #self._broker_list = []
-        #self.impl = qmfengine.Console()
-        #self._event = qmfengine.ConsoleEvent()
-        ##self._cv = Condition()
-        ##self._sync_count = 0
-        ##self._sync_result = None
-        ##self._select = {}
-        ##self._cb_cond = Condition()
-
-
-
     def destroy(self, timeout=None):
         """
         Must be called before the Console is deleted.  
@@ -801,8 +984,6 @@
             self.remove_connection(self._conn, timeout)
         logging.debug("Console Destroyed")
 
-
-
     def add_connection(self, conn):
         """
         Add a AMQP connection to the console.  The console will setup a session over the
@@ -934,10 +1115,11 @@
         cid = mbox.get_address()
 
         query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
-        msg = Message(subject="console.ind.locate." + name,
+        msg = Message(id=QMF_APP_ID,
+                      subject="console.ind.locate." + name,
                       properties={"method":"request",
-                                  "qmf.subject":make_subject(OpCode.agent_locate)},
-                      content={MsgKey.query: query.map_encode()})
+                                  "qmf.opcode":OpCode.agent_locate_req},
+                      content=query._predicate)
         msg.reply_to = str(self._address)
         msg.correlation_id = str(cid)
         logging.debug("Sending Agent Locate (%s)" % time.time())
@@ -995,23 +1177,13 @@
     def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
         """
         """
-        query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
-                      QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
-                      QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
-                      QmfQuery.TARGET_SCHEMA: MsgKey.schema,
-                      QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
-                      QmfQuery.TARGET_AGENT: MsgKey.agent_info}
-
         target = query.get_target()
-        msgkey = query_keymap.get(target)
-        if not msgkey:
-            raise Exception("Invalid target for query: %s" % str(query))
 
         if _reply_handle is not None:
             mbox = _QueryMailbox(self,
                                  agent.get_name(),
                                  _reply_handle,
-                                 target, msgkey,
+                                 target,
                                  _timeout)
         else:
             mbox = _SyncMailbox(self)
@@ -1045,9 +1217,8 @@
                 logging.debug("Query wait timed-out.")
                 break
 
-            objects = reply.content.get(msgkey)
-            if not objects:
-                # last response is empty
+            objects = reply.content
+            if not objects or not isinstance(objects, type([])):
                 break
 
             # convert from map to native types if needed
@@ -1081,21 +1252,154 @@
                 # no conversion needed.
                 response += objects
 
+            if not "partial" in reply.properties:
+                # reply not broken up over multiple msgs
+                break
+
             now = datetime.datetime.utcnow()
 
         mbox.destroy()
         return response
 
+
+    def create_subscription(self, agent, query, console_handle,
+                            _interval=None, _duration=None,
+                            _reply_handle=None, _timeout=None):
+        if not _duration:
+            _duration = self._subscribe_timeout
+
+        if _reply_handle is not None:
+            assert(False)  # async TBD
+        else:
+            mbox = _SubscriptionMailbox(self, _duration, console_handle, agent)
+
+        cid = mbox.get_address()
+
+        try:
+            logging.debug("Sending Subscribe to Agent (%s)" % time.time())
+            agent._send_subscribe_req(query, cid, _interval, _duration)
+        except SendError, e:
+            logging.error(str(e))
+            mbox.destroy()
+            return None
+
+        if _reply_handle is not None:
+            return True
+
+        # wait for reply
+        if _timeout is None:
+            _timeout = self._reply_timeout
+
+        logging.debug("Waiting for response to subscription (%s)" % _timeout)
+        # @todo: what if mbox expires here?
+        replyMsg = mbox.fetch(_timeout)
+
+        if not replyMsg:
+            logging.debug("Subscription request wait timed-out.")
+            mbox.destroy()
+            return None
+
+        error = replyMsg.content.get("_error")
+        if error:
+            mbox.destroy()
+            try:
+                e_map = QmfData.from_map(error)
+            except TypeError:
+                e_map = QmfData.create({"error":"Unknown error"})
+            return SubscribeParams(None, None, None, e_map)
+
+        mbox.agent_subscription_id = replyMsg.content.get("_subscription_id")
+        return SubscribeParams(mbox.get_address(),
+                               replyMsg.content.get("_interval"),
+                               replyMsg.content.get("_duration"),
+                               None)
+
+    def refresh_subscription(self, subscription_id,
+                             _duration=None,
+                             _reply_handle=None, _timeout=None):
+        if _reply_handle is not None:
+            assert(False)  # async TBD
+
+        mbox = self._get_mailbox(subscription_id)
+        if not mbox:
+            logging.warning("Subscription %s not found." % subscription_id)
+            return None
+
+        agent = self.get_agent(mbox.agent_name)
+        if not agent:
+            logging.warning("Subscription %s agent %s not found." %
+                            (mbox.agent_name, subscription_id))
+            return None
+
+        try:
+            logging.debug("Sending Subscribe to Agent (%s)" % time.time())
+            agent._send_resubscribe_req(subscription_id,
+                                        mbox.agent_subscription_id,
+                                        _duration)
+        except SendError, e:
+            logging.error(str(e))
+            # @todo ???? mbox.destroy()
+            return None
+
+        if _reply_handle is not None:
+            return True
+
+        # wait for reply
+        if _timeout is None:
+            _timeout = self._reply_timeout
+
+        logging.debug("Waiting for response to subscription (%s)" % _timeout)
+        replyMsg = mbox.fetch(_timeout)
+
+        if not replyMsg:
+            logging.debug("Subscription request wait timed-out.")
+            # @todo???? mbox.destroy()
+            return None
+
+        error = replyMsg.content.get("_error")
+        if error:
+            # @todo mbox.destroy()
+            try:
+                e_map = QmfData.from_map(error)
+            except TypeError:
+                e_map = QmfData.create({"error":"Unknown error"})
+            return SubscribeParams(None, None, None, e_map)
+
+        return SubscribeParams(mbox.get_address(),
+                               replyMsg.content.get("_interval"),
+                               replyMsg.content.get("_duration"),
+                               None)
+
+    def cancel_subscription(self, subscription_id):
+        """
+        """
+        mbox = self._get_mailbox(subscription_id)
+        if not mbox:
+            return None
+
+        agent = self.get_agent(mbox.agent_name)
+        if agent:
+            try:
+                logging.debug("Sending UnSubscribe to Agent (%s)" % time.time())
+                agent._send_unsubscribe_ind(subscription_id,
+                                            mbox.agent_subscription_id)
+            except SendError, e:
+                logging.error(str(e))
+
+        mbox.destroy()
+
+
     def _wake_thread(self):
         """
         Make the console management thread loop wakeup from its next_receiver
         sleep.
         """
         logging.debug("Sending noop to wake up [%s]" % self._address)
-        msg = Message(properties={"method":"request",
-                                  "qmf.subject":make_subject(OpCode.noop)},
+        msg = Message(id=QMF_APP_ID,
                       subject=self._name,
-                      content={"noop":"noop"})
+                      properties={"method":"request",
+                                  "qmf.opcode":OpCode.noop},
+                      content={})
         try:
             self._direct_sender.send( msg, sync=True )
         except SendError, e:
@@ -1152,9 +1456,17 @@
                 # to expire, or a mailbox requrest to time out
                 now = datetime.datetime.utcnow()
                 next_expire = self._next_agent_expire
-                if (self._next_mbox_expire and
-                    self._next_mbox_expire < next_expire):
-                    next_expire = self._next_mbox_expire
+
+                # the mailbox expire flag may be cleared by the
+                # app thread(s)
+                self._lock.acquire()
+                try:
+                    if (self._next_mbox_expire and
+                        self._next_mbox_expire < next_expire):
+                        next_expire = self._next_mbox_expire
+                finally:
+                    self._lock.release()
+
                 if next_expire > now:
                     timeout = timedelta_to_secs(next_expire - now)
                     try:
@@ -1268,13 +1580,14 @@
         """
         PRIVATE: Process a message received from an Agent
         """
-        logging.debug( "Message received from Agent! [%s]" % msg )
-        try:
-            version,opcode = parse_subject(msg.properties.get("qmf.subject"))
-            # @todo: deal with version mismatch!!!
-        except:
+        #logging.debug( "Message received from Agent! [%s]" % msg )
+        #logging.error( "Message received from Agent! [%s]" % msg )
+
+        opcode = msg.properties.get("qmf.opcode")
+        if not opcode:
             logging.error("Ignoring unrecognized message '%s'" % msg)
             return
+        version = 2 # @todo: fix me
 
         cmap = {}; props = {}
         if msg.content_type == "amqp/map":
@@ -1282,20 +1595,21 @@
         if msg.properties:
             props = msg.properties
 
-        if opcode == OpCode.agent_ind:
+        if opcode == OpCode.agent_heartbeat_ind:
             self._handle_agent_ind_msg( msg, cmap, version, _direct )
-        elif opcode == OpCode.data_ind:
-            self._handle_data_ind_msg(msg, cmap, version, _direct)
-        elif opcode == OpCode.event_ind:
-            self._handle_event_ind_msg(msg, cmap, version, _direct)
-        elif opcode == OpCode.managed_object:
-            logging.warning("!!! managed_object TBD !!!")
-        elif opcode == OpCode.object_ind:
-            logging.warning("!!! object_ind TBD !!!")
-        elif opcode == OpCode.response:
+        elif opcode == OpCode.agent_locate_rsp:
+            self._handle_agent_ind_msg( msg, cmap, version, _direct )
+        elif opcode == OpCode.query_rsp:
             self._handle_response_msg(msg, cmap, version, _direct)
-        elif opcode == OpCode.schema_ind:
-            logging.warning("!!! schema_ind TBD !!!")
+        elif opcode == OpCode.subscribe_rsp:
+            self._handle_response_msg(msg, cmap, version, _direct)
+        elif opcode == OpCode.method_rsp:
+            self._handle_response_msg(msg, cmap, version, _direct)
+        elif opcode == OpCode.data_ind:
+            if msg.correlation_id:
+                self._handle_response_msg(msg, cmap, version, _direct)
+            else:
+                self._handle_indication_msg(msg, cmap, version, _direct)
         elif opcode == OpCode.noop:
              logging.debug("No-op msg received.")
         else:
@@ -1309,7 +1623,7 @@
         """
         logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
 
-        ai_map = cmap.get(MsgKey.agent_info)
+        ai_map = msg.content
         if not ai_map or not isinstance(ai_map, type({})):
             logging.warning("Bad agent-ind message received: '%s'" % msg)
             return
@@ -1359,29 +1673,10 @@
             logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
             mbox.deliver(msg)
 
-    def _handle_data_ind_msg(self, msg, cmap, version, direct):
-        """
-        Process a received data-ind message.
-        """
-        logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
-
-        mbox = self._get_mailbox(msg.correlation_id)
-        if not mbox:
-            logging.debug("Data indicate received with unknown correlation_id"
-                          " msg='%s'" % str(msg)) 
-            return
-
-        # wake up all waiters
-        logging.debug("waking waiters for correlation id %s" %
-                      msg.correlation_id)
-        mbox.deliver(msg)
-
-
     def _handle_response_msg(self, msg, cmap, version, direct):
         """
         Process a received data-ind message.
         """
-        # @todo code replication - clean me.
         logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
 
         mbox = self._get_mailbox(msg.correlation_id)
@@ -1394,19 +1689,22 @@
         logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
         mbox.deliver(msg)
 
-    def _handle_event_ind_msg(self, msg, cmap, version, _direct):
-        ei_map = cmap.get(MsgKey.event)
-        if not ei_map or not isinstance(ei_map, type({})):
-            logging.warning("Bad event indication message received: '%s'" % msg)
-            return
+    def _handle_indication_msg(self, msg, cmap, version, _direct):
 
-        aname = ei_map.get("_name")
-        emap = ei_map.get("_event")
+        aname = msg.properties.get("qmf.agent")
         if not aname:
-            logging.debug("No '_name' field in event indication message.")
+            logging.debug("No agent name field in indication message.")
             return
-        if not emap:
-            logging.debug("No '_event' field in event indication message.")
+
+        content_type = msg.properties.get("qmf.content")
+        if (content_type != ContentType.event or
+            not isinstance(msg.content, type([]))):
+            logging.warning("Bad event indication message received: '%s'" % msg)
+            return
+
+        emap = msg.content[0]
+        if not isinstance(emap, type({})):
+            logging.debug("Invalid event body in indication message: '%s'" % msg)
             return
 
         agent = None
@@ -1439,13 +1737,13 @@
         """
         Check all async mailboxes for outstanding requests that have expired.
         """
-        now = datetime.datetime.utcnow()
-        if self._next_mbox_expire and now < self._next_mbox_expire:
-            return
-        expired_mboxes = []
-        self._next_mbox_expire = None
         self._lock.acquire()
         try:
+            now = datetime.datetime.utcnow()
+            if self._next_mbox_expire and now < self._next_mbox_expire:
+                return
+            expired_mboxes = []
+            self._next_mbox_expire = None
             for mbox in self._async_mboxes.itervalues():
                 if now >= mbox.expiration_date:
                     expired_mboxes.append(mbox)

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py Wed Feb 24 19:40:31 2010
@@ -27,3 +27,4 @@
 import multi_response
 import async_query
 import async_method
+import subscriptions

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py Wed Feb 24 19:40:31 2010
@@ -52,8 +52,8 @@
         self.broker_url = broker_url
         self.notifier = _testNotifier()
         self.agent = qmf2.agent.Agent(name,
-                           _notifier=self.notifier,
-                           _heartbeat_interval=heartbeat)
+                                      _notifier=self.notifier,
+                                      heartbeat_interval=heartbeat)
         # No database needed for this test
         self.running = False
         self.ready = Event()

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py Wed Feb 24 19:40:31 2010
@@ -53,7 +53,7 @@
         self.broker_url = broker_url
         self.agent = Agent(name,
                            _notifier=self.notifier,
-                           _heartbeat_interval=heartbeat)
+                           heartbeat_interval=heartbeat)
 
         # Dynamically construct a management database
 

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py Wed Feb 24 19:40:31 2010
@@ -53,7 +53,7 @@
         self.broker_url = broker_url
         self.agent = Agent(name,
                            _notifier=self.notifier,
-                           _heartbeat_interval=heartbeat)
+                           heartbeat_interval=heartbeat)
 
         # Dynamically construct a management database
 

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py Wed Feb 24 19:40:31 2010
@@ -53,7 +53,7 @@
         self.broker_url = broker_url
         self.agent = Agent(name,
                            _notifier=self.notifier,
-                           _heartbeat_interval=heartbeat)
+                           heartbeat_interval=heartbeat)
 
         # Dynamically construct a management database
 

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py Wed Feb 24 19:40:31 2010
@@ -53,7 +53,7 @@
         self.broker_url = broker_url
         self.agent = Agent(name,
                            _notifier=self.notifier,
-                           _heartbeat_interval=heartbeat)
+                           heartbeat_interval=heartbeat)
 
         # Dynamically construct a management database
 

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py Wed Feb 24 19:40:31 2010
@@ -58,7 +58,7 @@
         self.notifier = _testNotifier()
         self.agent = Agent(name,
                            _notifier=self.notifier,
-                           _heartbeat_interval=heartbeat)
+                           heartbeat_interval=heartbeat)
 
         # Dynamically construct a management database
 

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py Wed Feb 24 19:40:31 2010
@@ -60,8 +60,8 @@
         self.broker_url = broker_url
         self.agent = Agent(name,
                            _notifier=self.notifier,
-                           _heartbeat_interval=heartbeat,
-                           _max_msg_size=_MAX_OBJS_PER_MSG)
+                           heartbeat_interval=heartbeat,
+                           max_msg_size=_MAX_OBJS_PER_MSG)
 
         # Dynamically construct a management database
         for i in range(self.schema_count):

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py Wed Feb 24 19:40:31 2010
@@ -54,7 +54,7 @@
         self.broker_url = broker_url
         self.agent = Agent(name,
                            _notifier=self.notifier,
-                           _heartbeat_interval=heartbeat)
+                           heartbeat_interval=heartbeat)
 
         # Management Database 
         # - two different schema packages, 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org