You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/01/21 22:50:39 UTC
svn commit: r901871 - in /qpid/branches/qmfv2/qpid/python/qmf: qmfAgent.py
qmfCommon.py qmfConsole.py test/events.py
Author: kgiusti
Date: Thu Jan 21 21:50:39 2010
New Revision: 901871
URL: http://svn.apache.org/viewvc?rev=901871&view=rev
Log:
QPID-2261: Checkpoint initial (brain-dead) event implementation.
Added:
qpid/branches/qmfv2/qpid/python/qmf/test/events.py
Modified:
qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py?rev=901871&r1=901870&r2=901871&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py Thu Jan 21 21:50:39 2010
@@ -92,6 +92,11 @@
self._conn = None
self._session = None
+ self._direct_receiver = None
+ self._locate_receiver = None
+ self._ind_sender = None
+ self._event_sender = None
+
self._lock = Lock()
self._packages = {}
self._schema_timestamp = long(0)
@@ -119,26 +124,41 @@
return self.name
def set_connection(self, conn):
- my_addr = QmfAddress.direct(self.name, self._domain)
- locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
- ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
-
- logging.debug("my direct addr=%s" % my_addr)
- logging.debug("agent.locate addr=%s" % locate_addr)
- logging.debug("agent.ind addr=%s" % ind_addr)
-
self._conn = conn
self._session = self._conn.session()
+
+ my_addr = QmfAddress.direct(self.name, self._domain)
self._direct_receiver = self._session.receiver(str(my_addr) +
";{create:always,"
" node-properties:"
- " {type:topic, x-properties: {type:direct}}}",
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}",
capacity=self._capacity)
- self._locate_receiver = self._session.receiver(str(locate_addr) +
- ";{create:always, node-properties:{type:topic}}",
+ logging.debug("my direct addr=%s" % self._direct_receiver.source)
+
+ locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+ self._locate_receiver = self._session.receiver(str(locate_addr) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic}}",
capacity=self._capacity)
+ logging.debug("agent.locate addr=%s" % self._locate_receiver.source)
+
+
+ ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
self._ind_sender = self._session.sender(str(ind_addr) +
- ";{create:always, node-properties:{type:topic}}")
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic}}")
+ logging.debug("agent.ind addr=%s" % self._ind_sender.target)
+
+ my_events = QmfAddress.topic(self.name, self._domain)
+ self._event_sender = self._session.sender(str(my_events) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic}}")
+ logging.debug("my event addr=%s" % self._event_sender.target)
self._running = True
self.start()
@@ -161,8 +181,13 @@
if self.isAlive():
logging.error( "Agent thread '%s' is hung..." % self.name)
self._direct_receiver.close()
+ self._direct_receiver = None
self._locate_receiver.close()
+ self._locate_receiver = None
self._ind_sender.close()
+ self._ind_sender = None
+ self._event_sender.close()
+ self._event_sender = None
self._session.close()
self._session = None
self._conn = None
@@ -195,8 +220,20 @@
def register_event_class(self, schema):
return self.register_object_class(schema)
- def raiseEvent(self, qmfEvent):
- logging.error("!!!Agent.raiseEvent() TBD!!!")
+ def raise_event(self, qmfEvent):
+ """
+ TBD
+ """
+ if not self._event_sender:
+ raise Exception("No connection available")
+
+ # @todo: should we validate against the schema?
+ _map = {"_name": self.get_name(),
+ "_event": qmfEvent.map_encode()}
+ msg = Message(subject=makeSubject(OpCode.event_ind),
+ properties={"method":"response"},
+ content={MsgKey.event:_map})
+ self._event_sender.send(msg)
def add_object(self, data ):
"""
Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py?rev=901871&r1=901870&r2=901871&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py Thu Jan 21 21:50:39 2010
@@ -37,7 +37,11 @@
AMQP_QMF_AGENT_LOCATE = "agent.locate"
AMQP_QMF_AGENT_INDICATION = "agent.ind"
-
+AMQP_QMF_AGENT_EVENT="agent.event"
+# agent.ind[.<agent-name>]
+# agent.event.<sev>.<agent-name>
+# sev="strings"
+#
AMQP_QMF_SUBJECT = "qmf"
AMQP_QMF_VERSION = 4
@@ -52,6 +56,7 @@
object_id="object_id"
data_obj="object"
method="method"
+ event="event"
class OpCode(object):
@@ -544,8 +549,19 @@
change in some aspect of the system under managment.
"""
KEY_TIMESTAMP = "_timestamp"
+ KEY_SEVERITY = "_severity"
+
+ SEV_EMERG = "emerg"
+ SEV_ALERT = "alert"
+ SEV_CRIT = "crit"
+ SEV_ERR = "err"
+ SEV_WARNING = "warning"
+ SEV_NOTICE = "notice"
+ SEV_INFO = "info"
+ SEV_DEBUG = "debug"
- def __init__(self, _timestamp=None, _values={}, _subtypes={}, _tag=None,
+ def __init__(self, _timestamp=None, _sev=SEV_NOTICE, _values={},
+ _subtypes={}, _tag=None,
_map=None,
_schema=None, _const=True):
"""
@@ -567,6 +583,7 @@
super(QmfEvent, self).__init__(_map=_map, _schema=_schema,
_const=_const)
_timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp)
+ _sev = _map.get(self.KEY_SEVERITY, _sev)
else:
super(QmfEvent, self).__init__(_values=_values,
_subtypes=_subtypes, _tag=_tag,
@@ -579,10 +596,12 @@
except:
raise TypeError("QmfEvent: a numeric timestamp is required.")
- def _create(cls, timestamp, values,
+ self._severity = _sev
+
+ def _create(cls, timestamp, severity, values,
_subtypes={}, _tag=None, _schema=None, _const=False):
- return cls(_timestamp=timestamp, _values=values, _subtypes=_subtypes,
- _tag=_tag, _schema=_schema, _const=_const)
+ return cls(_timestamp=timestamp, _sev=severity, _values=values,
+ _subtypes=_subtypes, _tag=_tag, _schema=_schema, _const=_const)
create = classmethod(_create)
def _from_map(cls, map_, _schema=None, _const=False):
@@ -592,9 +611,13 @@
def get_timestamp(self):
return self._timestamp
+ def get_severity(self):
+ return self._severity
+
def map_encode(self):
_map = super(QmfEvent, self).map_encode()
_map[self.KEY_TIMESTAMP] = self._timestamp
+ _map[self.KEY_SEVERITY] = self._severity
return _map
Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py?rev=901871&r1=901870&r2=901871&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py Thu Jan 21 21:50:39 2010
@@ -34,7 +34,7 @@
QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
SchemaClass, SchemaClassId, SchemaEventClass,
- SchemaObjectClass, WorkItem, SchemaMethod)
+ SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
@@ -89,7 +89,7 @@
def __init__(self):
self.lock = Lock()
- self.sequence = 1L
+ self.sequence = long(time.time()) # pseudo-randomize seq start
self.pending = {}
@@ -512,6 +512,15 @@
return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
_error=_map.get(SchemaMethod.KEY_ERROR))
+ def enable_events(self):
+ raise Exception("enable_events tbd")
+
+ def disable_events(self):
+ raise Exception("disable_events tbd")
+
+ def destroy(self):
+ raise Exception("destroy tbd")
+
def __repr__(self):
return str(self._address)
@@ -662,18 +671,21 @@
" x-properties:"
" {type:direct}}}",
capacity=1)
- logging.debug("local addr=%s" % self._address)
+ logging.debug("my direct addr=%s" % self._direct_recvr.source)
+
ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
- logging.debug("agent.ind addr=%s" % ind_addr)
self._announce_recvr = self._session.receiver(str(ind_addr) +
";{create:always,"
" node-properties:{type:topic}}",
capacity=1)
+ logging.debug("agent.ind addr=%s" % self._announce_recvr.source)
+
locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
- logging.debug("agent.locate addr=%s" % locate_addr)
self._locate_sender = self._session.sender(str(locate_addr) +
";{create:always,"
" node-properties:{type:topic}}")
+ logging.debug("agent.locate addr=%s" % self._locate_sender.target)
+
#
# Now that receivers are created, fire off the receive thread...
#
@@ -898,6 +910,14 @@
break
self._dispatch(msg, _direct=True)
+ for agent in self._agent_map.itervalues():
+ try:
+ msg = agent._event_recvr.fetch(timeout = 0)
+ except Empty:
+ continue
+ self._dispatch(msg, _direct=False)
+
+
self._expireAgents() # check for expired agents
#if qLen == 0 and self._work_q.qsize() and self._notifier:
@@ -917,7 +937,7 @@
timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds
try:
logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
- self._session.next_receiver(timeout = timeout)
+ xxx = self._session.next_receiver(timeout = timeout)
except Empty:
pass
@@ -1004,9 +1024,7 @@
"""
PRIVATE: Process a message received from an Agent
"""
-
logging.debug( "Message received from Agent! [%s]" % msg )
-
try:
version,opcode = parseSubject(msg.subject)
# @todo: deal with version mismatch!!!
@@ -1025,7 +1043,7 @@
elif opcode == OpCode.data_ind:
self._handleDataIndMsg(msg, cmap, version, _direct)
elif opcode == OpCode.event_ind:
- logging.warning("!!! event_ind TBD !!!")
+ self._handleEventIndMsg(msg, cmap, version, _direct)
elif opcode == OpCode.managed_object:
logging.warning("!!! managed_object TBD !!!")
elif opcode == OpCode.object_ind:
@@ -1082,6 +1100,8 @@
if not agent:
# need to create and add a new agent
agent = self._createAgent(name)
+ if not agent:
+ return # failed to add agent
# lock out expiration scanning code
self._lock.acquire()
@@ -1137,6 +1157,41 @@
logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
self._req_correlation.put_data(msg.correlation_id, msg)
+ def _handleEventIndMsg(self, msg, cmap, version, _direct):
+ ei_map = cmap.get(MsgKey.event)
+ if not ei_map or not isinstance(ei_map, type({})):
+ logging.warning("Bad event indication message received: '%s'" % msg)
+ return
+
+ aname = ei_map.get("_name")
+ emap = ei_map.get("_event")
+ if not aname:
+ logging.debug("No '_name' field in event indication message.")
+ return
+ if not emap:
+ logging.debug("No '_event' field in event indication message.")
+ return
+ # @todo: do I need to lock this???
+ agent = self._agent_map.get(aname)
+ if not agent:
+ logging.debug("Agent '%s' not known." % aname)
+ return
+ try:
+ # @todo: schema???
+ event = QmfEvent.from_map(emap)
+ except TypeError:
+ logging.debug("Invalid QmfEvent map received: %s" % str(emap))
+ return
+
+ # @todo: schema? Need to fetch it, but not from this thread!
+ # This thread can not pend on a request.
+ logging.debug("Publishing event received from agent %s" % aname)
+ wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
+ {"agent":agent,
+ "event":event})
+ self._work_q.put(wi)
+ self._work_q_put = True
+
def _expireAgents(self):
"""
@@ -1158,6 +1213,7 @@
agent._announce_timestamp = None
wi = WorkItem(WorkItem.AGENT_DELETED, None,
{"agent":agent})
+ # @todo: remove agent from self._agent_map
self._work_q.put(wi)
self._work_q_put = True
else:
@@ -1175,7 +1231,7 @@
"""
Factory to create/retrieve an agent for this console
"""
-
+ logging.debug("creating agent %s" % name)
self._lock.acquire()
try:
agent = self._agent_map.get(name)
@@ -1183,12 +1239,28 @@
return agent
agent = Agent(name, self)
- agent._sender = self._session.sender(str(agent._address) +
- ";{create:always,"
- " node-properties:"
- " {type:topic,"
- " x-properties:"
- " {type:direct}}}")
+ try:
+ agent._sender = self._session.sender(str(agent._address) +
+ ";{create:always,"
+ " node-properties:"
+ " {type:topic,"
+ " x-properties:"
+ " {type:direct}}}")
+ except:
+ logging.warning("Unable to create sender for %s" % name)
+ return None
+ logging.debug("created agent sender %s" % agent._sender.target)
+
+ events_addr = QmfAddress.topic(name, self._domain)
+ try:
+ agent._event_recvr = self._session.receiver(str(events_addr) +
+ ";{create:always,"
+ " node-properties:{type:topic}}",
+ capacity=1)
+ except:
+ logging.warning("Unable to create event receiver for %s" % name)
+ return None
+ logging.debug("created agent event receiver %s" % agent._event_recvr.source)
self._agent_map[name] = agent
finally:
Added: qpid/branches/qmfv2/qpid/python/qmf/test/events.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/events.py?rev=901871&view=auto
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/events.py (added)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/events.py Thu Jan 21 21:50:39 2010
@@ -0,0 +1,193 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import time
+import datetime
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf.qmfCommon import (Notifier, SchemaObjectClass, SchemaClassId,
+ SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+ QmfData, QmfQueryPredicate, SchemaEventClass,
+ QmfEvent)
+import qmf.qmfConsole
+from qmf.qmfAgent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+ def __init__(self):
+ self._event = Event()
+
+ def indication(self):
+ # note: called by qmf daemon thread
+ self._event.set()
+
+ def wait_for_work(self, timeout):
+ # note: called by application thread to wait
+ # for qmf to generate work
+ self._event.wait(timeout)
+ timed_out = self._event.isSet() == False
+ if not timed_out:
+ self._event.clear()
+ return True
+ return False
+
+
+class _agentApp(Thread):
+ def __init__(self, name, broker_url, heartbeat):
+ Thread.__init__(self)
+ self.timeout = 3
+ self.broker_url = broker_url
+ self.notifier = _testNotifier()
+ self.agent = Agent(name,
+ _notifier=self.notifier,
+ _heartbeat_interval=heartbeat)
+
+ # Dynamically construct a management database
+
+ _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage",
+ "MyClass",
+ stype=SchemaClassId.TYPE_EVENT),
+ _desc="A test event schema")
+ # add properties
+ _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8))
+ _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+ # Add schema to Agent
+ self.schema = _schema
+ self.agent.register_object_class(_schema)
+
+ self.running = False
+
+ def start_app(self):
+ self.running = True
+ self.start()
+
+ def stop_app(self):
+ self.running = False
+ # wake main thread
+ self.notifier.indication() # hmmm... collide with daemon???
+ self.join(self.timeout)
+ if self.isAlive():
+ raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+ def run(self):
+ # broker_url = "user/passwd@hostname:port"
+ conn = qpid.messaging.Connection(self.broker_url.host,
+ self.broker_url.port,
+ self.broker_url.user,
+ self.broker_url.password)
+ conn.connect()
+ self.agent.set_connection(conn)
+
+ counter = 1
+ while self.running:
+ # post an event every second
+ event = QmfEvent.create(long(time.time() * 1000),
+ QmfEvent.SEV_WARNING,
+ {"prop-1": counter,
+ "prop-2": str(datetime.datetime.utcnow())},
+ _schema=self.schema)
+ counter += 1
+ self.agent.raise_event(event)
+ wi = self.agent.get_next_workitem(timeout=0)
+ while wi is not None:
+ logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+ self.agent.release_workitem(wi)
+ wi = self.agent.get_next_workitem(timeout=0)
+ self.notifier.wait_for_work(1)
+
+ self.agent.remove_connection(self.timeout)
+ self.agent.destroy(self.timeout)
+
+
+
+class BaseTest(unittest.TestCase):
+ def configure(self, config):
+ self.config = config
+ self.broker = config.broker
+ self.defines = self.config.defines
+
+ def setUp(self):
+ # one second agent indication interval
+ self.agent1 = _agentApp("agent1", self.broker, 1)
+ self.agent1.start_app()
+ self.agent2 = _agentApp("agent2", self.broker, 1)
+ self.agent2.start_app()
+
+ def tearDown(self):
+ if self.agent1:
+ self.agent1.stop_app()
+ self.agent1 = None
+ if self.agent2:
+ self.agent2.stop_app()
+ self.agent2 = None
+
+ def test_get_events(self):
+ # create console
+ # find agents
+
+ self.notifier = _testNotifier()
+ self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+ agent_timeout=3)
+ self.conn = qpid.messaging.Connection(self.broker.host,
+ self.broker.port,
+ self.broker.user,
+ self.broker.password)
+ self.conn.connect()
+ self.console.addConnection(self.conn)
+
+ # find the agents
+ for aname in ["agent1", "agent2"]:
+ agent = self.console.find_agent(aname, timeout=3)
+ self.assertTrue(agent and agent.get_name() == aname)
+
+ # now wait for events
+ agent1_events = agent2_events = 0
+ wi = self.console.get_next_workitem(timeout=4)
+ while wi:
+ if wi.get_type() == wi.EVENT_RECEIVED:
+ event = wi.get_params().get("event")
+ self.assertTrue(isinstance(event, QmfEvent))
+ self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING)
+ self.assertTrue(event.get_value("prop-1") > 0)
+
+ agent = wi.get_params().get("agent")
+ if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+ self.fail("Unexpected workitem from agent")
+ else:
+ if agent.get_name() == "agent1":
+ agent1_events += 1
+ elif agent.get_name() == "agent2":
+ agent2_events += 1
+ else:
+ self.fail("Unexpected agent name received: %s" %
+ agent.get_name())
+ if agent1_events and agent2_events:
+ break;
+
+ wi = self.console.get_next_workitem(timeout=4)
+
+ self.assertTrue(agent1_events > 0 and agent2_events > 0)
+
+ self.console.destroy(10)
+
+
+
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org