You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/02 02:02:33 UTC
svn commit: r917828 [3/4] - in /qpid/branches/qmf-devel0.7: ./
qpid/cpp/include/qmf/engine/ qpid/cpp/src/qmf/engine/
qpid/cpp/src/qpid/cluster/ qpid/cpp/src/qpid/management/
qpid/cpp/src/tests/ qpid/doc/book/src/ qpid/dotnet/
qpid/extras/qmf/src/py/qmf...
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/common.py?rev=917828&r1=917827&r2=917828&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/common.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/common.py Tue Mar 2 01:02:30 2010
@@ -620,208 +620,9 @@
-
-
-#==============================================================================
-#==============================================================================
-#==============================================================================
-
-
-
-
-class Arguments(object):
- def __init__(self, map):
- pass
-# self.map = map
-# self._by_hash = {}
-# key_count = self.map.keyCount()
-# a = 0
-# while a < key_count:
-# self._by_hash[self.map.key(a)] = self.by_key(self.map.key(a))
-# a += 1
-
-
-# def __getitem__(self, key):
-# return self._by_hash[key]
-
-
-# def __setitem__(self, key, value):
-# self._by_hash[key] = value
-# self.set(key, value)
-
-
-# def __iter__(self):
-# return self._by_hash.__iter__
-
-
-# def __getattr__(self, name):
-# if name in self._by_hash:
-# return self._by_hash[name]
-# return super.__getattr__(self, name)
-
-
-# def __setattr__(self, name, value):
-# #
-# # ignore local data members
-# #
-# if (name[0] == '_' or
-# name == 'map'):
-# return super.__setattr__(self, name, value)
-
-# if name in self._by_hash:
-# self._by_hash[name] = value
-# return self.set(name, value)
-
-# return super.__setattr__(self, name, value)
-
-
-# def by_key(self, key):
-# val = self.map.byKey(key)
-# vType = val.getType()
-# if vType == TYPE_UINT8: return val.asUint()
-# elif vType == TYPE_UINT16: return val.asUint()
-# elif vType == TYPE_UINT32: return val.asUint()
-# elif vType == TYPE_UINT64: return val.asUint64()
-# elif vType == TYPE_SSTR: return val.asString()
-# elif vType == TYPE_LSTR: return val.asString()
-# elif vType == TYPE_ABSTIME: return val.asInt64()
-# elif vType == TYPE_DELTATIME: return val.asUint64()
-# elif vType == TYPE_REF: return ObjectId(val.asObjectId())
-# elif vType == TYPE_BOOL: return val.asBool()
-# elif vType == TYPE_FLOAT: return val.asFloat()
-# elif vType == TYPE_DOUBLE: return val.asDouble()
-# elif vType == TYPE_UUID: return val.asUuid()
-# elif vType == TYPE_INT8: return val.asInt()
-# elif vType == TYPE_INT16: return val.asInt()
-# elif vType == TYPE_INT32: return val.asInt()
-# elif vType == TYPE_INT64: return val.asInt64()
-# else:
-# # when TYPE_MAP
-# # when TYPE_OBJECT
-# # when TYPE_LIST
-# # when TYPE_ARRAY
-# logging.error( "Unsupported Type for Get? '%s'" % str(val.getType()))
-# return None
-
-
-# def set(self, key, value):
-# val = self.map.byKey(key)
-# vType = val.getType()
-# if vType == TYPE_UINT8: return val.setUint(value)
-# elif vType == TYPE_UINT16: return val.setUint(value)
-# elif vType == TYPE_UINT32: return val.setUint(value)
-# elif vType == TYPE_UINT64: return val.setUint64(value)
-# elif vType == TYPE_SSTR:
-# if value:
-# return val.setString(value)
-# else:
-# return val.setString('')
-# elif vType == TYPE_LSTR:
-# if value:
-# return val.setString(value)
-# else:
-# return val.setString('')
-# elif vType == TYPE_ABSTIME: return val.setInt64(value)
-# elif vType == TYPE_DELTATIME: return val.setUint64(value)
-# elif vType == TYPE_REF: return val.setObjectId(value.impl)
-# elif vType == TYPE_BOOL: return val.setBool(value)
-# elif vType == TYPE_FLOAT: return val.setFloat(value)
-# elif vType == TYPE_DOUBLE: return val.setDouble(value)
-# elif vType == TYPE_UUID: return val.setUuid(value)
-# elif vType == TYPE_INT8: return val.setInt(value)
-# elif vType == TYPE_INT16: return val.setInt(value)
-# elif vType == TYPE_INT32: return val.setInt(value)
-# elif vType == TYPE_INT64: return val.setInt64(value)
-# else:
-# # when TYPE_MAP
-# # when TYPE_OBJECT
-# # when TYPE_LIST
-# # when TYPE_ARRAY
-# logging.error("Unsupported Type for Set? '%s'" % str(val.getType()))
-# return None
-
-
-
-#class MethodResponse(object):
-# def __init__(self, impl):
-# pass
-# self.impl = qmfengine.MethodResponse(impl)
-
-
-# def status(self):
-# return self.impl.getStatus()
-
-
-# def exception(self):
-# return self.impl.getException()
-
-
-# def text(self):
-# return exception().asString()
-
-
-# def args(self):
-# return Arguments(self.impl.getArgs())
-
-
-# def __getattr__(self, name):
-# myArgs = self.args()
-# return myArgs.__getattr__(name)
-
-
-# def __setattr__(self, name, value):
-# if name == 'impl':
-# return super.__setattr__(self, name, value)
-
-# myArgs = self.args()
-# return myArgs.__setattr__(name, value)
-
-
-
-# ##==============================================================================
-# ## QUERY
-# ##==============================================================================
-
-
-
-# def _doQuery(predicate, params ):
-# """
-# Given the predicate from a query, and a map of named parameters, apply the predicate
-# to the parameters, and return True or False.
-# """
-# if type(predicate) != list or len(predicate) < 1:
-# return False
-
-# elif opr == Query._LOGIC_AND:
-# logging.debug("_doQuery() AND: [%s]" % predicate )
-# rc = False
-# for exp in predicate[1:]:
-# rc = _doQuery( exp, params )
-# if not rc:
-# break
-# return rc
-
-# elif opr == Query._LOGIC_OR:
-# logging.debug("_doQuery() OR: [%s]" % predicate )
-# rc = False
-# for exp in predicate[1:]:
-# rc = _doQuery( exp, params )
-# if rc:
-# break
-# return rc
-
-# elif opr == Query._LOGIC_NOT:
-# logging.debug("_doQuery() NOT: [%s]" % predicate )
-# if len(predicate) != 2:
-# logging.warning("Malformed query not-expression received: '%s'" % predicate)
-# return False
-# return not _doQuery( predicate[1:], params )
-
-
-
-# else:
-# logging.warning("Unknown query operator received: '%s'" % opr)
-# return False
+##==============================================================================
+## QUERY
+##==============================================================================
@@ -1095,7 +896,7 @@
raise TypeError("Query expects to evaluate QmfData types.")
if not isinstance(pred, type([])):
- log_query.warning("Invalid type for predicate expression: '%s'" % str(pred))
+ log.warning("Invalid type for predicate expression: '%s'" % str(pred))
return False
# empty predicate - match all???
@@ -1140,8 +941,8 @@
if oper == QmfQuery.EXISTS:
if len(pred) != 2:
- log_query.warning("Malformed query: 'exists' operator"
- " - bad arguments '%s'" % str(pred))
+ log.warning("Malformed query: 'exists' operator"
+ " - bad arguments '%s'" % str(pred))
return False
### Q: Should we assume "quote", or should it be explicit?
### "foo" or ["quote" "foo"]
@@ -1150,7 +951,7 @@
try:
arg = self._fetch_pred_arg(pred[1], qmfData)
except AttributeError:
- log_query.debug("query parameter not found: '%s'" % str(pred))
+ log.warning("query parameter not found: '%s'" % str(pred))
return False
v = qmfData.has_value(arg)
log_query.debug("---> %s" % str(v))
@@ -1161,9 +962,9 @@
QmfQuery.LE, QmfQuery.GT, QmfQuery.GE,
QmfQuery.RE_MATCH]:
if len(pred) != 3:
- log_query.warning("Malformed query: '%s' operator"
- " - requires 2 arguments '%s'" %
- (oper, str(pred)))
+ log.warning("Malformed query: '%s' operator"
+ " - requires 2 arguments '%s'" %
+ (oper, str(pred)))
return False
# @todo: support regular expression match
log_query.debug("query evaluate binary op: [%s]" % str(pred))
@@ -1171,7 +972,7 @@
arg1 = self._fetch_pred_arg(pred[1], qmfData)
arg2 = self._fetch_pred_arg(pred[2], qmfData)
except AttributeError:
- log_query.debug("query parameter not found: '%s'" % str(pred))
+ log.warning("query parameter not found: '%s'" % str(pred))
return False
log_query.debug("query evaluate %s: %s, %s" % (oper, str(arg1), str(arg2)))
v = False
@@ -1183,11 +984,11 @@
elif oper == QmfQuery.GT: v = arg1 > arg2
elif oper == QmfQuery.GE: v = arg1 >= arg2
except TypeError:
- log_query.warning("query comparison failed: '%s'" % str(pred))
+ log.warning("query comparison failed: '%s'" % str(pred))
log_query.debug("---> %s" % str(v))
return v
- log_query.warning("Unrecognized query operator: [%s]" % str(pred[0]))
+ log.warning("Unrecognized query operator: [%s]" % str(pred[0]))
return False
def _fetch_pred_arg(self, arg, qmfData):
@@ -1447,12 +1248,17 @@
map["unit"] = str, describes units used
map["min"] = int, minimum allowed value
map["max"] = int, maximun allowed value
- map["maxlen"] = int, if string type, this is the maximum length in bytes
+ map["maxlen"] = int, if string type, this is the maximum length in bytes
required to represent the longest instance of this string.
map["desc"] = str, human-readable description of this argument
map["reference"] = str, ???
map["parent_ref"] = bool, true if this property references an object in
which this object is in a child-parent relationship. Default False
+ map["continuous"] = bool, true if the value potentially changes too fast to
+ be directly monitorable. Example: fast changing statistic or random
+ number. Subscriptions to objects containing continuous data will publish
+ only on an interval basis, rather than every time the data changes. Default
+ False.
"""
__hash__ = None
_access_strings = ["RO","RW","RC"]
@@ -1479,6 +1285,7 @@
self._isParentRef = False
self._dir = None
self._default = None
+ self._is_continuous = False
for key, value in kwargs.items():
if key == "access":
@@ -1495,6 +1302,8 @@
elif key == "desc" : self._desc = value
elif key == "reference" : self._reference = value
elif key == "parent_ref" : self._isParentRef = _to_bool(value)
+ elif key == "parent_ref" : self._isParentRef = _to_bool(value)
+ elif key == "continuous" : self._is_continuous = _to_bool(value)
elif key == "dir":
value = str(value).upper()
if value not in self._dir_strings:
@@ -1503,7 +1312,7 @@
elif key == "default" : self._default = value
# constructor
- def _create(cls, type_code, kwargs={}):
+ def _create(cls, type_code, **kwargs):
return cls(_type_code=type_code, kwargs=kwargs)
create = classmethod(_create)
@@ -1538,6 +1347,8 @@
def get_default(self): return self._default
+ def is_continuous(self): return self._is_continuous
+
def map_encode(self):
"""
Return the map encoding of this schema.
@@ -1556,6 +1367,7 @@
_map["parent_ref"] = self._isParentRef
if self._dir: _map["dir"] = self._dir
if self._default: _map["default"] = self._default
+ if self._is_continuous: _map["continuous"] = self._is_continuous
return _map
def __repr__(self):
@@ -1568,6 +1380,7 @@
hasher.update(str(self._type))
hasher.update(str(self._isIndex))
hasher.update(str(self._isOptional))
+ hasher.update(str(self._is_continuous))
if self._access: hasher.update(self._access)
if self._unit: hasher.update(self._unit)
if self._desc: hasher.update(self._desc)
@@ -1575,7 +1388,6 @@
if self._default: hasher.update(self._default)
-
class SchemaMethod(_mapEncoder):
"""
The SchemaMethod class describes the method's structure, and contains a
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/console.py?rev=917828&r1=917827&r2=917828&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/console.py Tue Mar 2 01:02:30 2010
@@ -18,11 +18,11 @@
#
import sys
import os
-import logging
import platform
import time
import datetime
import Queue
+from logging import getLogger
from threading import Thread, Event
from threading import RLock
from threading import currentThread
@@ -41,6 +41,8 @@
_callback_thread=None
+log = getLogger("qmf")
+trace = getLogger("qmf.console")
##==============================================================================
@@ -213,6 +215,7 @@
Process query response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
+ trace.debug("Delivering to query mailbox (agent=%s)." % self.agent_name)
objects = reply.content
if isinstance(objects, type([])):
# convert from map to native types if needed
@@ -253,7 +256,7 @@
self.result += objects
if not "partial" in reply.properties:
- # logging.error("QUERY COMPLETE for %s" % str(self.context))
+ # log.error("QUERY COMPLETE for %s" % str(self.context))
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
self.console._work_q_put = True
@@ -262,8 +265,7 @@
def expire(self):
- logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
- datetime.datetime.utcnow())
+ trace.debug("Expiring query mailbox (agent=%s)." % self.agent_name)
# send along whatever (possibly none) has been received so far
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
@@ -291,6 +293,7 @@
"""
Process schema response messages.
"""
+ trace.debug("Delivering schema mailbox (id=%s)." % self.schema_id)
done = False
schemas = reply.content
if schemas and isinstance(schemas, type([])):
@@ -309,6 +312,7 @@
def expire(self):
+ trace.debug("Expiring schema mailbox (id=%s)." % self.schema_id)
self.destroy()
@@ -332,10 +336,10 @@
Process method response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
-
+ trace.debug("Delivering to method mailbox.")
_map = reply.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
result = None
else:
error=_map.get(SchemaMethod.KEY_ERROR)
@@ -358,8 +362,7 @@
The mailbox expired without receiving a reply.
Invoked by the Console Management thread only.
"""
- logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
- datetime.datetime.utcnow())
+ trace.debug("Expiring method mailbox.")
# send along an empty response
wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None)
self.console._work_q.put(wi)
@@ -391,30 +394,30 @@
def subscribe(self, query):
agent = self.console.get_agent(self.agent_name)
if not agent:
- logging.warning("subscribed failed - unknown agent '%s'" %
+ log.warning("subscribed failed - unknown agent '%s'" %
self.agent_name)
return False
try:
- logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
+ trace.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
agent._send_subscribe_req(query, self.get_address(), self.interval,
self.duration)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
return False
return True
def resubscribe(self, duration):
agent = self.console.get_agent(self.agent_name)
if not agent:
- logging.warning("resubscribed failed - unknown agent '%s'" %
+ log.warning("resubscribed failed - unknown agent '%s'" %
self.agent_name)
return False
try:
- logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
+ trace.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
agent._send_resubscribe_req(self.get_address(),
self.agent_subscription_id, duration)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
return False
return True
@@ -430,7 +433,7 @@
try:
e_map = QmfData.from_map(error)
except TypeError:
- logging.warning("Invalid QmfData map received: '%s'"
+ log.warning("Invalid QmfData map received: '%s'"
% str(error))
e_map = QmfData.create({"error":"Unknown error"})
sp = SubscribeParams(None, None, None, e_map)
@@ -456,12 +459,12 @@
# else: data indication
agent_name = msg.properties.get("qmf.agent")
if not agent_name:
- logging.warning("Ignoring data_ind - no agent name given: %s" %
+ log.warning("Ignoring data_ind - no agent name given: %s" %
msg)
return
agent = self.console.get_agent(agent_name)
if not agent:
- logging.warning("Ignoring data_ind - unknown agent '%s'" %
+ log.warning("Ignoring data_ind - unknown agent '%s'" %
agent_name)
return
@@ -625,7 +628,7 @@
contents.
"""
if _reply_handle is not None:
- logging.error(" ASYNC REFRESH TBD!!!")
+ log.error(" ASYNC REFRESH TBD!!!")
return None
assert self._agent
@@ -677,28 +680,28 @@
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args
- logging.debug("Sending method req to Agent (%s)" % time.time())
+ trace.debug("Sending method req to Agent (%s)" % time.time())
try:
self._agent._send_method_req(_map, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
if _reply_handle is not None:
return True
- logging.debug("Waiting for response to method req (%s)" % _timeout)
+ trace.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
mbox.destroy()
if not replyMsg:
- logging.debug("Agent method req wait timed-out.")
+ trace.debug("Agent method req wait timed-out.")
return None
_map = replyMsg.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
return None
error=_map.get(SchemaMethod.KEY_ERROR)
@@ -751,7 +754,7 @@
self._packages = {} # map of {package-name:[list of class-names], } for this agent
self._subscriptions = [] # list of active standing subscriptions for this agent
self._announce_timestamp = None # datetime when last announce received
- logging.debug( "Created Agent with address: [%s]" % self._address )
+ trace.debug( "Created Agent with address: [%s]" % self._address )
def get_name(self):
@@ -768,7 +771,7 @@
if correlation_id:
msg.correlation_id = str(correlation_id)
# TRACE
- #logging.error("!!! Console %s sending to agent %s (%s)" %
+ #log.error("!!! Console %s sending to agent %s (%s)" %
# (self._console._name, self._name, str(msg)))
self._sender.send(msg)
# return handle
@@ -846,28 +849,28 @@
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy()
- logging.debug("Sending method req to Agent (%s)" % time.time())
+ trace.debug("Sending method req to Agent (%s)" % time.time())
try:
self._send_method_req(_map, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
if _reply_handle is not None:
return True
- logging.debug("Waiting for response to method req (%s)" % _timeout)
+ trace.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
mbox.destroy()
if not replyMsg:
- logging.debug("Agent method req wait timed-out.")
+ trace.debug("Agent method req wait timed-out.")
return None
_map = replyMsg.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
return None
return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
@@ -1076,10 +1079,10 @@
@type timeout: float
@param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
"""
- logging.debug("Destroying Console...")
+ trace.debug("Destroying Console...")
if self._conn:
self.remove_connection(self._conn, timeout)
- logging.debug("Console Destroyed")
+ trace.debug("Console Destroyed")
def add_connection(self, conn):
"""
@@ -1103,7 +1106,7 @@
" x-properties:"
" {type:direct}}}",
capacity=1)
- logging.debug("my direct addr=%s" % self._direct_recvr.source)
+ trace.debug("my direct addr=%s" % self._direct_recvr.source)
self._direct_sender = self._session.sender(str(self._address.get_node()) +
";{create:always,"
@@ -1111,7 +1114,7 @@
" {type:topic,"
" x-properties:"
" {type:direct}}}")
- logging.debug("my direct sender=%s" % self._direct_sender.target)
+ trace.debug("my direct sender=%s" % self._direct_sender.target)
# for receiving "broadcast" messages from agents
default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#",
@@ -1120,7 +1123,7 @@
";{create:always,"
" node-properties:{type:topic}}",
capacity=1)
- logging.debug("default topic recv addr=%s" % self._topic_recvr.source)
+ trace.debug("default topic recv addr=%s" % self._topic_recvr.source)
# for sending to topic subscribers
@@ -1128,7 +1131,7 @@
self._topic_sender = self._session.sender(str(topic_addr) +
";{create:always,"
" node-properties:{type:topic}}")
- logging.debug("default topic send addr=%s" % self._topic_sender.target)
+ trace.debug("default topic send addr=%s" % self._topic_sender.target)
#
# Now that receivers are created, fire off the receive thread...
@@ -1150,17 +1153,17 @@
@param conn: connection previously added by add_connection()
"""
if self._conn and conn and conn != self._conn:
- logging.error( "Attempt to delete unknown connection: %s" % str(conn))
+ log.error( "Attempt to delete unknown connection: %s" % str(conn))
# tell connection thread to shutdown
self._operational = False
if self.isAlive():
# kick my thread to wake it up
self._wake_thread()
- logging.debug("waiting for console receiver thread to exit")
+ trace.debug("waiting for console receiver thread to exit")
self.join(timeout)
if self.isAlive():
- logging.error( "Console thread '%s' is hung..." % self.getName() )
+ log.error( "Console thread '%s' is hung..." % self.getName() )
self._direct_recvr.close()
self._direct_sender.close()
self._topic_recvr.close()
@@ -1168,7 +1171,7 @@
self._session.close()
self._session = None
self._conn = None
- logging.debug("console connection removal complete")
+ trace.debug("console connection removal complete")
def get_address(self):
@@ -1219,14 +1222,14 @@
content=query._predicate)
msg.reply_to = str(self._address)
msg.correlation_id = str(cid)
- logging.debug("Sending Agent Locate (%s)" % time.time())
+ trace.debug("Sending Agent Locate (%s)" % time.time())
# TRACE
- #logging.error("!!! Console %s sending agent locate (%s)" %
+ #log.error("!!! Console %s sending agent locate (%s)" %
# (self._name, str(msg)))
try:
self._topic_sender.send(msg)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
@@ -1234,10 +1237,10 @@
timeout = self._reply_timeout
new_agent = None
- logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
+ trace.debug("Waiting for response to Agent Locate (%s)" % timeout)
mbox.fetch(timeout)
mbox.destroy()
- logging.debug("Agent Locate wait ended (%s)" % time.time())
+ trace.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
new_agent = self._agent_map.get(name)
@@ -1288,10 +1291,10 @@
cid = mbox.get_address()
try:
- logging.debug("Sending Query to Agent (%s)" % time.time())
+ trace.debug("Sending Query to Agent (%s)" % time.time())
agent._send_query(query, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
@@ -1302,7 +1305,7 @@
if not _timeout:
_timeout = self._reply_timeout
- logging.debug("Waiting for response to Query (%s)" % _timeout)
+ trace.debug("Waiting for response to Query (%s)" % _timeout)
now = datetime.datetime.utcnow()
expire = now + datetime.timedelta(seconds=_timeout)
@@ -1311,7 +1314,7 @@
_timeout = timedelta_to_secs(expire - now)
reply = mbox.fetch(_timeout)
if not reply:
- logging.debug("Query wait timed-out.")
+ trace.debug("Query wait timed-out.")
break
objects = reply.content
@@ -1383,12 +1386,12 @@
mbox.destroy()
return None
- logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ trace.debug("Waiting for response to subscription (%s)" % _timeout)
# @todo: what if mbox expires here?
sp = mbox.fetch(_timeout)
if not sp:
- logging.debug("Subscription request wait timed-out.")
+ trace.debug("Subscription request wait timed-out.")
mbox.destroy()
return None
@@ -1405,7 +1408,7 @@
mbox = self._get_mailbox(subscription_id)
if not mbox:
- logging.warning("Subscription %s not found." % subscription_id)
+ log.warning("Subscription %s not found." % subscription_id)
return None
if isinstance(mbox, _AsyncSubscriptionMailbox):
@@ -1418,11 +1421,11 @@
# wait for reply
- logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ trace.debug("Waiting for response to subscription (%s)" % _timeout)
sp = mbox.fetch(_timeout)
if not sp:
- logging.debug("re-subscribe request wait timed-out.")
+ trace.debug("re-subscribe request wait timed-out.")
# @todo???? mbox.destroy()
return None
@@ -1439,11 +1442,11 @@
agent = self.get_agent(mbox.agent_name)
if agent:
try:
- logging.debug("Sending UnSubscribe to Agent (%s)" % time.time())
+ trace.debug("Sending UnSubscribe to Agent (%s)" % time.time())
agent._send_unsubscribe_ind(subscription_id,
mbox.agent_subscription_id)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
@@ -1453,16 +1456,16 @@
Make the console management thread loop wakeup from its next_receiver
sleep.
"""
- logging.debug("Sending noop to wake up [%s]" % self._address)
+ trace.debug("Sending noop to wake up [%s]" % self._address)
msg = Message(id=QMF_APP_ID,
subject=self._name,
- properties={"method":"request",
+ properties={"method":"indication",
"qmf.opcode":OpCode.noop},
content={})
try:
self._direct_sender.send( msg, sync=True )
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
def run(self):
@@ -1484,7 +1487,7 @@
except Empty:
break
# TRACE:
- # logging.error("!!! Console %s: msg on %s [%s]" %
+ # log.error("!!! Console %s: msg on %s [%s]" %
# (self._name, self._topic_recvr.source, msg))
self._dispatch(msg, _direct=False)
@@ -1494,7 +1497,7 @@
except Empty:
break
# TRACE
- #logging.error("!!! Console %s: msg on %s [%s]" %
+ #log.error("!!! Console %s: msg on %s [%s]" %
# (self._name, self._direct_recvr.source, msg))
self._dispatch(msg, _direct=True)
@@ -1506,36 +1509,37 @@
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
- logging.info("Calling console notifier.indication")
+ trace.debug("Calling console notifier.indication")
self._notifier.indication()
_callback_thread = None
- if self._operational:
- # wait for a message to arrive, or an agent
- # to expire, or a mailbox requrest to time out
- now = datetime.datetime.utcnow()
- next_expire = self._next_agent_expire
- # the mailbox expire flag may be cleared by the
- # app thread(s)
- self._lock.acquire()
- try:
- if (self._next_mbox_expire and
- self._next_mbox_expire < next_expire):
- next_expire = self._next_mbox_expire
- finally:
- self._lock.release()
+ # wait for a message to arrive, or an agent
+ # to expire, or a mailbox requrest to time out
+ now = datetime.datetime.utcnow()
+ next_expire = self._next_agent_expire
- if next_expire > now:
- timeout = timedelta_to_secs(next_expire - now)
- try:
- logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
- xxx = self._session.next_receiver(timeout = timeout)
- except Empty:
- pass
+ self._lock.acquire()
+ try:
+ # the mailbox expire flag may be cleared by the
+ # app thread(s) to force an immedate mailbox scan
+ if self._next_mbox_expire is None:
+ next_expire = now
+ elif self._next_mbox_expire < next_expire:
+ next_expire = self._next_mbox_expire
+ finally:
+ self._lock.release()
+ timeout = timedelta_to_secs(next_expire - now)
+
+ if self._operational and timeout > 0.0:
+ try:
+ trace.debug("waiting for next rcvr (timeout=%s)..." % timeout)
+ self._session.next_receiver(timeout = timeout)
+ except Empty:
+ pass
- logging.debug("Shutting down Console thread")
+ trace.debug("Shutting down Console thread")
def get_objects(self,
_object_id=None,
@@ -1639,12 +1643,11 @@
"""
PRIVATE: Process a message received from an Agent
"""
- #logging.debug( "Message received from Agent! [%s]" % msg )
- #logging.error( "Message received from Agent! [%s]" % msg )
+ trace.debug( "Message received from Agent! [%s]" % msg )
opcode = msg.properties.get("qmf.opcode")
if not opcode:
- logging.error("Ignoring unrecognized message '%s'" % msg)
+ log.error("Ignoring unrecognized message '%s'" % msg)
return
version = 2 # @todo: fix me
@@ -1672,9 +1675,9 @@
else:
self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
- logging.debug("No-op msg received.")
+ trace.debug("No-op msg received.")
else:
- logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
+ log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
def _handle_agent_ind_msg(self, msg, cmap, version, direct):
@@ -1682,15 +1685,15 @@
Process a received agent-ind message. This message may be a response to a
agent-locate, or it can be an unsolicited agent announce.
"""
- logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
+ trace.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
ai_map = msg.content
if not ai_map or not isinstance(ai_map, type({})):
- logging.warning("Bad agent-ind message received: '%s'" % msg)
+ log.warning("Bad agent-ind message received: '%s'" % msg)
return
name = ai_map.get("_name")
if not name:
- logging.warning("Bad agent-ind message received: agent name missing"
+ log.warning("Bad agent-ind message received: agent name missing"
" '%s'" % msg)
return
@@ -1724,48 +1727,48 @@
if matched:
# unsolicited, but newly discovered
- logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
+ trace.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
self._work_q.put(wi)
self._work_q_put = True
if correlated:
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ trace.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
- logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
+ trace.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
mbox = self._get_mailbox(msg.correlation_id)
if not mbox:
- logging.debug("Response msg received with unknown correlation_id"
- " msg='%s'" % str(msg))
+ log.warning("Response msg received with unknown correlation_id"
+ " msg='%s'" % str(msg))
return
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ trace.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
def _handle_indication_msg(self, msg, cmap, version, _direct):
aname = msg.properties.get("qmf.agent")
if not aname:
- logging.debug("No agent name field in indication message.")
+ trace.debug("No agent name field in indication message.")
return
content_type = msg.properties.get("qmf.content")
if (content_type != ContentType.event or
not isinstance(msg.content, type([]))):
- logging.warning("Bad event indication message received: '%s'" % msg)
+ log.warning("Bad event indication message received: '%s'" % msg)
return
emap = msg.content[0]
if not isinstance(emap, type({})):
- logging.debug("Invalid event body in indication message: '%s'" % msg)
+ trace.debug("Invalid event body in indication message: '%s'" % msg)
return
agent = None
@@ -1775,18 +1778,18 @@
finally:
self._lock.release()
if not agent:
- logging.debug("Agent '%s' not known." % aname)
+ trace.debug("Agent '%s' not known." % aname)
return
try:
# @todo: schema???
event = QmfEvent.from_map(emap)
except TypeError:
- logging.debug("Invalid QmfEvent map received: %s" % str(emap))
+ trace.debug("Invalid QmfEvent map received: %s" % str(emap))
return
# @todo: schema? Need to fetch it, but not from this thread!
# This thread can not pend on a request.
- logging.debug("Publishing event received from agent %s" % aname)
+ trace.debug("Publishing event received from agent %s" % aname)
wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
{"agent":agent,
"event":event})
@@ -1835,12 +1838,12 @@
next_expire_delta = lifetime_delta
self._lock.acquire()
try:
- logging.debug("!!! expiring agents '%s'" % now)
+ trace.debug("!!! expiring agents '%s'" % now)
for agent in self._agent_map.itervalues():
if agent._announce_timestamp:
agent_deathtime = agent._announce_timestamp + lifetime_delta
if agent_deathtime <= now:
- logging.debug("AGENT_DELETED for %s" % agent)
+ trace.debug("AGENT_DELETED for %s" % agent)
agent._announce_timestamp = None
wi = WorkItem(WorkItem.AGENT_DELETED, None,
{"agent":agent})
@@ -1852,7 +1855,7 @@
next_expire_delta = agent_deathtime - now
self._next_agent_expire = now + next_expire_delta
- logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
+ trace.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
finally:
self._lock.release()
@@ -1862,7 +1865,7 @@
"""
Factory to create/retrieve an agent for this console
"""
- logging.debug("creating agent %s" % name)
+ trace.debug("creating agent %s" % name)
self._lock.acquire()
try:
agent = self._agent_map.get(name)
@@ -1878,9 +1881,9 @@
" x-properties:"
" {type:direct}}}")
except:
- logging.warning("Unable to create sender for %s" % name)
+ log.warning("Unable to create sender for %s" % name)
return None
- logging.debug("created agent sender %s" % agent._sender.target)
+ trace.debug("created agent sender %s" % agent._sender.target)
self._agent_map[name] = agent
finally:
@@ -1984,11 +1987,11 @@
if need_fetch:
mbox = _SchemaPrefetchMailbox(self, schema_id)
query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
- logging.debug("Sending Schema Query to Agent (%s)" % time.time())
+ trace.debug("Sending Schema Query to Agent (%s)" % time.time())
try:
agent._send_query(query, mbox.get_address())
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
self._lock.acquire()
try:
@@ -2041,7 +2044,7 @@
try:
mid = long(mid)
except TypeError:
- logging.error("Invalid mailbox id: %s" % str(mid))
+ log.error("Invalid mailbox id: %s" % str(mid))
return None
self._lock.acquire()
@@ -2056,7 +2059,7 @@
try:
mid = long(mid)
except TypeError:
- logging.error("Invalid mailbox id: %s" % str(mid))
+ log.error("Invalid mailbox id: %s" % str(mid))
return None
self._lock.acquire()
@@ -2242,36 +2245,36 @@
# count += 1
# try:
# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
-# logging.debug("Console Event AGENT_ADDED received")
+# trace.debug("Console Event AGENT_ADDED received")
# if self._handler:
# self._handler.agent_added(AgentProxy(self._event.agent, None))
# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
-# logging.debug("Console Event AGENT_DELETED received")
+# trace.debug("Console Event AGENT_DELETED received")
# if self._handler:
# self._handler.agent_deleted(AgentProxy(self._event.agent, None))
# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
-# logging.debug("Console Event NEW_PACKAGE received")
+# trace.debug("Console Event NEW_PACKAGE received")
# if self._handler:
# self._handler.new_package(self._event.name)
# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
-# logging.debug("Console Event NEW_CLASS received")
+# trace.debug("Console Event NEW_CLASS received")
# if self._handler:
# self._handler.new_class(SchemaClassKey(self._event.classKey))
# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
-# logging.debug("Console Event OBJECT_UPDATE received")
+# trace.debug("Console Event OBJECT_UPDATE received")
# if self._handler:
# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
# self._event.hasProps, self._event.hasStats)
# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
-# logging.debug("Console Event EVENT_RECEIVED received")
+# trace.debug("Console Event EVENT_RECEIVED received")
# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
-# logging.debug("Console Event AGENT_HEARTBEAT received")
+# trace.debug("Console Event AGENT_HEARTBEAT received")
# if self._handler:
# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
-# logging.debug("Console Event METHOD_RESPONSE received")
+# trace.debug("Console Event METHOD_RESPONSE received")
# else:
-# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
+# trace.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
# except e:
# print "Exception caught in callback thread:", e
# self.impl.popEvent()
@@ -2300,17 +2303,17 @@
# def shutdown(self):
-# logging.debug("broker.shutdown() called.")
+# trace.debug("broker.shutdown() called.")
# self.console.impl.delConnection(self.impl)
# self.conn.del_conn_handler(self)
# if self._session:
# self.impl.sessionClosed()
-# logging.debug("broker.shutdown() sessionClosed done.")
+# trace.debug("broker.shutdown() sessionClosed done.")
# self._session.destroy()
-# logging.debug("broker.shutdown() session destroy done.")
+# trace.debug("broker.shutdown() session destroy done.")
# self._session = None
# self._operational = False
-# logging.debug("broker.shutdown() done.")
+# trace.debug("broker.shutdown() done.")
# def wait_for_stable(self, timeout = None):
@@ -2343,24 +2346,24 @@
# while valid:
# count += 1
# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
-# logging.debug("Broker Event BROKER_INFO received");
+# trace.debug("Broker Event BROKER_INFO received");
# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
-# logging.debug("Broker Event DECLARE_QUEUE received");
+# trace.debug("Broker Event DECLARE_QUEUE received");
# self.conn.impl.declareQueue(self._session.handle, self._event.name)
# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
-# logging.debug("Broker Event DELETE_QUEUE received");
+# trace.debug("Broker Event DELETE_QUEUE received");
# self.conn.impl.deleteQueue(self._session.handle, self._event.name)
# elif self._event.kind == qmfengine.BrokerEvent.BIND:
-# logging.debug("Broker Event BIND received");
+# trace.debug("Broker Event BIND received");
# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
# elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
-# logging.debug("Broker Event UNBIND received");
+# trace.debug("Broker Event UNBIND received");
# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
-# logging.debug("Broker Event SETUP_COMPLETE received");
+# trace.debug("Broker Event SETUP_COMPLETE received");
# self.impl.startProtocol()
# elif self._event.kind == qmfengine.BrokerEvent.STABLE:
-# logging.debug("Broker Event STABLE received");
+# trace.debug("Broker Event STABLE received");
# self._cv.acquire()
# try:
# self._stable = True
@@ -2387,7 +2390,7 @@
# valid = self.impl.getXmtMessage(self._xmtMessage)
# while valid:
# count += 1
-# logging.debug("Broker: sending msg on connection")
+# trace.debug("Broker: sending msg on connection")
# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
# self.impl.popXmt()
# valid = self.impl.getXmtMessage(self._xmtMessage)
@@ -2405,14 +2408,14 @@
# def conn_event_connected(self):
-# logging.debug("Broker: Connection event CONNECTED")
+# trace.debug("Broker: Connection event CONNECTED")
# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
# self.impl.sessionOpened(self._session.handle)
# self._do_events()
# def conn_event_disconnected(self, error):
-# logging.debug("Broker: Connection event DISCONNECTED")
+# trace.debug("Broker: Connection event DISCONNECTED")
# pass
@@ -2421,14 +2424,14 @@
# def sess_event_session_closed(self, context, error):
-# logging.debug("Broker: Session event CLOSED")
+# trace.debug("Broker: Session event CLOSED")
# self.impl.sessionClosed()
# def sess_event_recv(self, context, message):
-# logging.debug("Broker: Session event MSG_RECV")
+# trace.debug("Broker: Session event MSG_RECV")
# if not self._operational:
-# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
+# log.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
# self.impl.handleRcvMessage(message)
# self._do_events()
@@ -2446,6 +2449,7 @@
if __name__ == '__main__':
# temp test code
+ import logging
from common import (qmfTypes, SchemaProperty)
logging.getLogger().setLevel(logging.INFO)
Modified: qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py?rev=917828&r1=917827&r2=917828&view=diff
==============================================================================
--- qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py (original)
+++ qpid/branches/qmf-devel0.7/qpid/extras/qmf/src/py/qmf2/tests/subscriptions.py Tue Mar 2 01:02:30 2010
@@ -75,7 +75,13 @@
_object_id_names=["key"] )
_schema.add_property( "key", SchemaProperty(qmfTypes.TYPE_LSTR))
- _schema.add_property( "count1", SchemaProperty(qmfTypes.TYPE_UINT32))
+
+ # note: count1 is continuous, count2 is not
+ count1_prop = SchemaProperty.create(qmfTypes.TYPE_UINT32,
+ continuous=True)
+ _schema.add_property( "count1", count1_prop)
+ count2_prop = SchemaProperty.create(qmfTypes.TYPE_UINT32,
+ continuous=False)
_schema.add_property( "count2", SchemaProperty(qmfTypes.TYPE_UINT32))
self.agent.register_object_class(_schema)
@@ -224,7 +230,7 @@
# create console
# find all agents
# subscribe to changes to any object in package1/class1
- # should succeed
+ # should succeed - verify 1 publish
self.notifier = _testNotifier()
self.console = qmf2.console.Console(notifier=self.notifier,
agent_timeout=3)
@@ -288,10 +294,10 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
- self.assertTrue(r_count == 5 * len(subscriptions))
+ # expect 1 publish per subscription
+ self.assertTrue(r_count == 5)
for ii in range(len(subscriptions)):
- self.assertTrue(subscriptions[ii][1] == 5)
+ self.assertTrue(subscriptions[ii][1] == 1)
self.console.destroy(10)
@@ -349,21 +355,17 @@
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
self.assertTrue(reply[0].get_object_id() == "undesc-2")
- # print("!!! get_params() = %s" % wi.get_params())
self.assertTrue(wi.get_handle() < len(subscriptions))
subscriptions[wi.get_handle()][1] += 1
- # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
- # self.assertTrue(reply.succeeded())
- # self.assertTrue(reply.get_argument("cookie") ==
- # wi.get_handle())
+
self.console.release_workitem(wi)
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
- self.assertTrue(r_count == 5 * len(subscriptions))
- #for ii in range(len(subscriptions)):
- # self.assertTrue(subscriptions[ii][1] == 5)
+ # expect 1 publish per subscription
+ self.assertTrue(r_count == 5)
+ for ii in range(len(subscriptions)):
+ self.assertTrue(subscriptions[ii][1] == 1)
self.console.destroy(10)
@@ -426,18 +428,15 @@
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() < len(subscriptions))
subscriptions[wi.get_handle()][1] += 1
- # self.assertTrue(isinstance(reply, qmf2.console.MethodResult))
- # self.assertTrue(reply.succeeded())
- # self.assertTrue(reply.get_argument("cookie") ==
- # wi.get_handle())
+
self.console.release_workitem(wi)
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
- self.assertTrue(r_count == 5 * len(subscriptions))
- #for ii in range(len(subscriptions)):
- # self.assertTrue(subscriptions[ii][1] == 5)
+ # expect 1 publish per subscription
+ self.assertTrue(r_count == 5)
+ for ii in range(len(subscriptions)):
+ self.assertTrue(subscriptions[ii][1] == 1)
self.console.destroy(10)
@@ -459,9 +458,9 @@
self.conn.connect()
self.console.add_connection(self.conn)
- # query to match object "p2c1_key2" in schema package2/class1
- sid = SchemaClassId.create("package2", "class1")
- query = QmfQuery.create_id_object("p2c1_key2", sid)
+ # query to match object "p1c1_key2" in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ query = QmfQuery.create_id_object("p1c1_key2", sid)
agent_app = self.agents[0]
aname = agent_app.agent.get_name()
@@ -489,13 +488,20 @@
self.assertTrue(isinstance(reply, type([])))
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
- self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
sid = reply[0].get_schema_class_id()
self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_package_name() == "package1")
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() == "my-handle")
+ # count1 is continuous, touching it will force a
+ # publish on the interval
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
self.console.release_workitem(wi)
if r_count == 3:
@@ -504,11 +510,8 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
+ # expect 5 publish per subscription, more if refreshed
self.assertTrue(r_count > 5)
- # print("!!! total r_count=%d" % r_count)
- #for ii in range(len(subscriptions)):
- # self.assertTrue(subscriptions[ii][1] == 5)
self.console.destroy(10)
@@ -530,9 +533,9 @@
self.conn.connect()
self.console.add_connection(self.conn)
- # query to match object "p2c1_key2" in schema package2/class1
- sid = SchemaClassId.create("package2", "class1")
- query = QmfQuery.create_id_object("p2c1_key2", sid)
+ # query to match object "p1c1_key2" in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ query = QmfQuery.create_id_object("p1c1_key2", sid)
agent_app = self.agents[0]
aname = agent_app.agent.get_name()
@@ -560,13 +563,20 @@
self.assertTrue(isinstance(reply, type([])))
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
- self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
sid = reply[0].get_schema_class_id()
self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_package_name() == "package1")
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() == "my-handle")
+ # count1 is continuous, touching it will force a
+ # publish on the interval
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
self.console.release_workitem(wi)
if r_count == 3:
@@ -574,10 +584,8 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription full duration
- self.assertTrue(r_count < 5)
- #for ii in range(len(subscriptions)):
- # self.assertTrue(subscriptions[ii][1] == 5)
+ # expect only 3 publish received before cancel
+ self.assertTrue(r_count == 3)
self.console.destroy(10)
@@ -645,8 +653,8 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription
- self.assertTrue(r_count == 6)
+ # one response + one publish = 2
+ self.assertTrue(r_count == 2)
self.console.destroy(10)
@@ -665,9 +673,9 @@
self.conn.connect()
self.console.add_connection(self.conn)
- # query to match object "p2c1_key2" in schema package2/class1
- sid = SchemaClassId.create("package2", "class1")
- query = QmfQuery.create_id_object("p2c1_key2", sid)
+ # query to match object "p1c1_key2" in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ query = QmfQuery.create_id_object("p1c1_key2", sid)
agent_app = self.agents[0]
aname = agent_app.agent.get_name()
@@ -685,6 +693,7 @@
# refresh after three subscribe indications, count all
# indications to verify refresh worked
r_count = 0
+ i_count = 0
sp = None
rp = None
while self.notifier.wait_for_work(4):
@@ -706,20 +715,28 @@
else:
self.assertTrue(wi.get_type() ==
WorkItem.SUBSCRIBE_INDICATION)
+ i_count += 1
# sp better be set up by now!
self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
reply = wi.get_params()
self.assertTrue(isinstance(reply, type([])))
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
- self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
sid = reply[0].get_schema_class_id()
self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_package_name() == "package1")
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() == "my-handle")
- if r_count == 4: # + 1 for subscribe reply
+ # count1 is continuous, touching it will force a
+ # publish on the interval
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
+ if r_count == 4: # 3 data + 1 subscribe reply
rp = self.console.refresh_subscription(sp.get_subscription_id())
self.assertTrue(rp)
@@ -727,8 +744,9 @@
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 5 publish per subscription, + 2 replys
- self.assertTrue(r_count > 7)
+ # expect 5 publish per subscription, more if refreshed
+ self.assertTrue(sp is not None and rp is not None)
+ self.assertTrue(i_count > 5)
self.console.destroy(10)
@@ -748,9 +766,9 @@
self.conn.connect()
self.console.add_connection(self.conn)
- # query to match object "p2c1_key2" in schema package2/class1
- sid = SchemaClassId.create("package2", "class1")
- query = QmfQuery.create_id_object("p2c1_key2", sid)
+ # query to match object "p1c1_key2" in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ query = QmfQuery.create_id_object("p1c1_key2", sid)
agent_app = self.agents[0]
aname = agent_app.agent.get_name()
@@ -765,8 +783,6 @@
_blocking=False)
self.assertTrue(rc)
- # refresh after three subscribe indications, count all
- # indications to verify refresh worked
r_count = 0
sp = None
rp = None
@@ -789,20 +805,220 @@
self.assertTrue(isinstance(reply, type([])))
self.assertTrue(len(reply) == 1)
self.assertTrue(isinstance(reply[0], QmfData))
- self.assertTrue(reply[0].get_object_id() == "p2c1_key2")
+ self.assertTrue(reply[0].get_object_id() == "p1c1_key2")
sid = reply[0].get_schema_class_id()
self.assertTrue(isinstance(sid, SchemaClassId))
- self.assertTrue(sid.get_package_name() == "package2")
+ self.assertTrue(sid.get_package_name() == "package1")
self.assertTrue(sid.get_class_name() == "class1")
self.assertTrue(wi.get_handle() == "my-handle")
+ # count1 is continuous, touching it will force a
+ # publish on the interval
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
+ if r_count == 3:
self.console.cancel_subscription(sp.get_subscription_id())
self.console.release_workitem(wi)
wi = self.console.get_next_workitem(timeout=0)
- # for now, I expect 1 subscribe reply and 1 data_indication
- self.assertTrue(r_count == 2)
+ # expect cancel after 3 replies
+ self.assertTrue(r_count == 3)
+
+ self.console.destroy(10)
+
+
+
+
+ def test_sync_periodic_publish_continuous(self):
+ # create console
+ # find all agents
+ # subscribe to changes to any object in package1/class1
+ # should succeed - verify 1 publish
+ # Change continuous property on each publish,
+ # should only see 1 publish per interval
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match all objects in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ _target_params=t_params)
+ # find an agent
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ "some-handle")
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ self.assertTrue(sp.get_duration() == 10)
+ self.assertTrue(sp.get_publish_interval() == 2)
+
+ # now wait for the (2 * interval) and count the updates
+ r_count = 0
+ sid = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ self.assertTrue(wi.get_handle() == "some-handle")
+ if r_count == 1:
+ # first indication - returns all matching objects
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 2)
+ for obj in reply:
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2" or
+ obj.get_object_id() == "p1c1_key1")
+ sid = obj.get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package1")
+ self.assertTrue(sid.get_class_name() == "class1")
+
+ else:
+ # verify publish of modified object only!
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ obj = reply[0]
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2")
+ self.assertTrue(obj.get_value("count1") == r_count - 1)
+ # fail test if we receive more than expected
+ self.assertTrue(r_count < 10)
+
+
+ # now update one of the objects!
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count1", r_count)
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # expect at most 1 publish per interval seen
+ self.assertTrue(r_count < 10)
+
+ self.console.destroy(10)
+
+
+
+
+ def test_sync_periodic_publish_noncontinuous(self):
+ # create console, find agent
+ # subscribe to changes to any object in package1/class1
+ # should succeed - verify 1 publish
+ # Change noncontinuous property on each publish,
+ # should only see 1 publish per each update
+ self.notifier = _testNotifier()
+ self.console = qmf2.console.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.add_connection(self.conn)
+
+ subscriptions = []
+ index = 0
+
+ # query to match all objects in schema package1/class1
+ sid = SchemaClassId.create("package1", "class1")
+ t_params = {QmfData.KEY_SCHEMA_ID: sid}
+ query = QmfQuery.create_wildcard(QmfQuery.TARGET_OBJECT,
+ _target_params=t_params)
+ # find an agent
+ agent_app = self.agents[0]
+ aname = agent_app.agent.get_name()
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # setup subscription on agent
+
+ sp = self.console.create_subscription(agent,
+ query,
+ "some-handle")
+ self.assertTrue(isinstance(sp, qmf2.console.SubscribeParams))
+ self.assertTrue(sp.succeeded())
+ self.assertTrue(sp.get_error() == None)
+ self.assertTrue(sp.get_duration() == 10)
+ self.assertTrue(sp.get_publish_interval() == 2)
+
+ # now wait for the (2 * interval) and count the updates
+ r_count = 0
+ sid = None
+ while self.notifier.wait_for_work(4):
+ wi = self.console.get_next_workitem(timeout=0)
+ while wi is not None:
+ r_count += 1
+ self.assertTrue(wi.get_type() == WorkItem.SUBSCRIBE_INDICATION)
+ self.assertTrue(wi.get_handle() == "some-handle")
+ if r_count == 1:
+ # first indication - returns all matching objects
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 2)
+ for obj in reply:
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2" or
+ obj.get_object_id() == "p1c1_key1")
+ sid = obj.get_schema_class_id()
+ self.assertTrue(isinstance(sid, SchemaClassId))
+ self.assertTrue(sid.get_package_name() == "package1")
+ self.assertTrue(sid.get_class_name() == "class1")
+
+ else:
+ # verify publish of modified object only!
+ reply = wi.get_params()
+ self.assertTrue(isinstance(reply, type([])))
+ self.assertTrue(len(reply) == 1)
+ obj = reply[0]
+ self.assertTrue(isinstance(obj, QmfData))
+ self.assertTrue(obj.get_object_id() == "p1c1_key2")
+ self.assertTrue(obj.get_value("count2") == r_count - 1)
+ # fail test if we receive more than expected
+ self.assertTrue(r_count < 30)
+
+
+ # now update the noncontinuous field of one of the objects!
+ if r_count < 20:
+ self.assertTrue(sid is not None)
+ test_obj = agent_app.agent.get_object("p1c1_key2", sid)
+ self.assertTrue(test_obj is not None)
+ test_obj.set_value("count2", r_count)
+
+ self.console.release_workitem(wi)
+
+ wi = self.console.get_next_workitem(timeout=0)
+
+ # expect at least 1 publish per update
+ self.assertTrue(r_count > 10)
self.console.destroy(10)
Propchange: qpid/branches/qmf-devel0.7/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -3,4 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java:795950-829653
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/trunk/qpid:796646-796653
-/qpid/trunk/qpid/java:911618-916854
+/qpid/trunk/qpid/java:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599
-/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-916854
+/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -1,4 +1,4 @@
/qpid/branches/0.5-release/qpid/java/broker/bin:757268
/qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-916854
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -3,4 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-916854
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -3,4 +3,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-916854
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/lib/org.osgi.core_1.0.0.jar
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:443187-720930
/qpid/branches/java-broker-0-10/qpid/java/lib/org.osgi.core_1.0.0.jar:795950-829653
/qpid/branches/java-network-refactor/qpid/java/lib/org.osgi.core_1.0.0.jar:805429-821809
-/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:911618-916854
+/qpid/trunk/qpid/java/lib/org.osgi.core_1.0.0.jar:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/client/src/main/java/org/apache/qpid/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:443187-703176
/qpid/branches/java-broker-0-10/qpid/java/management/client/src/main/java/org/apache/qpid/management:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/client/src/main/java/org/apache/qpid/management:805429-821809
-/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:911618-916854
+/qpid/trunk/qpid/java/management/client/src/main/java/org/apache/qpid/management:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/client/src/test/java/org/apache/qpid/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -1,4 +1,4 @@
/incubator/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:443187-703176
/qpid/branches/java-broker-0-10/qpid/java/management/client/src/test/java/org/apache/qpid/management:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/client/src/test/java/org/apache/qpid/management:805429-821809
-/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:911618-916854
+/qpid/trunk/qpid/java/management/client/src/test/java/org/apache/qpid/management:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedBroker.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:911618-916854
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -1,4 +1,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:911618-916854
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -1,4 +1,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ManagedExchange.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:911618-916854
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:911618-916854
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -1,4 +1,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:911618-916854
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java:911618-917825
Propchange: qpid/branches/qmf-devel0.7/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar 2 01:02:30 2010
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:795950-829653
/qpid/branches/java-network-refactor/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:805429-821809
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/MBeanAttribute.java:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
-/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:911618-916854
+/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java:911618-917825
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org