You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/06/11 16:34:19 UTC

svn commit: r953703 - /qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Author: kgiusti
Date: Fri Jun 11 14:34:18 2010
New Revision: 953703

URL: http://svn.apache.org/viewvc?rev=953703&view=rev
Log:
QPID-2663: scale and performance optimizations.

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=953703&r1=953702&r2=953703&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Fri Jun 11 14:34:18 2010
@@ -34,8 +34,9 @@ from qpid.connection import Connection, 
 from qpid.datatypes  import Message, RangedSet, UUID
 from qpid.util       import connect, ssl, URL
 from qpid.codec010   import StringCodec as Codec
-from threading       import Lock, Condition, Thread
-from time            import time, strftime, gmtime
+from threading       import Lock, Condition, Thread, Semaphore
+from Queue           import Queue, Empty
+from time            import time, strftime, gmtime, sleep
 from cStringIO       import StringIO
 
 #import qpid.log
@@ -382,7 +383,7 @@ class Object(object):
           mp.user_id = self._broker.authUser
           mp.correlation_id = str(seq)
           mp.app_id = "qmf2"
-          mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name)
+          mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_direct_queue)
           mp.application_headers = {'qmf.opcode':'_method_request'}
           sendCodec.write_map(call)
           smsg = Message(dp, mp, sendCodec.encoded)
@@ -671,7 +672,8 @@ class Session:
       if broker.isConnected():
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
         if broker.brokerSupportsV2:
-           broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_queue_name, bindingkey=v2key)
+          # data indications should arrive on the lo priority queue
+          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, bindingkey=v2key)
 
 
   def bindClass(self, pname, cname):
@@ -686,7 +688,8 @@ class Session:
       if broker.isConnected():
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
         if broker.brokerSupportsV2:
-           broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_queue_name, bindingkey=v2key)
+          # data indications should arrive on the lo priority queue
+          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, bindingkey=v2key)
 
       
   def bindClassKey(self, classKey):
@@ -1200,7 +1203,7 @@ class Session:
     if Session.ENCODINGS.has_key(klass):
       return self.ENCODINGS[klass]
     for base in klass.__bases__:
-      result = self._encoding(base, obj)
+      result = self._encoding(base)
       if result != None:
         return result
             
@@ -1333,7 +1336,7 @@ class Session:
           mp.user_id = broker.authUser
           mp.correlation_id = str(seq)
           mp.app_id = "qmf2"
-          mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name)
+          mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_direct_queue)
           mp.application_headers = {'qmf.opcode':'_method_request'}
           sendCodec.write_map(call)
           msg = Message(dp, mp, sendCodec.encoded)
@@ -1880,82 +1883,32 @@ class MethodResult(object):
 
 
 #===================================================================================================
-# ManagedConnection
-#===================================================================================================
-class ManagedConnection(Thread):
-  """ Thread class for managing a connection. """
-  DELAY_MIN = 1
-  DELAY_MAX = 128
-  DELAY_FACTOR = 2
-
-  def __init__(self, broker):
-    Thread.__init__(self)
-    self.daemon = True
-    self.name = "Connection for broker: %s:%d" % (broker.host, broker.port)
-    self.broker = broker
-    self.cv = Condition()
-    self.canceled = False
-
-  def stop(self):
-    """ Tell this thread to stop running and return. """
-    try:
-      self.cv.acquire()
-      self.canceled = True
-      self.cv.notify()
-    finally:
-      self.cv.release()
-
-  def disconnected(self):
-    """ Notify the thread that the connection was lost. """
-    try:
-      self.cv.acquire()
-      self.cv.notify()
-    finally:
-      self.cv.release()
-
-  def run(self):
-    """ Main body of the running thread. """
-    delay = self.DELAY_MIN
-    while True:
-      try:
-        self.broker._tryToConnect()
-        try:
-          self.cv.acquire()
-          while (not self.canceled) and self.broker.connected:
-            self.cv.wait()
-          if self.canceled:
-            return
-          delay = self.DELAY_MIN
-        finally:
-          self.cv.release()
-      except socket.error:
-        if delay < self.DELAY_MAX:
-          delay *= self.DELAY_FACTOR
-      except SessionDetached:
-        if delay < self.DELAY_MAX:
-          delay *= self.DELAY_FACTOR
-      except Closed:
-        if delay < self.DELAY_MAX:
-          delay *= self.DELAY_FACTOR
-
-      try:
-        self.cv.acquire()
-        self.cv.wait(delay)
-        if self.canceled:
-          return
-      finally:
-        self.cv.release()
-
-
-#===================================================================================================
 # Broker
 #===================================================================================================
