You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/08/20 00:12:27 UTC

svn commit: r987330 - in /qpid/trunk/qpid: extras/qmf/src/py/qmf/console.py tools/src/py/qpid-cluster tools/src/py/qpid-config tools/src/py/qpid-printevents tools/src/py/qpid-route tools/src/py/qpid-stat tools/src/py/qpid-tool

Author: kgiusti
Date: Thu Aug 19 22:12:27 2010
New Revision: 987330

URL: http://svn.apache.org/viewvc?rev=987330&view=rev
Log:
QPID-2810: clean up the broker thread properly on shutdown.

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
    qpid/trunk/qpid/tools/src/py/qpid-cluster
    qpid/trunk/qpid/tools/src/py/qpid-config
    qpid/trunk/qpid/tools/src/py/qpid-printevents
    qpid/trunk/qpid/tools/src/py/qpid-route
    qpid/trunk/qpid/tools/src/py/qpid-stat
    qpid/trunk/qpid/tools/src/py/qpid-tool

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Thu Aug 19 22:12:27 2010
@@ -586,6 +586,20 @@ class Session:
     if self.userBindings and not self.rcvObjects:
       raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
 
+  def close(self):
+    """ Releases all resources held by the session.  Must be called by the
+    application when it is done with the Session object.
+    """
+    self.cv.acquire()
+    try:
+      while len(self.brokers):
+        b = self.brokers.pop()
+        try:
+          b._shutdown()
+        except:
+          pass
+    finally:
+      self.cv.release()
 
   def _getBrokerForAgentAddr(self, agent_addr):
     try:
@@ -616,23 +630,23 @@ class Session:
 
 
   def addBroker(self, target="localhost", timeout=None, mechanisms=None):
-    """ Connect to a Qpid broker.  Returns an object of type Broker. """
+    """ Connect to a Qpid broker.  Returns an object of type Broker.
+    Will raise an exception if the session is not managing the connection and
+    the connection setup to the broker fails.
+    """
     url = BrokerURL(target)
     broker = Broker(self, url.host, url.port, mechanisms, url.authName, url.authPass,
                     ssl = url.scheme == URL.AMQPS, connTimeout=timeout)
 
     self.brokers.append(broker)
-    if not self.manageConnections:
-      broker._waitForStable()
-      agent = broker.getBrokerAgent()
-      if agent:
-        agent.getObjects(_class="agent")
     return broker
 
 
   def delBroker(self, broker):
-    """ Disconnect from a broker.  The 'broker' argument is the object
-    returned from the addBroker call """
+    """ Disconnect from a broker, and deallocate the broker proxy object.  The
+    'broker' argument is the object returned from the addBroker call.  Errors
+    are ignored.
+    """
     if self.console:
       for agent in broker.getAgents():
         self.console.delAgent(agent)
@@ -2053,6 +2067,13 @@ class Broker(Thread):
       self.data = data
 
   def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None):
+    """ Create a broker proxy and setup a connection to the broker.  Will raise
+    an exception if the connection fails and the session is not configured to
+    retry connection setup (manageConnections = False).
+
+    Spawns a thread to manage the broker connection.  Call _shutdown() to
+    shutdown the thread when releasing the broker.
+    """
     Thread.__init__(self)
     self.session  = session
     self.host = host
@@ -2071,6 +2092,8 @@ class Broker(Thread):
     self.brokerAgent = None
     self.brokerSupportsV2 = None
     self.rcv_queue = Queue() # for msg received on session
+    self.conn = None
+    self.amqpSession = None
     self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq)
     Broker.nextSeq += 1
     self.last_age_check = time()
@@ -2083,10 +2106,20 @@ class Broker(Thread):
     self.start()
     if not self.session.manageConnections:
       # wait for connection setup to complete in subthread.
-      # On failure, propagate exeception to caller
+      # On failure, propagate exception to caller
       self.ready.acquire()
       if self.conn_exc:
+        self._shutdown()   # wait for the subthread to clean up...
         raise self.conn_exc
+      # connection up - wait for stable...
+      try:
+        self._waitForStable()
+        agent = self.getBrokerAgent()
+        if agent:
+          agent.getObjects(_class="agent")
+      except:
+        self._shutdown()   # wait for the subthread to clean up...
+        raise
 
 
   def isConnected(self):
@@ -2173,6 +2206,10 @@ class Broker(Thread):
       self.cv.release()
 
   def _tryToConnect(self):
+    """ Connect to the broker.  Returns True if connection setup completes
+    successfully, otherwise returns False and sets self.error/self.conn_exc
+    with error info.  Does not raise exceptions.
+    """
     self.error = None
     self.conn_exc = None
     try:
@@ -2188,6 +2225,20 @@ class Broker(Thread):
       self.syncResult = None
       self.reqsOutstanding = 1
 
