You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/28 16:42:33 UTC

svn commit: r830640 - /qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py

Author: ritchiem
Date: Wed Oct 28 15:42:33 2009
New Revision: 830640

URL: http://svn.apache.org/viewvc?rev=830640&view=rev
Log:
QPID-2156 - Add thread shutdown to python QMF bindings, additional logging, native console test.
Committed patch from Ken Giusti.

Modified:
    qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py

Modified: qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py?rev=830640&r1=830639&r2=830640&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py (original)
+++ qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py Wed Oct 28 15:42:33 2009
@@ -19,6 +19,7 @@
 import sys
 import socket
 import os
+import logging
 from threading import Thread
 from threading import RLock
 from threading import Condition
@@ -109,8 +110,19 @@
         self._conn_handlers_to_delete = []
         self._conn_handlers = []
         self._connected = False
+        self._operational = True
         self.start()
-    
+
+
+    def destroy(self, timeout=None):
+        logging.debug("Destroying Connection...")
+        self._operational = False
+        self.kick()
+        self.join(timeout)
+        logging.debug("... Conn Destroyed!" )
+        if self.isAlive():
+            logging.error("Error: Connection thread '%s' is hung..." % self.getName())
+
 
     def connected(self):
         return self._connected
@@ -145,8 +157,8 @@
         del_handlers = []
         bt_count = 0
         
-        while True:
-            # print "Waiting for socket data"
+        while self._operational:
+            logging.debug("Connection thread waiting for socket data...")
             self._sock.recv(1)
             
             self._lock.acquire()
@@ -173,24 +185,31 @@
             while valid:
                 try:
                     if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED:
+                        logging.debug("Connection thread: CONNECTED event received.")
                         self._connected = True
                         for h in self._conn_handlers:
                             h.conn_event_connected()
                         
                     elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED:
+                        logging.debug("Connection thread: DISCONNECTED event received.")
                         self._connected = False
                         for h in self._conn_handlers:
                             h.conn_event_disconnected(eventImpl.errorText)
                         
                     elif eventImpl.kind == qmfengine.ResilientConnectionEvent.SESSION_CLOSED:
+                        logging.debug("Connection thread: SESSION_CLOSED event received.")
                         eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText)
                         
                     elif eventImpl.kind == qmfengine.ResilientConnectionEvent.RECV:
+                        logging.debug("Connection thread: RECV event received.")
                         eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message)
+                    else:
+                        logging.debug("Connection thread received unknown event: '%s'" % str(eventImpl.kind))
                         
                 except:
                     import traceback
-                    print "Event Exception:", sys.exc_info()
+                    logging.error( "Exception occurred during Connection event processing:" )
+                    logging.error( str(sys.exc_info()) )
                     if bt_count < 2:
                         traceback.print_exc()
                         traceback.print_stack()
@@ -202,6 +221,8 @@
             for h in self._conn_handlers:
                 h.conn_event_visit()
 
+        logging.debug("Shutting down Connection thread")
+
 
 
 class Session:
@@ -296,7 +317,7 @@
             # when TYPE_OBJECT
             # when TYPE_LIST
             # when TYPE_ARRAY
-            print "Unsupported type for get_attr?", val.getType()
+            logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) )
             return None
     
     
@@ -329,7 +350,7 @@
             # when TYPE_OBJECT
             # when TYPE_LIST
             # when TYPE_ARRAY
-            print "Unsupported type for get_attr?", val.getType()
+            logging.error("Unsupported type for get_attr? '%s'" % str(val.getType()))
             return None
     
     
@@ -626,7 +647,7 @@
             # when TYPE_OBJECT
             # when TYPE_LIST
             # when TYPE_ARRAY
-            print "Unsupported Type for Get?", val.getType()
+            logging.error( "Unsupported Type for Get? '%s'" % str(val.getType()))
             return None
     
     
@@ -663,7 +684,7 @@
             # when TYPE_OBJECT
             # when TYPE_LIST
             # when TYPE_ARRAY