-class Broker:
+class Broker(Thread):
   """ This object represents a connection (or potential connection) to a QMF broker. """
   SYNC_TIME = 60
   nextSeq = 1
 
+  # for connection recovery
+  DELAY_MIN = 1
+  DELAY_MAX = 128
+  DELAY_FACTOR = 2
+
+  class _q_item:
+    """ Broker-private class to encapsulate data sent to the broker thread
+    queue.
+    """
+    type_wakeup = 0
+    type_v1msg = 1
+    type_v2msg = 2
+
+    def __init__(self, typecode, data):
+      self.typecode = typecode
+      self.data = data
+
   def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None):
+    Thread.__init__(self)
     self.session  = session
     self.host = host
     self.port = port
@@ -1967,18 +1920,28 @@ class Broker:
     self.cv = Condition()
     self.seqToAgentMap = {}
     self.error = None
+    self.conn_exc = None  # exception hit by _tryToConnect()
     self.brokerId = None
     self.connected = False
     self.brokerAgent = None
     self.brokerSupportsV2 = None
+    self.rcv_queue = Queue() # for msg received on session
     self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq)
     Broker.nextSeq += 1
-    if self.session.manageConnections:
-      self.thread = ManagedConnection(self)
-      self.thread.start()
-    else:
-      self.thread = None
-      self._tryToConnect()
+
+    # thread control
+    self.setDaemon(True)
+    self.setName("Thread for broker: %s:%d" % (host, port))
+    self.canceled = False
+    self.ready = Semaphore(0)
+    self.start()
+    if not self.session.manageConnections:
+      # wait for connection setup to complete in subthread.
+      # On failure, propagate exeception to caller
+      self.ready.acquire()
+      if self.conn_exc:
+        raise self.conn_exc
+
 
   def isConnected(self):
     """ Return True if there is an active connection to the broker. """
@@ -2064,6 +2027,8 @@ class Broker:
       self.cv.release()
 
   def _tryToConnect(self):
+    self.error = None
+    self.conn_exc = None
     try:
       try:
         self.cv.acquire()
@@ -2106,9 +2071,9 @@ class Broker:
                                          accept_mode=self.amqpSession.accept_mode.none,
                                          acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
       self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
-      self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
-      self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL)
-      self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL)
+      self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=self.amqpSession.flow_mode.window)
+      self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+      self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200)
 
       self.topicName = "topic-%s" % self.amqpSessionId
       self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
@@ -2116,9 +2081,9 @@ class Broker:
                                          accept_mode=self.amqpSession.accept_mode.none,
                                          acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
       self.amqpSession.incoming("tdest").listen(self._v1Cb)
-      self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
-      self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
-      self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
+      self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=self.amqpSession.flow_mode.window)
+      self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+      self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.message, value=200)
 
       ##
       ## Check to see if the broker has QMFv2 exchanges configured
@@ -2139,21 +2104,44 @@ class Broker:
       ## Set up connectivity for QMFv2
       ##
       if self.brokerSupportsV2:
-        self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId
-        self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True)
+        # set up 3 queues:
+        # 1 direct queue - for responses destined to this console.
+        # 2 topic queues - one for heartbeats (hi priority), one for all other indications.
+        self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId
+        self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True)
+        self.v2_topic_queue_lo = "qmfc-v2-lo-%s" % self.amqpSessionId
+        self.amqpSession.queue_declare(queue=self.v2_topic_queue_lo, exclusive=True, auto_delete=True)
+        self.v2_topic_queue_hi = "qmfc-v2-hi-%s" % self.amqpSessionId
+        self.amqpSession.queue_declare(queue=self.v2_topic_queue_hi, exclusive=True, auto_delete=True)
+
         self.amqpSession.exchange_bind(exchange="qmf.default.direct",
-                                       queue=self.v2_queue_name, binding_key=self.v2_queue_name)
+                                       queue=self.v2_direct_queue, binding_key=self.v2_direct_queue)
         ## Other bindings here...