+      try:
+        if self.amqpSession:
+          self.amqpSession.close()
+      except:
+        pass
+      self.amqpSession = None
+
+      try:
+        if self.conn:
+          self.conn.close()
+      except:
+        pass
+      self.conn = None
+
       sock = connect(self.host, self.port)
       sock.settimeout(5)
       oldTimeout = sock.gettimeout()
@@ -2426,22 +2477,27 @@ class Broker(Thread):
     self.amqpSession.message_transfer(destination=dest, message=msg)
 
   def _shutdown(self, _timeout=10):
+    """ Disconnect from a broker, and release its resources.   Errors are
+    ignored.
+    """
     if self.isAlive():
       # kick the thread
       self.canceled = True
       self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
       self.join(_timeout)
-    if self.connected:
-      self.amqpSession.incoming("rdest").stop()
-      if self.session.console != None:
-        self.amqpSession.incoming("tdest").stop()
-      if self.brokerSupportsV2:
-        self.amqpSession.incoming("v2dest").stop()
-        self.amqpSession.incoming("v2TopicUI").stop()
-        self.amqpSession.incoming("v2TopicHB").stop()
-      self.amqpSession.close()
-      self.conn.close()
-      self.connected = False
+    try:
+      if self.amqpSession:
+        self.amqpSession.close();
+    except:
+      pass
+    self.amqpSession = None
+    try:
+      if self.conn:
+        self.conn.close()
+    except:
+      pass
+    self.conn = None
+    self.connected = False
 
   def _waitForStable(self):
     try:

