You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/01/21 22:50:39 UTC

svn commit: r901871 - in /qpid/branches/qmfv2/qpid/python/qmf: qmfAgent.py qmfCommon.py qmfConsole.py test/events.py

Author: kgiusti
Date: Thu Jan 21 21:50:39 2010
New Revision: 901871

URL: http://svn.apache.org/viewvc?rev=901871&view=rev
Log:
QPID-2261: Checkpoint initial (brain-dead) event implementation.

Added:
    qpid/branches/qmfv2/qpid/python/qmf/test/events.py
Modified:
    qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
    qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
    qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py?rev=901871&r1=901870&r2=901871&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py Thu Jan 21 21:50:39 2010
@@ -92,6 +92,11 @@
 
         self._conn = None
         self._session = None
+        self._direct_receiver = None
+        self._locate_receiver = None
+        self._ind_sender = None
+        self._event_sender = None
+
         self._lock = Lock()
         self._packages = {}
         self._schema_timestamp = long(0)
@@ -119,26 +124,41 @@
         return self.name
 
     def set_connection(self, conn):
-        my_addr = QmfAddress.direct(self.name, self._domain)
-        locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
-        ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
-
-        logging.debug("my direct addr=%s" % my_addr)
-        logging.debug("agent.locate addr=%s" % locate_addr)
-        logging.debug("agent.ind addr=%s" % ind_addr)
-
         self._conn = conn
         self._session = self._conn.session()
+
+        my_addr = QmfAddress.direct(self.name, self._domain)
         self._direct_receiver = self._session.receiver(str(my_addr) +
                                                        ";{create:always,"
                                                        " node-properties:"
-                                                       " {type:topic, x-properties: {type:direct}}}", 
+                                                       " {type:topic,"
+                                                       " x-properties:"
+                                                       " {type:direct}}}",
                                                        capacity=self._capacity)
-        self._locate_receiver = self._session.receiver(str(locate_addr) + 
-                                                       ";{create:always, node-properties:{type:topic}}",
+        logging.debug("my direct addr=%s" % self._direct_receiver.source)
+
+        locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
+        self._locate_receiver = self._session.receiver(str(locate_addr) +
+                                                       ";{create:always,"
+                                                       " node-properties:"
+                                                       " {type:topic}}",
                                                        capacity=self._capacity)
+        logging.debug("agent.locate addr=%s" % self._locate_receiver.source)
+
+
+        ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
         self._ind_sender = self._session.sender(str(ind_addr) +
-                                                ";{create:always, node-properties:{type:topic}}")
+                                                ";{create:always,"
+                                                " node-properties:"
+                                                " {type:topic}}")
+        logging.debug("agent.ind addr=%s" % self._ind_sender.target)
+
+        my_events = QmfAddress.topic(self.name, self._domain)
+        self._event_sender = self._session.sender(str(my_events) +
+                                                  ";{create:always,"
+                                                  " node-properties:"
+                                                  " {type:topic}}")
+        logging.debug("my event addr=%s" % self._event_sender.target)
 
         self._running = True
         self.start()
@@ -161,8 +181,13 @@
             if self.isAlive():
                 logging.error( "Agent thread '%s' is hung..." % self.name)
         self._direct_receiver.close()
+        self._direct_receiver = None
         self._locate_receiver.close()
+        self._locate_receiver = None
         self._ind_sender.close()
+        self._ind_sender = None
+        self._event_sender.close()
+        self._event_sender = None
         self._session.close()
         self._session = None
         self._conn = None
@@ -195,8 +220,20 @@
     def register_event_class(self, schema):
         return self.register_object_class(schema)
 
