You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/11/24 20:49:57 UTC

svn commit: r720275 - /incubator/qpid/trunk/qpid/python/qmf/console.py

Author: tross
Date: Mon Nov 24 11:49:57 2008
New Revision: 720275

URL: http://svn.apache.org/viewvc?rev=720275&view=rev
Log:
QPID-1483 - Connection handling in the Python QMF console API

Modified:
    incubator/qpid/trunk/qpid/python/qmf/console.py

Modified: incubator/qpid/trunk/qpid/python/qmf/console.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qmf/console.py?rev=720275&r1=720274&r2=720275&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qmf/console.py (original)
+++ incubator/qpid/trunk/qpid/python/qmf/console.py Mon Nov 24 11:49:57 2008
@@ -29,7 +29,7 @@
 from qpid.datatypes  import UUID, uuid4, Message, RangedSet
 from qpid.util       import connect, ssl, URL
 from qpid.codec010   import StringCodec as Codec
-from threading       import Lock, Condition
+from threading       import Lock, Condition, Thread
 from time            import time, strftime, gmtime
 from cStringIO       import StringIO
 
@@ -165,22 +165,18 @@
     if self.userBindings and not self.rcvObjects:
       raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
 
-    if manageConnections:
-      raise Exception("manageConnections - not yet implemented")
-
   def __repr__(self):
-    return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers)
+    return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
 
   def addBroker(self, target="localhost"):
     """ Connect to a Qpid broker.  Returns an object of type Broker. """
     url = BrokerURL(target)
     broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass,
                     ssl = url.scheme == URL.AMQPS)
-    if not broker.isConnected and not self.manageConnections:
-      raise Exception(broker.error)
 
     self.brokers.append(broker)
-    self.getObjects(broker=broker, _class="agent")
+    if not self.manageConnections:
+      self.getObjects(broker=broker, _class="agent")
     return broker
 
   def delBroker(self, broker):
@@ -220,22 +216,32 @@
         return self.packages[pname][pkey]
 
   def bindPackage(self, packageName):
-    """ """
+    """ Request object updates for all table classes within a package. """
     if not self.userBindings or not self.rcvObjects:
       raise Exception("userBindings option not set for Session")
+    key = "console.obj.*.*.%s.#" % packageName
+    self.bindingKeyList.append(key)
     for broker in self.brokers:
-      broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
-                                       binding_key="console.obj.*.*.%s.#" % packageName)
+      if broker.isConnected():
+        broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+                                         binding_key=key)
 
-  def bindClass(self, classKey):
-    """ """
+  def bindClass(self, pname, cname):
+    """ Request object updates for a particular table class by package and class name. """
     if not self.userBindings or not self.rcvObjects:
       raise Exception("userBindings option not set for Session")
+    key = "console.obj.*.*.%s.%s.#" % (pname, cname)
+    self.bindingKeyList.append(key)
+    for broker in self.brokers:
+      if broker.isConnected():
+        broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
+                                         binding_key=key)
+      
+  def bindClassKey(self, classKey):
+    """ Request object updates for a particular table class by class key. """
     pname = classKey.getPackageName()
     cname = classKey.getClassName()
-    for broker in self.brokers:
-      broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
-                                       binding_key="console.obj.*.*.%s.%s.#" % (pname, cname))
+    self.bindClass(pname, cname)
 
   def getAgents(self, broker=None):
     """ Get a list of currently known agents """
@@ -293,11 +299,16 @@
       agent = kwargs["_agent"]
       if agent.broker not in brokerList:
         raise Exception("Supplied agent is not accessible through the supplied broker")
-      agentList.append(agent)
+      if agent.broker.isConnected():
+        agentList.append(agent)
     else:
       for broker in brokerList:
         for agent in broker.getAgents():
-          agentList.append(agent)
+          if agent.broker.isConnected():
+            agentList.append(agent)
+
+    if len(agentList) == 0:
+      return []
 
     pname = None
     cname = None
@@ -387,10 +398,12 @@
     return keyList
 
   def _handleBrokerConnect(self, broker):
-    pass
+    if self.console:
+      self.console.brokerConnected(broker)
 
   def _handleBrokerDisconnect(self, broker):
-    pass
+    if self.console:
+      self.console.brokerDisconnected(broker)
 
   def _handleBrokerResp(self, broker, codec, seq):
     broker.brokerId = UUID(codec.read_uuid())
@@ -1127,8 +1140,63 @@
   def __repr__(self):
     return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
 
