You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/08/18 20:05:10 UTC
svn commit: r986828 - /qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
Author: kgiusti
Date: Wed Aug 18 18:05:10 2010
New Revision: 986828
URL: http://svn.apache.org/viewvc?rev=986828&view=rev
Log:
QPID-2663: prevent slow consoles from causing agent disconnects due to full topic queues
Modified:
qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=986828&r1=986827&r2=986828&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Wed Aug 18 18:05:10 2010
@@ -678,8 +678,8 @@ class Session:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
if broker.brokerSupportsV2:
for v2key in v2keys:
- # data indications should arrive on the lo priority queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+ # data indications should arrive on the unsolicited indication queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
def bindClass(self, pname, cname=None):
@@ -701,8 +701,8 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
if broker.brokerSupportsV2:
- # data indications should arrive on the lo priority queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+ # data indications should arrive on the unsolicited indication queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
def bindClassKey(self, classKey):
@@ -731,8 +731,8 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
if broker.brokerSupportsV2:
- # event indications should arrive on the lo priority queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+ # event indications should arrive on the unsolicited indication queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
def bindEventKey(self, eventKey):
""" Request events for a particular class by class key. Only valid if
@@ -757,8 +757,8 @@ class Session:
for broker in self.brokers:
if broker.isConnected():
if broker.brokerSupportsV2:
- # heartbeats should arrive on the hi priority queue
- broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hi, binding_key=v2key)
+ # heartbeats should arrive on the heartbeat queue
+ broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hb, binding_key=v2key)
def getAgents(self, broker=None):
""" Get a list of currently known agents """
@@ -2206,6 +2206,10 @@ class Broker(Thread):
sock.settimeout(oldTimeout)
self.conn.aborted = oldAborted
+ # prevent topic queues from filling up (and causing the agents to
+ # disconnect) by discarding the oldest queued messages when full.
+ topic_queue_options = {"qpid.policy_type":"ring"}
+
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
self.amqpSession.timeout = self.SYNC_TIME
@@ -2222,7 +2226,9 @@ class Broker(Thread):
self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200)
self.topicName = "topic-%s" % self.amqpSessionId
- self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
+ self.amqpSession.queue_declare(queue=self.topicName, exclusive=True,
+ auto_delete=True,
+ arguments=topic_queue_options)
self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
@@ -2252,13 +2258,18 @@ class Broker(Thread):
if self.brokerSupportsV2:
# set up 3 queues:
# 1 direct queue - for responses destined to this console.
- # 2 topic queues - one for heartbeats (hi priority), one for all other indications.
+ # 2 topic queues - one for heartbeats (hb), one for unsolicited data
+ # and event indications (ui).
self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True)
- self.v2_topic_queue_lo = "qmfc-v2-lo-%s" % self.amqpSessionId
- self.amqpSession.queue_declare(queue=self.v2_topic_queue_lo, exclusive=True, auto_delete=True)
- self.v2_topic_queue_hi = "qmfc-v2-hi-%s" % self.amqpSessionId
- self.amqpSession.queue_declare(queue=self.v2_topic_queue_hi, exclusive=True, auto_delete=True)
+ self.v2_topic_queue_ui = "qmfc-v2-ui-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_topic_queue_ui,
+ exclusive=True, auto_delete=True,
+ arguments=topic_queue_options)
+ self.v2_topic_queue_hb = "qmfc-v2-hb-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_topic_queue_hb,
+ exclusive=True, auto_delete=True,
+ arguments=topic_queue_options)
self.amqpSession.exchange_bind(exchange="qmf.default.direct",
queue=self.v2_direct_queue, binding_key=self.v2_direct_queue)
@@ -2272,22 +2283,22 @@ class Broker(Thread):
self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50)
- self.amqpSession.message_subscribe(queue=self.v2_topic_queue_lo, destination="v2TopicLo",
+ self.amqpSession.message_subscribe(queue=self.v2_topic_queue_ui, destination="v2TopicUI",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("v2TopicLo").listen(self._v2Cb, self._exceptionCb)
- self.amqpSession.message_set_flow_mode(destination="v2TopicLo", flow_mode=self.amqpSession.flow_mode.window)
- self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
- self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.message, value=25)
+ self.amqpSession.incoming("v2TopicUI").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2TopicUI", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.message, value=25)
- self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hi, destination="v2TopicHi",
+ self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hb, destination="v2TopicHB",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("v2TopicHi").listen(self._v2Cb, self._exceptionCb)
- self.amqpSession.message_set_flow_mode(destination="v2TopicHi", flow_mode=self.amqpSession.flow_mode.window)
- self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
- self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.message, value=100)
+ self.amqpSession.incoming("v2TopicHB").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2TopicHB", flow_mode=self.amqpSession.flow_mode.window)
+ self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.message, value=100)
codec = Codec()
self._setHeader(codec, 'B')
@@ -2426,8 +2437,8 @@ class Broker(Thread):
self.amqpSession.incoming("tdest").stop()
if self.brokerSupportsV2:
self.amqpSession.incoming("v2dest").stop()
- self.amqpSession.incoming("v2TopicLo").stop()
- self.amqpSession.incoming("v2TopicHi").stop()
+ self.amqpSession.incoming("v2TopicUI").stop()
+ self.amqpSession.incoming("v2TopicHB").stop()
self.amqpSession.close()
self.conn.close()
self.connected = False
@@ -2472,11 +2483,11 @@ class Broker(Thread):
for key in self.session.v2BindingKeyList:
if key.startswith("agent.ind.heartbeat"):
self.amqpSession.exchange_bind(exchange="qmf.default.topic",
- queue=self.v2_topic_queue_hi,
+ queue=self.v2_topic_queue_hb,
binding_key=key)
else:
self.amqpSession.exchange_bind(exchange="qmf.default.topic",
- queue=self.v2_topic_queue_lo,
+ queue=self.v2_topic_queue_ui,
binding_key=key)
# solicit an agent locate now, after we bind to agent.ind.data,
# because the agent locate will cause the agent to publish a
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org