-    def raiseEvent(self, qmfEvent):
-        logging.error("!!!Agent.raiseEvent() TBD!!!")
+    def raise_event(self, qmfEvent):
+        """
+        TBD
+        """
+        if not self._event_sender:
+            raise Exception("No connection available")
+
+        # @todo: should we validate against the schema?
+        _map = {"_name": self.get_name(),
+                "_event": qmfEvent.map_encode()}
+        msg = Message(subject=makeSubject(OpCode.event_ind),
+                      properties={"method":"response"},
+                      content={MsgKey.event:_map})
+        self._event_sender.send(msg)
 
     def add_object(self, data ):
         """

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py?rev=901871&r1=901870&r2=901871&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py Thu Jan 21 21:50:39 2010
@@ -37,7 +37,11 @@
 
 AMQP_QMF_AGENT_LOCATE = "agent.locate"
 AMQP_QMF_AGENT_INDICATION = "agent.ind"
-
+AMQP_QMF_AGENT_EVENT="agent.event"
+# agent.ind[.<agent-name>]
+# agent.event.<sev>.<agent-name>
+# sev="strings"
+#
 
 AMQP_QMF_SUBJECT = "qmf"
 AMQP_QMF_VERSION = 4
@@ -52,6 +56,7 @@
     object_id="object_id"
     data_obj="object"
     method="method"
+    event="event"
 
 
 class OpCode(object):
@@ -544,8 +549,19 @@
     change in some aspect of the system under managment.
     """
     KEY_TIMESTAMP = "_timestamp"
+    KEY_SEVERITY = "_severity"
+
+    SEV_EMERG = "emerg"
+    SEV_ALERT = "alert"
+    SEV_CRIT = "crit"
+    SEV_ERR = "err"
+    SEV_WARNING = "warning"
+    SEV_NOTICE = "notice"
+    SEV_INFO = "info"
+    SEV_DEBUG = "debug"
 
