You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/08/18 20:05:10 UTC

svn commit: r986828 - /qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Author: kgiusti
Date: Wed Aug 18 18:05:10 2010
New Revision: 986828

URL: http://svn.apache.org/viewvc?rev=986828&view=rev
Log:
QPID-2663: prevent slow consoles from causing agent disconnects due to full topic queues

Modified:
    qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py

Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=986828&r1=986827&r2=986828&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Wed Aug 18 18:05:10 2010
@@ -678,8 +678,8 @@ class Session:
           broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
         if broker.brokerSupportsV2:
           for v2key in v2keys:
-            # data indications should arrive on the lo priority queue
-            broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+            # data indications should arrive on the unsolicited indication queue
+            broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
 
 
   def bindClass(self, pname, cname=None):
@@ -701,8 +701,8 @@ class Session:
       if broker.isConnected():
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
         if broker.brokerSupportsV2:
-          # data indications should arrive on the lo priority queue
-          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+          # data indications should arrive on the unsolicited indication queue
+          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
 
       
   def bindClassKey(self, classKey):
@@ -731,8 +731,8 @@ class Session:
       if broker.isConnected():
         broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=v1key)
         if broker.brokerSupportsV2:
-          # event indications should arrive on the lo priority queue
-          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_lo, binding_key=v2key)
+          # event indications should arrive on the unsolicited indication queue
+          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_ui, binding_key=v2key)
 
   def bindEventKey(self, eventKey):
     """ Request events for a particular class by class key. Only valid if
@@ -757,8 +757,8 @@ class Session:
     for broker in self.brokers:
       if broker.isConnected():
         if broker.brokerSupportsV2:
-          # heartbeats should arrive on the hi priority queue
-          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hi, binding_key=v2key)
+          # heartbeats should arrive on the heartbeat queue
+          broker.amqpSession.exchange_bind(exchange="qmf.default.topic", queue=broker.v2_topic_queue_hb, binding_key=v2key)
 
   def getAgents(self, broker=None):
     """ Get a list of currently known agents """
@@ -2206,6 +2206,10 @@ class Broker(Thread):
       sock.settimeout(oldTimeout)
       self.conn.aborted = oldAborted
 
+      # prevent topic queues from filling up (and causing the agents to
+      # disconnect) by discarding the oldest queued messages when full.
+      topic_queue_options = {"qpid.policy_type":"ring"}
+
       self.replyName = "reply-%s" % self.amqpSessionId
       self.amqpSession = self.conn.session(self.amqpSessionId)
       self.amqpSession.timeout = self.SYNC_TIME
@@ -2222,7 +2226,9 @@ class Broker(Thread):
       self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200)
 
       self.topicName = "topic-%s" % self.amqpSessionId
-      self.amqpSession.queue_declare(queue=self.topicName, exclusive=True, auto_delete=True)
+      self.amqpSession.queue_declare(queue=self.topicName, exclusive=True,
+                                     auto_delete=True,
+                                     arguments=topic_queue_options)
       self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
                                          accept_mode=self.amqpSession.accept_mode.none,
                                          acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
@@ -2252,13 +2258,18 @@ class Broker(Thread):
       if self.brokerSupportsV2:
         # set up 3 queues:
         # 1 direct queue - for responses destined to this console.
-        # 2 topic queues - one for heartbeats (hi priority), one for all other indications.
+        # 2 topic queues - one for heartbeats (hb), one for unsolicited data
+        # and event indications (ui).
         self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId
         self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True)
-        self.v2_topic_queue_lo = "qmfc-v2-lo-%s" % self.amqpSessionId
-        self.amqpSession.queue_declare(queue=self.v2_topic_queue_lo, exclusive=True, auto_delete=True)
-        self.v2_topic_queue_hi = "qmfc-v2-hi-%s" % self.amqpSessionId
-        self.amqpSession.queue_declare(queue=self.v2_topic_queue_hi, exclusive=True, auto_delete=True)
+        self.v2_topic_queue_ui = "qmfc-v2-ui-%s" % self.amqpSessionId
+        self.amqpSession.queue_declare(queue=self.v2_topic_queue_ui,
+                                       exclusive=True, auto_delete=True,
+                                       arguments=topic_queue_options)
+        self.v2_topic_queue_hb = "qmfc-v2-hb-%s" % self.amqpSessionId
+        self.amqpSession.queue_declare(queue=self.v2_topic_queue_hb,
+                                       exclusive=True, auto_delete=True,
+                                       arguments=topic_queue_options)
 
         self.amqpSession.exchange_bind(exchange="qmf.default.direct",
                                        queue=self.v2_direct_queue, binding_key=self.v2_direct_queue)
@@ -2272,22 +2283,22 @@ class Broker(Thread):
         self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
         self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50)
 
-        self.amqpSession.message_subscribe(queue=self.v2_topic_queue_lo, destination="v2TopicLo",
+        self.amqpSession.message_subscribe(queue=self.v2_topic_queue_ui, destination="v2TopicUI",
                                            accept_mode=self.amqpSession.accept_mode.none,
                                            acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
-        self.amqpSession.incoming("v2TopicLo").listen(self._v2Cb, self._exceptionCb)
-        self.amqpSession.message_set_flow_mode(destination="v2TopicLo", flow_mode=self.amqpSession.flow_mode.window)
-        self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
-        self.amqpSession.message_flow(destination="v2TopicLo", unit=self.amqpSession.credit_unit.message, value=25)
+        self.amqpSession.incoming("v2TopicUI").listen(self._v2Cb, self._exceptionCb)
+        self.amqpSession.message_set_flow_mode(destination="v2TopicUI", flow_mode=self.amqpSession.flow_mode.window)
+        self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+        self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.message, value=25)
 
 
-        self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hi, destination="v2TopicHi",
+        self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hb, destination="v2TopicHB",
                                            accept_mode=self.amqpSession.accept_mode.none,
                                            acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
-        self.amqpSession.incoming("v2TopicHi").listen(self._v2Cb, self._exceptionCb)
-        self.amqpSession.message_set_flow_mode(destination="v2TopicHi", flow_mode=self.amqpSession.flow_mode.window)
-        self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
-        self.amqpSession.message_flow(destination="v2TopicHi", unit=self.amqpSession.credit_unit.message, value=100)
+        self.amqpSession.incoming("v2TopicHB").listen(self._v2Cb, self._exceptionCb)
+        self.amqpSession.message_set_flow_mode(destination="v2TopicHB", flow_mode=self.amqpSession.flow_mode.window)
+        self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFFL)
+        self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.message, value=100)
 
       codec = Codec()
       self._setHeader(codec, 'B')
@@ -2426,8 +2437,8 @@ class Broker(Thread):
         self.amqpSession.incoming("tdest").stop()
       if self.brokerSupportsV2:
         self.amqpSession.incoming("v2dest").stop()
-        self.amqpSession.incoming("v2TopicLo").stop()
-        self.amqpSession.incoming("v2TopicHi").stop()
+        self.amqpSession.incoming("v2TopicUI").stop()
+        self.amqpSession.incoming("v2TopicHB").stop()
       self.amqpSession.close()
       self.conn.close()
       self.connected = False
@@ -2472,11 +2483,11 @@ class Broker(Thread):
           for key in self.session.v2BindingKeyList:
             if key.startswith("agent.ind.heartbeat"):
               self.amqpSession.exchange_bind(exchange="qmf.default.topic",
-                                             queue=self.v2_topic_queue_hi,
+                                             queue=self.v2_topic_queue_hb,
                                              binding_key=key)
             else:
               self.amqpSession.exchange_bind(exchange="qmf.default.topic",
-                                             queue=self.v2_topic_queue_lo,
+                                             queue=self.v2_topic_queue_ui,
                                              binding_key=key)
           # solicit an agent locate now, after we bind to agent.ind.data,
           # because the agent locate will cause the agent to publish a



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org