+class ManagedConnection(Thread):
+  """ Thread class for managing a connection. """
+  DELAY_MIN = 1
+  DELAY_MAX = 128
+  DELAY_FACTOR = 2
+
+  def __init__(self, broker):
+    Thread.__init__(self)
+    self.broker = broker
+    self.cv = Condition()
+    self.canceled = False
+
+  def stop(self):
+    """ Tell this thread to stop running and return. """
+    try:
+      self.cv.acquire()
+      self.canceled = True
+      self.cv.notify()
+    finally:
+      self.cv.release()
+
+  def disconnected(self):
+    """ Notify the thread that the connection was lost. """
+    try:
+      self.cv.acquire()
+      self.cv.notify()
+    finally:
+      self.cv.release()
+
+  def run(self):
+    """ Main body of the running thread. """
+    delay = self.DELAY_MIN
+    while True:
+      try:
+        self.broker._tryToConnect()
+        try:
+          self.cv.acquire()
+          while (not self.canceled) and self.broker.connected:
+            self.cv.wait()
+          if self.canceled:
+            return
+          delay = self.DELAY_MIN
+        except:
+          self.cv.release()
+      except:
+        if delay < self.DELAY_MAX:
+          delay *= self.DELAY_FACTOR
+      try:
+        self.cv.acquire()
+        self.cv.wait(delay)
+        if self.canceled:
+          return
+      finally:
+        self.cv.release()
+
 class Broker:
-  """ """
+  """ This object represents a connection (or potential connection) to a QMF broker. """
   SYNC_TIME = 60
 
   def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
@@ -1138,24 +1206,24 @@
     self.ssl = ssl
     self.authUser = authUser
     self.authPass = authPass
-    self.agents = {}
-    self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
-    self.topicBound = False
     self.cv = Condition()
-    self.syncInFlight = False
-    self.syncRequest = 0
-    self.syncResult = None
-    self.reqsOutstanding = 1
     self.error = None
     self.brokerId = None
-    self.isConnected = False
+    self.connected = False
     self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
-    self._tryToConnect()
+    if self.session.manageConnections:
+      self.thread = ManagedConnection(self)
+      self.thread.start()
+    else:
+      self.thread = None
+      self._tryToConnect()
 
   def isConnected(self):
-    return self.isConnected
+    """ Return True if there is an active connection to the broker. """
+    return self.connected
 
   def getError(self):
+    """ Return the last error message seen while trying to connect to the broker. """
     return self.error
 
   def getBrokerId(self):
@@ -1163,9 +1231,13 @@
     return self.brokerId
 
   def getBrokerBank(self):
+    """ Return the broker-bank value.  This is the value that the broker assigns to
+        objects within its control.  This value appears as a field in the ObjectId
+        of objects created by agents controlled by this broker. """
     return 1
 
   def getAgent(self, brokerBank, agentBank):
+    """ Return the agent object associated with a particular broker and agent bank value."""
     bankKey = (brokerBank, agentBank)
     if bankKey in self.agents:
       return self.agents[bankKey]
@@ -1199,13 +1271,21 @@
     return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672)
 
   def __repr__(self):
-    if self.isConnected:
+    if self.connected:
       return "Broker connected at: %s" % self.getUrl()
     else:
       return "Disconnected Broker"
 
   def _tryToConnect(self):
     try:
+      self.agents = {}
+      self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+      self.topicBound = False
+      self.syncInFlight = False
+      self.syncRequest = 0
+      self.syncResult = None
+      self.reqsOutstanding = 1
+
       sock = connect(self.host, self.port)
       if self.ssl:
         sock = ssl(sock)
@@ -1235,7 +1315,7 @@
       self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
       self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF)
 
-      self.isConnected = True
+      self.connected = True
       self.session._handleBrokerConnect(self)
 
       codec = Codec(self.conn.spec)
@@ -1245,10 +1325,13 @@
 
     except socket.error, e:
       self.error = "Socket Error %s - %s" % (e[0], e[1])
+      raise Exception(self.error)
     except Closed, e:
       self.error = "Connect Failed %d - %s" % (e[0], e[1])
+      raise Exception(self.error)
     except ConnectionFailed, e:
       self.error = "Connect Failed %d - %s" % (e[0], e[1])
+      raise Exception(self.error)
 
   def _updateAgent(self, obj):
     bankKey = (obj.brokerBank, obj.agentBank)
@@ -1301,19 +1384,22 @@
     self.amqpSession.message_transfer(destination=dest, message=msg)
 
   def _shutdown(self):
-    if self.isConnected:
+    if self.thread:
+      self.thread.stop()
+      self.thread.join()
+    if self.connected:
       self.amqpSession.incoming("rdest").stop()
       if self.session.console != None:
         self.amqpSession.incoming("tdest").stop()
       self.amqpSession.close()
       self.conn.close()
-      self.isConnected = False
-    else:
-      raise Exception("Broker already disconnected")
+      self.connected = False
 
   def _waitForStable(self):
     try:
       self.cv.acquire()
+      if not self.connected:
+        return
       if self.reqsOutstanding == 0:
         return
       self.syncInFlight = True
@@ -1365,7 +1451,7 @@
       elif opcode == 'g': self.session._handleContentInd      (self, codec, seq, prop=True, stat=True)
 
   def _exceptionCb(self, data):
-    self.isConnected = False
+    self.connected = False
     self.error = data
     try:
       self.cv.acquire()
@@ -1375,6 +1461,8 @@
       self.cv.release()
     self.session._handleError(self.error)
     self.session._handleBrokerDisconnect(self)
+    if self.thread:
+      self.thread.disconnected()
 
 class Agent:
   """ """