You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/28 16:42:33 UTC
svn commit: r830640 -
/qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py
Author: ritchiem
Date: Wed Oct 28 15:42:33 2009
New Revision: 830640
URL: http://svn.apache.org/viewvc?rev=830640&view=rev
Log:
QPID-2156 - Add thread shutdown to python QMF bindings, additional logging, native console test.
Committed patch from Ken Giusti.
Modified:
qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py
Modified: qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py?rev=830640&r1=830639&r2=830640&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py (original)
+++ qpid/branches/0.5.x-dev/qpid/cpp/bindings/qmf/python/qmf.py Wed Oct 28 15:42:33 2009
@@ -19,6 +19,7 @@
import sys
import socket
import os
+import logging
from threading import Thread
from threading import RLock
from threading import Condition
@@ -109,8 +110,19 @@
self._conn_handlers_to_delete = []
self._conn_handlers = []
self._connected = False
+ self._operational = True
self.start()
-
+
+
+ def destroy(self, timeout=None):
+ logging.debug("Destroying Connection...")
+ self._operational = False
+ self.kick()
+ self.join(timeout)
+ logging.debug("... Conn Destroyed!" )
+ if self.isAlive():
+ logging.error("Error: Connection thread '%s' is hung..." % self.getName())
+
def connected(self):
return self._connected
@@ -145,8 +157,8 @@
del_handlers = []
bt_count = 0
- while True:
- # print "Waiting for socket data"
+ while self._operational:
+ logging.debug("Connection thread waiting for socket data...")
self._sock.recv(1)
self._lock.acquire()
@@ -173,24 +185,31 @@
while valid:
try:
if eventImpl.kind == qmfengine.ResilientConnectionEvent.CONNECTED:
+ logging.debug("Connection thread: CONNECTED event received.")
self._connected = True
for h in self._conn_handlers:
h.conn_event_connected()
elif eventImpl.kind == qmfengine.ResilientConnectionEvent.DISCONNECTED:
+ logging.debug("Connection thread: DISCONNECTED event received.")
self._connected = False
for h in self._conn_handlers:
h.conn_event_disconnected(eventImpl.errorText)
elif eventImpl.kind == qmfengine.ResilientConnectionEvent.SESSION_CLOSED:
+ logging.debug("Connection thread: SESSION_CLOSED event received.")
eventImpl.sessionContext.handler.sess_event_session_closed(eventImpl.sessionContext, eventImpl.errorText)
elif eventImpl.kind == qmfengine.ResilientConnectionEvent.RECV:
+ logging.debug("Connection thread: RECV event received.")
eventImpl.sessionContext.handler.sess_event_recv(eventImpl.sessionContext, eventImpl.message)
+ else:
+ logging.debug("Connection thread received unknown event: '%s'" % str(eventImpl.kind))
except:
import traceback
- print "Event Exception:", sys.exc_info()
+ logging.error( "Exception occurred during Connection event processing:" )
+ logging.error( str(sys.exc_info()) )
if bt_count < 2:
traceback.print_exc()
traceback.print_stack()
@@ -202,6 +221,8 @@
for h in self._conn_handlers:
h.conn_event_visit()
+ logging.debug("Shutting down Connection thread")
+
class Session:
@@ -296,7 +317,7 @@
# when TYPE_OBJECT
# when TYPE_LIST
# when TYPE_ARRAY
- print "Unsupported type for get_attr?", val.getType()
+ logging.error( "Unsupported type for get_attr? '%s'" % str(val.getType()) )
return None
@@ -329,7 +350,7 @@
# when TYPE_OBJECT
# when TYPE_LIST
# when TYPE_ARRAY
- print "Unsupported type for get_attr?", val.getType()
+ logging.error("Unsupported type for get_attr? '%s'" % str(val.getType()))
return None
@@ -626,7 +647,7 @@
# when TYPE_OBJECT
# when TYPE_LIST
# when TYPE_ARRAY
- print "Unsupported Type for Get?", val.getType()
+ logging.error( "Unsupported Type for Get? '%s'" % str(val.getType()))
return None
@@ -663,7 +684,7 @@
# when TYPE_OBJECT
# when TYPE_LIST
# when TYPE_ARRAY
- print "Unsupported Type for Set?", val.getType()
+ logging.error("Unsupported Type for Set? '%s'" % str(val.getType()))
return None
@@ -960,8 +981,19 @@
self._sync_result = None
self._select = {}
self._cb_cond = Condition()
+ self._operational = True
self.start()
+
+ def destroy(self, timeout=None):
+ logging.debug("Destroying Console...")
+ self._operational = False
+ self.start_console_events() # wake thread up
+ self.join(timeout)
+ logging.debug("... Console Destroyed!")
+ if self.isAlive():
+ logging.error( "Console thread '%s' is hung..." % self.getName() )
+
def add_connection(self, conn):
broker = Broker(self, conn)
@@ -974,12 +1006,15 @@
def del_connection(self, broker):
+ logging.debug("shutting down broker...")
broker.shutdown()
+ logging.debug("...broker down.")
self._cv.acquire()
try:
self._broker_list.remove(broker)
finally:
self._cv.release()
+ logging.debug("del_connection() finished")
def packages(self):
@@ -1129,14 +1164,15 @@
def run(self):
- while True:
- self._cb_cond.acquire()
- try:
- self._cb_cond.wait(1)
- while self.do_console_events():
- pass
- finally:
- self._cb_cond.release()
+ while self._operational:
+ self._cb_cond.acquire()
+ try:
+ self._cb_cond.wait(1)
+ while self._do_console_events():
+ pass
+ finally:
+ self._cb_cond.release()
+ logging.debug("Shutting down Console thread")
def start_console_events(self):
@@ -1147,39 +1183,47 @@
self._cb_cond.release()
- def do_console_events(self):
+ def _do_console_events(self):
'''
- Called by Broker proxy to poll for Console events. Passes the events
- onto the ConsoleHandler associated with this Console.
+ Called by the Console thread to poll for events. Passes the events
+ onto the ConsoleHandler associated with this Console. Is called
+ periodically, but can also be kicked by Console.start_console_events().
'''
count = 0
valid = self.impl.getEvent(self._event)
while valid:
count += 1
- # print "Console Event:", self._event.kind
if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED:
+ logging.debug("Console Event AGENT_ADDED received")
if self._handler:
self._handler.agent_added(AgentProxy(self._event.agent, None))
elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED:
+ logging.debug("Console Event AGENT_DELETED received")
if self._handler:
self._handler.agent_deleted(AgentProxy(self._event.agent, None))
elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE:
+ logging.debug("Console Event NEW_PACKAGE received")
if self._handler:
self._handler.new_package(self._event.name)
elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS:
+ logging.debug("Console Event NEW_CLASS received")
if self._handler:
self._handler.new_class(SchemaClassKey(self._event.classKey))
elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE:
+ logging.debug("Console Event OBJECT_UPDATE received")
if self._handler:
self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}),
self._event.hasProps, self._event.hasStats)
elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED:
- pass
+ logging.debug("Console Event EVENT_RECEIVED received")
elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT:
+ logging.debug("Console Event AGENT_HEARTBEAT received")
if self._handler:
self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp)
elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE:
- pass
+ logging.debug("Console Event METHOD_RESPONSE received")
+ else:
+ logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind))
self.impl.popEvent()
valid = self.impl.getEvent(self._event)
@@ -1210,7 +1254,6 @@
class Broker(ConnectionHandler):
# attr_reader :impl :conn, :console, :broker_bank
def __init__(self, console, conn):
- ConnectionHandler.__init__(self)
self.broker_bank = 1
self.console = console
self.conn = conn
@@ -1226,9 +1269,17 @@
def shutdown(self):
+ logging.debug("broker.shutdown() called.")
self.console.impl.delConnection(self.impl)
self.conn.del_conn_handler(self)
+ if self._session:
+ self.impl.sessionClosed()
+ logging.debug("broker.shutdown() sessionClosed done.")
+ self._session.destroy()
+ logging.debug("broker.shutdown() session destroy done.")
+ self._session = None
self._operational = False
+ logging.debug("broker.shutdown() done.")
def wait_for_stable(self, timeout = None):
@@ -1252,26 +1303,31 @@
self.conn.kick()
- def do_broker_events(self):
+ def _do_broker_events(self):
count = 0
valid = self.impl.getEvent(self._event)
while valid:
count += 1
- # print "Broker Event: ", self._event.kind
if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO:
- pass
+ logging.debug("Broker Event BROKER_INFO received");
elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE:
+ logging.debug("Broker Event DECLARE_QUEUE received");
self.conn.impl.declareQueue(self._session.handle, self._event.name)
elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE:
+ logging.debug("Broker Event DELETE_QUEUE received");
self.conn.impl.deleteQueue(self._session.handle, self._event.name)
elif self._event.kind == qmfengine.BrokerEvent.BIND:
+ logging.debug("Broker Event BIND received");
self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
elif self._event.kind == qmfengine.BrokerEvent.UNBIND:
+ logging.debug("Broker Event UNBIND received");
self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey)
elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE:
+ logging.debug("Broker Event SETUP_COMPLETE received");
self.impl.startProtocol()
elif self._event.kind == qmfengine.BrokerEvent.STABLE:
- self_.cv.acquire()
+ logging.debug("Broker Event STABLE received");
+ self._cv.acquire()
try:
self._stable = True
self._cv.notify()
@@ -1292,11 +1348,12 @@
return count
- def do_broker_messages(self):
+ def _do_broker_messages(self):
count = 0
valid = self.impl.getXmtMessage(self._xmtMessage)
while valid:
count += 1
+ logging.debug("Broker: sending msg on connection")
self.conn.impl.sendMessage(self._session.handle, self._xmtMessage)
self.impl.popXmt()
valid = self.impl.getXmtMessage(self._xmtMessage)
@@ -1304,41 +1361,42 @@
return count
- def do_events(self):
+ def _do_events(self):
while True:
self.console.start_console_events()
- bcnt = do_broker_events()
- mcnt = do_broker_messages()
+ bcnt = self._do_broker_events()
+ mcnt = self._do_broker_messages()
if bcnt == 0 and mcnt == 0:
break;
def conn_event_connected(self):
- print "Console Connection Established..."
+ logging.debug("Broker: Connection event CONNECTED")
self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self)
self.impl.sessionOpened(self._session.handle)
- self.do_events()
+ self._do_events()
def conn_event_disconnected(self, error):
- print "Console Connection Lost"
+ logging.debug("Broker: Connection event DISCONNECTED")
pass
def conn_event_visit(self):
- self.do_events()
+ self._do_events()
def sess_event_session_closed(self, context, error):
- print "Console Session Lost"
+ logging.debug("Broker: Session event CLOSED")
self.impl.sessionClosed()
def sess_event_recv(self, context, message):
+ logging.debug("Broker: Session event MSG_RECV")
if not self._operational:
- print "Unexpected RECV Event"
+ logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context))
self.impl.handleRcvMessage(message)
- self.do_events()
+ self._do_events()
@@ -1454,7 +1512,7 @@
def conn_event_connected(self):
- print "Agent Connection Established..."
+ logging.debug("Agent Connection Established...")
self._session = Session(self._conn,
"qmfa-%s.%d" % (socket.gethostname(), os.getpid()),
self)
@@ -1463,7 +1521,7 @@
def conn_event_disconnected(self, error):
- print "Agent Connection Lost"
+ logging.debug("Agent Connection Lost")
pass
@@ -1472,7 +1530,7 @@
def sess_event_session_closed(self, context, error):
- print "Agent Session Lost"
+ logging.debug("Agent Session Lost")
pass
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org