You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/06/11 16:34:19 UTC
svn commit: r953703 - /qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
Author: kgiusti
Date: Fri Jun 11 14:34:18 2010
New Revision: 953703
URL: http://svn.apache.org/viewvc?rev=953703&view=rev
Log:
QPID-2663: scale and performance optimizations.
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=953703&r1=953702&r2=953703&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Fri Jun 11 14:34:18 2010
@@ -34,8 +34,9 @@ from qpid.connection import Connection,
from qpid.datatypes import Message, RangedSet, UUID
from qpid.util import connect, ssl, URL
from qpid.codec010 import StringCodec as Codec
-from threading import Lock, Condition, Thread
-from time import time, strftime, gmtime
+from threading import Lock, Condition, Thread, Semaphore
+from Queue import Queue, Empty
+from time import time, strftime, gmtime, sleep
from cStringIO import StringIO
#import qpid.log
@@ -382,7 +383,7 @@ class Object(object):
mp.user_id = self._broker.authUser
mp.correlation_id = str(seq)
mp.app_id = "qmf2"
- mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name)
+ mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_direct_queue)
mp.application_headers = {'qmf.opcode':'_method_request'}
sendCodec.write_map(call)
smsg = Message(dp, mp, sendCodec.encoded)
@@ -671,7 +672,8 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
if broker.brokerSupportsV2:
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_queue_name, bindingkey=v2key)
+ # data indications should arrive on the lo priority queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, bindingkey=v2key)
def bindClass(self, pname, cname):
@@ -686,7 +688,8 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
if broker.brokerSupportsV2:
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_queue_name, bindingkey=v2key)
+ # data indications should arrive on the lo priority queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, bindingkey=v2key)
def bindClassKey(self, classKey):
@@ -1200,7 +1203,7 @@ class Session:
if Session.ENCODINGS.has_key(klass):
return self.ENCODINGS[klass]
for base in klass.__bases__:
- result = self._encoding(base, obj)
+ result = self._encoding(base)
if result != None:
return result
@@ -1333,7 +1336,7 @@ class Session:
mp.user_id = broker.authUser
mp.correlation_id = str(seq)
mp.app_id = "qmf2"
- mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name)
+ mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_direct_queue)
mp.application_headers = {'qmf.opcode':'_method_request'}
sendCodec.write_map(call)
msg = Message(dp, mp, sendCodec.encoded)
@@ -1880,82 +1883,32 @@ class MethodResult(object):
#===================================================================================================
-# ManagedConnection
-#===================================================================================================
-class ManagedConnection(Thread):
- """ Thread class for managing a connection. """
- DELAY_MIN = 1
- DELAY_MAX = 128
- DELAY_FACTOR = 2
-
- def __init__(self, broker):
- Thread.__init__(self)
- self.daemon = True
- self.name = "Connection for broker: %s:%d" % (broker.host, broker.port)
- self.broker = broker
- self.cv = Condition()
- self.canceled = False
-
- def stop(self):
- """ Tell this thread to stop running and return. """
- try:
- self.cv.acquire()
- self.canceled = True
- self.cv.notify()
- finally:
- self.cv.release()
-
- def disconnected(self):
- """ Notify the thread that the connection was lost. """
- try:
- self.cv.acquire()
- self.cv.notify()
- finally:
- self.cv.release()
-
- def run(self):
- """ Main body of the running thread. """
- delay = self.DELAY_MIN
- while True:
- try:
- self.broker._tryToConnect()
- try:
- self.cv.acquire()
- while (not self.canceled) and self.broker.connected:
- self.cv.wait()
- if self.canceled:
- return
- delay = self.DELAY_MIN
- finally:
- self.cv.release()
- except socket.error:
- if delay < self.DELAY_MAX:
- delay *= self.DELAY_FACTOR
- except SessionDetached:
- if delay < self.DELAY_MAX:
- delay *= self.DELAY_FACTOR
- except Closed:
- if delay < self.DELAY_MAX:
- delay *= self.DELAY_FACTOR
-
- try:
- self.cv.acquire()
- self.cv.wait(delay)
- if self.canceled:
- return
- finally:
- self.cv.release()
-
-
-#===================================================================================================
# Broker
#===================================================================================================
-class Broker:
+class Broker(Thread):
""" This object represents a connection (or potential connection) to a QMF broker. """
SYNC_TIME = 60
nextSeq = 1
+ # for connection recovery
+ DELAY_MIN = 1
+ DELAY_MAX = 128
+ DELAY_FACTOR = 2
+
+ class _q_item:
+ """ Broker-private class to encapsulate data sent to the broker thread
+ queue.
+ """
+ type_wakeup = 0
+ type_v1msg = 1
+ type_v2msg = 2
+
+ def __init__(self, typecode, data):
+ self.typecode = typecode
+ self.data = data
+
def __init__(self, session, host, port, authMechs, authUser, authPass, ssl=False, connTimeout=None):
+ Thread.__init__(self)
self.session = session
self.host = host
self.port = port
@@ -1967,18 +1920,28 @@ class Broker:
self.cv = Condition()
self.seqToAgentMap = {}
self.error = None
+ self.conn_exc = None # exception hit by _tryToConnect()
self.brokerId = None
self.connected = False
self.brokerAgent = None
self.brokerSupportsV2 = None
+ self.rcv_queue = Queue() # for msg received on session
self.amqpSessionId = "%s.%d.%d" % (platform.uname()[1], os.getpid(), Broker.nextSeq)
Broker.nextSeq += 1
- if self.session.manageConnections:
- self.thread = ManagedConnection(self)
- self.thread.start()
- else:
- self.thread = None
- self._tryToConnect()
+
+ # thread control
+ self.setDaemon(True)
+ self.setName("Thread for broker: %s:%d" % (host, port))
+ self.canceled = False
+ self.ready = Semaphore(0)
+ self.start()
+ if not self.session.manageConnections:
+ # wait for connection setup to complete in subthread.
+ # On failure, propagate exeception to caller
+ self.ready.acquire()
+ if self.conn_exc:
+ raise self.conn_exc
+
def isConnected(self):
""" Return True if there is an active connection to the broker. """
@@ -2064,6 +2027,8 @@ class Broker:
self.cv.release()
def _tryToConnect(self):
+ self.error = None
+ self.conn_exc = None
try:
try:
self.cv.acquire()
@@ -2106,9 +2071,9 @@ class Broker:
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
- self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
- self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL)
- self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL)
+ self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200)
self.topicName = "topic-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
@@ -2116,9 +2081,9 @@ class Broker:
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("tdest").listen(self._v1Cb)
- self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
- self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
- self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
+ self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.message, value=200)
##
## Check to see if the broker has QMFv2 exchanges configured
@@ -2139,21 +2104,44 @@ class Broker:
## Set up connectivity for QMFv2
##
if self.brokerSupportsV2:
- self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId
- self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True)
+ # set up 3 queues:
+ # 1 direct queue - for responses destined to this console.
+ # 2 topic queues - one for heartbeats (hi priority), one for all other indications.
+ self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True)
+ self.v2_topic_queue_lo = "qmfc-v2-lo-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_topic_queue_lo, exclusive=True, auto_delete=True)
+ self.v2_topic_queue_hi = "qmfc-v2-hi-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_topic_queue_hi, exclusive=True, auto_delete=True)
+
self.amqpSession.exchange_bind(exchange="qmf.default.direct",
- queue=self.v2_queue_name, binding_key=self.v2_queue_name)
+ queue=self.v2_direct_queue, binding_key=self.v2_direct_queue)
## Other bindings here...
- self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest",
+
+ self.amqpSession.message_subscribe(queue=self.v2_direct_queue, destination="v2dest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
- self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1)
- self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL)
- self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL)
+ self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50)
+
+ self.amqpSession.message_subscribe(queue=self.v2_topic_queue_lo, destination="v2TopicLo",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+ self.amqpSession.incoming("v2TopicLo").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2TopicLo", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.message, value=25)
- self.connected = True
- self.session._handleBrokerConnect(self)
+
+ self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hi, destination="v2TopicHi",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+ self.amqpSession.incoming("v2TopicHi").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2TopicHi", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.message, value=100)
codec = Codec()
self._setHeader(codec, 'B')
@@ -2162,15 +2150,20 @@ class Broker:
if self.brokerSupportsV2:
self._v2SendAgentLocate()
+ return True # connection complete
+
except socket.error, e:
self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
- raise
except Closed, e:
self.error = "Connect Failed %s - %s" % (e.__class__.__name__, e)
- raise
except ConnectionFailed, e:
self.error = "Connect Failed %s - %s" % (e.__class__.__name__, e)
- raise
+ except:
+ e = Exception("Unknown connection exception")
+ self.error = str(e)
+
+ self.conn_exc = e
+ return False # connection failed
def _updateAgent(self, obj):
bankKey = str(obj.agentBank)
@@ -2233,7 +2226,7 @@ class Broker:
mp.content_type = "amqp/map"
mp.user_id = self.authUser
mp.app_id = "qmf2"
- mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name)
+ mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_direct_queue)
mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
sendCodec = Codec()
sendCodec.write_map(predicate)
@@ -2280,14 +2273,20 @@ class Broker:
def _send(self, msg, dest="qpid.management"):
self.amqpSession.message_transfer(destination=dest, message=msg)
- def _shutdown(self):
- if self.thread:
- self.thread.stop()
- self.thread.join()
+ def _shutdown(self, _timeout=10):
+ if self.isAlive():
+ # kick the thread
+ self.canceled = True
+ self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
+ self.join(_timeout)
if self.connected:
self.amqpSession.incoming("rdest").stop()
if self.session.console != None:
self.amqpSession.incoming("tdest").stop()
+ if self.brokerSupportsV2:
+ self.amqpSession.incoming("v2dest").stop()
+ self.amqpSession.incoming("v2TopicLo").stop()
+ self.amqpSession.incoming("v2TopicHi").stop()
self.amqpSession.close()
self.conn.close()
self.connected = False
@@ -2325,9 +2324,20 @@ class Broker:
self.amqpSession.exchange_bind(exchange="qpid.management",
queue=self.topicName, binding_key=key)
if self.brokerSupportsV2:
+ # do not drop heartbeat indications when under load from data
+ # or event indications. Put heartbeats on their own dedicated
+ # queue.
+ #
for key in self.session.v2BindingKeyList:
- self.amqpSession.exchange_bind(exchange="qmf.default.topic",
- queue=self.v2_queue_name, binding_key=key)
+ if key.startswith("agent.ind.heartbeat"):
+ self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+ queue=self.v2_topic_queue_hi,
+ binding_key=key)
+ else:
+ self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+ queue=self.v2_topic_queue_lo,
+ binding_key=key)
+
if self.reqsOutstanding == 0 and self.syncInFlight:
self.syncInFlight = False
self.cv.notify()
@@ -2335,14 +2345,19 @@ class Broker:
self.cv.release()
def _v1Cb(self, msg):
+ """ Callback from session receive thread for V1 messages
+ """
+ self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v1msg, msg))
+
+ def _v1Dispatch(self, msg):
try:
- self._v1CbProtected(msg)
+ self._v1DispatchProtected(msg)
except Exception, e:
print "EXCEPTION in Broker._v1Cb:", e
import traceback
traceback.print_exc()
- def _v1CbProtected(self, msg):
+ def _v1DispatchProtected(self, msg):
"""
This is the general message handler for messages received via the QMFv1 exchanges.
"""
@@ -2382,7 +2397,7 @@ class Broker:
finally:
self.cv.release()
- if opcode == None: return
+ if opcode == None: break
if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
@@ -2400,6 +2415,11 @@ class Broker:
pass
def _v2Cb(self, msg):
+ """ Callback from session receive thread for V2 messages
+ """
+ self.rcv_queue.put(Broker._q_item(Broker._q_item.type_v2msg, msg))
+
+ def _v2Dispatch(self, msg):
"""
This is the general message handler for messages received via QMFv2 exchanges.
"""
@@ -2455,18 +2475,112 @@ class Broker:
pass
def _exceptionCb(self, data):
- self.connected = False
- self.error = data
+ """ Exception notification callback from session receive thread.
+ """
+ self.cv.acquire()
try:
- self.cv.acquire()
- if self.syncInFlight:
- self.cv.notify()
+ self.connected = False
+ self.error = data
finally:
self.cv.release()
- self.session._handleError(self.error)
- self.session._handleBrokerDisconnect(self)
- if self.thread:
- self.thread.disconnected()
+ self.rcv_queue.put(Broker._q_item(Broker._q_item.type_wakeup, None))
+
+ def run(self):
+ """ Main body of the running thread. """
+
+ # First, attempt a connection. In the unmanaged case,
+ # failure to connect needs to cause the Broker()
+ # constructor to raise an exception.
+ delay = self.DELAY_MIN
+ while not self.canceled:
+ if self._tryToConnect(): # connection up
+ break
+ # unmanaged connection - fail & wake up constructor
+ if not self.session.manageConnections:
+ self.ready.release()
+ return
+ # managed connection - try again
+ count = 0
+ while not self.canceled and count < delay:
+ sleep(1)
+ count += 1
+ if delay < self.DELAY_MAX:
+ delay *= self.DELAY_FACTOR
+
+ if self.canceled:
+ self.ready.release()
+ return
+
+ # connection successful!
+ self.cv.acquire()
+ try:
+ self.connected = True
+ finally:
+ self.cv.release()
+
+ self.session._handleBrokerConnect(self)
+ self.ready.release()
+
+ while not self.canceled:
+
+ item = self.rcv_queue.get()
+
+ if self.canceled:
+ return
+
+ if not self.connected:
+ # connection failure
+ while item:
+ # drain the queue
+ try:
+ item = self.rcv_queue.get(block=False)
+ except Empty:
+ break
+
+ # notify any waiters, and callback
+ self.cv.acquire()
+ try:
+ edata = self.error;
+ if self.syncInFlight:
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ self.session._handleError(edata)
+ self.session._handleBrokerDisconnect(self)
+
+ if not self.session.manageConnections:
+ return # do not attempt recovery
+
+ # retry connection setup
+ delay = self.DELAY_MIN
+ while not self.canceled:
+ if self._tryToConnect():
+ break
+ # managed connection - try again
+ count = 0
+ while not self.canceled and count < delay:
+ sleep(1)
+ count += 1
+ if delay < self.DELAY_MAX:
+ delay *= self.DELAY_FACTOR
+
+ if self.canceled:
+ return
+
+ # connection successful!
+ self.cv.acquire()
+ try:
+ self.connected = True
+ finally:
+ self.cv.release()
+
+ self.session._handleBrokerConnect(self)
+
+ elif item.typecode == Broker._q_item.type_v1msg:
+ self._v1Dispatch(item.data)
+ elif item.typecode == Broker._q_item.type_v2msg:
+ self._v2Dispatch(item.data)
+
#===================================================================================================
@@ -2932,7 +3046,7 @@ class Agent:
mp.user_id = self.broker.authUser
mp.correlation_id = str(sequence)
mp.app_id = "qmf2"
- mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name)
+ mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue)
mp.application_headers = {'qmf.opcode':'_query_request'}
sendCodec = Codec()
sendCodec.write_map(query)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org