You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/08/20 00:12:27 UTC
svn commit: r987330 - in /qpid/trunk/qpid: extras/qmf/src/py/qmf/console.py
tools/src/py/qpid-cluster tools/src/py/qpid-config
tools/src/py/qpid-printevents tools/src/py/qpid-route
tools/src/py/qpid-stat tools/src/py/qpid-tool
Author: kgiusti
Date: Thu Aug 19 22:12:27 2010
New Revision: 987330
URL: http://svn.apache.org/viewvc?rev=987330&view=rev
Log:
QPID-2810: clean up the broker thread properly on shutdown.
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
qpid/trunk/qpid/tools/src/py/qpid-cluster
qpid/trunk/qpid/tools/src/py/qpid-config
qpid/trunk/qpid/tools/src/py/qpid-printevents
qpid/trunk/qpid/tools/src/py/qpid-route
qpid/trunk/qpid/tools/src/py/qpid-stat
qpid/trunk/qpid/tools/src/py/qpid-tool
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Thu Aug 19 22:12:27 2010
@@ -586,6 +586,20 @@ class Session:
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+ def close(self):
+ """ Releases all resources held by the session. Must be called by the
+ application when it is done with the Session object.
+ """
+ self.cv.acquire()
+ try:
+ while len(self.brokers):
+ b = self.brokers.pop()
+ try:
+ b._shutdown()
+ except:
+ pass
+ finally:
+ self.cv.release()
def _getBrokerForAgentAddr(self, agent_addr):
try:
@@ -616,23 +630,23 @@ class Session:
def addBroker(self, target="localhost", timeout=None, mechanisms=None):
- """ Connect to a Qpid broker. Returns an object of type Broker. """
+ """ Connect to a Qpid broker. Returns an object of type Broker.
+ Will raise an exception if the session is not managing the connection and
+ the connection setup to the broker fails.
+ """
url = BrokerURL(target)
broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass,
ssl = url.scheme == URL.AMQPS, connTimeout=timeout)
self.brokers.append(broker)
- if not self.manageConnections:
- broker._waitForStable()
- agent = broker.getBrokerAgent()
- if agent:
- agent.getObjects(_class="agent")
return broker
def delBroker(self, broker):
- """ Disconnect from a broker. The 'broker' argument is the object
- returned from the addBroker call """
+ """ Disconnect from a broker, and deallocate the broker proxy object. The
+ 'broker' argument is the object returned from the addBroker call. Errors
+ are ignored.
+ """
if self.console:
for agent in broker.getAgents():
self.console.delAgent(agent)
@@ -2053,6 +2067,13 @@ class Broker(Thread):
self.data = data
def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None):
+ """ Create a broker proxy and setup a connection to the broker. Will raise
+ an exception if the connection fails and the session is not configured to
+ retry connection setup (manageConnections = False).
+
+ Spawns a thread to manage the broker connection. Call _shutdown() to
+ shutdown the thread when releasing the broker.
+ """
Thread.__init__(self)
self.session = session
self.host = host
@@ -2071,6 +2092,8 @@ class Broker(Thread):
self.brokerAgent = None
self.brokerSupportsV2 = None
self.rcv_queue = Queue() # for msg received on session
+ self.conn = None
+ self.amqpSession = None
self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq)
Broker.nextSeq += 1
self.last_age_check = time()
@@ -2083,10 +2106,20 @@ class Broker(Thread):
self.start()
if not self.session.manageConnections:
# wait for connection setup to complete in subthread.
- # On failure, propagate exeception to caller
+ # On failure, propagate exception to caller
self.ready.acquire()
if self.conn_exc:
+ self._shutdown() # wait for the subthread to clean up...
raise self.conn_exc
+ # connection up - wait for stable...
+ try:
+ self._waitForStable()
+ agent = self.getBrokerAgent()
+ if agent:
+ agent.getObjects(_class="agent")
+ except:
+ self._shutdown() # wait for the subthread to clean up...
+ raise
def isConnected(self):
@@ -2173,6 +2206,10 @@ class Broker(Thread):
self.cv.release()
def _tryToConnect(self):
+ """ Connect to the broker. Returns True if connection setup completes
+ successfully, otherwise returns False and sets self.error/self.conn_exc
+ with error info. Does not raise exceptions.
+ """
self.error = None
self.conn_exc = None
try:
@@ -2188,6 +2225,20 @@ class Broker(Thread):
self.syncResult = None
self.reqsOutstanding = 1
+ try:
+ if self.amqpSession:
+ self.amqpSession.close()
+ except:
+ pass
+ self.amqpSession = None
+
+ try:
+ if self.conn:
+ self.conn.close()
+ except:
+ pass
+ self.conn = None
+
sock = connect(self.host, self.port)
sock.settimeout(5)
oldTimeout = sock.gettimeout()
@@ -2426,22 +2477,27 @@ class Broker(Thread):
self.amqpSession.message_transfer(destination=dest, message=msg)
def _shutdown(self, _timeout=10):
+ """ Disconnect from a broker, and release its resources. Errors are
+ ignored.
+ """
if self.isAlive():
# kick the thread
self.canceled = True
self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
self.join(_timeout)
- if self.connected:
- self.amqpSession.incoming("rdest").stop()
- if self.session.console != None:
- self.amqpSession.incoming("tdest").stop()
- if self.brokerSupportsV2:
- self.amqpSession.incoming("v2dest").stop()
- self.amqpSession.incoming("v2TopicUI").stop()
- self.amqpSession.incoming("v2TopicHB").stop()
- self.amqpSession.close()
- self.conn.close()
- self.connected = False
+ try:
+ if self.amqpSession:
+ self.amqpSession.close();
+ except:
+ pass
+ self.amqpSession = None
+ try:
+ if self.conn:
+ self.conn.close()
+ except:
+ pass
+ self.conn = None
+ self.connected = False
def _waitForStable(self):
try:
Modified: qpid/trunk/qpid/tools/src/py/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-cluster?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-cluster (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-cluster Thu Aug 19 22:12:27 2010
@@ -90,6 +90,7 @@ class BrokerManager:
self.brokerName = None
self.qmf = None
self.broker = None
+ self.brokers = []
def SetBroker(self, brokerUrl):
self.url = brokerUrl
@@ -101,8 +102,18 @@ class BrokerManager:
self.brokerAgent = a
def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokers):
+ b = self.brokers.pop()
+ self.qmf.delBroker(b)
+ except:
+ pass
def _getClusters(self):
packages = self.qmf.getPackages()
@@ -195,7 +206,6 @@ class BrokerManager:
hostList = self._getHostList(memberList)
self.qmf.delBroker(self.broker)
self.broker = None
- self.brokers = []
idx = 0
for host in hostList:
@@ -238,10 +248,10 @@ class BrokerManager:
if self.config._delConn and not found:
print "Client connection '%s' not found" % self.config._delConn
- for broker in self.brokers:
+ while len(self.brokers):
+ broker = self.brokers.pop()
self.qmf.delBroker(broker)
-
def main(argv=None):
if argv is None: argv = sys.argv
try:
@@ -322,6 +332,7 @@ def main(argv=None):
except KeyboardInterrupt:
print
except Exception,e:
+ bm.Disconnect() # try to deallocate brokers - ignores errors
if str(e).find("connection aborted") > 0:
# we expect this when asking the connected broker to shut down
return 0
Modified: qpid/trunk/qpid/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-config?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-config (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-config Thu Aug 19 22:12:27 2010
@@ -574,13 +574,16 @@ except KeyboardInterrupt:
print
except IOError, e:
print e
+ bm.Disconnect()
sys.exit(1)
except SystemExit, e:
+ bm.Disconnect()
sys.exit(1)
except Exception,e:
if e.__class__.__name__ != "Timeout":
# ignore Timeout exception, handle in the loop below
print "Failed: %s: %s" % (e.__class__.__name__, e)
+ bm.Disconnect()
sys.exit(1)
while True:
Modified: qpid/trunk/qpid/tools/src/py/qpid-printevents
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-printevents?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-printevents (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-printevents Thu Aug 19 22:12:27 2010
@@ -59,18 +59,20 @@ ex: localhost, 10.1.1.7:10000, broker-ho
console = EventConsole()
session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True)
brokers = []
- for host in arguments:
- brokers.append(session.addBroker(host))
-
try:
- while (True):
- sleep(10)
- except KeyboardInterrupt:
- for broker in brokers:
- session.delBroker(broker)
- print
- sys.exit(0)
-
+ for host in arguments:
+ brokers.append(session.addBroker(host))
+ try:
+ while (True):
+ sleep(10)
+ except KeyboardInterrupt:
+ print
+ sys.exit(0)
+ finally:
+ while len(brokers):
+ b = brokers.pop()
+ session.delBroker(b)
+
if __name__ == '__main__':
main()
Modified: qpid/trunk/qpid/tools/src/py/qpid-route
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-route?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-route (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-route Thu Aug 19 22:12:27 2010
@@ -71,6 +71,7 @@ _connTimeout = 10
class RouteManager:
def __init__(self, localBroker):
+ self.brokerList = {}
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
@@ -79,7 +80,16 @@ class RouteManager:
self.agent = self.broker.getBrokerAgent()
def disconnect(self):
- self.qmf.delBroker(self.broker)
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
+ except:
+ pass # ignore errors while shutting down
def getLink(self):
links = self.agent.getObjects(_class="link")
@@ -135,8 +145,7 @@ class RouteManager:
print
print "Finding Linked Brokers:"
- brokerList = {}
- brokerList[self.local.name()] = self.broker
+ self.brokerList[self.local.name()] = self.broker
print " %s... Ok" % self.local
added = True
@@ -145,11 +154,11 @@ class RouteManager:
links = self.qmf.getObjects(_class="link")
for link in links:
url = BrokerURL("%s:%d" % (link.host, link.port))
- if url.name() not in brokerList:
+ if url.name() not in self.brokerList:
print " %s..." % url.name(),
try:
b = self.qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout)
- brokerList[url.name()] = b
+ self.brokerList[url.name()] = b
added = True
print "Ok"
except Exception, e:
@@ -217,10 +226,10 @@ class RouteManager:
(toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
print
- for broker in brokerList:
- if broker != self.local.name():
- self.qmf.delBroker(brokerList[broker])
-
+ while len(self.brokerList):
+ b = self.brokerList.popitem()
+ if b[0] != self.local.name():
+ self.qmf.delBroker(b[1])
def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
if dynamic and _srclocal:
@@ -463,6 +472,7 @@ else:
group = cargs[0]
cmd = cargs[1]
+rm = None
try:
rm = RouteManager(localBroker)
if group == "link":
@@ -528,6 +538,8 @@ try:
Usage()
except Exception,e:
+ if rm:
+ rm.disconnect() # try to release broker resources
print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Thu Aug 19 22:12:27 2010
@@ -161,10 +161,16 @@ class BrokerManager(Console):
self.brokerAgent = a
def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
- else:
- for b in self.brokers: self.qmf.delBroker(b.broker)
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+ else:
+ for b in self.brokers: self.qmf.delBroker(b.broker)
+ except:
+ pass
def _getCluster(self):
packages = self.qmf.getPackages()
@@ -524,6 +530,7 @@ except KeyboardInterrupt:
print
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
+ bm.Disconnect() # try to deallocate brokers
raise # FIXME aconway 2010-03-03:
sys.exit(1)
Modified: qpid/trunk/qpid/tools/src/py/qpid-tool
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-tool?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-tool (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-tool Thu Aug 19 22:12:27 2010
@@ -191,8 +191,12 @@ class QmfData(Console):
self.cli = cli
def close(self):
- self.closing = True
- self.session.delBroker(self.broker)
+ try:
+ self.closing = True
+ if self.session and self.broker:
+ self.session.delBroker(self.broker)
+ except:
+ pass # we're shutting down - ignore any errors
def classCompletions(self, text):
pass
@@ -645,4 +649,8 @@ try:
except KeyboardInterrupt:
print
print "Exiting..."
- data.close()
+except Exception, e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+# alway attempt to cleanup broker resources
+data.close()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org