-    def __init__(self, _timestamp=None, _values={}, _subtypes={}, _tag=None,
+    def __init__(self, _timestamp=None, _sev=SEV_NOTICE, _values={},
+                 _subtypes={}, _tag=None,
                  _map=None,
                  _schema=None, _const=True):
         """
@@ -567,6 +583,7 @@
             super(QmfEvent, self).__init__(_map=_map, _schema=_schema,
                                            _const=_const)
             _timestamp = _map.get(self.KEY_TIMESTAMP, _timestamp)
+            _sev = _map.get(self.KEY_SEVERITY, _sev)
         else:
             super(QmfEvent, self).__init__(_values=_values,
                                            _subtypes=_subtypes, _tag=_tag,
@@ -579,10 +596,12 @@
         except:
             raise TypeError("QmfEvent: a numeric timestamp is required.")
 
-    def _create(cls, timestamp, values,
+        self._severity = _sev
+
+    def _create(cls, timestamp, severity, values,
                 _subtypes={}, _tag=None, _schema=None, _const=False):
-        return cls(_timestamp=timestamp, _values=values, _subtypes=_subtypes,
-                _tag=_tag, _schema=_schema, _const=_const)
+        return cls(_timestamp=timestamp, _sev=severity, _values=values,
+                _subtypes=_subtypes, _tag=_tag, _schema=_schema, _const=_const)
     create = classmethod(_create)
 
     def _from_map(cls, map_, _schema=None, _const=False):
@@ -592,9 +611,13 @@
     def get_timestamp(self): 
         return self._timestamp
 
+    def get_severity(self):
+        return self._severity
+
     def map_encode(self):
         _map = super(QmfEvent, self).map_encode()
         _map[self.KEY_TIMESTAMP] = self._timestamp
+        _map[self.KEY_SEVERITY] = self._severity
         return _map
 
 

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py?rev=901871&r1=901870&r2=901871&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py Thu Jan 21 21:50:39 2010
@@ -34,7 +34,7 @@
                        QmfQueryPredicate, MsgKey, QmfData, QmfAddress,
                        AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION,
                        SchemaClass, SchemaClassId, SchemaEventClass,
-                       SchemaObjectClass, WorkItem, SchemaMethod)
+                       SchemaObjectClass, WorkItem, SchemaMethod, QmfEvent)
 
 
 
@@ -89,7 +89,7 @@
 
     def __init__(self):
         self.lock     = Lock()
-        self.sequence = 1L
+        self.sequence = long(time.time())  # pseudo-randomize seq start
         self.pending  = {}
 
 
@@ -512,6 +512,15 @@
         return MethodResult(_out_args=_map.get(SchemaMethod.KEY_ARGUMENTS),
                             _error=_map.get(SchemaMethod.KEY_ERROR))
 
+    def enable_events(self):
+        raise Exception("enable_events tbd")
+
+    def disable_events(self):
+        raise Exception("disable_events tbd")
+
+    def destroy(self):
+        raise Exception("destroy tbd")
+
     def __repr__(self):
         return str(self._address)
     
@@ -662,18 +671,21 @@
                                                     " x-properties:"
                                                     " {type:direct}}}", 
                                                     capacity=1)
-        logging.debug("local addr=%s" % self._address)
+        logging.debug("my direct addr=%s" % self._direct_recvr.source)
+
         ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain)
-        logging.debug("agent.ind addr=%s" % ind_addr)
         self._announce_recvr = self._session.receiver(str(ind_addr) +
                                                       ";{create:always,"
                                                       " node-properties:{type:topic}}",
                                                       capacity=1)
+        logging.debug("agent.ind addr=%s" % self._announce_recvr.source)
+
         locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain)
-        logging.debug("agent.locate addr=%s" % locate_addr)
         self._locate_sender = self._session.sender(str(locate_addr) +
                                                    ";{create:always,"
                                                    " node-properties:{type:topic}}")
+        logging.debug("agent.locate addr=%s" % self._locate_sender.target)
+
         #
         # Now that receivers are created, fire off the receive thread...
         #
@@ -898,6 +910,14 @@
                     break
                 self._dispatch(msg, _direct=True)
 
+            for agent in self._agent_map.itervalues():
+                try:
+                    msg = agent._event_recvr.fetch(timeout = 0)
+                except Empty:
+                    continue
+                self._dispatch(msg, _direct=False)
+
+
             self._expireAgents()   # check for expired agents
 
             #if qLen == 0 and self._work_q.qsize() and self._notifier:
@@ -917,7 +937,7 @@
                     timeout = ((self._next_agent_expire - now) + datetime.timedelta(microseconds=999999)).seconds
                     try:
                         logging.debug("waiting for next rcvr (timeout=%s)..." % timeout)
-                        self._session.next_receiver(timeout = timeout)
+                        xxx = self._session.next_receiver(timeout = timeout)
                     except Empty:
                         pass
 
@@ -1004,9 +1024,7 @@
         """
         PRIVATE: Process a message received from an Agent
         """
-
         logging.debug( "Message received from Agent! [%s]" % msg )
-
         try:
             version,opcode = parseSubject(msg.subject)
             # @todo: deal with version mismatch!!!
@@ -1025,7 +1043,7 @@
         elif opcode == OpCode.data_ind:
             self._handleDataIndMsg(msg, cmap, version, _direct)
         elif opcode == OpCode.event_ind:
-            logging.warning("!!! event_ind TBD !!!")
+            self._handleEventIndMsg(msg, cmap, version, _direct)
         elif opcode == OpCode.managed_object:
             logging.warning("!!! managed_object TBD !!!")
         elif opcode == OpCode.object_ind:
@@ -1082,6 +1100,8 @@
             if not agent:
                 # need to create and add a new agent
                 agent = self._createAgent(name)
+                if not agent:
+                    return   # failed to add agent
 
             # lock out expiration scanning code
             self._lock.acquire()
@@ -1137,6 +1157,41 @@
         logging.debug("waking waiters for correlation id %s" % msg.correlation_id)
         self._req_correlation.put_data(msg.correlation_id, msg)
 
+    def _handleEventIndMsg(self, msg, cmap, version, _direct):
+        ei_map = cmap.get(MsgKey.event)
+        if not ei_map or not isinstance(ei_map, type({})):
+            logging.warning("Bad event indication message received: '%s'" % msg)
+            return
+
+        aname = ei_map.get("_name")
+        emap = ei_map.get("_event")
+        if not aname:
+            logging.debug("No '_name' field in event indication message.")
+            return
+        if not emap:
+            logging.debug("No '_event' field in event indication message.")
+            return
+        # @todo: do I need to lock this???
+        agent = self._agent_map.get(aname)
+        if not agent:
+            logging.debug("Agent '%s' not known." % aname)
+            return
+        try:
+            # @todo: schema???
+            event = QmfEvent.from_map(emap)
+        except TypeError:
+            logging.debug("Invalid QmfEvent map received: %s" % str(emap))
+            return
+
+        # @todo: schema?  Need to fetch it, but not from this thread!
+        # This thread can not pend on a request.
+        logging.debug("Publishing event received from agent %s" % aname)
+        wi = WorkItem(WorkItem.EVENT_RECEIVED, None,
+                      {"agent":agent,
+                       "event":event})
+        self._work_q.put(wi)
+        self._work_q_put = True
+
 
     def _expireAgents(self):
         """
@@ -1158,6 +1213,7 @@
                         agent._announce_timestamp = None
                         wi = WorkItem(WorkItem.AGENT_DELETED, None,
                                       {"agent":agent})
+                        # @todo: remove agent from self._agent_map
                         self._work_q.put(wi)
                         self._work_q_put = True
                     else:
@@ -1175,7 +1231,7 @@
         """
         Factory to create/retrieve an agent for this console
         """
-
+        logging.debug("creating agent %s" % name)
         self._lock.acquire()
         try:
             agent = self._agent_map.get(name)
@@ -1183,12 +1239,28 @@
                 return agent
 
             agent = Agent(name, self)
-            agent._sender = self._session.sender(str(agent._address) + 
-                                                    ";{create:always,"
-                                                    " node-properties:"
-                                                    " {type:topic,"
-                                                    " x-properties:"
-                                                    " {type:direct}}}") 
+            try:
+                agent._sender = self._session.sender(str(agent._address) + 
+                                                     ";{create:always,"
+                                                     " node-properties:"
+                                                     " {type:topic,"
+                                                     " x-properties:"
+                                                     " {type:direct}}}") 
+            except:
+                logging.warning("Unable to create sender for %s" % name)
+                return None
+            logging.debug("created agent sender %s" % agent._sender.target)
+
+            events_addr = QmfAddress.topic(name, self._domain)
+            try:
+                agent._event_recvr = self._session.receiver(str(events_addr) +
+                                                            ";{create:always,"
+                                                            " node-properties:{type:topic}}",
+                                                            capacity=1)
+            except:
+                logging.warning("Unable to create event receiver for %s" % name)
+                return None
+            logging.debug("created agent event receiver %s" % agent._event_recvr.source)
 
             self._agent_map[name] = agent
         finally:

Added: qpid/branches/qmfv2/qpid/python/qmf/test/events.py
URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/events.py?rev=901871&view=auto
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/events.py (added)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/events.py Thu Jan 21 21:50:39 2010
@@ -0,0 +1,193 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+import time
+import datetime
+import logging
+from threading import Thread, Event
+
+import qpid.messaging
+from qmf.qmfCommon import (Notifier, SchemaObjectClass, SchemaClassId,
+                           SchemaProperty, qmfTypes, SchemaMethod, QmfQuery,
+                           QmfData, QmfQueryPredicate, SchemaEventClass,
+                           QmfEvent)
+import qmf.qmfConsole
+from qmf.qmfAgent import(QmfAgentData, Agent)
+
+
+class _testNotifier(Notifier):
+    def __init__(self):
+        self._event = Event()
+
+    def indication(self):
+        # note: called by qmf daemon thread
+        self._event.set()
+
+    def wait_for_work(self, timeout):
+        # note: called by application thread to wait
+        # for qmf to generate work
+        self._event.wait(timeout)
+        timed_out = self._event.isSet() == False
+        if not timed_out:
+            self._event.clear()
+            return True
+        return False
+
+
+class _agentApp(Thread):
+    def __init__(self, name, broker_url, heartbeat):
+        Thread.__init__(self)
+        self.timeout = 3
+        self.broker_url = broker_url
+        self.notifier = _testNotifier()
+        self.agent = Agent(name,
+                           _notifier=self.notifier,
+                           _heartbeat_interval=heartbeat)
+
+        # Dynamically construct a management database
+
+        _schema = SchemaEventClass(_classId=SchemaClassId("MyPackage",
+                                                          "MyClass",
+                                                          stype=SchemaClassId.TYPE_EVENT),
+                                   _desc="A test event schema")
+        # add properties
+        _schema.add_property( "prop-1", SchemaProperty(qmfTypes.TYPE_UINT8))
+        _schema.add_property( "prop-2", SchemaProperty(qmfTypes.TYPE_LSTR))
+
+        # Add schema to Agent
+        self.schema = _schema
+        self.agent.register_object_class(_schema)
+
+        self.running = False
+
+    def start_app(self):
+        self.running = True
+        self.start()
+
+    def stop_app(self):
+        self.running = False
+        # wake main thread
+        self.notifier.indication() # hmmm... collide with daemon???
+        self.join(self.timeout)
+        if self.isAlive():
+            raise Exception("AGENT DID NOT TERMINATE AS EXPECTED!!!")
+
+    def run(self):
+        # broker_url = "user/passwd@hostname:port"
+        conn = qpid.messaging.Connection(self.broker_url.host,
+                                         self.broker_url.port,
+                                         self.broker_url.user,
+                                         self.broker_url.password)
+        conn.connect()
+        self.agent.set_connection(conn)
+
+        counter = 1
+        while self.running:
+            # post an event every second
+            event = QmfEvent.create(long(time.time() * 1000),
+                                    QmfEvent.SEV_WARNING,
+                                    {"prop-1": counter,
+                                     "prop-2": str(datetime.datetime.utcnow())},
+                                    _schema=self.schema)
+            counter += 1
+            self.agent.raise_event(event)
+            wi = self.agent.get_next_workitem(timeout=0)
+            while wi is not None:
+                logging.error("UNEXPECTED AGENT WORKITEM RECEIVED=%s" % wi.get_type())
+                self.agent.release_workitem(wi)
+                wi = self.agent.get_next_workitem(timeout=0)
+            self.notifier.wait_for_work(1)
+
+        self.agent.remove_connection(self.timeout)
+        self.agent.destroy(self.timeout)
+
+
+
+class BaseTest(unittest.TestCase):
+    def configure(self, config):
+        self.config = config
+        self.broker = config.broker
+        self.defines = self.config.defines
+
+    def setUp(self):
+        # one second agent indication interval
+        self.agent1 = _agentApp("agent1", self.broker, 1)
+        self.agent1.start_app()
+        self.agent2 = _agentApp("agent2", self.broker, 1)
+        self.agent2.start_app()
+
+    def tearDown(self):
+        if self.agent1:
+            self.agent1.stop_app()
+            self.agent1 = None
+        if self.agent2:
+            self.agent2.stop_app()
+            self.agent2 = None
+
+    def test_get_events(self):
+        # create console
+        # find agents
+
+        self.notifier = _testNotifier()
+        self.console = qmf.qmfConsole.Console(notifier=self.notifier,
+                                              agent_timeout=3)
+        self.conn = qpid.messaging.Connection(self.broker.host,
+                                              self.broker.port,
+                                              self.broker.user,
+                                              self.broker.password)
+        self.conn.connect()
+        self.console.addConnection(self.conn)
+
+        # find the agents
+        for aname in ["agent1", "agent2"]:
+            agent = self.console.find_agent(aname, timeout=3)
+            self.assertTrue(agent and agent.get_name() == aname)
+
+        # now wait for events
+        agent1_events = agent2_events = 0
+        wi = self.console.get_next_workitem(timeout=4)
+        while wi:
+            if wi.get_type() == wi.EVENT_RECEIVED:
+                event = wi.get_params().get("event")
+                self.assertTrue(isinstance(event, QmfEvent))
+                self.assertTrue(event.get_severity() == QmfEvent.SEV_WARNING)
+                self.assertTrue(event.get_value("prop-1") > 0)
+
+                agent = wi.get_params().get("agent")
+                if not agent or not isinstance(agent, qmf.qmfConsole.Agent):
+                    self.fail("Unexpected workitem from agent")
+                else:
+                    if agent.get_name() == "agent1":
+                        agent1_events += 1
+                    elif agent.get_name() == "agent2":
+                        agent2_events += 1
+                    else:
+                        self.fail("Unexpected agent name received: %s" %
+                                  agent.get_name())
+                    if agent1_events and agent2_events:
+                        break;
+
+            wi = self.console.get_next_workitem(timeout=4)
+
+        self.assertTrue(agent1_events > 0 and agent2_events > 0)
+
+        self.console.destroy(10)
+
+
+
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org