-            print "Unsupported Type for Set?", val.getType()
+            logging.error("Unsupported Type for Set? '%s'" % str(val.getType()))
             return None
 
 
@@ -960,8 +981,19 @@
         self._sync_result = None
         self._select = {}
         self._cb_cond = Condition()
+        self._operational = True
         self.start()
 
+
+    def destroy(self, timeout=None):
+        logging.debug("Destroying Console...")
+        self._operational = False
+        self.start_console_events()  # wake thread up
+        self.join(timeout)
+        logging.debug("... Console Destroyed!")
+        if self.isAlive():
+            logging.error( "Console thread '%s' is hung..." % self.getName() )
+
     
     def add_connection(self, conn):
         broker = Broker(self, conn)
@@ -974,12 +1006,15 @@
     
     
     def del_connection(self, broker):
+        logging.debug("shutting down broker...")
         broker.shutdown()
+        logging.debug("...broker down.")
         self._cv.acquire()
         try:
             self._broker_list.remove(broker)
         finally:
             self._cv.release()
+        logging.debug("del_connection() finished")
     
     
     def packages(self):
@@ -1129,14 +1164,15 @@
     
     
     def run(self):
-      while True:
-        self._cb_cond.acquire()
-        try:
-            self._cb_cond.wait(1)
-            while self.do_console_events():
-                pass
-        finally:
-            self._cb_cond.release()
+        while self._operational:
+            self._cb_cond.acquire()
+            try:
+                self._cb_cond.wait(1)
+                while self._do_console_events():
+                    pass
+            finally:
+                self._cb_cond.release()
+        logging.debug("Shutting down Console thread")
 
 
     def start_console_events(self):
@@ -1147,39 +1183,47 @@
             self._cb_cond.release()
 
 
-    def do_console_events(self):
+    def _do_console_events(self):
         '''
-        Called by Broker proxy to poll for Console events.  Passes the events
-        onto the ConsoleHandler associated with this Console.
+        Called by the Console thread to poll for events.  Passes the events
+        onto the ConsoleHandler associated with this Console.  Is called
+        periodically, but can also be kicked by Console.start_console_events().
         '''
         count = 0
         valid = self.impl.getEvent(self._event)
         while valid:
             count += 1
-            # print "Console Event:", self._event.kind
             if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
+                logging.debug("Console Event AGENT_ADDED received")
                 if self._handler:
                     self._handler.agent_added(AgentProxy(self._event.agent, None))
             elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
+                logging.debug("Console Event AGENT_DELETED received")
                 if self._handler:
                     self._handler.agent_deleted(AgentProxy(self._event.agent, None))
             elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
+                logging.debug("Console Event NEW_PACKAGE received")
                 if self._handler:
                     self._handler.new_package(self._event.name)
             elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
+                logging.debug("Console Event NEW_CLASS received")
                 if self._handler:
                     self._handler.new_class(SchemaClassKey(self._event.classKey))
             elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
+                logging.debug("Console Event OBJECT_UPDATE received")
                 if self._handler:
                     self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
                                                 self._event.hasProps, self._event.hasStats)
             elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
-                pass
+                logging.debug("Console Event EVENT_RECEIVED received")
             elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
+                logging.debug("Console Event AGENT_HEARTBEAT received")
                 if self._handler:
                     self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
             elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
-                pass
+                logging.debug("Console Event METHOD_RESPONSE received")
+            else:
+                logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
             
             self.impl.popEvent()
             valid = self.impl.getEvent(self._event)
@@ -1210,7 +1254,6 @@
 class Broker(ConnectionHandler):
     #   attr_reader :impl :conn, :console, :broker_bank
     def __init__(self, console, conn):
-        ConnectionHandler.__init__(self)
         self.broker_bank = 1
         self.console = console
         self.conn = conn
@@ -1226,9 +1269,17 @@
     
     
     def shutdown(self):
+        logging.debug("broker.shutdown() called.")
         self.console.impl.delConnection(self.impl)
         self.conn.del_conn_handler(self)