Modified: qpid/trunk/qpid/tools/src/py/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-cluster?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-cluster (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-cluster Thu Aug 19 22:12:27 2010
@@ -90,6 +90,7 @@ class BrokerManager:
         self.brokerName = None
         self.qmf        = None
         self.broker     = None
+        self.brokers    = []
 
     def SetBroker(self, brokerUrl):
         self.url = brokerUrl
@@ -101,8 +102,18 @@ class BrokerManager:
                 self.brokerAgent = a
 
     def Disconnect(self):
-        if self.broker:
-            self.qmf.delBroker(self.broker)
+        """ Release any allocated brokers.  Ignore any failures as the tool is
+        shutting down.
+        """
+        try:
+            if self.broker:
+                self.qmf.delBroker(self.broker)
+                self.broker = None
+            while len(self.brokers):
+                b = self.brokers.pop()
+                self.qmf.delBroker(b)
+        except:
+            pass
 
     def _getClusters(self):
         packages = self.qmf.getPackages()
@@ -195,7 +206,6 @@ class BrokerManager:
         hostList = self._getHostList(memberList)
         self.qmf.delBroker(self.broker)
         self.broker = None
-        self.brokers = []
 
         idx = 0
         for host in hostList:
@@ -238,10 +248,10 @@ class BrokerManager:
         if self.config._delConn and not found:
             print "Client connection '%s' not found" % self.config._delConn
 
-        for broker in self.brokers:
+        while len(self.brokers):
+            broker = self.brokers.pop()
             self.qmf.delBroker(broker)
 
-
 def main(argv=None):
     if argv is None: argv = sys.argv
     try:
@@ -322,6 +332,7 @@ def main(argv=None):
         except KeyboardInterrupt:
             print
         except Exception,e:
+            bm.Disconnect()   # try to deallocate brokers - ignores errors
             if str(e).find("connection aborted") > 0:
                 # we expect this when asking the connected broker to shut down
                 return 0

Modified: qpid/trunk/qpid/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-config?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-config (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-config Thu Aug 19 22:12:27 2010
@@ -574,13 +574,16 @@ except KeyboardInterrupt:
     print
 except IOError, e:
     print e
+    bm.Disconnect()
     sys.exit(1)
 except SystemExit, e:
+    bm.Disconnect()
     sys.exit(1)
 except Exception,e:
     if e.__class__.__name__ != "Timeout":
         # ignore Timeout exception, handle in the loop below
         print "Failed: %s: %s" % (e.__class__.__name__, e)
+        bm.Disconnect()
         sys.exit(1)
  
 while True:

Modified: qpid/trunk/qpid/tools/src/py/qpid-printevents
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-printevents?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-printevents (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-printevents Thu Aug 19 22:12:27 2010
@@ -59,18 +59,20 @@ ex: localhost, 10.1.1.7:10000, broker-ho
   console = EventConsole()
   session = Session(console, rcvObjects=False, rcvHeartbeats=False, manageConnections=True)
   brokers = []
-  for host in arguments:
-    brokers.append(session.addBroker(host))
-
   try:
-    while (True):
-      sleep(10)
-  except KeyboardInterrupt:
-    for broker in brokers:
-      session.delBroker(broker)
-    print
-    sys.exit(0)
- 
+    for host in arguments:
+      brokers.append(session.addBroker(host))
+    try:
+      while (True):
+        sleep(10)
+    except KeyboardInterrupt:
+      print
+      sys.exit(0)
+  finally:
+    while len(brokers):
+      b = brokers.pop()
+      session.delBroker(b)
+
 if __name__ == '__main__':
   main()
 

Modified: qpid/trunk/qpid/tools/src/py/qpid-route
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-route?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-route (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-route Thu Aug 19 22:12:27 2010
@@ -71,6 +71,7 @@ _connTimeout = 10
 
 class RouteManager:
     def __init__(self, localBroker):
+        self.brokerList = {}
         self.local = BrokerURL(localBroker)
         self.remote  = None
         self.qmf = Session()
@@ -79,7 +80,16 @@ class RouteManager:
         self.agent = self.broker.getBrokerAgent()
 
     def disconnect(self):
-        self.qmf.delBroker(self.broker)
+        try:
+            if self.broker:
+                self.qmf.delBroker(self.broker)
+                self.broker = None
+            while len(self.brokerList):
+                b = self.brokerList.popitem()
+                if b[0] != self.local.name():
+                    self.qmf.delBroker(b[1])
+        except:
+            pass  # ignore errors while shutting down
 
     def getLink(self):
         links = self.agent.getObjects(_class="link")
@@ -135,8 +145,7 @@ class RouteManager:
         print
         print "Finding Linked Brokers:"
 
-        brokerList = {}
-        brokerList[self.local.name()] = self.broker
+        self.brokerList[self.local.name()] = self.broker
         print "    %s... Ok" % self.local
 
         added = True
@@ -145,11 +154,11 @@ class RouteManager:
             links = self.qmf.getObjects(_class="link")
             for link in links:
                 url = BrokerURL("%s:%d" % (link.host, link.port))
-                if url.name() not in brokerList:
+                if url.name() not in self.brokerList:
                     print "    %s..." % url.name(),
                     try:
                         b = self.qmf.addBroker("%s:%d" % (link.host, link.port), _connTimeout)
-                        brokerList[url.name()] = b
+                        self.brokerList[url.name()] = b
                         added = True
                         print "Ok"
                     except Exception, e:
@@ -217,10 +226,10 @@ class RouteManager:
                     (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
         print
 
-        for broker in brokerList:
-            if broker != self.local.name():
-                self.qmf.delBroker(brokerList[broker])
-
+        while len(self.brokerList):
+            b = self.brokerList.popitem()
+            if b[0] != self.local.name():
+                self.qmf.delBroker(b[1])
 
     def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
         if dynamic and _srclocal:
@@ -463,6 +472,7 @@ else:
 group = cargs[0]
 cmd   = cargs[1]
 
+rm = None
 try:
     rm = RouteManager(localBroker)
     if group == "link":
@@ -528,6 +538,8 @@ try:
             Usage()
 
 except Exception,e:
+    if rm:
+        rm.disconnect()  # try to release broker resources
     print "Failed: %s - %s" % (e.__class__.__name__, e)
     sys.exit(1)
 

Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Thu Aug 19 22:12:27 2010
@@ -161,10 +161,16 @@ class BrokerManager(Console):
                 self.brokerAgent = a
 
     def Disconnect(self):
-        if self.broker:
-            self.qmf.delBroker(self.broker)
-        else:
-            for b in self.brokers: self.qmf.delBroker(b.broker)
+        """ Release any allocated brokers.  Ignore any failures as the tool is
+        shutting down.
+        """
+        try:
+            if self.broker:
+                self.qmf.delBroker(self.broker)
+            else:
+                for b in self.brokers: self.qmf.delBroker(b.broker)
+        except:
+            pass
 
     def _getCluster(self):
         packages = self.qmf.getPackages()
@@ -524,6 +530,7 @@ except KeyboardInterrupt:
     print
 except Exception,e:
     print "Failed: %s - %s" % (e.__class__.__name__, e)
+    bm.Disconnect()   # try to deallocate brokers
     raise                               # FIXME aconway 2010-03-03: 
     sys.exit(1)
 

Modified: qpid/trunk/qpid/tools/src/py/qpid-tool
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-tool?rev=987330&r1=987329&r2=987330&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-tool (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-tool Thu Aug 19 22:12:27 2010
@@ -191,8 +191,12 @@ class QmfData(Console):
     self.cli = cli
 
   def close(self):
-    self.closing = True
-    self.session.delBroker(self.broker)
+    try:
+      self.closing = True
+      if self.session and self.broker:
+        self.session.delBroker(self.broker)
+    except:
+      pass   # we're shutting down - ignore any errors
 
   def classCompletions(self, text):
     pass
@@ -645,4 +649,8 @@ try:
 except KeyboardInterrupt:
   print
   print "Exiting..."
-  data.close()
+except Exception, e:
+  print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+# alway attempt to cleanup broker resources
+data.close()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org