You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/03/01 21:05:22 UTC
svn commit: r917688 - in /qpid/trunk/qpid/extras/qmf/src/py/qmf2: agent.py
common.py console.py
Author: kgiusti
Date: Mon Mar 1 20:05:22 2010
New Revision: 917688
URL: http://svn.apache.org/viewvc?rev=917688&view=rev
Log:
QPID-2261: split error logging and debug tracing into separate loggers
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py?rev=917688&r1=917687&r2=917688&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/agent.py Mon Mar 1 20:05:22 2010
@@ -17,10 +17,10 @@
#
import sys
-import logging
import datetime
import time
import Queue
+from logging import getLogger
from threading import Thread, RLock, currentThread, Event
from qpid.messaging import Connection, Message, Empty, SendError
from uuid import uuid4
@@ -32,6 +32,8 @@
# running the agent notifier callback
_callback_thread=None
+log = getLogger("qmf")
+trace = getLogger("qmf.agent")
##==============================================================================
@@ -175,10 +177,10 @@
@type timeout: float
@param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
"""
- logging.debug("Destroying Agent %s" % self.name)
+ trace.debug("Destroying Agent %s" % self.name)
if self._conn:
self.remove_connection(timeout)
- logging.debug("Agent Destroyed")
+ trace.debug("Agent Destroyed")
def get_name(self):
@@ -196,7 +198,7 @@
" x-properties:"
" {type:direct}}}",
capacity=self._capacity)
- logging.debug("my direct addr=%s" % self._direct_receiver.source)
+ trace.debug("my direct addr=%s" % self._direct_receiver.source)
# for sending directly addressed messages.
self._direct_sender = self._session.sender(str(self._address.get_node()) +
@@ -205,7 +207,7 @@
" {type:topic,"
" x-properties:"
" {type:direct}}}")
- logging.debug("my default direct send addr=%s" % self._direct_sender.target)
+ trace.debug("my default direct send addr=%s" % self._direct_sender.target)
# for receiving "broadcast" messages from consoles
default_addr = QmfAddress.topic(QmfAddress.SUBJECT_CONSOLE_IND + ".#",
@@ -215,7 +217,7 @@
" node-properties:"
" {type:topic}}",
capacity=self._capacity)
- logging.debug("console.ind addr=%s" % self._topic_receiver.source)
+ trace.debug("console.ind addr=%s" % self._topic_receiver.source)
# for sending to topic subscribers
ind_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND,
@@ -224,7 +226,7 @@
";{create:always,"
" node-properties:"
" {type:topic}}")
- logging.debug("agent.ind addr=%s" % self._topic_sender.target)
+ trace.debug("agent.ind addr=%s" % self._topic_sender.target)
self._running = True
self.start()
@@ -238,10 +240,10 @@
if self.isAlive():
# kick my thread to wake it up
self._wake_thread()
- logging.debug("waiting for agent receiver thread to exit")
+ trace.debug("waiting for agent receiver thread to exit")
self.join(timeout)
if self.isAlive():
- logging.error( "Agent thread '%s' is hung..." % self.name)
+ log.error( "Agent thread '%s' is hung..." % self.name)
self._direct_receiver.close()
self._direct_receiver = None
self._direct_sender.close()
@@ -253,7 +255,7 @@
self._session.close()
self._session = None
self._conn = None
- logging.debug("agent connection removal complete")
+ trace.debug("agent connection removal complete")
def register_object_class(self, schema):
"""
@@ -303,7 +305,7 @@
"qmf.agent":self.name},
content=[qmfEvent.map_encode()])
# TRACE
- # logging.error("!!! Agent %s sending Event (%s)" %
+ # log.error("!!! Agent %s sending Event (%s)" %
# (self.name, str(msg)))
self._topic_sender.send(msg)
@@ -422,14 +424,14 @@
#
# Process inbound messages
#
- logging.debug("%s processing inbound messages..." % self.name)
+ trace.debug("%s processing inbound messages..." % self.name)
for i in range(batch_limit):
try:
msg = self._topic_receiver.fetch(timeout=0)
except Empty:
break
# TRACE
- # logging.error("!!! Agent %s: msg on %s [%s]" %
+ # log.error("!!! Agent %s: msg on %s [%s]" %
# (self.name, self._topic_receiver.source, msg))
self._dispatch(msg, _direct=False)
@@ -439,7 +441,7 @@
except Empty:
break
# TRACE
- # logging.error("!!! Agent %s: msg on %s [%s]" %
+ # log.error("!!! Agent %s: msg on %s [%s]" %
# (self.name, self._direct_receiver.source, msg))
self._dispatch(msg, _direct=True)
@@ -448,7 +450,7 @@
#
now = datetime.datetime.utcnow()
if now >= next_heartbeat:
- logging.debug("%s sending heartbeat..." % self.name)
+ trace.debug("%s sending heartbeat..." % self.name)
ind = Message(id=QMF_APP_ID,
subject=QmfAddress.SUBJECT_AGENT_HEARTBEAT,
properties={"method":"indication",
@@ -456,10 +458,10 @@
"qmf.agent":self.name},
content=self._makeAgentInfoBody())
# TRACE
- #logging.error("!!! Agent %s sending Heartbeat (%s)" %
+ #log.error("!!! Agent %s sending Heartbeat (%s)" %
# (self.name, str(ind)))
self._topic_sender.send(ind)
- logging.debug("Agent Indication Sent")
+ trace.debug("Agent Indication Sent")
next_heartbeat = now + datetime.timedelta(seconds = self._heartbeat_interval)
#
@@ -470,7 +472,7 @@
now = datetime.datetime.utcnow()
if (self._next_subscribe_event is None or
now >= self._next_subscribe_event):
- logging.debug("%s polling subscriptions..." % self.name)
+ trace.debug("%s polling subscriptions..." % self.name)
self._next_subscribe_event = now + datetime.timedelta(seconds=
self._max_duration)
dead_ss = {}
@@ -494,11 +496,11 @@
# notify application of pending WorkItems
#
if self._work_q_put and self._notifier:
- logging.debug("%s notifying application..." % self.name)
+ trace.debug("%s notifying application..." % self.name)
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
- logging.info("Calling agent notifier.indication")
+ trace.debug("Calling agent notifier.indication")
self._notifier.indication()
_callback_thread = None
@@ -521,7 +523,7 @@
timeout = timedelta_to_secs(next_timeout - now)
if self._running and timeout > 0.0:
- logging.debug("%s sleeping %s seconds..." % (self.name,
+ trace.debug("%s sleeping %s seconds..." % (self.name,
timeout))
try:
self._session.next_receiver(timeout=timeout)
@@ -529,7 +531,7 @@
pass
- logging.debug("Shutting down Agent %s thread" % self.name)
+ trace.debug("Shutting down Agent %s thread" % self.name)
#
# Private:
@@ -550,24 +552,24 @@
try:
reply_to = QmfAddress.from_string(str(reply_to))
except ValueError:
- logging.error("Invalid reply-to address '%s'" % reply_to)
+ log.error("Invalid reply-to address '%s'" % reply_to)
msg.subject = reply_to.get_subject()
try:
if reply_to.is_direct():
# TRACE
- #logging.error("!!! Agent %s direct REPLY-To:%s (%s)" %
+ #log.error("!!! Agent %s direct REPLY-To:%s (%s)" %
# (self.name, str(reply_to), str(msg)))
self._direct_sender.send(msg)
else:
# TRACE
- # logging.error("!!! Agent %s topic REPLY-To:%s (%s)" %
+ # log.error("!!! Agent %s topic REPLY-To:%s (%s)" %
# (self.name, str(reply_to), str(msg)))
self._topic_sender.send(msg)
- logging.debug("reply msg sent to [%s]" % str(reply_to))
+ trace.debug("reply msg sent to [%s]" % str(reply_to))
except SendError, e:
- logging.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
+ log.error("Failed to send reply msg '%s' (%s)" % (msg, str(e)))
def _send_query_response(self, content_type, cid, reply_to, objects):
"""
@@ -612,12 +614,11 @@
@param _direct: True if msg directly addressed to this agent.
"""
- # logging.debug( "Message received from Console! [%s]" % msg )
- # logging.error( "%s Message received from Console! [%s]" % (self.name, msg) )
+ trace.debug( "Message received from Console! [%s]" % msg )
opcode = msg.properties.get("qmf.opcode")
if not opcode:
- logging.warning("Ignoring unrecognized message '%s'" % msg)
+ log.warning("Ignoring unrecognized message '%s'" % msg)
return
version = 2 # @todo: fix me
cmap = {}; props={}
@@ -640,16 +641,16 @@
self._handleUnsubscribeReqMsg(msg, cmap, props, version, _direct)
elif opcode == OpCode.noop:
self._noop_pending = False
- logging.debug("No-op msg received.")
+ trace.debug("No-op msg received.")
else:
- logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
+ log.warning("Ignoring message with unrecognized 'opcode' value: '%s'"
% opcode)
def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ):
"""
Process a received agent-locate message
"""
- logging.debug("_handleAgentLocateMsg")
+ trace.debug("_handleAgentLocateMsg")
reply = False
if props.get("method") == "request":
@@ -676,21 +677,21 @@
m.correlation_id = msg.correlation_id
self._send_reply(m, msg.reply_to)
else:
- logging.debug("agent-locate msg not mine - no reply sent")
+ trace.debug("agent-locate msg not mine - no reply sent")
def _handleQueryMsg(self, msg, cmap, props, version, _direct ):
"""
Handle received query message
"""
- logging.debug("_handleQueryMsg")
+ trace.debug("_handleQueryMsg")
if "method" in props and props["method"] == "request":
if cmap:
try:
query = QmfQuery.from_map(cmap)
except TypeError:
- logging.error("Invalid Query format: '%s'" % str(cmap))
+ log.error("Invalid Query format: '%s'" % str(cmap))
return
target = query.get_target()
if target == QmfQuery.TARGET_PACKAGES:
@@ -700,13 +701,13 @@
elif target == QmfQuery.TARGET_SCHEMA:
self._querySchemaReply( msg, query)
elif target == QmfQuery.TARGET_AGENT:
- logging.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
+ log.warning("!!! @todo: Query TARGET=AGENT TBD !!!")
elif target == QmfQuery.TARGET_OBJECT_ID:
self._queryDataReply(msg, query, _idOnly=True)
elif target == QmfQuery.TARGET_OBJECT:
self._queryDataReply(msg, query)
else:
- logging.warning("Unrecognized query target: '%s'" % str(target))
+ log.warning("Unrecognized query target: '%s'" % str(target))
@@ -717,7 +718,7 @@
if "method" in props and props["method"] == "request":
mname = cmap.get(SchemaMethod.KEY_NAME)
if not mname:
- logging.warning("Invalid method call from '%s': no name"
+ log.warning("Invalid method call from '%s': no name"
% msg.reply_to)
return
@@ -774,7 +775,7 @@
try:
query = QmfQuery.from_map(query_map)
except TypeError:
- logging.warning("Invalid query for subscription: %s" %
+ log.warning("Invalid query for subscription: %s" %
str(query_map))
return
@@ -788,7 +789,7 @@
# self._work_q.put(WorkItem(WorkItem.SUBSCRIBE_REQUEST,
# msg.correlation_id, param))
# self._work_q_put = True
- logging.error("External Subscription TBD")
+ log.error("External Subscription TBD")
return
# validate the query - only specific objects, or
@@ -796,7 +797,7 @@
if (query.get_target() != QmfQuery.TARGET_OBJECT or
(query.get_selector() == QmfQuery.PREDICATE and
query.get_predicate())):
- logging.error("Subscriptions only support (wildcard) Object"
+ log.error("Subscriptions only support (wildcard) Object"
" Queries.")
err = QmfData.create(
{"reason": "Unsupported Query type for subscription.",
@@ -819,7 +820,7 @@
elif duration < self._min_duration:
duration = self._min_duration
except:
- logging.warning("Bad duration value: %s" % str(msg))
+ log.warning("Bad duration value: %s" % str(msg))
duration = self._default_duration
if interval is None:
@@ -830,7 +831,7 @@
if interval < self._min_interval:
interval = self._min_interval
except:
- logging.warning("Bad interval value: %s" % str(msg))
+ log.warning("Bad interval value: %s" % str(msg))
interval = self._default_interval
ss = _SubscriptionState(msg.reply_to,
@@ -867,7 +868,7 @@
if props.get("method") == "request":
sid = cmap.get("_subscription_id")
if not sid:
- logging.error("Invalid subscription refresh msg: %s" %
+ log.error("Invalid subscription refresh msg: %s" %
str(msg))
return
@@ -875,7 +876,7 @@
try:
ss = self._subscriptions.get(sid)
if not ss:
- logging.error("Ignoring unknown subscription: %s" %
+ log.error("Ignoring unknown subscription: %s" %
str(sid))
return
duration = cmap.get("_duration")
@@ -887,7 +888,7 @@
elif duration < self._min_duration:
duration = self._min_duration
except:
- logging.error("Bad duration value: %s" % str(msg))
+ log.error("Bad duration value: %s" % str(msg))
duration = None # use existing duration
ss.resubscribe(datetime.datetime.utcnow(), duration)
@@ -917,7 +918,7 @@
if props.get("method") == "request":
sid = cmap.get("_subscription_id")
if not sid:
- logging.warning("No subscription id supplied: %s" % msg)
+ log.warning("No subscription id supplied: %s" % msg)
return
self._lock.acquire()
@@ -1100,7 +1101,7 @@
response.append(obj.map_encode())
if response:
- logging.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id))
+ trace.debug("!!! %s publishing %s!!!" % (self.name, sub.correlation_id))
self._send_query_response( ContentType.data,
sub.correlation_id,
sub.reply_to,
@@ -1127,7 +1128,7 @@
self._lock.acquire()
try:
if not self._noop_pending:
- logging.debug("Sending noop to wake up [%s]" % self._address)
+ trace.debug("Sending noop to wake up [%s]" % self._address)
msg = Message(id=QMF_APP_ID,
subject=self.name,
properties={"method":"indication",
@@ -1137,7 +1138,7 @@
self._direct_sender.send( msg, sync=True )
self._noop_pending = True
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
finally:
self._lock.release()
@@ -1155,7 +1156,7 @@
super(AgentExternal, self).__init__(name, _domain, _notifier,
_heartbeat_interval,
_max_msg_size, _capacity)
- logging.error("AgentExternal TBD")
+ log.error("AgentExternal TBD")
@@ -1294,6 +1295,7 @@
if __name__ == '__main__':
# static test cases - no message passing, just exercise API
+ import logging
from common import (AgentName, SchemaProperty, qmfTypes, SchemaEventClass)
logging.getLogger().setLevel(logging.INFO)
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py?rev=917688&r1=917687&r2=917688&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/common.py Mon Mar 1 20:05:22 2010
@@ -620,208 +620,9 @@
-
-
-#==============================================================================
-#==============================================================================
-#==============================================================================
-
-
-
-
-class Arguments(object):
- def __init__(self, map):
- pass
-# self.map = map
-# self._by_hash = {}
-# key_count = self.map.keyCount()
-# a = 0
-# while a < key_count:
-# self._by_hash[self.map.key(a)] = self.by_key(self.map.key(a))
-# a += 1
-
-
-# def __getitem__(self, key):
-# return self._by_hash[key]
-
-
-# def __setitem__(self, key, value):
-# self._by_hash[key] = value
-# self.set(key, value)
-
-
-# def __iter__(self):
-# return self._by_hash.__iter__
-
-
-# def __getattr__(self, name):
-# if name in self._by_hash:
-# return self._by_hash[name]
-# return super.__getattr__(self, name)
-
-
-# def __setattr__(self, name, value):
-# #
-# # ignore local data members
-# #
-# if (name[0] == '_' or
-# name == 'map'):
-# return super.__setattr__(self, name, value)
-
-# if name in self._by_hash:
-# self._by_hash[name] = value
-# return self.set(name, value)
-
-# return super.__setattr__(self, name, value)
-
-
-# def by_key(self, key):
-# val = self.map.byKey(key)
-# vType = val.getType()
-# if vType == TYPE_UINT8: return val.asUint()
-# elif vType == TYPE_UINT16: return val.asUint()
-# elif vType == TYPE_UINT32: return val.asUint()
-# elif vType == TYPE_UINT64: return val.asUint64()
-# elif vType == TYPE_SSTR: return val.asString()
-# elif vType == TYPE_LSTR: return val.asString()
-# elif vType == TYPE_ABSTIME: return val.asInt64()
-# elif vType == TYPE_DELTATIME: return val.asUint64()
-# elif vType == TYPE_REF: return ObjectId(val.asObjectId())
-# elif vType == TYPE_BOOL: return val.asBool()
-# elif vType == TYPE_FLOAT: return val.asFloat()
-# elif vType == TYPE_DOUBLE: return val.asDouble()
-# elif vType == TYPE_UUID: return val.asUuid()
-# elif vType == TYPE_INT8: return val.asInt()
-# elif vType == TYPE_INT16: return val.asInt()
-# elif vType == TYPE_INT32: return val.asInt()
-# elif vType == TYPE_INT64: return val.asInt64()
-# else:
-# # when TYPE_MAP
-# # when TYPE_OBJECT
-# # when TYPE_LIST
-# # when TYPE_ARRAY
-# logging.error( "Unsupported Type for Get? '%s'" % str(val.getType()))
-# return None
-
-
-# def set(self, key, value):
-# val = self.map.byKey(key)
-# vType = val.getType()
-# if vType == TYPE_UINT8: return val.setUint(value)
-# elif vType == TYPE_UINT16: return val.setUint(value)
-# elif vType == TYPE_UINT32: return val.setUint(value)
-# elif vType == TYPE_UINT64: return val.setUint64(value)
-# elif vType == TYPE_SSTR:
-# if value:
-# return val.setString(value)
-# else:
-# return val.setString('')
-# elif vType == TYPE_LSTR:
-# if value:
-# return val.setString(value)
-# else:
-# return val.setString('')
-# elif vType == TYPE_ABSTIME: return val.setInt64(value)
-# elif vType == TYPE_DELTATIME: return val.setUint64(value)
-# elif vType == TYPE_REF: return val.setObjectId(value.impl)
-# elif vType == TYPE_BOOL: return val.setBool(value)
-# elif vType == TYPE_FLOAT: return val.setFloat(value)
-# elif vType == TYPE_DOUBLE: return val.setDouble(value)
-# elif vType == TYPE_UUID: return val.setUuid(value)
-# elif vType == TYPE_INT8: return val.setInt(value)
-# elif vType == TYPE_INT16: return val.setInt(value)
-# elif vType == TYPE_INT32: return val.setInt(value)
-# elif vType == TYPE_INT64: return val.setInt64(value)
-# else:
-# # when TYPE_MAP
-# # when TYPE_OBJECT
-# # when TYPE_LIST
-# # when TYPE_ARRAY
-# logging.error("Unsupported Type for Set? '%s'" % str(val.getType()))
-# return None
-
-
-
-#class MethodResponse(object):
-# def __init__(self, impl):
-# pass
-# self.impl = qmfengine.MethodResponse(impl)
-
-
-# def status(self):
-# return self.impl.getStatus()
-
-
-# def exception(self):
-# return self.impl.getException()
-
-
-# def text(self):
-# return exception().asString()
-
-
-# def args(self):
-# return Arguments(self.impl.getArgs())
-
-
-# def __getattr__(self, name):
-# myArgs = self.args()
-# return myArgs.__getattr__(name)
-
-
-# def __setattr__(self, name, value):
-# if name == 'impl':
-# return super.__setattr__(self, name, value)
-
-# myArgs = self.args()
-# return myArgs.__setattr__(name, value)
-
-
-
-# ##==============================================================================
-# ## QUERY
-# ##==============================================================================
-
-
-
-# def _doQuery(predicate, params ):
-# """
-# Given the predicate from a query, and a map of named parameters, apply the predicate
-# to the parameters, and return True or False.
-# """
-# if type(predicate) != list or len(predicate) < 1:
-# return False
-
-# elif opr == Query._LOGIC_AND:
-# logging.debug("_doQuery() AND: [%s]" % predicate )
-# rc = False
-# for exp in predicate[1:]:
-# rc = _doQuery( exp, params )
-# if not rc:
-# break
-# return rc
-
-# elif opr == Query._LOGIC_OR:
-# logging.debug("_doQuery() OR: [%s]" % predicate )
-# rc = False
-# for exp in predicate[1:]:
-# rc = _doQuery( exp, params )
-# if rc:
-# break
-# return rc
-
-# elif opr == Query._LOGIC_NOT:
-# logging.debug("_doQuery() NOT: [%s]" % predicate )
-# if len(predicate) != 2:
-# logging.warning("Malformed query not-expression received: '%s'" % predicate)
-# return False
-# return not _doQuery( predicate[1:], params )
-
-
-
-# else:
-# logging.warning("Unknown query operator received: '%s'" % opr)
-# return False
+##==============================================================================
+## QUERY
+##==============================================================================
@@ -1095,7 +896,7 @@
raise TypeError("Query expects to evaluate QmfData types.")
if not isinstance(pred, type([])):
- log_query.warning("Invalid type for predicate expression: '%s'" % str(pred))
+ log.warning("Invalid type for predicate expression: '%s'" % str(pred))
return False
# empty predicate - match all???
@@ -1140,8 +941,8 @@
if oper == QmfQuery.EXISTS:
if len(pred) != 2:
- log_query.warning("Malformed query: 'exists' operator"
- " - bad arguments '%s'" % str(pred))
+ log.warning("Malformed query: 'exists' operator"
+ " - bad arguments '%s'" % str(pred))
return False
### Q: Should we assume "quote", or should it be explicit?
### "foo" or ["quote" "foo"]
@@ -1150,7 +951,7 @@
try:
arg = self._fetch_pred_arg(pred[1], qmfData)
except AttributeError:
- log_query.debug("query parameter not found: '%s'" % str(pred))
+ log.warning("query parameter not found: '%s'" % str(pred))
return False
v = qmfData.has_value(arg)
log_query.debug("---> %s" % str(v))
@@ -1161,9 +962,9 @@
QmfQuery.LE, QmfQuery.GT, QmfQuery.GE,
QmfQuery.RE_MATCH]:
if len(pred) != 3:
- log_query.warning("Malformed query: '%s' operator"
- " - requires 2 arguments '%s'" %
- (oper, str(pred)))
+ log.warning("Malformed query: '%s' operator"
+ " - requires 2 arguments '%s'" %
+ (oper, str(pred)))
return False
# @todo: support regular expression match
log_query.debug("query evaluate binary op: [%s]" % str(pred))
@@ -1171,7 +972,7 @@
arg1 = self._fetch_pred_arg(pred[1], qmfData)
arg2 = self._fetch_pred_arg(pred[2], qmfData)
except AttributeError:
- log_query.debug("query parameter not found: '%s'" % str(pred))
+ log.warning("query parameter not found: '%s'" % str(pred))
return False
log_query.debug("query evaluate %s: %s, %s" % (oper, str(arg1), str(arg2)))
v = False
@@ -1183,11 +984,11 @@
elif oper == QmfQuery.GT: v = arg1 > arg2
elif oper == QmfQuery.GE: v = arg1 >= arg2
except TypeError:
- log_query.warning("query comparison failed: '%s'" % str(pred))
+ log.warning("query comparison failed: '%s'" % str(pred))
log_query.debug("---> %s" % str(v))
return v
- log_query.warning("Unrecognized query operator: [%s]" % str(pred[0]))
+ log.warning("Unrecognized query operator: [%s]" % str(pred[0]))
return False
def _fetch_pred_arg(self, arg, qmfData):
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py?rev=917688&r1=917687&r2=917688&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf2/console.py Mon Mar 1 20:05:22 2010
@@ -18,11 +18,11 @@
#
import sys
import os
-import logging
import platform
import time
import datetime
import Queue
+from logging import getLogger
from threading import Thread, Event
from threading import RLock
from threading import currentThread
@@ -41,6 +41,8 @@
_callback_thread=None
+log = getLogger("qmf")
+trace = getLogger("qmf.console")
##==============================================================================
@@ -213,6 +215,7 @@
Process query response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
+ trace.debug("Delivering to query mailbox (agent=%s)." % self.agent_name)
objects = reply.content
if isinstance(objects, type([])):
# convert from map to native types if needed
@@ -253,7 +256,7 @@
self.result += objects
if not "partial" in reply.properties:
- # logging.error("QUERY COMPLETE for %s" % str(self.context))
+ # log.error("QUERY COMPLETE for %s" % str(self.context))
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
self.console._work_q_put = True
@@ -262,8 +265,7 @@
def expire(self):
- logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
- datetime.datetime.utcnow())
+ trace.debug("Expiring query mailbox (agent=%s)." % self.agent_name)
# send along whatever (possibly none) has been received so far
wi = WorkItem(WorkItem.QUERY_COMPLETE, self.context, self.result)
self.console._work_q.put(wi)
@@ -291,6 +293,7 @@
"""
Process schema response messages.
"""
+ trace.debug("Delivering schema mailbox (id=%s)." % self.schema_id)
done = False
schemas = reply.content
if schemas and isinstance(schemas, type([])):
@@ -309,6 +312,7 @@
def expire(self):
+ trace.debug("Expiring schema mailbox (id=%s)." % self.schema_id)
self.destroy()
@@ -332,10 +336,10 @@
Process method response messages delivered to this mailbox.
Invoked by Console Management thread only.
"""
-
+ trace.debug("Delivering to method mailbox.")
_map = reply.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
result = None
else:
error=_map.get(SchemaMethod.KEY_ERROR)
@@ -358,8 +362,7 @@
The mailbox expired without receiving a reply.
Invoked by the Console Management thread only.
"""
- logging.debug("ASYNC MAILBOX EXPIRED @ %s!!!" %
- datetime.datetime.utcnow())
+ trace.debug("Expiring method mailbox.")
# send along an empty response
wi = WorkItem(WorkItem.METHOD_RESPONSE, self.context, None)
self.console._work_q.put(wi)
@@ -391,30 +394,30 @@
def subscribe(self, query):
agent = self.console.get_agent(self.agent_name)
if not agent:
- logging.warning("subscribed failed - unknown agent '%s'" %
+ log.warning("subscribed failed - unknown agent '%s'" %
self.agent_name)
return False
try:
- logging.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
+ trace.debug("Sending Subscribe to Agent (%s)" % self.agent_name)
agent._send_subscribe_req(query, self.get_address(), self.interval,
self.duration)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
return False
return True
def resubscribe(self, duration):
agent = self.console.get_agent(self.agent_name)
if not agent:
- logging.warning("resubscribed failed - unknown agent '%s'" %
+ log.warning("resubscribed failed - unknown agent '%s'" %
self.agent_name)
return False
try:
- logging.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
+ trace.debug("Sending resubscribe to Agent (%s)" % self.agent_name)
agent._send_resubscribe_req(self.get_address(),
self.agent_subscription_id, duration)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
return False
return True
@@ -430,7 +433,7 @@
try:
e_map = QmfData.from_map(error)
except TypeError:
- logging.warning("Invalid QmfData map received: '%s'"
+ log.warning("Invalid QmfData map received: '%s'"
% str(error))
e_map = QmfData.create({"error":"Unknown error"})
sp = SubscribeParams(None, None, None, e_map)
@@ -456,12 +459,12 @@
# else: data indication
agent_name = msg.properties.get("qmf.agent")
if not agent_name:
- logging.warning("Ignoring data_ind - no agent name given: %s" %
+ log.warning("Ignoring data_ind - no agent name given: %s" %
msg)
return
agent = self.console.get_agent(agent_name)
if not agent:
- logging.warning("Ignoring data_ind - unknown agent '%s'" %
+ log.warning("Ignoring data_ind - unknown agent '%s'" %
agent_name)
return
@@ -625,7 +628,7 @@
contents.
"""
if _reply_handle is not None:
- logging.error(" ASYNC REFRESH TBD!!!")
+ log.error(" ASYNC REFRESH TBD!!!")
return None
assert self._agent
@@ -677,28 +680,28 @@
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args
- logging.debug("Sending method req to Agent (%s)" % time.time())
+ trace.debug("Sending method req to Agent (%s)" % time.time())
try:
self._agent._send_method_req(_map, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
if _reply_handle is not None:
return True
- logging.debug("Waiting for response to method req (%s)" % _timeout)
+ trace.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
mbox.destroy()
if not replyMsg:
- logging.debug("Agent method req wait timed-out.")
+ trace.debug("Agent method req wait timed-out.")
return None
_map = replyMsg.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
return None
error=_map.get(SchemaMethod.KEY_ERROR)
@@ -751,7 +754,7 @@
self._packages = {} # map of {package-name:[list of class-names], } for this agent
self._subscriptions = [] # list of active standing subscriptions for this agent
self._announce_timestamp = None # datetime when last announce received
- logging.debug( "Created Agent with address: [%s]" % self._address )
+ trace.debug( "Created Agent with address: [%s]" % self._address )
def get_name(self):
@@ -768,7 +771,7 @@
if correlation_id:
msg.correlation_id = str(correlation_id)
# TRACE
- #logging.error("!!! Console %s sending to agent %s (%s)" %
+ #log.error("!!! Console %s sending to agent %s (%s)" %
# (self._console._name, self._name, str(msg)))
self._sender.send(msg)
# return handle
@@ -846,28 +849,28 @@
if _in_args:
_map[SchemaMethod.KEY_ARGUMENTS] = _in_args.copy()
- logging.debug("Sending method req to Agent (%s)" % time.time())
+ trace.debug("Sending method req to Agent (%s)" % time.time())
try:
self._send_method_req(_map, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
if _reply_handle is not None:
return True
- logging.debug("Waiting for response to method req (%s)" % _timeout)
+ trace.debug("Waiting for response to method req (%s)" % _timeout)
replyMsg = mbox.fetch(_timeout)
mbox.destroy()
if not replyMsg:
- logging.debug("Agent method req wait timed-out.")
+ trace.debug("Agent method req wait timed-out.")
return None
_map = replyMsg.content
if not _map or not isinstance(_map, type({})):
- logging.error("Invalid method call reply message")
+ log.error("Invalid method call reply message")
return None
return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
@@ -1076,10 +1079,10 @@
@type timeout: float
@param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever.
"""
- logging.debug("Destroying Console...")
+ trace.debug("Destroying Console...")
if self._conn:
self.remove_connection(self._conn, timeout)
- logging.debug("Console Destroyed")
+ trace.debug("Console Destroyed")
def add_connection(self, conn):
"""
@@ -1103,7 +1106,7 @@
" x-properties:"
" {type:direct}}}",
capacity=1)
- logging.debug("my direct addr=%s" % self._direct_recvr.source)
+ trace.debug("my direct addr=%s" % self._direct_recvr.source)
self._direct_sender = self._session.sender(str(self._address.get_node()) +
";{create:always,"
@@ -1111,7 +1114,7 @@
" {type:topic,"
" x-properties:"
" {type:direct}}}")
- logging.debug("my direct sender=%s" % self._direct_sender.target)
+ trace.debug("my direct sender=%s" % self._direct_sender.target)
# for receiving "broadcast" messages from agents
default_addr = QmfAddress.topic(QmfAddress.SUBJECT_AGENT_IND + ".#",
@@ -1120,7 +1123,7 @@
";{create:always,"
" node-properties:{type:topic}}",
capacity=1)
- logging.debug("default topic recv addr=%s" % self._topic_recvr.source)
+ trace.debug("default topic recv addr=%s" % self._topic_recvr.source)
# for sending to topic subscribers
@@ -1128,7 +1131,7 @@
self._topic_sender = self._session.sender(str(topic_addr) +
";{create:always,"
" node-properties:{type:topic}}")
- logging.debug("default topic send addr=%s" % self._topic_sender.target)
+ trace.debug("default topic send addr=%s" % self._topic_sender.target)
#
# Now that receivers are created, fire off the receive thread...
@@ -1150,17 +1153,17 @@
@param conn: connection previously added by add_connection()
"""
if self._conn and conn and conn != self._conn:
- logging.error( "Attempt to delete unknown connection: %s" % str(conn))
+ log.error( "Attempt to delete unknown connection: %s" % str(conn))
# tell connection thread to shutdown
self._operational = False
if self.isAlive():
# kick my thread to wake it up
self._wake_thread()
- logging.debug("waiting for console receiver thread to exit")
+ trace.debug("waiting for console receiver thread to exit")
self.join(timeout)
if self.isAlive():
- logging.error( "Console thread '%s' is hung..." % self.getName() )
+ log.error( "Console thread '%s' is hung..." % self.getName() )
self._direct_recvr.close()
self._direct_sender.close()
self._topic_recvr.close()
@@ -1168,7 +1171,7 @@
self._session.close()
self._session = None
self._conn = None
- logging.debug("console connection removal complete")
+ trace.debug("console connection removal complete")
def get_address(self):
@@ -1219,14 +1222,14 @@
content=query._predicate)
msg.reply_to = str(self._address)
msg.correlation_id = str(cid)
- logging.debug("Sending Agent Locate (%s)" % time.time())
+ trace.debug("Sending Agent Locate (%s)" % time.time())
# TRACE
- #logging.error("!!! Console %s sending agent locate (%s)" %
+ #log.error("!!! Console %s sending agent locate (%s)" %
# (self._name, str(msg)))
try:
self._topic_sender.send(msg)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
@@ -1234,10 +1237,10 @@
timeout = self._reply_timeout
new_agent = None
- logging.debug("Waiting for response to Agent Locate (%s)" % timeout)
+ trace.debug("Waiting for response to Agent Locate (%s)" % timeout)
mbox.fetch(timeout)
mbox.destroy()
- logging.debug("Agent Locate wait ended (%s)" % time.time())
+ trace.debug("Agent Locate wait ended (%s)" % time.time())
self._lock.acquire()
try:
new_agent = self._agent_map.get(name)
@@ -1288,10 +1291,10 @@
cid = mbox.get_address()
try:
- logging.debug("Sending Query to Agent (%s)" % time.time())
+ trace.debug("Sending Query to Agent (%s)" % time.time())
agent._send_query(query, cid)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
return None
@@ -1302,7 +1305,7 @@
if not _timeout:
_timeout = self._reply_timeout
- logging.debug("Waiting for response to Query (%s)" % _timeout)
+ trace.debug("Waiting for response to Query (%s)" % _timeout)
now = datetime.datetime.utcnow()
expire = now + datetime.timedelta(seconds=_timeout)
@@ -1311,7 +1314,7 @@
_timeout = timedelta_to_secs(expire - now)
reply = mbox.fetch(_timeout)
if not reply:
- logging.debug("Query wait timed-out.")
+ trace.debug("Query wait timed-out.")
break
objects = reply.content
@@ -1383,12 +1386,12 @@
mbox.destroy()
return None
- logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ trace.debug("Waiting for response to subscription (%s)" % _timeout)
# @todo: what if mbox expires here?
sp = mbox.fetch(_timeout)
if not sp:
- logging.debug("Subscription request wait timed-out.")
+ trace.debug("Subscription request wait timed-out.")
mbox.destroy()
return None
@@ -1405,7 +1408,7 @@
mbox = self._get_mailbox(subscription_id)
if not mbox:
- logging.warning("Subscription %s not found." % subscription_id)
+ log.warning("Subscription %s not found." % subscription_id)
return None
if isinstance(mbox, _AsyncSubscriptionMailbox):
@@ -1418,11 +1421,11 @@
# wait for reply
- logging.debug("Waiting for response to subscription (%s)" % _timeout)
+ trace.debug("Waiting for response to subscription (%s)" % _timeout)
sp = mbox.fetch(_timeout)
if not sp:
- logging.debug("re-subscribe request wait timed-out.")
+ trace.debug("re-subscribe request wait timed-out.")
# @todo???? mbox.destroy()
return None
@@ -1439,11 +1442,11 @@
agent = self.get_agent(mbox.agent_name)
if agent:
try:
- logging.debug("Sending UnSubscribe to Agent (%s)" % time.time())
+ trace.debug("Sending UnSubscribe to Agent (%s)" % time.time())
agent._send_unsubscribe_ind(subscription_id,
mbox.agent_subscription_id)
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
@@ -1453,7 +1456,7 @@
Make the console management thread loop wakeup from its next_receiver
sleep.
"""
- logging.debug("Sending noop to wake up [%s]" % self._address)
+ trace.debug("Sending noop to wake up [%s]" % self._address)
msg = Message(id=QMF_APP_ID,
subject=self._name,
properties={"method":"indication",
@@ -1462,7 +1465,7 @@
try:
self._direct_sender.send( msg, sync=True )
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
def run(self):
@@ -1484,7 +1487,7 @@
except Empty:
break
# TRACE:
- # logging.error("!!! Console %s: msg on %s [%s]" %
+ # log.error("!!! Console %s: msg on %s [%s]" %
# (self._name, self._topic_recvr.source, msg))
self._dispatch(msg, _direct=False)
@@ -1494,7 +1497,7 @@
except Empty:
break
# TRACE
- #logging.error("!!! Console %s: msg on %s [%s]" %
+ #log.error("!!! Console %s: msg on %s [%s]" %
# (self._name, self._direct_recvr.source, msg))
self._dispatch(msg, _direct=True)
@@ -1506,7 +1509,7 @@
# new stuff on work queue, kick the the application...
self._work_q_put = False
_callback_thread = currentThread()
- logging.info("Calling console notifier.indication")
+ trace.debug("Calling console notifier.indication")
self._notifier.indication()
_callback_thread = None
@@ -1531,12 +1534,12 @@
if self._operational and timeout > 0.0:
try:
- logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
+ trace.debug("waiting for next rcvr (timeout=%s)..." % timeout)
self._session.next_receiver(timeout = timeout)
except Empty:
pass
- logging.debug("Shutting down Console thread")
+ trace.debug("Shutting down Console thread")
def get_objects(self,
_object_id=None,
@@ -1640,12 +1643,11 @@
"""
PRIVATE: Process a message received from an Agent
"""
- #logging.debug( "Message received from Agent! [%s]" % msg )
- #logging.error( "Message received from Agent! [%s]" % msg )
+ trace.debug( "Message received from Agent! [%s]" % msg )
opcode = msg.properties.get("qmf.opcode")
if not opcode:
- logging.error("Ignoring unrecognized message '%s'" % msg)
+ log.error("Ignoring unrecognized message '%s'" % msg)
return
version = 2 # @todo: fix me
@@ -1673,9 +1675,9 @@
else:
self._handle_indication_msg(msg, cmap, version, _direct)
elif opcode == OpCode.noop:
- logging.debug("No-op msg received.")
+ trace.debug("No-op msg received.")
else:
- logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
+ log.warning("Ignoring message with unrecognized 'opcode' value: '%s'" % opcode)
def _handle_agent_ind_msg(self, msg, cmap, version, direct):
@@ -1683,15 +1685,15 @@
Process a received agent-ind message. This message may be a response to a
agent-locate, or it can be an unsolicited agent announce.
"""
- logging.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
+ trace.debug("_handle_agent_ind_msg '%s' (%s)" % (msg, time.time()))
ai_map = msg.content
if not ai_map or not isinstance(ai_map, type({})):
- logging.warning("Bad agent-ind message received: '%s'" % msg)
+ log.warning("Bad agent-ind message received: '%s'" % msg)
return
name = ai_map.get("_name")
if not name:
- logging.warning("Bad agent-ind message received: agent name missing"
+ log.warning("Bad agent-ind message received: agent name missing"
" '%s'" % msg)
return
@@ -1725,48 +1727,48 @@
if matched:
# unsolicited, but newly discovered
- logging.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
+ trace.debug("AGENT_ADDED for %s (%s)" % (agent, time.time()))
wi = WorkItem(WorkItem.AGENT_ADDED, None, {"agent": agent})
self._work_q.put(wi)
self._work_q_put = True
if correlated:
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ trace.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
def _handle_response_msg(self, msg, cmap, version, direct):
"""
Process a received data-ind message.
"""
- logging.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
+ trace.debug("_handle_response_msg '%s' (%s)" % (msg, time.time()))
mbox = self._get_mailbox(msg.correlation_id)
if not mbox:
- logging.warning("Response msg received with unknown correlation_id"
+ log.warning("Response msg received with unknown correlation_id"
" msg='%s'" % str(msg))
return
# wake up all waiters
- logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
+ trace.debug("waking waiters for correlation id %s" % msg.correlation_id)
mbox.deliver(msg)
def _handle_indication_msg(self, msg, cmap, version, _direct):
aname = msg.properties.get("qmf.agent")
if not aname:
- logging.debug("No agent name field in indication message.")
+ trace.debug("No agent name field in indication message.")
return
content_type = msg.properties.get("qmf.content")
if (content_type != ContentType.event or
not isinstance(msg.content, type([]))):
- logging.warning("Bad event indication message received: '%s'" % msg)
+ log.warning("Bad event indication message received: '%s'" % msg)
return
emap = msg.content[0]
if not isinstance(emap, type({})):
- logging.debug("Invalid event body in indication message: '%s'" % msg)
+ trace.debug("Invalid event body in indication message: '%s'" % msg)
return
agent = None
@@ -1776,18 +1778,18 @@
finally:
self._lock.release()
if not agent:
- logging.debug("Agent '%s' not known." % aname)
+ trace.debug("Agent '%s' not known." % aname)
return
try:
# @todo: schema???
event = QmfEvent.from_map(emap)
except TypeError:
- logging.debug("Invalid QmfEvent map received: %s" % str(emap))
+ trace.debug("Invalid QmfEvent map received: %s" % str(emap))
return
# @todo: schema? Need to fetch it, but not from this thread!
# This thread can not pend on a request.
- logging.debug("Publishing event received from agent %s" % aname)
+ trace.debug("Publishing event received from agent %s" % aname)
wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
{"agent":agent,
"event":event})
@@ -1836,12 +1838,12 @@
next_expire_delta = lifetime_delta
self._lock.acquire()
try:
- logging.debug("!!! expiring agents '%s'" % now)
+ trace.debug("!!! expiring agents '%s'" % now)
for agent in self._agent_map.itervalues():
if agent._announce_timestamp:
agent_deathtime = agent._announce_timestamp + lifetime_delta
if agent_deathtime <= now:
- logging.debug("AGENT_DELETED for %s" % agent)
+ trace.debug("AGENT_DELETED for %s" % agent)
agent._announce_timestamp = None
wi = WorkItem(WorkItem.AGENT_DELETED, None,
{"agent":agent})
@@ -1853,7 +1855,7 @@
next_expire_delta = agent_deathtime - now
self._next_agent_expire = now + next_expire_delta
- logging.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
+ trace.debug("!!! next expire cycle = '%s'" % self._next_agent_expire)
finally:
self._lock.release()
@@ -1863,7 +1865,7 @@
"""
Factory to create/retrieve an agent for this console
"""
- logging.debug("creating agent %s" % name)
+ trace.debug("creating agent %s" % name)
self._lock.acquire()
try:
agent = self._agent_map.get(name)
@@ -1879,9 +1881,9 @@
" x-properties:"
" {type:direct}}}")
except:
- logging.warning("Unable to create sender for %s" % name)
+ log.warning("Unable to create sender for %s" % name)
return None
- logging.debug("created agent sender %s" % agent._sender.target)
+ trace.debug("created agent sender %s" % agent._sender.target)
self._agent_map[name] = agent
finally:
@@ -1985,11 +1987,11 @@
if need_fetch:
mbox = _SchemaPrefetchMailbox(self, schema_id)
query = QmfQuery.create_id(QmfQuery.TARGET_SCHEMA, schema_id)
- logging.debug("Sending Schema Query to Agent (%s)" % time.time())
+ trace.debug("Sending Schema Query to Agent (%s)" % time.time())
try:
agent._send_query(query, mbox.get_address())
except SendError, e:
- logging.error(str(e))
+ log.error(str(e))
mbox.destroy()
self._lock.acquire()
try:
@@ -2042,7 +2044,7 @@
try:
mid = long(mid)
except TypeError:
- logging.error("Invalid mailbox id: %s" % str(mid))
+ log.error("Invalid mailbox id: %s" % str(mid))
return None
self._lock.acquire()
@@ -2057,7 +2059,7 @@
try:
mid = long(mid)
except TypeError:
- logging.error("Invalid mailbox id: %s" % str(mid))
+ log.error("Invalid mailbox id: %s" % str(mid))
return None
self._lock.acquire()
@@ -2243,36 +2245,36 @@
# count += 1
# try:
# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
-# logging.debug("Console Event AGENT_ADDED received")
+# trace.debug("Console Event AGENT_ADDED received")
# if self._handler:
# self._handler.agent_added(AgentProxy(self._event.agent, None))
# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
-# logging.debug("Console Event AGENT_DELETED received")
+# trace.debug("Console Event AGENT_DELETED received")
# if self._handler:
# self._handler.agent_deleted(AgentProxy(self._event.agent, None))
# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
-# logging.debug("Console Event NEW_PACKAGE received")
+# trace.debug("Console Event NEW_PACKAGE received")
# if self._handler:
# self._handler.new_package(self._event.name)
# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
-# logging.debug("Console Event NEW_CLASS received")
+# trace.debug("Console Event NEW_CLASS received")
# if self._handler:
# self._handler.new_class(SchemaClassKey(self._event.classKey))
# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
-# logging.debug("Console Event OBJECT_UPDATE received")
+# trace.debug("Console Event OBJECT_UPDATE received")
# if self._handler:
# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
# self._event.hasProps, self._event.hasStats)
# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
-# logging.debug("Console Event EVENT_RECEIVED received")
+# trace.debug("Console Event EVENT_RECEIVED received")
# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
-# logging.debug("Console Event AGENT_HEARTBEAT received")
+# trace.debug("Console Event AGENT_HEARTBEAT received")
# if self._handler:
# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
-# logging.debug("Console Event METHOD_RESPONSE received")
+# trace.debug("Console Event METHOD_RESPONSE received")
# else:
-# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
+# trace.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
# except e:
# print "Exception caught in callback thread:", e
# self.impl.popEvent()
@@ -2301,17 +2303,17 @@
# def shutdown(self):
-# logging.debug("broker.shutdown() called.")
+# trace.debug("broker.shutdown() called.")
# self.console.impl.delConnection(self.impl)
# self.conn.del_conn_handler(self)
# if self._session:
# self.impl.sessionClosed()
-# logging.debug("broker.shutdown() sessionClosed done.")
+# trace.debug("broker.shutdown() sessionClosed done.")
# self._session.destroy()
-# logging.debug("broker.shutdown() session destroy done.")
+# trace.debug("broker.shutdown() session destroy done.")
# self._session = None
# self._operational = False
-# logging.debug("broker.shutdown() done.")
+# trace.debug("broker.shutdown() done.")
# def wait_for_stable(self, timeout = None):
@@ -2344,24 +2346,24 @@
# while valid:
# count += 1
# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
-# logging.debug("Broker Event BROKER_INFO received");
+# trace.debug("Broker Event BROKER_INFO received");
# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
-# logging.debug("Broker Event DECLARE_QUEUE received");
+# trace.debug("Broker Event DECLARE_QUEUE received");
# self.conn.impl.declareQueue(self._session.handle, self._event.name)
# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
-# logging.debug("Broker Event DELETE_QUEUE received");
+# trace.debug("Broker Event DELETE_QUEUE received");
# self.conn.impl.deleteQueue(self._session.handle, self._event.name)
# elif self._event.kind == qmfengine.BrokerEvent.BIND:
-# logging.debug("Broker Event BIND received");
+# trace.debug("Broker Event BIND received");
# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
# elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
-# logging.debug("Broker Event UNBIND received");
+# trace.debug("Broker Event UNBIND received");
# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
-# logging.debug("Broker Event SETUP_COMPLETE received");
+# trace.debug("Broker Event SETUP_COMPLETE received");
# self.impl.startProtocol()
# elif self._event.kind == qmfengine.BrokerEvent.STABLE:
-# logging.debug("Broker Event STABLE received");
+# trace.debug("Broker Event STABLE received");
# self._cv.acquire()
# try:
# self._stable = True
@@ -2388,7 +2390,7 @@
# valid = self.impl.getXmtMessage(self._xmtMessage)
# while valid:
# count += 1
-# logging.debug("Broker: sending msg on connection")
+# trace.debug("Broker: sending msg on connection")
# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
# self.impl.popXmt()
# valid = self.impl.getXmtMessage(self._xmtMessage)
@@ -2406,14 +2408,14 @@
# def conn_event_connected(self):
-# logging.debug("Broker: Connection event CONNECTED")
+# trace.debug("Broker: Connection event CONNECTED")
# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
# self.impl.sessionOpened(self._session.handle)
# self._do_events()
# def conn_event_disconnected(self, error):
-# logging.debug("Broker: Connection event DISCONNECTED")
+# trace.debug("Broker: Connection event DISCONNECTED")
# pass
@@ -2422,14 +2424,14 @@
# def sess_event_session_closed(self, context, error):
-# logging.debug("Broker: Session event CLOSED")
+# trace.debug("Broker: Session event CLOSED")
# self.impl.sessionClosed()
# def sess_event_recv(self, context, message):
-# logging.debug("Broker: Session event MSG_RECV")
+# trace.debug("Broker: Session event MSG_RECV")
# if not self._operational:
-# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
+# log.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
# self.impl.handleRcvMessage(message)
# self._do_events()
@@ -2447,6 +2449,7 @@
if __name__ == '__main__':
# temp test code
+ import logging
from common import (qmfTypes, SchemaProperty)
logging.getLogger().setLevel(logging.INFO)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org