+        if self._session:
+            self.impl.sessionClosed()
+            logging.debug("broker.shutdown() sessionClosed done.")
+            self._session.destroy()
+            logging.debug("broker.shutdown() session destroy done.")
+            self._session = None
         self._operational = False
+        logging.debug("broker.shutdown() done.")
 
 
     def wait_for_stable(self, timeout = None):
@@ -1252,26 +1303,31 @@
         self.conn.kick()
 
 
-    def do_broker_events(self):
+    def _do_broker_events(self):
         count = 0
         valid = self.impl.getEvent(self._event)
         while valid:
             count += 1
-            # print "Broker Event: ", self._event.kind
             if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
-                pass
+                logging.debug("Broker Event BROKER_INFO received");
             elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
+                logging.debug("Broker Event DECLARE_QUEUE received");
                 self.conn.impl.declareQueue(self._session.handle, self._event.name)
             elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
+                logging.debug("Broker Event DELETE_QUEUE received");
                 self.conn.impl.deleteQueue(self._session.handle, self._event.name)
             elif self._event.kind == qmfengine.BrokerEvent.BIND:
+                logging.debug("Broker Event BIND received");
                 self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
             elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
+                logging.debug("Broker Event UNBIND received");
                 self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
             elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
+                logging.debug("Broker Event SETUP_COMPLETE received");
                 self.impl.startProtocol()
             elif self._event.kind == qmfengine.BrokerEvent.STABLE:
-                self_.cv.acquire()
+                logging.debug("Broker Event STABLE received");
+                self._cv.acquire()
                 try:
                     self._stable = True
                     self._cv.notify()
@@ -1292,11 +1348,12 @@
         return count
     
     
-    def do_broker_messages(self):
+    def _do_broker_messages(self):
         count = 0
         valid = self.impl.getXmtMessage(self._xmtMessage)
         while valid:
             count += 1
+            logging.debug("Broker: sending msg on connection")
             self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
             self.impl.popXmt()
             valid = self.impl.getXmtMessage(self._xmtMessage)
@@ -1304,41 +1361,42 @@
         return count
     
     
-    def do_events(self):
+    def _do_events(self):
         while True:
             self.console.start_console_events()
-            bcnt = do_broker_events()
-            mcnt = do_broker_messages()
+            bcnt = self._do_broker_events()
+            mcnt = self._do_broker_messages()
             if bcnt == 0 and mcnt == 0:
                 break;
     
     
     def conn_event_connected(self):
-        print "Console Connection Established..."
+        logging.debug("Broker: Connection event CONNECTED")
         self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
         self.impl.sessionOpened(self._session.handle)
-        self.do_events()
+        self._do_events()
     
     
     def conn_event_disconnected(self, error):
-        print "Console Connection Lost"
+        logging.debug("Broker: Connection event DISCONNECTED")
         pass
     
     
     def conn_event_visit(self):
-        self.do_events()
+        self._do_events()
 
 
     def sess_event_session_closed(self, context, error):
-        print "Console Session Lost"
+        logging.debug("Broker: Session event CLOSED")
         self.impl.sessionClosed()
     
     
     def sess_event_recv(self, context, message):
+        logging.debug("Broker: Session event MSG_RECV")
         if not self._operational:
-            print "Unexpected RECV Event"
+            logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
         self.impl.handleRcvMessage(message)
-        self.do_events()
+        self._do_events()
 
 
 
@@ -1454,7 +1512,7 @@
     
     
     def conn_event_connected(self):
-        print "Agent Connection Established..."
+        logging.debug("Agent Connection Established...")
         self._session = Session(self._conn,
                                 "qmfa-%s.%d" % (socket.gethostname(), os.getpid()),
                                 self)
@@ -1463,7 +1521,7 @@
     
     
     def conn_event_disconnected(self, error):
-        print "Agent Connection Lost"
+        logging.debug("Agent Connection Lost")
         pass
     
     
@@ -1472,7 +1530,7 @@
 
 
     def sess_event_session_closed(self, context, error):
-        print "Agent Session Lost"
+        logging.debug("Agent Session Lost")
         pass
     
     



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org