-        self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest",
+
+        self.amqpSession.message_subscribe(queue=self.v2_direct_queue, destination="v2dest",
                                            accept_mode=self.amqpSession.accept_mode.none,
                                            acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
         self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
-        self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1)
-        self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL)
-        self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL)
+        self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=self.amqpSession.flow_mode.window)
+        self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+        self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50)
+
+        self.amqpSession.message_subscribe(queue=self.v2_topic_queue_lo, destination="v2TopicLo",
+                                           accept_mode=self.amqpSession.accept_mode.none,
+                                           acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+        self.amqpSession.incoming("v2TopicLo").listen(self._v2Cb, self._exceptionCb)
+        self.amqpSession.message_set_flow_mode(destination="v2TopicLo", flow_mode=self.amqpSession.flow_mode.window)
+        self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+        self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.message, value=25)
 
-      self.connected = True
-      self.session._handleBrokerConnect(self)
+
+        self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hi, destination="v2TopicHi",
+                                           accept_mode=self.amqpSession.accept_mode.none,
+                                           acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+        self.amqpSession.incoming("v2TopicHi").listen(self._v2Cb, self._exceptionCb)
+        self.amqpSession.message_set_flow_mode(destination="v2TopicHi", flow_mode=self.amqpSession.flow_mode.window)
+        self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+        self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.message, value=100)
 
       codec = Codec()
       self._setHeader(codec, 'B')
@@ -2162,15 +2150,20 @@ class Broker:
       if self.brokerSupportsV2:
         self._v2SendAgentLocate()
 
+      return True  # connection complete
+
     except socket.error, e:
       self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
-      raise
     except Closed, e:
       self.error = "Connect Failed %s - %s" % (e.__class__.__name__, e)
-      raise
     except ConnectionFailed, e:
       self.error = "Connect Failed %s - %s" % (e.__class__.__name__, e)
-      raise
+    except:
+      e = Exception("Unknown connection exception")
+      self.error = str(e)
+
+    self.conn_exc = e
+    return False     # connection failed
 
   def _updateAgent(self, obj):
     bankKey = str(obj.agentBank)
@@ -2233,7 +2226,7 @@ class Broker:
     mp.content_type = "amqp/map"
     mp.user_id = self.authUser
     mp.app_id = "qmf2"
-    mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name)
+    mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue)
     mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
     sendCodec = Codec()
     sendCodec.write_map(predicate)
@@ -2280,14 +2273,20 @@ class Broker:
   def _send(self, msg, dest="qpid.management"):
     self.amqpSession.message_transfer(destination=dest, message=msg)
 
-  def _shutdown(self):
-    if self.thread:
-      self.thread.stop()
-      self.thread.join()
+  def _shutdown(self, _timeout=10):
+    if self.isAlive():
+      # kick the thread
+      self.canceled = True
+      self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
+      self.join(_timeout)
     if self.connected:
       self.amqpSession.incoming("rdest").stop()
       if self.session.console != None:
         self.amqpSession.incoming("tdest").stop()
+      if self.brokerSupportsV2:
+        self.amqpSession.incoming("v2dest").stop()
+        self.amqpSession.incoming("v2TopicLo").stop()
+        self.amqpSession.incoming("v2TopicHi").stop()
       self.amqpSession.close()
       self.conn.close()
       self.connected = False
@@ -2325,9 +2324,20 @@ class Broker:
           self.amqpSession.exchange_bind(exchange="qpid.management",
                                          queue=self.topicName, binding_key=key)
         if self.brokerSupportsV2:
+          # do not drop heartbeat indications when under load from data
+          # or event indications.  Put heartbeats on their own dedicated
+          # queue.
+          #
           for key in self.session.v2BindingKeyList:
-            self.amqpSession.exchange_bind(exchange="qmf.default.topic",
-                                           queue=self.v2_queue_name, binding_key=key)
+            if key.startswith("agent.ind.heartbeat"):
+              self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+                                             queue=self.v2_topic_queue_hi,
+                                             binding_key=key)
+            else:
+              self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+                                             queue=self.v2_topic_queue_lo,
+                                             binding_key=key)
+
       if self.reqsOutstanding == 0 and self.syncInFlight:
         self.syncInFlight = False
         self.cv.notify()
@@ -2335,14 +2345,19 @@ class Broker:
       self.cv.release()
 
   def _v1Cb(self, msg):
+    """ Callback from session receive thread for V1 messages
+    """
+    self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v1msg, msg))
+
+  def _v1Dispatch(self, msg):
     try:
-      self._v1CbProtected(msg)
+      self._v1DispatchProtected(msg)
     except Exception, e:
       print "EXCEPTION in Broker._v1Cb:", e
       import traceback
       traceback.print_exc()
 
