You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/02/27 01:38:16 UTC
svn commit: r916887 [2/4] - in /qpid/branches/qmf-devel0.7: ./
qpid/cpp/include/qmf/engine/ qpid/cpp/include/qpid/
qpid/cpp/include/qpid/sys/posix/ qpid/cpp/include/qpid/sys/windows/
qpid/cpp/rubygen/framing.0-10/ qpid/cpp/src/ qpid/cpp/src/qmf/engine/...
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/agent.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/agent.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/agent.py Sat Feb 27 00:38:13 2010
@@ -21,18 +21,19 @@
import datetime
import time
import Queue
-from threading import Thread, Lock, currentThread, Event
+from threading import Thread, RLock, currentThread, Event
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
-from common import (make_subject, parse_subject, OpCode, QmfQuery,
- SchemaObjectClass, MsgKey, QmfData, QmfAddress,
- SchemaClass, SchemaClassId, WorkItem, SchemaMethod,
- timedelta_to_secs)
+from common import (OpCode, QmfQuery, ContentType, SchemaObjectClass,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId, WorkItem,
+ SchemaMethod, timedelta_to_secs, QMF_APP_ID)
# global flag that indicates which thread (if any) is
# running the agent notifier callback
_callback_thread=None
+
+
##==============================================================================
## METHOD CALL
##==============================================================================
@@ -78,14 +79,73 @@
return self._user_id
+ ##==============================================================================
+ ## SUBSCRIPTIONS
+ ##==============================================================================
+
+class _ConsoleHandle(object):
+ """
+ """
+ def __init__(self, handle, reply_to):
+ self.console_handle = handle
+ self.reply_to = reply_to
+
+class SubscriptionParams(object):
+ """
+ """
+ def __init__(self, console_handle, query, interval, duration, user_id):
+ self._console_handle = console_handle
+ self._query = query
+ self._interval = interval
+ self._duration = duration
+ self._user_id = user_id
+
+ def get_console_handle(self):
+ return self._console_handle
+
+ def get_query(self):
+ return self._query
+
+ def get_interval(self):
+ return self._interval
+
+ def get_duration(self):
+ return self._duration
+
+ def get_user_id(self):
+ return self._user_id
+
+class _SubscriptionState(object):
+ """
+ An internally-managed subscription.
+ """
+ def __init__(self, reply_to, cid, query, interval, duration):
+ self.reply_to = reply_to
+ self.correlation_id = cid
+ self.query = query
+ self.interval = interval
+ self.duration = duration
+ now = datetime.datetime.utcnow()
+ self.next_update = now # do an immediate update
+ self.expiration = now + datetime.timedelta(seconds=duration)
+ self.id = 0
+
+ def resubscribe(self, now, _duration=None):
+ if _duration is not None:
+ self.duration = _duration
+ self.expiration = now + datetime.timedelta(seconds=self.duration)
+
+ def reset_interval(self, now):
+ self.next_update = now + datetime.timedelta(seconds=self.interval)
+
+
##==============================================================================
## AGENT
##==============================================================================
class Agent(Thread):
- def __init__(self, name, _domain=None, _notifier=None, _heartbeat_interval=30,
- _max_msg_size=0, _capacity=10):
+ def __init__(self, name, _domain=None, _notifier=None, **options):
Thread.__init__(self)
self._running = False
self._ready = Event()
@@ -94,11 +154,20 @@
self._domain = _domain
self._address = QmfAddress.direct(self.name, self._domain)
self._notifier = _notifier
- self._heartbeat_interval = _heartbeat_interval
+
+ # configurable parameters
+ #
+ self._heartbeat_interval = options.get("heartbeat_interval", 30)
+ self._capacity = options.get("capacity", 10)
+ self._default_duration = options.get("default_duration", 300)
+ self._max_duration = options.get("max_duration", 3600)
+ self._min_duration = options.get("min_duration", 10)
+ self._default_interval = options.get("default_interval", 30)
+ self._min_interval = options.get("min_interval", 5)
+
# @todo: currently, max # of objects in a single reply message, would
# be better if it were max bytesize of per-msg content...
- self._max_msg_size = _max_msg_size
- self._capacity = _capacity
+ self._max_msg_size = options.get("max_msg_size", 0)
self._conn = None
self._session = None
@@ -107,7 +176,7 @@
self._direct_sender = None
self._topic_sender = None
- self._lock = Lock()
+ self._lock = RLock()
self._packages = {}
self._schema_timestamp = long(0)
self._schema = {}
@@ -119,6 +188,10 @@
self._undescribed_data = {}
self._work_q = Queue.Queue()
self._work_q_put = False
+ # subscriptions
+ self._subscription_id = long(time.time())
+ self._subscriptions = {}
+ self._next_subscribe_event = None
def destroy(self, timeout=None):
@@ -192,10 +265,11 @@
if self.isAlive():
# kick my thread to wake it up
try:
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
+ msg = Message(id=QMF_APP_ID,
subject=self.name,
- content={"noop":"noop"})
+ properties={ "method":"request",
+ "qmf.opcode":OpCode.noop},
+ content={})
# TRACE
#logging.error("!!! sending wakeup to myself: %s" % msg)
@@ -258,13 +332,14 @@
raise Exception("No connection available")
# @todo: should we validate against the schema?
- _map = {"_name": self.get_name(),
- "_event": qmfEvent.map_encode()}
- msg = Message(subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
+ msg = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_EVENT + "." +
qmfEvent.get_severity() + "." + self.name,
- properties={"method":"response",
- "qmf.subject":make_subject(OpCode.event_ind)},
- content={MsgKey.event:_map})
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content": ContentType.event,
+ "qmf.agent":self.name},
+ content=[qmfEvent.map_encode()])
# TRACE
# logging.error("!!! Agent %s sending Event (%s)" %
# (self.name, str(msg)))
@@ -330,9 +405,10 @@
raise TypeError("Invalid type for error - must be QmfData")
_map[SchemaMethod.KEY_ERROR] = _error.map_encode()
- msg = Message( properties={"method":"response",
- "qmf.subject":make_subject(OpCode.response)},
- content={MsgKey.method:_map})
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.method_rsp},
+ content=_map)
msg.correlation_id = handle.correlation_id
self._send_reply(msg, handle.reply_to)
@@ -370,25 +446,10 @@
while self._running:
- now = datetime.datetime.utcnow()
- # print("now=%s next_heartbeat=%s" % (now, next_heartbeat))
- if now >= next_heartbeat:
- ind = self._makeAgentIndMsg()
- ind.subject = QmfAddress.SUBJECT_AGENT_HEARTBEAT
- # TRACE
- #logging.error("!!! Agent %s sending Heartbeat (%s)" %
- # (self.name, str(ind)))
- self._topic_sender.send(ind)
- logging.debug("Agent Indication Sent")
- next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
-
- timeout = timedelta_to_secs(next_heartbeat - now)
- # print("now=%s next_heartbeat=%s timeout=%s" % (now, next_heartbeat, timeout))
- try:
- self._session.next_receiver(timeout=timeout)
- except Empty:
- continue
-
+ #
+ # Process inbound messages
+ #
+ logging.debug("%s processing inbound messages..." % self.name)
for i in range(batch_limit):
try:
msg = self._topic_receiver.fetch(timeout=0)
@@ -409,7 +470,71 @@
# (self.name, self._direct_receiver.source, msg))
self._dispatch(msg, _direct=True)
+ #
+ # Send Heartbeat Notification
+ #
+ now = datetime.datetime.utcnow()
+ if now >= next_heartbeat:
+ logging.debug("%s sending heartbeat..." % self.name)
+ ind = Message(id=QMF_APP_ID,
+ subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT,
+ properties={"method":"indication",
+ "qmf.opcode":OpCode.agent_heartbeat_ind,
+ "qmf.agent":self.name},
+ content=self._makeAgentInfoBody())
+ # TRACE
+ #logging.error("!!! Agent %s sending Heartbeat (%s)" %
+ # (self.name, str(ind)))
+ self._topic_sender.send(ind)
+ logging.debug("Agent Indication Sent")
+ next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
+
+
+
+ #
+ # Monitor Subscriptions
+ #
+ if (self._next_subscribe_event is None or
+ now >= self._next_subscribe_event):
+
+ logging.debug("%s polling subscriptions..." % self.name)
+ self._next_subscribe_event = now + datetime.timedelta(seconds=
+ self._max_duration)
+ self._lock.acquire()
+ try:
+ dead_ss = []
+ for sid,ss in self._subscriptions.iteritems():
+ if now >= ss.expiration:
+ dead_ss.append(sid)
+ continue
+ if now >= ss.next_update:
+ response = []
+ objs = self._queryData(ss.query)
+ if objs:
+ for obj in objs:
+ response.append(obj.map_encode())
+ logging.debug("!!! %s publishing %s!!!" % (self.name, ss.correlation_id))
+ self._send_query_response( ContentType.data,
+ ss.correlation_id,
+ ss.reply_to,
+ response)
+ ss.reset_interval(now)
+
+ next_timeout = min(ss.expiration, ss.next_update)
+ if next_timeout < self._next_subscribe_event:
+ self._next_subscribe_event = next_timeout
+
+ for sid in dead_ss:
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+ #
+ # notify application of pending WorkItems
+ #
+
if self._work_q_put and self._notifier:
+ logging.debug("%s notifying application..." % self.name)
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
@@ -417,19 +542,33 @@
self._notifier.indication()
_callback_thread = None
+ #
+ # Sleep until messages arrive or something times out
+ #
+ next_timeout = min(next_heartbeat, self._next_subscribe_event)
+ timeout = timedelta_to_secs(next_timeout -
+ datetime.datetime.utcnow())
+ if timeout > 0.0:
+ logging.debug("%s sleeping %s seconds..." % (self.name,
+ timeout))
+ try:
+ self._session.next_receiver(timeout=timeout)
+ except Empty:
+ pass
+
+
+
+
#
# Private:
#
- def _makeAgentIndMsg(self):
+ def _makeAgentInfoBody(self):
"""
- Create an agent indication message identifying this agent
+ Create an agent indication message body identifying this agent
"""
- _map = {"_name": self.get_name(),
+ return {"_name": self.get_name(),
"_schema_timestamp": self._schema_timestamp}
- return Message(properties={"method":"response",
- "qmf.subject":make_subject(OpCode.agent_ind)},
- content={MsgKey.agent_info: _map})
def _send_reply(self, msg, reply_to):
"""
@@ -458,7 +597,7 @@
except SendError, e:
logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
- def _send_query_response(self, subject, msgkey, cid, reply_to, objects):
+ def _send_query_response(self, content_type, cid, reply_to, objects):
"""
Send a response to a query, breaking the result into multiple
messages based on the agent's _max_msg_size config parameter
@@ -472,24 +611,28 @@
start = 0
end = min(total, max_count)
- while end <= total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
+ # send partial response if too many objects present
+ while end < total:
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "partial":None,
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
correlation_id = cid,
- content={msgkey:objects[start:end]})
+ content=objects[start:end])
self._send_reply(m, reply_to)
- if end == total:
- break;
start = end
end = min(total, end + max_count)
- # response terminator - last message has empty object array
- if total:
- m = Message(properties={"qmf.subject":subject,
- "method":"response"},
- correlation_id = cid,
- content={msgkey: []} )
- self._send_reply(m, reply_to)
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.data_ind,
+ "qmf.content":content_type,
+ "qmf.agent":self.name},
+ correlation_id = cid,
+ content=objects[start:end])
+ self._send_reply(m, reply_to)
def _dispatch(self, msg, _direct=False):
"""
@@ -497,33 +640,32 @@
@param _direct: True if msg directly addressed to this agent.
"""
- logging.debug( "Message received from Console! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- except:
- logging.warning("Ignoring unrecognized message '%s'" % msg.subject)
- return
+ # logging.debug( "Message received from Console! [%s]" % msg )
+ # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) )
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
+ logging.warning("Ignoring unrecognized message '%s'" % msg)
+ return
+ version = 2 # @todo: fix me
cmap = {}; props={}
if msg.content_type == "amqp/map":
cmap = msg.content
if msg.properties:
props = msg.properties
- if opcode == OpCode.agent_locate:
+ if opcode == OpCode.agent_locate_req:
self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
- elif opcode == OpCode.get_query:
+ elif opcode == OpCode.query_req:
self._handleQueryMsg( msg, cmap, props, version, _direct )
elif opcode == OpCode.method_req:
self._handleMethodReqMsg(msg, cmap, props, version, _direct)
- elif opcode == OpCode.cancel_subscription:
- logging.warning("!!! CANCEL_SUB TBD !!!")
- elif opcode == OpCode.create_subscription:
- logging.warning("!!! CREATE_SUB TBD !!!")
- elif opcode == OpCode.renew_subscription:
- logging.warning("!!! RENEW_SUB TBD !!!")
- elif opcode == OpCode.schema_query:
- logging.warning("!!! SCHEMA_QUERY TBD !!!")
+ elif opcode == OpCode.subscribe_req:
+ self._handleSubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_refresh_req:
+ self._handleResubscribeReqMsg(msg, cmap, props, version, _direct)
+ elif opcode == OpCode.subscribe_cancel_ind:
+ self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.noop:
logging.debug("No-op msg received.")
else:
@@ -536,18 +678,28 @@
"""
logging.debug("_handleAgentLocateMsg")
- reply = True
- if "method" in props and props["method"] == "request":
- query = cmap.get(MsgKey.query)
- if query is not None:
- # fake a QmfData containing my identifier for the query compare
- tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
- self.get_name()},
- _object_id="my-name")
- reply = QmfQuery.from_map(query).evaluate(tmpData)
+ reply = False
+ if props.get("method") == "request":
+ # if the message is addressed to me or wildcard, process it
+ if (msg.subject == "console.ind" or
+ msg.subject == "console.ind.locate" or
+ msg.subject == "console.ind.locate." + self.name):
+ pred = msg.content
+ if not pred:
+ reply = True
+ elif isinstance(pred, type([])):
+ # fake a QmfData containing my identifier for the query compare
+ query = QmfQuery.create_predicate(QmfQuery.TARGET_AGENT, pred)
+ tmpData = QmfData.create({QmfQuery.KEY_AGENT_NAME:
+ self.get_name()},
+ _object_id="my-name")
+ reply = query.evaluate(tmpData)
if reply:
- m = self._makeAgentIndMsg()
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.agent_locate_rsp},
+ content=self._makeAgentInfoBody())
m.correlation_id = msg.correlation_id
self._send_reply(m, msg.reply_to)
else:
@@ -561,22 +713,25 @@
logging.debug("_handleQueryMsg")
if "method" in props and props["method"] == "request":
- qmap = cmap.get(MsgKey.query)
- if qmap:
- query = QmfQuery.from_map(qmap)
+ if cmap:
+ try:
+ query = QmfQuery.from_map(cmap)
+ except TypeError:
+ logging.error("Invalid Query format: '%s'" % str(cmap))
+ return
target = query.get_target()
if target == QmfQuery.TARGET_PACKAGES:
- self._queryPackages( msg, query )
+ self._queryPackagesReply( msg, query )
elif target == QmfQuery.TARGET_SCHEMA_ID:
- self._querySchema( msg, query, _idOnly=True )
+ self._querySchemaReply( msg, query, _idOnly=True )
elif target == QmfQuery.TARGET_SCHEMA:
- self._querySchema( msg, query)
+ self._querySchemaReply( msg, query)
elif target == QmfQuery.TARGET_AGENT:
logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
elif target == QmfQuery.TARGET_OBJECT_ID:
- self._queryData(msg, query, _idOnly=True)
+ self._queryDataReply(msg, query, _idOnly=True)
elif target == QmfQuery.TARGET_OBJECT:
- self._queryData(msg, query)
+ self._queryDataReply(msg, query)
else:
logging.warning("Unrecognized query target: '%s'" % str(target))
@@ -634,7 +789,173 @@
self._work_q.put(WorkItem(WorkItem.METHOD_CALL, handle, param))
self._work_q_put = True
- def _queryPackages(self, msg, query):
+ def _handleSubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Subscription Request
+ """
+ if "method" in props and props["method"] == "request":
+ query_map = cmap.get("_query")
+ interval = cmap.get("_interval")
+ duration = cmap.get("_duration")
+
+ try:
+ query = QmfQuery.from_map(query_map)
+ except TypeError:
+ logging.warning("Invalid query for subscription: %s" %
+ str(query_map))
+ return
+
+ if isinstance(self, AgentExternal):
+ # param = SubscriptionParams(_ConsoleHandle(console_handle,
+ # msg.reply_to),
+ # query,
+ # interval,
+ # duration,
+ # msg.user_id)
+ # self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST,
+ # msg.correlation_id, param))
+ # self._work_q_put = True
+ logging.error("External Subscription TBD")
+ return
+
+ # validate the query - only specific objects, or
+ # objects wildcard, are currently supported.
+ if (query.get_target() != QmfQuery.TARGET_OBJECT or
+ (query.get_selector() == QmfQuery.PREDICATE and
+ query.get_predicate())):
+ logging.error("Subscriptions only support (wildcard) Object"
+ " Queries.")
+ err = QmfData.create(
+ {"reason": "Unsupported Query type for subscription.",
+ "query": str(query.map_encode())})
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content={"_error": err.map_encode()})
+ self._send_reply(m, msg.reply_to)
+ return
+
+ if duration is None:
+ duration = self._default_duration
+ else:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ logging.warning("Bad duration value: %s" % str(msg))
+ duration = self._default_duration
+
+ if interval is None:
+ interval = self._default_interval
+ else:
+ try:
+ interval = float(interval)
+ if interval < self._min_interval:
+ interval = self._min_interval
+ except:
+ logging.warning("Bad interval value: %s" % str(msg))
+ interval = self._default_interval
+
+ ss = _SubscriptionState(msg.reply_to,
+ msg.correlation_id,
+ query,
+ interval,
+ duration)
+ self._lock.acquire()
+ try:
+ sid = self._subscription_id
+ self._subscription_id += 1
+ ss.id = sid
+ self._subscriptions[sid] = ss
+ self._next_subscribe_event = None
+ finally:
+ self._lock.release()
+
+ sr_map = {"_subscription_id": sid,
+ "_interval": interval,
+ "_duration": duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
+
+ def _handleResubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Renew Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ logging.error("Invalid subscription refresh msg: %s" %
+ str(msg))
+ return
+
+ self._lock.acquire()
+ try:
+ ss = self._subscriptions.get(sid)
+ if not ss:
+ logging.error("Ignoring unknown subscription: %s" %
+ str(sid))
+ return
+ duration = cmap.get("_duration")
+ if duration is not None:
+ try:
+ duration = float(duration)
+ if duration > self._max_duration:
+ duration = self._max_duration
+ elif duration < self._min_duration:
+ duration = self._min_duration
+ except:
+ logging.error("Bad duration value: %s" % str(msg))
+ duration = None # use existing duration
+
+ ss.resubscribe(datetime.datetime.utcnow(), duration)
+
+ new_duration = ss.duration
+ new_interval = ss.interval
+
+ finally:
+ self._lock.release()
+
+
+ sr_map = {"_subscription_id": sid,
+ "_interval": new_interval,
+ "_duration": new_duration}
+ m = Message(id=QMF_APP_ID,
+ properties={"method":"response",
+ "qmf.opcode":OpCode.subscribe_refresh_rsp},
+ correlation_id = msg.correlation_id,
+ content=sr_map)
+ self._send_reply(m, msg.reply_to)
+
+
+ def _handleUnsubscribeReqMsg(self, msg, cmap, props, version, _direct):
+ """
+ Process received Cancel Subscription Request
+ """
+ if props.get("method") == "request":
+ sid = cmap.get("_subscription_id")
+ if not sid:
+ logging.warning("No subscription id supplied: %s" % msg)
+ return
+
+ self._lock.acquire()
+ try:
+ if sid in self._subscriptions:
+ del self._subscriptions[sid]
+ finally:
+ self._lock.release()
+
+
+ def _queryPackagesReply(self, msg, query):
"""
Run a query against the list of known packages
"""
@@ -646,58 +967,83 @@
_object_id="_package")
if query.evaluate(qmfData):
pnames.append(name)
+
+ self._send_query_response(ContentType.schema_package,
+ msg.correlation_id,
+ msg.reply_to,
+ pnames)
finally:
self._lock.release()
- self._send_query_response(make_subject(OpCode.data_ind),
- MsgKey.package_info,
- msg.correlation_id,
- msg.reply_to,
- pnames)
- def _querySchema( self, msg, query, _idOnly=False ):
+ def _querySchemaReply( self, msg, query, _idOnly=False ):
"""
"""
schemas = []
- # if querying for a specific schema, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- found = None
- self._lock.acquire()
- try:
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific schema, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
found = self._schema.get(query.get_id())
- finally:
- self._lock.release()
- if found:
- if _idOnly:
- schemas.append(query.get_id().map_encode())
- else:
- schemas.append(found.map_encode())
- else: # otherwise, evaluate all schema
- self._lock.acquire()
- try:
+ if found:
+ if _idOnly:
+ schemas.append(query.get_id().map_encode())
+ else:
+ schemas.append(found.map_encode())
+ else: # otherwise, evaluate all schema
for sid,val in self._schema.iteritems():
if query.evaluate(val):
if _idOnly:
schemas.append(sid.map_encode())
else:
schemas.append(val.map_encode())
- finally:
- self._lock.release()
+ if _idOnly:
+ msgkey = ContentType.schema_id
+ else:
+ msgkey = ContentType.schema_class
- if _idOnly:
- msgkey = MsgKey.schema_id
- else:
- msgkey = MsgKey.schema
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ schemas)
+ finally:
+ self._lock.release()
+
+
+ def _queryDataReply( self, msg, query, _idOnly=False ):
+ """
+ """
+ # hold the (recursive) lock for the duration so the Agent
+ # won't send data that is currently being modified by the
+ # app.
+ self._lock.acquire()
+ try:
+ response = []
+ data_objs = self._queryData(query)
+ if _idOnly:
+ for obj in data_objs:
+ response.append(obj.get_object_id())
+ else:
+ for obj in data_objs:
+ response.append(obj.map_encode())
+
+ if _idOnly:
+ msgkey = ContentType.object_id
+ else:
+ msgkey = ContentType.data
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- schemas)
+ self._send_query_response(msgkey,
+ msg.correlation_id,
+ msg.reply_to,
+ response)
+ finally:
+ self._lock.release()
- def _queryData( self, msg, query, _idOnly=False ):
+ def _queryData(self, query):
"""
+ Return a list of QmfData objects that match a given query
"""
data_objs = []
# extract optional schema_id from target params
@@ -705,12 +1051,12 @@
t_params = query.get_target_param()
if t_params:
sid = t_params.get(QmfData.KEY_SCHEMA_ID)
- # if querying for a specific object, do a direct lookup
- if query.get_selector() == QmfQuery.ID:
- oid = query.get_id()
- found = None
- self._lock.acquire()
- try:
+
+ self._lock.acquire()
+ try:
+ # if querying for a specific object, do a direct lookup
+ if query.get_selector() == QmfQuery.ID:
+ oid = query.get_id()
if sid and not sid.get_hash_string():
# wildcard schema_id match, check each schema
for name,db in self._described_data.iteritems():
@@ -718,11 +1064,9 @@
and name.get_package_name() == sid.get_package_name()):
found = db.get(oid)
if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
+ data_objs.append(found)
else:
+ found = None
if sid:
db = self._described_data.get(sid)
if db:
@@ -730,15 +1074,9 @@
else:
found = self._undescribed_data.get(oid)
if found:
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(found.map_encode())
- finally:
- self._lock.release()
- else: # otherwise, evaluate all data
- self._lock.acquire()
- try:
+ data_objs.append(found)
+
+ else: # otherwise, evaluate all data
if sid and not sid.get_hash_string():
# wildcard schema_id match, check each schema
for name,db in self._described_data.iteritems():
@@ -746,10 +1084,7 @@
and name.get_package_name() == sid.get_package_name()):
for oid,data in db.iteritems():
if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
+ data_objs.append(data)
else:
if sid:
db = self._described_data.get(sid)
@@ -759,23 +1094,28 @@
if db:
for oid,data in db.iteritems():
if query.evaluate(data):
- if _idOnly:
- data_objs.append(oid)
- else:
- data_objs.append(data.map_encode())
- finally:
- self._lock.release()
+ data_objs.append(data)
+ finally:
+ self._lock.release()
- if _idOnly:
- msgkey = MsgKey.object_id
- else:
- msgkey = MsgKey.data_obj
+ return data_objs
- self._send_query_response(make_subject(OpCode.data_ind),
- msgkey,
- msg.correlation_id,
- msg.reply_to,
- data_objs)
+
+
+ ##==============================================================================
+ ## EXTERNAL DATABASE AGENT
+ ##==============================================================================
+
+class AgentExternal(Agent):
+ """
+ An Agent which uses an external management database.
+ """
+ def __init__(self, name, _domain=None, _notifier=None,
+ _heartbeat_interval=30, _max_msg_size=0, _capacity=10):
+ super(AgentExternal, self).__init__(name, _domain, _notifier,
+ _heartbeat_interval,
+ _max_msg_size, _capacity)
+ logging.error("AgentExternal TBD")
@@ -823,9 +1163,11 @@
_schema_id=schema_id, _const=False)
self._agent = agent
self._validated = False
+ self._modified = True
def destroy(self):
self._dtime = long(time.time() * 1000)
+ self._touch()
# @todo: publish change
def is_deleted(self):
@@ -833,6 +1175,7 @@
def set_value(self, _name, _value, _subType=None):
super(QmfAgentData, self).set_value(_name, _value, _subType)
+ self._touch()
# @todo: publish change
def inc_value(self, name, delta=1):
@@ -849,6 +1192,7 @@
""" subtract the delta from the property """
# @todo: need to take write-lock
logging.error(" TBD!!!")
+ self._touch()
def validate(self):
"""
@@ -868,6 +1212,13 @@
raise Exception("Required property '%s' not present." % name)
self._validated = True
+ def _touch(self):
+ """
+ Mark this object as modified. Used to force a publish of this object
+ if on subscription.
+ """
+ self._modified = True
+
################################################################################
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/common.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/common.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/common.py Sat Feb 27 00:38:13 2010
@@ -34,61 +34,44 @@
## Constants
##
-AMQP_QMF_SUBJECT = "qmf"
-AMQP_QMF_VERSION = 4
-AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
-
-class MsgKey(object):
- agent_info = "agent_info"
- query = "query"
- package_info = "package_info"
- schema_id = "schema_id"
- schema = "schema"
- object_id="object_id"
- data_obj="object"
- method="method"
- event="event"
+QMF_APP_ID="qmf2"
-class OpCode(object):
- noop = "noop"
-
- # codes sent by a console and processed by the agent
- agent_locate = "agent-locate"
- cancel_subscription = "cancel-subscription"
- create_subscription = "create-subscription"
- get_query = "get-query"
- method_req = "method"
- renew_subscription = "renew-subscription"
- schema_query = "schema-query" # @todo: deprecate
-
- # codes sent by the agent to a console
- agent_ind = "agent"
- data_ind = "data"
- event_ind = "event"
- managed_object = "managed-object"
- object_ind = "object"
- response = "response"
- schema_ind="schema" # @todo: deprecate
+class ContentType(object):
+ """ Values for the 'qmf.content' message header
+ """
+ schema_package = "_schema_package"
+ schema_id = "_schema_id"
+ schema_class = "_schema_class"
+ object_id = "_object_id"
+ data = "_data"
+ event = "_event"
+class OpCode(object):
+ """ Values for the 'qmf.opcode' message header.
+ """
+ noop = "_noop"
+ # codes sent by a console and processed by the agent
+ agent_locate_req = "_agent_locate_request"
+ subscribe_req = "_subscribe_request"
+ subscribe_cancel_ind = "_subscribe_cancel_indication"
+ subscribe_refresh_req = "_subscribe_refresh_indication"
+ query_req = "_query_request"
+ method_req = "_method_request"
-def make_subject(_code):
- """
- Create a message subject field value.
- """
- return AMQP_QMF_SUBJECT_FMT % (AMQP_QMF_SUBJECT, AMQP_QMF_VERSION, _code)
+ # codes sent by the agent to a console
+ agent_locate_rsp = "_agent_locate_response"
+ agent_heartbeat_ind = "_agent_heartbeat_indication"
+ query_rsp = "_query_response"
+ subscribe_rsp = "_subscribe_response"
+ subscribe_refresh_rsp = "_subscribe_refresh_response"
+ data_ind = "_data_indication"
+ method_rsp = "_method_response"
-def parse_subject(_sub):
- """
- Deconstruct a subject field, return version,opcode values
- """
- if _sub[:3] != "qmf":
- raise Exception("Non-QMF message received")
- return _sub[3:].split('.', 1)
def timedelta_to_secs(td):
"""
@@ -133,11 +116,15 @@
AGENT_HEARTBEAT=8
QUERY_COMPLETE=9
METHOD_RESPONSE=10
+ SUBSCRIBE_RESPONSE=11
+ SUBSCRIBE_INDICATION=12
+ RESUBSCRIBE_RESPONSE=13
# Enumeration of the types of WorkItems produced on the Agent
METHOD_CALL=1000
QUERY=1001
- SUBSCRIBE=1002
- UNSUBSCRIBE=1003
+ SUBSCRIBE_REQUEST=1002
+ RESUBSCRIBE_REQUEST=1003
+ UNSUBSCRIBE_REQUEST=1004
def __init__(self, kind, handle, _params=None):
"""
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/console.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/console.py Sat Feb 27 00:38:13 2010
@@ -24,14 +24,14 @@
import datetime
import Queue
from threading import Thread, Event
-from threading import Lock
+from threading import RLock
from threading import currentThread
from threading import Condition
from qpid.messaging import Connection, Message, Empty, SendError
-from common import (make_subject, parse_subject, OpCode, QmfQuery, Notifier,
- MsgKey, QmfData, QmfAddress, SchemaClass, SchemaClassId,
+from common import (QMF_APP_ID, OpCode, QmfQuery, Notifier, ContentType,
+ QmfData, QmfAddress, SchemaClass, SchemaClassId,
SchemaEventClass, SchemaObjectClass, WorkItem,
SchemaMethod, QmfEvent, timedelta_to_secs)
@@ -141,6 +141,7 @@
console._lock.acquire()
try:
console._async_mboxes[self.cid] = self
+ console._next_mbox_expire = None
finally:
console._lock.release()
@@ -150,6 +151,24 @@
console._wake_thread()
+ def reset_timeout(self, _timeout=None):
+ """ Reset the expiration date for this mailbox.
+ """
+ if _timeout is None:
+ _timeout = self.console._reply_timeout
+ self.console._lock.acquire()
+ try:
+ self.expiration_date = (datetime.datetime.utcnow() +
+ datetime.timedelta(seconds=_timeout))
+ self.console._next_mbox_expire = None
+ finally:
+ self.console._lock.release()
+
+ # wake the console mgmt thread so it will learn about the mbox
+ # expiration date (and adjust its idle sleep period correctly)
+
+ self.console._wake_thread()
+
def deliver(self, msg):
"""
"""
@@ -177,7 +196,7 @@
def __init__(self, console,
agent_name,
context,
- target, msgkey,
+ target,
_timeout=None):
"""
Invoked by application thread.
@@ -186,7 +205,6 @@
_timeout)
self.agent_name = agent_name
self.target = target
- self.msgkey = msgkey
self.context = context
self.result = []
@@ -195,11 +213,8 @@
Process query response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
- done = False
- objects = reply.content.get(self.msgkey)
- if not objects:
- done = True
- else:
+ objects = reply.content
+ if isinstance(objects, type([])):
# convert from map to native types if needed
if self.target == QmfQuery.TARGET_SCHEMA_ID:
for sid_map in objects:
@@ -237,8 +252,7 @@
# no conversion needed.
self.result += objects
- if done:
- # create workitem
+ if not "partial" in reply.properties:
# logging.error("QUERY COMPLETE for %s" % str(self.context))
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
@@ -278,8 +292,8 @@
Process schema response messages.
"""
done = False
- schemas = reply.content.get(MsgKey.schema)
- if schemas:
+ schemas = reply.content
+ if schemas and isinstance(schemas, type([])):
for schema_map in schemas:
# extract schema id, convert based on schema type
sid_map = schema_map.get(SchemaClass.KEY_SCHEMA_ID)
@@ -319,8 +333,8 @@
Invoked by Console Management thread only.
"""
- _map = reply.content.get(MsgKey.method)
- if not _map:
+ _map = reply.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
result = None
else:
@@ -355,6 +369,207 @@
+class _SubscriptionMailbox(_AsyncMailbox):
+ """
+ A Mailbox for a single subscription. Allows only sychronous "subscribe"
+ and "refresh" requests.
+ """
+ def __init__(self, console, context, agent, duration, interval):
+ """
+ Invoked by application thread.
+ """
+ super(_SubscriptionMailbox, self).__init__(console, duration)
+ self.cv = Condition()
+ self.data = []
+ self.result = []
+ self.context = context
+ self.duration = duration
+ self.interval = interval
+ self.agent_name = agent.get_name()
+ self.agent_subscription_id = None # from agent
+
+ def subscribe(self, query):
+ agent = self.console.get_agent(self.agent_name)
+ if not agent:
+ logging.warning("subscribed failed - unknown agent '%s'" %
+ self.agent_name)
+ return False
+ try:
+ logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
+ agent._send_subscribe_req(query, self.get_address(), self.interval,
+ self.duration)
+ except SendError, e:
+ logging.error(str(e))
+ return False
+ return True
+
+ def resubscribe(self, duration):
+ agent = self.console.get_agent(self.agent_name)
+ if not agent:
+ logging.warning("resubscribed failed - unknown agent '%s'" %
+ self.agent_name)
+ return False
+ try:
+ logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
+ agent._send_resubscribe_req(self.get_address(),
+ self.agent_subscription_id, duration)
+ except SendError, e:
+ logging.error(str(e))
+ return False
+ return True
+
+ def deliver(self, msg):
+ """
+ """
+ opcode = msg.properties.get("qmf.opcode")
+ if (opcode == OpCode.subscribe_rsp or
+ opcode == OpCode.subscribe_refresh_rsp):
+
+ error = msg.content.get("_error")
+ if error:
+ try:
+ e_map = QmfData.from_map(error)
+ except TypeError:
+ logging.warning("Invalid QmfData map received: '%s'"
+ % str(error))
+ e_map = QmfData.create({"error":"Unknown error"})
+ sp = SubscribeParams(None, None, None, e_map)
+ else:
+ self.agent_subscription_id = msg.content.get("_subscription_id")
+ self.duration = msg.content.get("_duration", self.duration)
+ self.interval = msg.content.get("_interval", self.interval)
+ self.reset_timeout(self.duration)
+ sp = SubscribeParams(self.get_address(),
+ self.interval,
+ self.duration,
+ None)
+ self.cv.acquire()
+ try:
+ self.data.append(sp)
+ # if was empty, notify waiters
+ if len(self.data) == 1:
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ return
+
+ # else: data indication
+ agent_name = msg.properties.get("qmf.agent")
+ if not agent_name:
+ logging.warning("Ignoring data_ind - no agent name given: %s" %
+ msg)
+ return
+ agent = self.console.get_agent(agent_name)
+ if not agent:
+ logging.warning("Ignoring data_ind - unknown agent '%s'" %
+ agent_name)
+ return
+
+ objects = msg.content
+ for obj_map in objects:
+ obj = QmfConsoleData(map_=obj_map, agent=agent)
+ # start fetch of schema if not known
+ sid = obj.get_schema_class_id()
+ if sid:
+ self.console._prefetch_schema(sid, agent)
+ self.result.append(obj)
+
+ if not "partial" in msg.properties:
+ wi = WorkItem(WorkItem.SUBSCRIBE_INDICATION, self.context, self.result)
+ self.result = []
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ def fetch(self, timeout=None):
+ """
+ Get one data item from a mailbox, with timeout.
+ Invoked by application thread.
+ """
+ self.cv.acquire()
+ try:
+ if len(self.data) == 0:
+ self.cv.wait(timeout)
+ if len(self.data):
+ return self.data.pop(0)
+ return None
+ finally:
+ self.cv.release()
+
+ def expire(self):
+ """ The subscription expired.
+ """
+ self.destroy()
+
+
+
+
+class _AsyncSubscriptionMailbox(_SubscriptionMailbox):
+ """
+ A Mailbox for a single subscription. Allows only asychronous "subscribe"
+ and "refresh" requests.
+ """
+ def __init__(self, console, context, agent, duration, interval):
+ """
+ Invoked by application thread.
+ """
+ super(_AsyncSubscriptionMailbox, self).__init__(console, context,
+ agent, duration,
+ interval)
+ self.subscribe_pending = False
+ self.resubscribe_pending = False
+
+
+ def subscribe(self, query, reply_timeout):
+ if super(_AsyncSubscriptionMailbox, self).subscribe(query):
+ self.subscribe_pending = True
+ self.reset_timeout(reply_timeout)
+ return True
+ return False
+
+ def resubscribe(self, duration, reply_timeout):
+ if super(_AsyncSubscriptionMailbox, self).resubscribe(duration):
+ self.resubscribe_pending = True
+ self.reset_timeout(reply_timeout)
+ return True
+ return False
+
+ def deliver(self, msg):
+ """
+ """
+ super(_AsyncSubscriptionMailbox, self).deliver(msg)
+ sp = self.fetch(0)
+ if sp:
+ # if the message was a reply to a subscribe or
+ # re-subscribe, then we get here.
+ if self.subscribe_pending:
+ wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+ self.context, sp)
+ else:
+ wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+ self.context, sp)
+
+ self.subscribe_pending = False
+ self.resubscribe_pending = False
+
+ self.console._work_q.put(wi)
+ self.console._work_q_put = True
+
+ if not sp.succeeded():
+ self.destroy()
+
+
+ def expire(self):
+ """ Either the subscription expired, or a request timedout.
+ """
+ if self.subscribe_pending:
+ wi = WorkItem(WorkItem.SUBSCRIBE_RESPONSE,
+ self.context, None)
+ elif self.resubscribe_pending:
+ wi = WorkItem(WorkItem.RESUBSCRIBE_RESPONSE,
+ self.context, None)
+ self.destroy()
+
+
##==============================================================================
## DATA MODEL
##==============================================================================
@@ -481,8 +696,8 @@
logging.debug("Agent method req wait timed-out.")
return None
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
+ _map = replyMsg.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
return None
@@ -650,8 +865,8 @@
logging.debug("Agent method req wait timed-out.")
return None
- _map = replyMsg.content.get(MsgKey.method)
- if not _map:
+ _map = replyMsg.content
+ if not _map or not isinstance(_map, type({})):
logging.error("Invalid method call reply message")
return None
@@ -676,20 +891,66 @@
def _send_query(self, query, correlation_id=None):
"""
"""
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.get_query)},
- content={MsgKey.query: query.map_encode()})
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.query_req},
+ content=query.map_encode())
self._send_msg( msg, correlation_id )
def _send_method_req(self, mr_map, correlation_id=None):
"""
"""
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.method_req)},
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.method_req},
content=mr_map)
self._send_msg( msg, correlation_id )
+ def _send_subscribe_req(self, query, correlation_id, _interval=None,
+ _lifetime=None):
+ """
+ """
+ sr_map = {"_query":query.map_encode()}
+ if _interval is not None:
+ sr_map["_interval"] = _interval
+ if _lifetime is not None:
+ sr_map["_duration"] = _lifetime
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_req},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
+
+ def _send_resubscribe_req(self, correlation_id,
+ subscription_id,
+ _lifetime=None):
+ """
+ """
+ sr_map = {"_subscription_id":subscription_id}
+ if _lifetime is not None:
+ sr_map["_duration"] = _lifetime
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_refresh_req},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
+
+ def _send_unsubscribe_ind(self, correlation_id, subscription_id):
+ """
+ """
+ sr_map = {"_subscription_id":subscription_id}
+
+ msg = Message(id=QMF_APP_ID,
+ properties={"method":"request",
+ "qmf.opcode":OpCode.subscribe_cancel_ind},
+ content=sr_map)
+ self._send_msg(msg, correlation_id)
+
##==============================================================================
## METHOD CALL
@@ -716,6 +977,36 @@
return arg
+
+ ##==============================================================================
+ ## SUBSCRIPTION
+ ##==============================================================================
+
+class SubscribeParams(object):
+ """ Represents a standing subscription for this console.
+ """
+ def __init__(self, sid, interval, duration, _error=None):
+ self._sid = sid
+ self._interval = interval
+ self._duration = duration
+ self._error = _error
+
+ def succeeded(self):
+ return self._error is None
+
+ def get_error(self):
+ return self._error
+
+ def get_subscription_id(self):
+ return self._sid
+
+ def get_publish_interval(self):
+ return self._interval
+
+ def get_duration(self):
+ return self._duration
+
+
##==============================================================================
## CONSOLE
##==============================================================================
@@ -753,7 +1044,7 @@
self._domain = _domain
self._address = QmfAddress.direct(self._name, self._domain)
self._notifier = notifier
- self._lock = Lock()
+ self._lock = RLock()
self._conn = None
self._session = None
# dict of "agent-direct-address":class Agent entries
@@ -766,6 +1057,7 @@
self._agent_discovery_filter = None
self._reply_timeout = reply_timeout
self._agent_timeout = agent_timeout
+ self._subscribe_timeout = 300 # @todo: parameterize
self._next_agent_expire = None
self._next_mbox_expire = None
# for passing WorkItems to the application
@@ -776,18 +1068,6 @@
self._post_office = {} # indexed by cid
self._async_mboxes = {} # indexed by cid, used to expire them
- ## Old stuff below???
- #self._broker_list = []
- #self.impl = qmfengine.Console()
- #self._event = qmfengine.ConsoleEvent()
- ##self._cv = Condition()
- ##self._sync_count = 0
- ##self._sync_result = None
- ##self._select = {}
- ##self._cb_cond = Condition()
-
-
-
def destroy(self, timeout=None):
"""
Must be called before the Console is deleted.
@@ -801,8 +1081,6 @@
self.remove_connection(self._conn, timeout)
logging.debug("Console Destroyed")
-
-
def add_connection(self, conn):
"""
Add a AMQP connection to the console. The console will setup a session over the
@@ -934,10 +1212,11 @@
cid = mbox.get_address()
query = QmfQuery.create_id(QmfQuery.TARGET_AGENT, name)
- msg = Message(subject="console.ind.locate." + name,
+ msg = Message(id=QMF_APP_ID,
+ subject="console.ind.locate." + name,
properties={"method":"request",
- "qmf.subject":make_subject(OpCode.agent_locate)},
- content={MsgKey.query: query.map_encode()})
+ "qmf.opcode":OpCode.agent_locate_req},
+ content=query._predicate)
msg.reply_to = str(self._address)
msg.correlation_id = str(cid)
logging.debug("Sending Agent Locate (%s)" % time.time())
@@ -995,23 +1274,13 @@
def do_query(self, agent, query, _reply_handle=None, _timeout=None ):
"""
"""
- query_keymap={QmfQuery.TARGET_PACKAGES: MsgKey.package_info,
- QmfQuery.TARGET_OBJECT_ID: MsgKey.object_id,
- QmfQuery.TARGET_SCHEMA_ID: MsgKey.schema_id,
- QmfQuery.TARGET_SCHEMA: MsgKey.schema,
- QmfQuery.TARGET_OBJECT: MsgKey.data_obj,
- QmfQuery.TARGET_AGENT: MsgKey.agent_info}
-
target = query.get_target()
- msgkey = query_keymap.get(target)
- if not msgkey:
- raise Exception("Invalid target for query: %s" % str(query))
if _reply_handle is not None:
mbox = _QueryMailbox(self,
agent.get_name(),
_reply_handle,
- target, msgkey,
+ target,
_timeout)
else:
mbox = _SyncMailbox(self)
@@ -1045,9 +1314,8 @@
logging.debug("Query wait timed-out.")
break
- objects = reply.content.get(msgkey)
- if not objects:
- # last response is empty
+ objects = reply.content
+ if not objects or not isinstance(objects, type([])):
break
# convert from map to native types if needed
@@ -1081,21 +1349,116 @@
# no conversion needed.
response += objects
+ if not "partial" in reply.properties:
+ # reply not broken up over multiple msgs
+ break
+
now = datetime.datetime.utcnow()
mbox.destroy()
return response
+
+ def create_subscription(self, agent, query, console_handle,
+ _interval=None, _duration=None,
+ _blocking=True, _timeout=None):
+ if not _duration:
+ _duration = self._subscribe_timeout
+
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ if not _blocking:
+ mbox = _AsyncSubscriptionMailbox(self, console_handle, agent,
+ _duration, _interval)
+ if not mbox.subscribe(query, _timeout):
+ mbox.destroy()
+ return False
+ return True
+ else:
+ mbox = _SubscriptionMailbox(self, console_handle, agent, _duration,
+ _interval)
+
+ if not mbox.subscribe(query):
+ mbox.destroy()
+ return None
+
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ # @todo: what if mbox expires here?
+ sp = mbox.fetch(_timeout)
+
+ if not sp:
+ logging.debug("Subscription request wait timed-out.")
+ mbox.destroy()
+ return None
+
+ if not sp.succeeded():
+ mbox.destroy()
+
+ return sp
+
+ def refresh_subscription(self, subscription_id,
+ _duration=None,
+ _timeout=None):
+ if _timeout is None:
+ _timeout = self._reply_timeout
+
+ mbox = self._get_mailbox(subscription_id)
+ if not mbox:
+ logging.warning("Subscription %s not found." % subscription_id)
+ return None
+
+ if isinstance(mbox, _AsyncSubscriptionMailbox):
+ return mbox.resubscribe(_duration, _timeout)
+ else:
+ # synchronous - wait for reply
+ if not mbox.resubscribe(_duration):
+ # @todo ???? mbox.destroy()
+ return None
+
+ # wait for reply
+
+ logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ sp = mbox.fetch(_timeout)
+
+ if not sp:
+ logging.debug("re-subscribe request wait timed-out.")
+ # @todo???? mbox.destroy()
+ return None
+
+ return sp
+
+
+ def cancel_subscription(self, subscription_id):
+ """
+ """
+ mbox = self._get_mailbox(subscription_id)
+ if not mbox:
+ return
+
+ agent = self.get_agent(mbox.agent_name)
+ if agent:
+ try:
+ logging.debug("Sending UnSubscribe to Agent (%s)" % time.time())
+ agent._send_unsubscribe_ind(subscription_id,
+ mbox.agent_subscription_id)
+ except SendError, e:
+ logging.error(str(e))
+
+ mbox.destroy()
+
+
def _wake_thread(self):
"""
Make the console management thread loop wakeup from its next_receiver
sleep.
"""
logging.debug("Sending noop to wake up [%s]" % self._address)
- msg = Message(properties={"method":"request",
- "qmf.subject":make_subject(OpCode.noop)},
+ msg = Message(id=QMF_APP_ID,
subject=self._name,
- content={"noop":"noop"})
+ properties={"method":"request",
+ "qmf.opcode":OpCode.noop},
+ content={})
try:
self._direct_sender.send( msg, sync=True )
except SendError, e:
@@ -1152,9 +1515,17 @@
# to expire, or a mailbox requrest to time out
now = datetime.datetime.utcnow()
next_expire = self._next_agent_expire
- if (self._next_mbox_expire and
- self._next_mbox_expire < next_expire):
- next_expire = self._next_mbox_expire
+
+ # the mailbox expire flag may be cleared by the
+ # app thread(s)
+ self._lock.acquire()
+ try:
+ if (self._next_mbox_expire and
+ self._next_mbox_expire < next_expire):
+ next_expire = self._next_mbox_expire
+ finally:
+ self._lock.release()
+
if next_expire > now:
timeout = timedelta_to_secs(next_expire - now)
try:
@@ -1268,13 +1639,14 @@
"""
PRIVATE: Process a message received from an Agent
"""
- logging.debug( "Message received from Agent! [%s]" % msg )
- try:
- version,opcode = parse_subject(msg.properties.get("qmf.subject"))
- # @todo: deal with version mismatch!!!
- except:
+ #logging.debug( "Message received from Agent! [%s]" % msg )
+ #logging.error( "Message received from Agent! [%s]" % msg )
+
+ opcode = msg.properties.get("qmf.opcode")
+ if not opcode:
logging.error("Ignoring unrecognized message '%s'" % msg)
return
+ version = 2 # @todo: fix me
cmap = {}; props = {}
if msg.content_type == "amqp/map":
@@ -1282,20 +1654,23 @@
if msg.properties:
props = msg.properties
- if opcode == OpCode.agent_ind:
+ if opcode == OpCode.agent_heartbeat_ind:
self._handle_agent_ind_msg( msg, cmap, version, _direct )
- elif opcode == OpCode.data_ind:
- self._handle_data_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.event_ind:
- self._handle_event_ind_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.managed_object:
- logging.warning("!!! managed_object TBD !!!")
- elif opcode == OpCode.object_ind:
- logging.warning("!!! object_ind TBD !!!")
- elif opcode == OpCode.response:
+ elif opcode == OpCode.agent_locate_rsp:
+ self._handle_agent_ind_msg( msg, cmap, version, _direct )
+ elif opcode == OpCode.query_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.subscribe_rsp:
self._handle_response_msg(msg, cmap, version, _direct)
- elif opcode == OpCode.schema_ind:
- logging.warning("!!! schema_ind TBD !!!")
+ elif opcode == OpCode.subscribe_refresh_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.method_rsp:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ elif opcode == OpCode.data_ind:
+ if msg.correlation_id:
+ self._handle_response_msg(msg, cmap, version, _direct)
+ else:
+ self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
logging.debug("No-op msg received.")
else:
@@ -1309,7 +1684,7 @@
"""
logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
- ai_map = cmap.get(MsgKey.agent_info)
+ ai_map = msg.content
if not ai_map or not isinstance(ai_map, type({})):
logging.warning("Bad agent-ind message received: '%s'" % msg)
return
@@ -1359,29 +1734,10 @@
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
- def _handle_data_ind_msg(self, msg, cmap, version, direct):
- """
- Process a received data-ind message.
- """
- logging.debug("_handle_data_ind_msg '%s' (%s)" % (msg, time.time()))
-
- mbox = self._get_mailbox(msg.correlation_id)
- if not mbox:
- logging.debug("Data indicate received with unknown correlation_id"
- " msg='%s'" % str(msg))
- return
-
- # wake up all waiters
- logging.debug("waking waiters for correlation id %s" %
- msg.correlation_id)
- mbox.deliver(msg)
-
-
def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
- # @todo code replication - clean me.
logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
mbox = self._get_mailbox(msg.correlation_id)
@@ -1394,19 +1750,22 @@
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
- def _handle_event_ind_msg(self, msg, cmap, version, _direct):
- ei_map = cmap.get(MsgKey.event)
- if not ei_map or not isinstance(ei_map, type({})):
- logging.warning("Bad event indication message received: '%s'" % msg)
- return
+ def _handle_indication_msg(self, msg, cmap, version, _direct):
- aname = ei_map.get("_name")
- emap = ei_map.get("_event")
+ aname = msg.properties.get("qmf.agent")
if not aname:
- logging.debug("No '_name' field in event indication message.")
+ logging.debug("No agent name field in indication message.")
return
- if not emap:
- logging.debug("No '_event' field in event indication message.")
+
+ content_type = msg.properties.get("qmf.content")
+ if (content_type != ContentType.event or
+ not isinstance(msg.content, type([]))):
+ logging.warning("Bad event indication message received: '%s'" % msg)
+ return
+
+ emap = msg.content[0]
+ if not isinstance(emap, type({})):
+ logging.debug("Invalid event body in indication message: '%s'" % msg)
return
agent = None
@@ -1439,13 +1798,13 @@
"""
Check all async mailboxes for outstanding requests that have expired.
"""
- now = datetime.datetime.utcnow()
- if self._next_mbox_expire and now < self._next_mbox_expire:
- return
- expired_mboxes = []
- self._next_mbox_expire = None
self._lock.acquire()
try:
+ now = datetime.datetime.utcnow()
+ if self._next_mbox_expire and now < self._next_mbox_expire:
+ return
+ expired_mboxes = []
+ self._next_mbox_expire = None
for mbox in self._async_mboxes.itervalues():
if now >= mbox.expiration_date:
expired_mboxes.append(mbox)
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/__init__.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/__init__.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/__init__.py Sat Feb 27 00:38:13 2010
@@ -27,3 +27,4 @@
import multi_response
import async_query
import async_method
+import subscriptions
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/agent_discovery.py Sat Feb 27 00:38:13 2010
@@ -52,8 +52,8 @@
self.broker_url = broker_url
self.notifier = _testNotifier()
self.agent = qmf2.agent.Agent(name,
- _notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ _notifier=self.notifier,
+ heartbeat_interval=heartbeat)
# No database needed for this test
self.running = False
self.ready = Event()
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/async_method.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/async_method.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/async_method.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/async_method.py Sat Feb 27 00:38:13 2010
@@ -53,7 +53,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/async_query.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/async_query.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/async_query.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/async_query.py Sat Feb 27 00:38:13 2010
@@ -53,7 +53,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/basic_method.py Sat Feb 27 00:38:13 2010
@@ -53,7 +53,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/basic_query.py Sat Feb 27 00:38:13 2010
@@ -53,7 +53,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/events.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/events.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/events.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/events.py Sat Feb 27 00:38:13 2010
@@ -58,7 +58,7 @@
self.notifier = _testNotifier()
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Dynamically construct a management database
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/multi_response.py Sat Feb 27 00:38:13 2010
@@ -60,8 +60,8 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat,
- _max_msg_size=_MAX_OBJS_PER_MSG)
+ heartbeat_interval=heartbeat,
+ max_msg_size=_MAX_OBJS_PER_MSG)
# Dynamically construct a management database
for i in range(self.schema_count):
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/obj_gets.py Sat Feb 27 00:38:13 2010
@@ -54,7 +54,7 @@
self.broker_url = broker_url
self.agent = Agent(name,
_notifier=self.notifier,
- _heartbeat_interval=heartbeat)
+ heartbeat_interval=heartbeat)
# Management Database
# - two different schema packages,
Propchange: qpid/branches/qmf-devel0.7/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,6 +1,6 @@
/qpid/branches/0.5.x-dev:886720-886722
-/qpid/branches/0.5.x-dev/qpid/java:886720-886722,887145,892761,894875
+/qpid/branches/0.5.x-dev/qpid/java:886720-886722,887145,892761,894875,916304,916325
/qpid/branches/java-broker-0-10/qpid/java:795950-829653
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/trunk/qpid:796646-796653
-/qpid/trunk/qpid/java:911618-912022
+/qpid/trunk/qpid/java:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599
-/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-912022
+/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/qpid/branches/0.5-release/qpid/java/broker/bin:757268
/qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-912022
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -3,4 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-912022
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -3,4 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-912022
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-916854
Modified: qpid/branches/qmf-devel0.7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties?rev=916887&r1=916886&r2=916887&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties (original)
+++ qpid/branches/qmf-devel0.7/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties Sat Feb 27 00:38:13 2010
@@ -32,6 +32,7 @@
# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
topic.ibmStocks = stocks.nyse.ibm
+topic.MyTopic = example.MyTopic
# Register an AMQP destination in JNDI
# NOTE: Qpid currently only supports direct,topics and headers
Propchange: qpid/branches/qmf-devel0.7/qpid/java/lib/org.osgi.core_1.0.0.jar
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:443187-720930
/qpid/branches/java-broker-0-10/qpid/java/lib/org.osgi.core_1.0.0.jar:795950-829653
/qpid/branches/java-network-refactor/qpid/java/lib/org.osgi.core_1.0.0.jar:805429-821809
-/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:911618-912022
+/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/client/src/main/java/org/apache/qpid/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:443187-703176
/qpid/branches/java-broker-0-10/qpid/java/management/client/src/main/java/org/apache/qpid/management:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/client/src/main/java/org/apache/qpid/management:805429-821809
-/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:911618-912022
+/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:911618-916854
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/client/src/test/java/org/apache/qpid/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Feb 27 00:38:13 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:443187-703176
/qpid/branches/java-broker-0-10/qpid/java/management/client/src/test/java/org/apache/qpid/management:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/client/src/test/java/org/apache/qpid/management:805429-821809
-/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:911618-912022
+/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:911618-916854
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org