You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/02/24 20:40:31 UTC
svn commit: r915946 [1/2] - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: ./
tests/
Author: kgiusti
Date: Wed Feb 24 19:40:31 2010
New Revision: 915946
URL: http://svn.apache.org/viewvc?rev=915946&view=rev
Log:
QPID-2261: sync with msg formats defined on wiki, start subscription impl.
Added:
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py Wed Feb 24 19:40:31 2010
@@ -21,18 +21,19 @@
import datetime
import time
import Queue
-from threading import Thread, Lock, currentThread, Event
+from threading import Thread, RLock, currentThread, Event
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
-from common import (make_subject, parse_subject, OpCode, QmfQuery,
- SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod,
- timedelta_to_secs)
+from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem,
+ SchemaMethod, timedelta_to_secs, QMF_APP_ID)
# global flag that indicates which thread (if any) is
# running the agent notifier callback
_callback_thread=None
+
+
##==============================================================================
## METHOD CALL
##==============================================================================
@@ -78,14 +79,73 @@
return self._user_id
+ ##==============================================================================
+ ## SUBSCRIPTIONS
+ ##==============================================================================
+
+class _ConsoleHandle(object):
+ """
+ """
+ def __init__(self, handle, reply_to):
+ self.console_handle = handle
+ self.reply_to = reply_to
+
+class SubscriptionParams(object):
+ """
+ """
+ def __init__(self, console_handle, query, interval, duration, user_id):
+ self._console_handle = console_handle
+ self._query = query
+ self._interval = interval
+ self._duration = duration
+ self._user_id = user_id
+
+ def get_console_handle(self):
+ return self._console_handle
+
+ def get_query(self):
+ return self._query
+
+ def get_interval(self):
+ return self._interval
+
+ def get_duration(self):
+ return self._duration
+
+ def get_user_id(self):
+ return self._user_id
+
+class _SubscriptionState(object):
+ """
+ An internally-managed subscription.
+ """
+ def __init__(self, reply_to, cid, query, interval, duration):
+ self.reply_to = reply_to
+ self.correlation_id = cid
+ self.query = query
+ self.interval = interval
+ self.duration = duration
+ now = datetime.datetime.utcnow()
+ self.next_update = now # do an immediate update
+ self.expiration = now + datetime.timedelta(seconds=duration)
+ self.id = 0
+
+ def resubscribe(self, now, _duration=None):
+ if _duration is not None:
+ self.duration = _duration
+ self.expiration = now + datetime.timedelta(seconds=self.duration)
+
+ def reset_interval(self, now):
+ self.next_update = now + datetime.timedelta(seconds=self.interval)
+
+
##==============================================================================
## AGENT
##==============================================================================
class Agent(Thread):
- def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30,
- _max_msg_size=0, _capacity=10):
+ def __init__(self, name, _domain=None, _notifier=None, **options):
Thread.__init__(self)
self._running = False
self._ready = Event()
@@ -94,11 +154,20 @@
self._domain = _domain
self._address = QmfAddress.direct(self.name, self._domain)
self._notifier = _notifier
- self._heartbeat_interval = _heartbeat_interval
+
+ # configurable parameters
+ #
+ self._heartbeat_interval = options.get("heartbeat_interval", 30)
+ self._capacity = options.get("capacity", 10)
+ self._default_duration = options.get("default_duration", 300)
+ self._max_duration = options.get("max_duration", 3600)
+ self._min_duration = options.get("min_duration", 10)
+ self._default_interval = options.get("default_interval", 30)
+ self._min_interval = options.get("min_interval", 5)
+
# @todo: currently, max # of objects in a single reply message, would
# be better if it were max bytesize of per-msg content...
- self._max_msg_size = _max_msg_size
- self._capacity = _capacity
+ self._max_msg_size = options.get("max_msg_size", 0)
self._conn = None
self._session = None
@@ -107,7 +176,7 @@
self._direct_sender = None
self._topic_sender = None
- self._lock = Lock()
+ self._lock = RLock()
self._packages = {}
self._schema_timestamp = long(0)
self._schema = {}
@@ -119,6 +188,10 @@
self._undescribed_data = {}
self._work_q = Queue.Queue()
self._work_q_put = False
+ # subscriptions
+ self._subscription_id = long(time.time())
+ self._subscriptions = {}
+ self._next_subscribe_event = None
def destroy(self, timeout=None):
@@ -192,10 +265,11 @@
if self.isAlive():
# kick my thread to wake it up
try:
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
+ msg = Message(id=QMF_APP_ID,
subject=self.name,
- content={"noop":"noop"})
+ properties={ "method":"request",
+ "qmf.opcode":OpCode.noop},
+ content={})
# TRACE
#logging.error("!!! sending wakeup to myself: %s" % msg)
@@ -258,13 +332,14 @@
raise Exception("No connection available")
# @todo: should we validate against the schema?
- _map = {"_name": self.get_name(),
- "_event": qmfEvent.map_encode()}
- msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
+ msg = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
qmfEvent.get_severity() + "." + self.name,
- properties={"method":"response",
- "qmf.subject":make_subject(OpCode.event_ind)},
- content={MsgKey.event:_map})
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content": ContentType.event,
+ "qmf.agent":self.name},
+ content=[qmfEvent.map_encode()])
# TRACE
# logging.error("!!! Agent %s sending Event (%s)" %
# (self.name, str(msg)))
@@ -330,9 +405,10 @@
raise TypeError("Invalid type for error - must be QmfData")
_map[SchemaMethod.KEY_ERROR] = _error.map_encode()
- msg = Message( properties={"method":"response",
- "qmf.subject":make_subject(OpCode.response)},
- content={MsgKey.method:_map})
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.method_rsp},
+ content=_map)
msg.correlation_id = handle.correlation_id
self._send_reply(msg, handle.reply_to)
@@ -370,25 +446,10 @@
while self._running:
- now = datetime.datetime.utcnow()
- # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
- if now >= next_heartbeat:
- ind = self._makeAgentIndMsg()
- ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT
- # TRACE
- #logging.error("!!! Agent %s sending Heartbeat (%s)" %
- # (self.name, str(ind)))
- self._topic_sender.send(ind)
- logging.debug("Agent Indication Sent")
- next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
-
- timeout = timedelta_to_secs(next_heartbeat - now)
- # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
- try:
- self._session.next_receiver(timeout=timeout)
- except Empty:
- continue
-
+ #
+ # Process inbound messages
+ #
+ logging.debug("%s processing inbound messages..." % self.name)
for i in range(batch_limit):
try:
msg = self._topic_receiver.fetch(timeout=0)
@@ -409,7 +470,71 @@
# (self.name, self._direct_receiver.source, msg))
self._dispatch(msg, _direct=True)
+ #
+ # Send Heartbeat Notification
+ #
+ now = datetime.datetime.utcnow()
+ if now >= next_heartbeat:
+ logging.debug("%s sending heartbeat..." % self.name)
+ ind = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT,
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.agent_heartbeat_ind,
+ "qmf.agent":self.name},
+ content=self._makeAgentInfoBody())
+ # TRACE
+ #logging.error("!!! Agent %s sending Heartbeat (%s)" %
+ # (self.name, str(ind)))
+ self._topic_sender.send(ind)
+ logging.debug("Agent Indication Sent")
+ next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+
+
+ #
+ # Monitor Subscriptions
+ #
+ if (self._next_subscribe_event is None or
+ now >= self._next_subscribe_event):
+
+ logging.debug("%s polling subscriptions..." % self.name)
+ self._next_subscribe_event = now + datetime.timedelta(seconds=
+ self._max_duration)
+ self._lock.acquire()
+ try:
+ dead_ss = []
+ for sid,ss in self._subscriptions.iteritems():
+ if now >= ss.expiration:
+ dead_ss.append(sid)
+ continue
+ if now >= ss.next_update:
+ response = []
+ objs = self._queryData(ss.query)
+ if objs:
+ for obj in objs:
+ response.append(obj.map_encode())
+ logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id))
+ self._send_query_response( ContentType.data,
+ ss.correlation_id,
+ ss.reply_to,
+ response)
+ ss.reset_interval(now)
+
+ next_timeout = min(ss.expiration, ss.next_update)
+ if next_timeout < self._next_subscribe_event:
+ self._next_subscribe_event = next_timeout
+
+ for sid in dead_ss:
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+ #
+ # notify application of pending WorkItems
+ #
+
if self._work_q_put and self._notifier:
+ logging.debug("%s notifying application..." % self.name)
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
@@ -417,19 +542,33 @@
self._notifier.indication()
_callback_thread = None
+ #
+ # Sleep until messages arrive or something times out
+ #
+ next_timeout = min(next_heartbeat, self._next_subscribe_event)
+ timeout = timedelta_to_secs(next_timeout -
+ datetime.datetime.utcnow())
+ if timeout > 0.0:
+ logging.debug("%s sleeping %s seconds..." % (self.name,
+ timeout))
+ try:
+ self._session.next_receiver(timeout=timeout)
+ except Empty:
+ pass
+
+
+
+
#
# Private:
#
- def _makeAgentIndMsg(self):
+ def _makeAgentInfoBody(self):
"""
- Create an agent indication message identifying this agent
+ Create an agent indication message body identifying this agent
"""
- _map = {"_name": self.get_name(),
+ return {"_name": self.get_name(),
"_schema_timestamp": self._schema_timestamp}
- return Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.agent_ind)},
- content={MsgKey.agent_info: _map})
def _send_reply(self, msg, reply_to):
"""
@@ -458,7 +597,7 @@
except SendError, e:
logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
- def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
+ def _send_query_response(self, content_type, cid, reply_to, objects):
"""
Send a response to a query, breaking the result into multiple
messages based on the agent's _max_msg_size config parameter
@@ -472,24 +611,28 @@
start = 0
end = min(total, max_count)
- while end <= total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
+ # send partial response if too many objects present
+ while end < total:
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "partial":None,
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
correlation_id = cid,
- content={msgkey:objects[start:end]})
+ content=objects[start:end])
self._send_reply(m, reply_to)
- if end == total:
- break;
start = end
end = min(total, end + max_count)
- # response terminator - last message has empty object array
- if total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
- correlation_id = cid,
- content={msgkey: []} )
- self._send_reply(m, reply_to)
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
+ correlation_id = cid,
+ content=objects[start:end])
+ self._send_reply(m, reply_to)
def _dispatch(self, msg, _direct=False):
"""
@@ -497,33 +640,32 @@
@param _direct: True if msg directly addressed to this agent.
"""
- logging.debug( "Message received from Console! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- except:
- logging.warning("Ignoring unrecognized message '%s'" % msg.subject)
- return
+ # logging.debug( "Message received from Console! [%s]" % msg )
+ # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) )
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
+ logging.warning("Ignoring unrecognized message '%s'" % msg)
+ return
+ version = 2 # @todo: fix me
cmap = {}; props={}
if msg.content_type == "amqp/map":
cmap = msg.content
if msg.properties:
props = msg.properties
- if opcode == OpCode.agent_locate:
+ if opcode == OpCode.agent_locate_req:
self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
- elif opcode == OpCode.get_query:
+ elif opcode == OpCode.query_req:
self._handleQueryMsg( msg, cmap, props, version, _direct )
elif opcode == OpCode.method_req:
self._handleMethodReqMsg(msg, cmap, props, version, _direct)
- elif opcode == OpCode.cancel_subscription:
- logging.warning("!!! CANCEL_SUB TBD !!!")
- elif opcode == OpCode.create_subscription:
- logging.warning("!!! CREATE_SUB TBD !!!")
- elif opcode == OpCode.renew_subscription:
- logging.warning("!!! RENEW_SUB TBD !!!")
- elif opcode == OpCode.schema_query:
- logging.warning("!!! SCHEMA_QUERY TBD !!!")
+ elif opcode == OpCode.subscribe_req:
+ self._handleSubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_refresh_req:
+ self._handleResubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_cancel_ind:
+ self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.noop:
logging.debug("No-op msg received.")
else:
@@ -536,18 +678,28 @@
"""
logging.debug("_handleAgentLocateMsg")
- reply = True
- if "method" in props and props["method"] == "request":
- query = cmap.get(MsgKey.query)
- if query is not None:
- # fake a QmfData containing my identifier for the query compare
- tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
- self.get_name()},
- _object_id="my-name")
- reply = QmfQuery.from_map(query).evaluate(tmpData)
+ reply = False
+ if props.get("method") == "request":
+ # if the message is addressed to me or wildcard, process it
+ if (msg.subject == "console.ind" or
+ msg.subject == "console.ind.locate" or
+ msg.subject == "console.ind.locate." + self.name):
+ pred = msg.content
+ if not pred:
+ reply = True
+ elif isinstance(pred, type([])):
+ # fake a QmfData containing my identifier for the query compare
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred)
+ tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
+ self.get_name()},
+ _object_id="my-name")
+ reply = query.evaluate(tmpData)
if reply:
- m = self._makeAgentIndMsg()
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.agent_locate_rsp},
+ content=self._makeAgentInfoBody())
m.correlation_id = msg.correlation_id
self._send_reply(m, msg.reply_to)
else:
@@ -561,22 +713,25 @@
logging.debug("_handleQueryMsg")
if "method" in props and props["method"] == "request":
- qmap = cmap.get(MsgKey.query)
- if qmap:
- query = QmfQuery.from_map(qmap)
+ if cmap:
+ try:
+ query = QmfQuery.from_map(cmap)
+ except TypeError:
+ logging.error("Invalid Query format: '%s'" % str(cmap))
+ return
target = query.get_target()
if target == QmfQuery.TARGET_PACKAGES:
- self._queryPackages( msg, query )
+ self._queryPackagesReply( msg, query )
elif target == QmfQuery.TARGET_SCHEMA_ID:
- self._querySchema( msg, query, _idOnly=True )
+ self._querySchemaReply( msg, query, _idOnly=True )
elif target == QmfQuery.TARGET_SCHEMA:
- self._querySchema( msg, query)
+ self._querySchemaReply( msg, query)
elif target == QmfQuery.TARGET_AGENT:
logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
elif target == QmfQuery.TARGET_OBJECT_ID:
- self._queryData(msg, query, _idOnly=True)
+ self._queryDataReply(msg, query, _idOnly=True)
elif target == QmfQuery.TARGET_OBJECT:
- self._queryData(msg, query)
+ self._queryDataReply(msg, query)
else:
logging.warning("Unrecognized query target: '%s'" % str(target))
@@ -634,7 +789,159 @@
self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
self._work_q_put = True
- def _queryPackages(self, msg, query):
+ def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Subscription Request
+ """
+ if "method" in props and props["method"] == "request":
+ query_map = cmap.get("_query")
+ interval = cmap.get("_interval")
+ duration = cmap.get("_duration")
+
+ try:
+ query = QmfQuery.from_map(query_map)
+ except TypeError:
+ logging.warning("Invalid query for subscription: %s" %
+ str(query_map))
+ return
+
+ if isinstance(self, AgentExternal):
+ # param = SubscriptionParams(_ConsoleHandle(console_handle,
+ # msg.reply_to),
+ # query,
+ # interval,
+ # duration,
+ # msg.user_id)
+ # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST,
+ # msg.correlation_id, param))
+ # self._work_q_put = True
+ logging.error("External Subscription TBD")
+ return
+
+ # validate the query - only specific objects, or
+ # objects wildcard, are currently supported.
+ if (query.get_target() != QmfQuery.TARGET_OBJECT or
+ (query.get_selector() == QmfQuery.PREDICATE and
+ query.get_predicate())):
+ logging.error("Subscriptions only support (wildcard) Object"
+ " Queries.")
+ err = QmfData.create(
+ {"reason": "Unsupported Query type for subscription.",
+ "query": str(query.map_encode())})
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content={"_error": err.map_encode()})
+ self._send_reply(m, msg.reply_to)
+ return
+
+ if duration is None:
+ duration = self._default_duration
+ else:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ logging.warning("Bad duration value: %s" % str(msg))
+ duration = self._default_duration
+
+ if interval is None:
+ interval = self._default_interval
+ else:
+ try:
+ interval = float(interval)
+ if interval < self._min_interval:
+ interval = self._min_interval
+ except:
+ logging.warning("Bad interval value: %s" % str(msg))
+ interval = self._default_interval
+
+ ss = _SubscriptionState(msg.reply_to,
+ msg.correlation_id,
+ query,
+ interval,
+ duration)
+ self._lock.acquire()
+ try:
+ sid = self._subscription_id
+ self._subscription_id += 1
+ ss.id = sid
+ self._subscriptions[sid] = ss
+ self._next_subscribe_event = None
+ finally:
+ self._lock.release()
+
+ sr_map = {"_subscription_id": sid,
+ "_interval": interval,
+ "_duration": duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
+
+ def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Renew Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ logging.debug("Invalid subscription refresh msg: %s" %
+ str(msg))
+ return
+
+ self._lock.acquire()
+ try:
+ ss = self._subscriptions.get(sid)
+ if not ss:
+ logging.debug("Ignoring unknown subscription: %s" %
+ str(sid))
+ return
+ duration = cmap.get("_duration")
+ if duration is not None:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ logging.debug("Bad duration value: %s" % str(msg))
+ duration = None # use existing duration
+
+ ss.resubscribe(datetime.datetime.utcnow(), duration)
+
+ finally:
+ self._lock.release()
+
+
+ def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Cancel Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ logging.warning("No subscription id supplied: %s" % msg)
+ return
+
+ self._lock.acquire()
+ try:
+ if sid in self._subscriptions:
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+
+ def _queryPackagesReply(self, msg, query):
"""
Run a query against the list of known packages
"""
@@ -646,58 +953,83 @@
_object_id="_package")
if query.evaluate(qmfData):
pnames.append(name)
+
+ self._send_query_response(ContentType.schema_package,
+ msg.correlation_id,
+ msg.reply_to,
+ pnames)
finally:
self._lock.release()
- self._send_query_response(make_subject(OpCode.data_ind),
- MsgKey.package_info,
- msg.correlation_id,
- msg.reply_to,
- pnames)
- def _querySchema( self, msg, query, _idOnly=False ):
+ def _querySchemaReply( self, msg, query, _idOnly=False ):
"""
"""
schemas = []
- # if querying for a specific schema, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- found = None
- self._lock.acquire()
- try:
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific schema, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
found = self._schema.get(query.get_id())
- finally:
- self._lock.release()
- if found:
- if _idOnly:
- schemas.append(query.get_id().map_encode())
- else:
- schemas.append(found.map_encode())
- else: # otherwise, evaluate all schema
- self._lock.acquire()
- try:
+ if found:
+ if _idOnly:
+ schemas.append(query.get_id().map_encode())
+ else:
+ schemas.append(found.map_encode())
+ else: # otherwise, evaluate all schema
for sid,val in self._schema.iteritems():
if query.evaluate(val):
if _idOnly:
schemas.append(sid.map_encode())
else:
schemas.append(val.map_encode())
- finally:
- self._lock.release()
+ if _idOnly:
+ msgkey = ContentType.schema_id
+ else:
+ msgkey = ContentType.schema_class
- if _idOnly:
- msgkey = MsgKey.schema_id
- else:
- msgkey = MsgKey.schema
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ schemas)
+ finally:
+ self._lock.release()
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- schemas)
+ def _queryDataReply( self, msg, query, _idOnly=False ):
+ """
+ """
+ # hold the (recursive) lock for the duration so the Agent
+ # won't send data that is currently being modified by the
+ # app.
+ self._lock.acquire()
+ try:
+ response = []
+ data_objs = self._queryData(query)
+ if _idOnly:
+ for obj in data_objs:
+ response.append(obj.get_object_id())
+ else:
+ for obj in data_objs:
+ response.append(obj.map_encode())
+
+ if _idOnly:
+ msgkey = ContentType.object_id
+ else:
+ msgkey = ContentType.data
- def _queryData( self, msg, query, _idOnly=False ):
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ response)
+ finally:
+ self._lock.release()
+
+
+ def _queryData(self, query):
"""
+ Return a list of QmfData objects that match a given query
"""
data_objs = []
# extract optional schema_id from target params
@@ -705,12 +1037,12 @@
t_params = query.get_target_param()
if t_params:
sid = t_params.get(QmfData.KEY_SCHEMA_ID)
- # if querying for a specific object, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- oid = query.get_id()
- found = None
- self._lock.acquire()
- try:
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific object, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ oid = query.get_id()
if sid and not sid.get_hash_string():
# wildcard schema_id match, check each schema
for name,db in self._described_data.iteritems():
@@ -718,11 +1050,9 @@
and name.get_package_name() == sid.get_package_name()):
found = db.get(oid)
if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
+ data_objs.append(found)
else:
+ found = None
if sid:
db = self._described_data.get(sid)
if db:
@@ -730,15 +1060,9 @@
else:
found = self._undescribed_data.get(oid)
if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
- finally:
- self._lock.release()
- else: # otherwise, evaluate all data
- self._lock.acquire()
- try:
+ data_objs.append(found)
+
+ else: # otherwise, evaluate all data
if sid and not sid.get_hash_string():
# wildcard schema_id match, check each schema
for name,db in self._described_data.iteritems():
@@ -746,10 +1070,7 @@
and name.get_package_name() == sid.get_package_name()):
for oid,data in db.iteritems():
if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
+ data_objs.append(data)
else:
if sid:
db = self._described_data.get(sid)
@@ -759,23 +1080,28 @@
if db:
for oid,data in db.iteritems():
if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
- finally:
- self._lock.release()
+ data_objs.append(data)
+ finally:
+ self._lock.release()
- if _idOnly:
- msgkey = MsgKey.object_id
- else:
- msgkey = MsgKey.data_obj
+ return data_objs
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- data_objs)
+
+
+ ##==============================================================================
+ ## EXTERNAL DATABASE AGENT
+ ##==============================================================================
+
+class AgentExternal(Agent):
+ """
+ An Agent which uses an external management database.
+ """
+ def __init__(self, name, _domain=None, _notifier=None,
+ _heartbeat_interval=30, _max_msg_size=0, _capacity=10):
+ super(AgentExternal, self).__init__(name, _domain, _notifier,
+ _heartbeat_interval,
+ _max_msg_size, _capacity)
+ logging.error("AgentExternal TBD")
@@ -823,9 +1149,11 @@
_schema_id=schema_id, _const=False)
self._agent = agent
self._validated = False
+ self._modified = True
def destroy(self):
self._dtime = long(time.time() * 1000)
+ self._touch()
# @todo: publish change
def is_deleted(self):
@@ -833,6 +1161,7 @@
def set_value(self, _name, _value, _subType=None):
super(QmfAgentData, self).set_value(_name, _value, _subType)
+ self._touch()
# @todo: publish change
def inc_value(self, name, delta=1):
@@ -849,6 +1178,7 @@
""" subtract the delta from the property """
# @todo: need to take write-lock
logging.error(" TBD!!!")
+ self._touch()
def validate(self):
"""
@@ -868,6 +1198,13 @@
raise Exception("Required property '%s' not present." % name)
self._validated = True
+ def _touch(self):
+ """
+ Mark this object as modified. Used to force a publish of this object
+ if on subscription.
+ """
+ self._modified = True
+
################################################################################
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py Wed Feb 24 19:40:31 2010
@@ -34,61 +34,44 @@
## Constants
##
-AMQP_QMF_SUBJECT = "qmf"
-AMQP_QMF_VERSION = 4
-AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
-
-class MsgKey(object):
- agent_info = "agent_info"
- query = "query"
- package_info = "package_info"
- schema_id = "schema_id"
- schema = "schema"
- object_id="object_id"
- data_obj="object"
- method="method"
- event="event"
+QMF_APP_ID="qmf2"
-class OpCode(object):
- noop = "noop"
-
- # codes sent by a console and processed by the agent
- agent_locate = "agent-locate"
- cancel_subscription = "cancel-subscription"
- create_subscription = "create-subscription"
- get_query = "get-query"
- method_req = "method"
- renew_subscription = "renew-subscription"
- schema_query = "schema-query" # @todo: deprecate
-
- # codes sent by the agent to a console
- agent_ind = "agent"
- data_ind = "data"
- event_ind = "event"
- managed_object = "managed-object"
- object_ind = "object"
- response = "response"
- schema_ind="schema" # @todo: deprecate
+class ContentType(object):
+ """ Values for the 'qmf.content' message header
+ """
+ schema_package = "_schema_package"
+ schema_id = "_schema_id"
+ schema_class = "_schema_class"
+ object_id = "_object_id"
+ data = "_data"
+ event = "_event"
+class OpCode(object):
+ """ Values for the 'qmf.opcode' message header.
+ """
+ noop = "_noop"
+ # codes sent by a console and processed by the agent
+ agent_locate_req = "_agent_locate_request"
+ subscribe_req = "_subscribe_request"
+ subscribe_cancel_ind = "_subscribe_cancel_indication"
+ subscribe_refresh_req = "_subscribe_refresh_indication"
+ query_req = "_query_request"
+ method_req = "_method_request"
-def make_subject(_code):
- """
- Create a message subject field value.
- """
- return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code)
+ # codes sent by the agent to a console
+ agent_locate_rsp = "_agent_locate_response"
+ agent_heartbeat_ind = "_agent_heartbeat_indication"
+ query_rsp = "_query_response"
+ subscribe_rsp = "_subscribe_response"
+ subscribe_refresh_rsp = "_subscribe_refresh_response"
+ data_ind = "_data_indication"
+ method_rsp = "_method_response"
-def parse_subject(_sub):
- """
- Deconstruct a subject field, return version,opcode values
- """
- if _sub[:3] != "qmf":
- raise Exception("Non-QMF message received")
- return _sub[3:].split('.', 1)
def timedelta_to_secs(td):
"""
@@ -133,11 +116,15 @@
AGENT_HEARTBEAT=8
QUERY_COMPLETE=9
METHOD_RESPONSE=10
+ SUBSCRIBE_RESPONSE=11
+ SUBSCRIBE_INDICATION=12
+ RESUBSCRIBE_RESPONSE=13
# Enumeration of the types of WorkItems produced on the Agent
METHOD_CALL=1000
QUERY=1001
- SUBSCRIBE=1002
- UNSUBSCRIBE=1003
+ SUBSCRIBE_REQUEST=1002
+ RESUBSCRIBE_REQUEST=1003
+ UNSUBSCRIBE_REQUEST=1004
def __init__(self, kind, handle, _params=None):
"""
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Wed Feb 24 19:40:31 2010
@@ -24,14 +24,14 @@
import datetime
import Queue
from threading import Thread, Event
-from threading import Lock
+from threading import RLock
from threading import currentThread
from threading import Condition
from qpid.messaging import Connection, Message, Empty, SendError
-from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
- MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId,
+from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId,
SchemaEventClass, SchemaObjectClass, WorkItem,
SchemaMethod, QmfEvent, timedelta_to_secs)
@@ -141,6 +141,7 @@
console._lock.acquire()
try:
console._async_mboxes[self.cid] = self
+ console._next_mbox_expire = None
finally:
console._lock.release()
@@ -177,7 +178,7 @@
def __init__(self, console,
agent_name,
context,
- target, msgkey,
+ target,
_timeout=None):
"""
Invoked by application thread.
@@ -186,7 +187,6 @@
_timeout)
self.agent_name = agent_name
self.target = target
- self.msgkey = msgkey
self.context = context
self.result = []
@@ -195,11 +195,8 @@
Process query response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
- done = False
- objects = reply.content.get(self.msgkey)
- if not objects:
- done = True
- else:
+ objects = reply.content
+ if isinstance(objects, type([])):
# convert from map to native types if needed
if self.target == QmfQuery.TARGET_SCHEMA_ID:
for sid_map in objects:
@@ -237,8 +234,7 @@
# no conversion needed.
self.result += objects
- if done:
- # create workitem
+ if not "partial" in reply.properties:
# logging.error("QUERY COMPLETE for %s" % str(self.context))
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
@@ -278,8 +274,8 @@
Process schema response messages.
"""
done = False
- schemas = reply.content.get(MsgKey.schema)
- if schemas:
+ schemas = reply.content
+ if schemas and isinstance(schemas, type([])):
for schema_map in schemas:
# extract schema id, convert based on schema type
sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
@@ -319,8 +315,8 @@
Invoked by Console Management thread only.
"""
- _map = reply.content.get(MsgKey.method)
- if not _map:
+ _map = reply.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
result = None
else:
@@ -355,6 +351,128 @@
+class _SubscriptionMailbox(_AsyncMailbox):
+ """
+ A Mailbox for a single subscription.
+ """
+ def __init__(self, console, lifetime, context, agent):
+ """
+ Invoked by application thread.
+ """
+ super(_SubscriptionMailbox, self).__init__(console, lifetime)
+ self.cv = Condition()
+ self.data = []
+ self.result = []
+ self.context = context
+ self.agent_name = agent.get_name()
+ self.agent_subscription_id = None # from agent
+
+ def deliver(self, msg):
+ """
+ """
+ opcode = msg.properties.get("qmf.opcode")
+ if (opcode == OpCode.subscribe_rsp or
+ opcode == OpCode.subscribe_refresh_rsp):
+ #
+ # sync only - just deliver the msg
+ #
+ self.cv.acquire()
+ try:
+ self.data.append(msg)
+ # if was empty, notify waiters
+ if len(self.data) == 1:
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ return
+
+ # sid = msg.content.get("_subscription_id")
+ # lifetime = msg.content.get("_duration")
+ # error = msg.content.get("_error")
+ # sp = SubscribeParams(sid,
+ # msg.content.get("_interval"),
+ # lifetime, error)
+ # if sid and self.subscription_id is None:
+ # self.subscription_id = sid
+ # if lifetime:
+ # self.console._lock.acquire()
+ # try:
+ # self.expiration_date = (datetime.datetime.utcnow() +
+ # datetime.timedelta(seconds=lifetime))
+ # finally:
+ # self.console._lock.release()
+
+ # if self.waiting:
+ # self.cv.acquire()
+ # try:
+ # self.data.append(sp)
+ # # if was empty, notify waiters
+ # if len(self._data) == 1:
+ # self._cv.notify()
+ # finally:
+ # self._cv.release()
+ # else:
+ # if opcode == OpCode.subscribe_rsp:
+ # wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+ # self.context, sp)
+ # else:
+ # wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+ # self.context, sp)
+ # self.console._work_q.put(wi)
+ # self.console._work_q_put = True
+ # if error:
+ # self.destroy()
+
+ agent_name = msg.properties.get("qmf.agent")
+ if not agent_name:
+ logging.warning("Ignoring data_ind - no agent name given: %s" %
+ msg)
+ return
+ agent = self.console.get_agent(agent_name)
+ if not agent:
+ logging.warning("Ignoring data_ind - unknown agent '%s'" %
+ agent_name)
+ return
+
+ objects = msg.content
+ for obj_map in objects:
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ # start fetch of schema if not known
+ sid = obj.get_schema_class_id()
+ if sid:
+ self.console._prefetch_schema(sid, agent)
+ self.result.append(obj)
+
+ if not "partial" in msg.properties:
+ wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result)
+ self.result = []
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ def fetch(self, timeout=None):
+ """
+ Get one data item from a mailbox, with timeout.
+ Invoked by application thread.
+ """
+ self.cv.acquire()
+ try:
+ if len(self.data) == 0:
+ self.cv.wait(timeout)
+ if len(self.data):
+ return self.data.pop(0)
+ return None
+ finally:
+ self.cv.release()
+
+ def expire(self):
+ """ The subscription expired.
+ """
+ self.destroy()
+
+
+
+
+
##==============================================================================
## DATA MODEL
##==============================================================================
@@ -481,8 +599,8 @@
logging.debug("Agent method req wait timed-out.")
return None
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
+ _map = replyMsg.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
return None
@@ -650,8 +768,8 @@
logging.debug("Agent method req wait timed-out.")
return None
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
+ _map = replyMsg.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
return None
@@ -676,20 +794,66 @@
def _send_query(self, query, correlation_id=None):
"""
"""
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.get_query)},
- content={MsgKey.query: query.map_encode()})
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.query_req},
+ content=query.map_encode())
self._send_msg( msg, correlation_id )
def _send_method_req(self, mr_map, correlation_id=None):
"""
"""
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.method_req)},
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.method_req},
content=mr_map)
self._send_msg( msg, correlation_id )
+ def _send_subscribe_req(self, query, correlation_id, _interval=None,
+ _lifetime=None):
+ """
+ """
+ sr_map = {"_query":query.map_encode()}
+ if _interval is not None:
+ sr_map["_interval"] = _interval
+ if _lifetime is not None:
+ sr_map["_duration"] = _lifetime
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_req},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
+
+ def _send_resubscribe_req(self, correlation_id,
+ subscription_id,
+ _lifetime=None):
+ """
+ """
+ sr_map = {"_subscription_id":subscription_id}
+ if _lifetime is not None:
+ sr_map["_duration"] = _lifetime
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_refresh_req},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
+
+ def _send_unsubscribe_ind(self, correlation_id, subscription_id):
+ """
+ """
+ sr_map = {"_subscription_id":subscription_id}
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_cancel_ind},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
##==============================================================================
## METHOD CALL
@@ -716,6 +880,36 @@
return arg
+
+ ##==============================================================================
+ ## SUBSCRIPTION
+ ##==============================================================================
+
+class SubscribeParams(object):
+ """ Represents a standing subscription for this console.
+ """
+ def __init__(self, sid, interval, duration, _error=None):
+ self._sid = sid
+ self._interval = interval
+ self._duration = duration
+ self._error = _error
+
+ def succeeded(self):
+ return self._error is None
+
+ def get_error(self):
+ return self._error
+
+ def get_subscription_id(self):
+ return self._sid
+
+ def get_publish_interval(self):
+ return self._interval
+
+ def get_duration(self):
+ return self._duration
+
+
##==============================================================================
## CONSOLE
##==============================================================================
@@ -753,7 +947,7 @@
self._domain = _domain
self._address = QmfAddress.direct(self._name, self._domain)
self._notifier = notifier
- self._lock = Lock()
+ self._lock = RLock()
self._conn = None
self._session = None
# dict of "agent-direct-address":class Agent entries
@@ -766,6 +960,7 @@
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
+ self._subscribe_timeout = 300 # @todo: parameterize
self._next_agent_expire = None
self._next_mbox_expire = None
# for passing WorkItems to the application
@@ -776,18 +971,6 @@
self._post_office = {} # indexed by cid
self._async_mboxes = {} # indexed by cid, used to expire them
- ## Old stuff below???
- #self._broker_list = []
- #self.impl = qmfengine.Console()
- #self._event = qmfengine.ConsoleEvent()
- ##self._cv = Condition()
- ##self._sync_count = 0
- ##self._sync_result = None
- ##self._select = {}
- ##self._cb_cond = Condition()
-
-
-
def destroy(self, timeout=None):
"""
Must be called before the Console is deleted.
@@ -801,8 +984,6 @@
self.remove_connection(self._conn, timeout)
logging.debug("Console Destroyed")
-
-
def add_connection(self, conn):
"""
Add a AMQP connection to the console. The console will setup a session over the
@@ -934,10 +1115,11 @@
cid = mbox.get_address()
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
- msg = Message(subject="console.ind.locate." + name,
+ msg = Message(id=QMF_APP_ID,
+ subject="console.ind.locate." + name,
properties={"method":"request",
- "qmf.subject":make_subject(OpCode.agent_locate)},
- content={MsgKey.query: query.map_encode()})
+ "qmf.opcode":OpCode.agent_locate_req},
+ content=query._predicate)
msg.reply_to = str(self._address)
msg.correlation_id = str(cid)
logging.debug("Sending Agent Locate (%s)" % time.time())
@@ -995,23 +1177,13 @@
def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
"""
"""
- query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
- QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
- QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
- QmfQuery.TARGET_SCHEMA: MsgKey.schema,
- QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
- QmfQuery.TARGET_AGENT: MsgKey.agent_info}
-
target = query.get_target()
- msgkey = query_keymap.get(target)
- if not msgkey:
- raise Exception("Invalid target for query: %s" % str(query))
if _reply_handle is not None:
mbox = _QueryMailbox(self,
agent.get_name(),
_reply_handle,
- target, msgkey,
+ target,
_timeout)
else:
mbox = _SyncMailbox(self)
@@ -1045,9 +1217,8 @@
logging.debug("Query wait timed-out.")
break
- objects = reply.content.get(msgkey)
- if not objects:
- # last response is empty
+ objects = reply.content
+ if not objects or not isinstance(objects, type([])):
break
# convert from map to native types if needed
@@ -1081,21 +1252,154 @@
# no conversion needed.
response += objects
+ if not "partial" in reply.properties:
+ # reply not broken up over multiple msgs
+ break
+
now = datetime.datetime.utcnow()
mbox.destroy()
return response
+
+ def create_subscription(self, agent, query, console_handle,
+ _interval=None, _duration=None,
+ _reply_handle=None, _timeout=None):
+ if not _duration:
+ _duration = self._subscribe_timeout
+
+ if _reply_handle is not None:
+ assert(False) # async TBD
+ else:
+ mbox = _SubscriptionMailbox(self, _duration, console_handle, agent)
+
+ cid = mbox.get_address()
+
+ try:
+ logging.debug("Sending Subscribe to Agent (%s)" % time.time())
+ agent._send_subscribe_req(query, cid, _interval, _duration)
+ except SendError, e:
+ logging.error(str(e))
+ mbox.destroy()
+ return None
+
+ if _reply_handle is not None:
+ return True
+
+ # wait for reply
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ # @todo: what if mbox expires here?
+ replyMsg = mbox.fetch(_timeout)
+
+ if not replyMsg:
+ logging.debug("Subscription request wait timed-out.")
+ mbox.destroy()
+ return None
+
+ error = replyMsg.content.get("_error")
+ if error:
+ mbox.destroy()
+ try:
+ e_map = QmfData.from_map(error)
+ except TypeError:
+ e_map = QmfData.create({"error":"Unknown error"})
+ return SubscribeParams(None, None, None, e_map)
+
+ mbox.agent_subscription_id = replyMsg.content.get("_subscription_id")
+ return SubscribeParams(mbox.get_address(),
+ replyMsg.content.get("_interval"),
+ replyMsg.content.get("_duration"),
+ None)
+
+ def refresh_subscription(self, subscription_id,
+ _duration=None,
+ _reply_handle=None, _timeout=None):
+ if _reply_handle is not None:
+ assert(False) # async TBD
+
+ mbox = self._get_mailbox(subscription_id)
+ if not mbox:
+ logging.warning("Subscription %s not found." % subscription_id)
+ return None
+
+ agent = self.get_agent(mbox.agent_name)
+ if not agent:
+ logging.warning("Subscription %s agent %s not found." %
+ (mbox.agent_name, subscription_id))
+ return None
+
+ try:
+ logging.debug("Sending Subscribe to Agent (%s)" % time.time())
+ agent._send_resubscribe_req(subscription_id,
+ mbox.agent_subscription_id,
+ _duration)
+ except SendError, e:
+ logging.error(str(e))
+ # @todo ???? mbox.destroy()
+ return None
+
+ if _reply_handle is not None:
+ return True
+
+ # wait for reply
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ replyMsg = mbox.fetch(_timeout)
+
+ if not replyMsg:
+ logging.debug("Subscription request wait timed-out.")
+ # @todo???? mbox.destroy()
+ return None
+
+ error = replyMsg.content.get("_error")
+ if error:
+ # @todo mbox.destroy()
+ try:
+ e_map = QmfData.from_map(error)
+ except TypeError:
+ e_map = QmfData.create({"error":"Unknown error"})
+ return SubscribeParams(None, None, None, e_map)
+
+ return SubscribeParams(mbox.get_address(),
+ replyMsg.content.get("_interval"),
+ replyMsg.content.get("_duration"),
+ None)
+
+ def cancel_subscription(self, subscription_id):
+ """
+ """
+ mbox = self._get_mailbox(subscription_id)
+ if not mbox:
+ return None
+
+ agent = self.get_agent(mbox.agent_name)
+ if agent:
+ try:
+ logging.debug("Sending UnSubscribe to Agent (%s)" % time.time())
+ agent._send_unsubscribe_ind(subscription_id,
+ mbox.agent_subscription_id)
+ except SendError, e:
+ logging.error(str(e))
+
+ mbox.destroy()
+
+
def _wake_thread(self):
"""
Make the console management thread loop wakeup from its next_receiver
sleep.
"""
logging.debug("Sending noop to wake up [%s]" % self._address)
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
+ msg = Message(id=QMF_APP_ID,
subject=self._name,
- content={"noop":"noop"})
+ properties={"method":"request",
+ "qmf.opcode":OpCode.noop},
+ content={})
try:
self._direct_sender.send( msg, sync=True )
except SendError, e:
@@ -1152,9 +1456,17 @@
# to expire, or a mailbox requrest to time out
now = datetime.datetime.utcnow()
next_expire = self._next_agent_expire
- if (self._next_mbox_expire and
- self._next_mbox_expire < next_expire):
- next_expire = self._next_mbox_expire
+
+ # the mailbox expire flag may be cleared by the
+ # app thread(s)
+ self._lock.acquire()
+ try:
+ if (self._next_mbox_expire and
+ self._next_mbox_expire < next_expire):
+ next_expire = self._next_mbox_expire
+ finally:
+ self._lock.release()
+
if next_expire > now:
timeout = timedelta_to_secs(next_expire - now)
try:
@@ -1268,13 +1580,14 @@
"""
PRIVATE: Process a message received from an Agent
"""
- logging.debug( "Message received from Agent! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- # @todo: deal with version mismatch!!!
- except:
+ #logging.debug( "Message received from Agent! [%s]" % msg )
+ #logging.error( "Message received from Agent! [%s]" % msg )
+
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
logging.error("Ignoring unrecognized message '%s'" % msg)
return
+ version = 2 # @todo: fix me
cmap = {}; props = {}
if msg.content_type == "amqp/map":
@@ -1282,20 +1595,21 @@
if msg.properties:
props = msg.properties
- if opcode == OpCode.agent_ind:
+ if opcode == OpCode.agent_heartbeat_ind:
self._handle_agent_ind_msg( msg, cmap, version, _direct )
- elif opcode == OpCode.data_ind:
- self._handle_data_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.event_ind:
- self._handle_event_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.managed_object:
- logging.warning("!!! managed_object TBD !!!")
- elif opcode == OpCode.object_ind:
- logging.warning("!!! object_ind TBD !!!")
- elif opcode == OpCode.response:
+ elif opcode == OpCode.agent_locate_rsp:
+ self._handle_agent_ind_msg( msg, cmap, version, _direct )
+ elif opcode == OpCode.query_rsp:
self._handle_response_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.schema_ind:
- logging.warning("!!! schema_ind TBD !!!")
+ elif opcode == OpCode.subscribe_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.method_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.data_ind:
+ if msg.correlation_id:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ else:
+ self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
logging.debug("No-op msg received.")
else:
@@ -1309,7 +1623,7 @@
"""
logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
- ai_map = cmap.get(MsgKey.agent_info)
+ ai_map = msg.content
if not ai_map or not isinstance(ai_map, type({})):
logging.warning("Bad agent-ind message received: '%s'" % msg)
return
@@ -1359,29 +1673,10 @@
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
- def _handle_data_ind_msg(self, msg, cmap, version, direct):
- """
- Process a received data-ind message.
- """
- logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
-
- mbox = self._get_mailbox(msg.correlation_id)
- if not mbox:
- logging.debug("Data indicate received with unknown correlation_id"
- " msg='%s'" % str(msg))
- return
-
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" %
- msg.correlation_id)
- mbox.deliver(msg)
-
-
def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
- # @todo code replication - clean me.
logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
mbox = self._get_mailbox(msg.correlation_id)
@@ -1394,19 +1689,22 @@
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
- def _handle_event_ind_msg(self, msg, cmap, version, _direct):
- ei_map = cmap.get(MsgKey.event)
- if not ei_map or not isinstance(ei_map, type({})):
- logging.warning("Bad event indication message received: '%s'" % msg)
- return
+ def _handle_indication_msg(self, msg, cmap, version, _direct):
- aname = ei_map.get("_name")
- emap = ei_map.get("_event")
+ aname = msg.properties.get("qmf.agent")
if not aname:
- logging.debug("No '_name' field in event indication message.")
+ logging.debug("No agent name field in indication message.")
return
- if not emap:
- logging.debug("No '_event' field in event indication message.")
+
+ content_type = msg.properties.get("qmf.content")
+ if (content_type != ContentType.event or
+ not isinstance(msg.content, type([]))):
+ logging.warning("Bad event indication message received: '%s'" % msg)
+ return
+
+ emap = msg.content[0]
+ if not isinstance(emap, type({})):
+ logging.debug("Invalid event body in indication message: '%s'" % msg)
return
agent = None
@@ -1439,13 +1737,13 @@
"""
Check all async mailboxes for outstanding requests that have expired.
"""
- now = datetime.datetime.utcnow()
- if self._next_mbox_expire and now < self._next_mbox_expire:
- return
- expired_mboxes = []
- self._next_mbox_expire = None
self._lock.acquire()
try:
+ now = datetime.datetime.utcnow()
+ if self._next_mbox_expire and now < self._next_mbox_expire:
+ return
+ expired_mboxes = []
+ self._next_mbox_expire = None
for mbox in self._async_mboxes.itervalues():
if now >= mbox.expiration_date:
expired_mboxes.append(mbox)
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/__init__.py Wed Feb 24 19:40:31 2010
@@ -27,3 +27,4 @@
import multi_response
import async_query
import async_method
+import subscriptions
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py Wed Feb 24 19:40:31 2010
@@ -52,8 +52,8 @@
self.broker_url = broker_url
self.notifier = _testNotifier()
self.agent = qmf2.agent.Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ _notifier=self.notifier,
+ heartbeat_interval=heartbeat)
# No database needed for this test
self.running = False
self.ready = Event()
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_method.py Wed Feb 24 19:40:31 2010
@@ -53,7 +53,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/async_query.py Wed Feb 24 19:40:31 2010
@@ -53,7 +53,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py Wed Feb 24 19:40:31 2010
@@ -53,7 +53,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py Wed Feb 24 19:40:31 2010
@@ -53,7 +53,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/events.py Wed Feb 24 19:40:31 2010
@@ -58,7 +58,7 @@
self.notifier = _testNotifier()
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py Wed Feb 24 19:40:31 2010
@@ -60,8 +60,8 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat,
- _max_msg_size=_MAX_OBJS_PER_MSG)
+ heartbeat_interval=heartbeat,
+ max_msg_size=_MAX_OBJS_PER_MSG)
# Dynamically construct a management database
for i in range(self.schema_count):
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py?rev=915946&r1=915945&r2=915946&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py Wed Feb 24 19:40:31 2010
@@ -54,7 +54,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Management Database
# - two different schema packages,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org