-  def _v1CbProtected(self, msg):
+  def _v1DispatchProtected(self, msg):
     """
     This is the general message handler for messages received via the QMFv1 exchanges.
     """
@@ -2382,7 +2397,7 @@ class Broker:
         finally:
           self.cv.release()
 
-      if   opcode == None: return
+      if   opcode == None: break
       if   opcode == 'b': self.session._handleBrokerResp      (self, codec, seq)
       elif opcode == 'p': self.session._handlePackageInd      (self, codec, seq)
       elif opcode == 'q': self.session._handleClassInd        (self, codec, seq)
@@ -2400,6 +2415,11 @@ class Broker:
       pass
 
   def _v2Cb(self, msg):
+    """ Callback from session receive thread for V2 messages
+    """
+    self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v2msg, msg))
+
+  def _v2Dispatch(self, msg):
     """
     This is the general message handler for messages received via QMFv2 exchanges.
     """
@@ -2455,18 +2475,112 @@ class Broker:
       pass
 
   def _exceptionCb(self, data):
-    self.connected = False
-    self.error = data
+    """ Exception notification callback from session receive thread.
+    """
+    self.cv.acquire()
     try:
-      self.cv.acquire()
-      if self.syncInFlight:
-        self.cv.notify()
+      self.connected = False
+      self.error = data
     finally:
       self.cv.release()
-    self.session._handleError(self.error)
-    self.session._handleBrokerDisconnect(self)
-    if self.thread:
-      self.thread.disconnected()
+    self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
+
+  def run(self):
+    """ Main body of the running thread. """
+
+    # First, attempt a connection.  In the unmanaged case,
+    # failure to connect needs to cause the Broker()
+    # constructor to raise an exception.
+    delay = self.DELAY_MIN
+    while not self.canceled:
+      if self._tryToConnect(): # connection up
+        break
+      # unmanaged connection - fail & wake up constructor
+      if not self.session.manageConnections:
+        self.ready.release()
+        return
+      # managed connection - try again
+      count = 0
+      while not self.canceled and count < delay:
+        sleep(1)
+        count += 1
+      if delay < self.DELAY_MAX:
+        delay *= self.DELAY_FACTOR
+
+    if self.canceled:
+      self.ready.release()
+      return
+
+    # connection successful!
+    self.cv.acquire()
+    try:
+      self.connected = True
+    finally:
+      self.cv.release()
+
+    self.session._handleBrokerConnect(self)
+    self.ready.release()
+
+    while not self.canceled:
+
+      item = self.rcv_queue.get()
+
+      if self.canceled:
+        return
+
+      if not self.connected:
+        # connection failure
+        while item:
+          # drain the queue
+          try:
+            item = self.rcv_queue.get(block=False)
+          except Empty:
+            break
+
+        # notify any waiters, and callback
+        self.cv.acquire()
+        try:
+          edata = self.error;
+          if self.syncInFlight:
+            self.cv.notify()
+        finally:
+          self.cv.release()
+        self.session._handleError(edata)
+        self.session._handleBrokerDisconnect(self)
+
+        if not self.session.manageConnections:
+          return  # do not attempt recovery
+
+        # retry connection setup
+        delay = self.DELAY_MIN
+        while not self.canceled:
+          if self._tryToConnect():
+            break
+          # managed connection - try again
+          count = 0
+          while not self.canceled and count < delay:
+            sleep(1)
+            count += 1
+          if delay < self.DELAY_MAX:
+            delay *= self.DELAY_FACTOR
+
+        if self.canceled:
+          return
+
+        # connection successful!
+        self.cv.acquire()
+        try:
+          self.connected = True
+        finally:
+          self.cv.release()
+
+        self.session._handleBrokerConnect(self)
+
+      elif item.typecode == Broker._q_item.type_v1msg:
+        self._v1Dispatch(item.data)
+      elif item.typecode == Broker._q_item.type_v2msg:
+        self._v2Dispatch(item.data)
+
 
 
 #===================================================================================================
@@ -2932,7 +3046,7 @@ class Agent:
     mp.user_id = self.broker.authUser
     mp.correlation_id = str(sequence)
     mp.app_id = "qmf2"
-    mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name)
+    mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue)
     mp.application_headers = {'qmf.opcode':'_query_request'}
     sendCodec = Codec()
     sendCodec.write_map(query)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org