You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/21 03:04:16 UTC

svn commit: r1291553 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-sto...

Author: chirino
Date: Tue Feb 21 02:04:15 2012
New Revision: 1291553

URL: http://svn.apache.org/viewvc?rev=1291553&view=rev
Log:
Use better session buffering defaults so that we perform better when transferring larger messages.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Feb 21 02:04:15 2012
@@ -132,11 +132,6 @@ class Queue(val router: LocalRouter, val
   var tune_swap_range_size = 0
 
   /**
-   *  The amount of memory buffer space to use per subscription.
-   */
-  var tune_consumer_buffer = 0
-
-  /**
    *  The max memory to allow this queue to grow to.
    */
   var tune_quota = -1L
@@ -220,20 +215,20 @@ class Queue(val router: LocalRouter, val
   var config:QueueDTO = _
 
   def configure(update:QueueDTO) = {
-    def mem_size(value:String, default:Int) = Option(value).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(default)
+    def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
 
-    producer_swapped_in.size_max += mem_size(update.tail_buffer, 1024*64) - Option(config).map{ config=>
-      mem_size(config.tail_buffer, 1024*64)
+    producer_swapped_in.size_max += mem_size(update.tail_buffer, "640k") - Option(config).map{ config=>
+      mem_size(config.tail_buffer, "640k")
     }.getOrElse(0)
 
     tune_persistent = virtual_host.store !=null && update.persistent.getOrElse(true)
     tune_swap = tune_persistent && update.swap.getOrElse(true)
     tune_swap_range_size = update.swap_range_size.getOrElse(10000)
-    tune_consumer_buffer = mem_size(update.consumer_buffer, 256*1024)
-    tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,1024*1024)
-    tune_catchup_enqueue_rate = mem_size(update.catchup_enqueue_rate,-1)
-    tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,-1)
-    tune_quota = mem_size(update.quota,-1)
+    tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,"1M")
+    tune_catchup_enqueue_rate = mem_size(update.catchup_enqueue_rate,"-1")
+    tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,"-1")
+    tune_quota = mem_size(update.quota,"-1")
+
     auto_delete_after = update.auto_delete_after.getOrElse(30)
     if( auto_delete_after!= 0 ) {
       // we don't auto delete explicitly configured queues,
@@ -374,22 +369,9 @@ class Queue(val router: LocalRouter, val
   def update(on_completed:Runnable) = dispatch_queue {
 
     val prev_persistent = tune_persistent
-    val prev_consumer_size = tune_consumer_buffer
 
     configure(binding.config(virtual_host))
 
-    val consumer_buffer_change = tune_consumer_buffer-prev_consumer_size
-    if( consumer_buffer_change!=0 ) {
-      // for each
-      all_subscriptions.values.foreach { sub =>
-        // open session
-        if( sub.session!=null ) {
-          // change the queue capacity, by the change in consumer buffer change.
-          change_consumer_capacity(consumer_buffer_change)
-        }
-      }
-    }
-
     restore_from_store {
       check_idle
       trigger_swap
@@ -2028,6 +2010,8 @@ class Subscription(val queue:Queue, val 
   def browser = consumer.browser
   def exclusive = consumer.exclusive
 
+  val consumer_buffer = consumer.receive_buffer_size
+  
   // This opens up the consumer
   def open() = {
     consumer.retain
@@ -2049,10 +2033,10 @@ class Subscription(val queue:Queue, val 
       }
     }
     pos ::= this
-
+    
     queue.all_subscriptions += consumer -> this
     queue.consumer_counter += 1
-    queue.change_consumer_capacity( queue.tune_consumer_buffer )
+    queue.change_consumer_capacity( consumer_buffer )
 
     if( exclusive ) {
       queue.exclusive_subscriptions.append(this)
@@ -2092,7 +2076,7 @@ class Subscription(val queue:Queue, val 
       // The following action gets executed once all acquired messages
       // ared acked or nacked.
       pending_close_action = ()=> {
-        queue.change_consumer_capacity( - queue.tune_consumer_buffer )
+        queue.change_consumer_capacity( - consumer_buffer )
 
         if( exclusive ) {
           // rewind all the subs to the start of the queue.
@@ -2209,7 +2193,7 @@ class Subscription(val queue:Queue, val 
       pos // start prefetching from the current position.
     }
 
-    var remaining = queue.tune_consumer_buffer;
+    var remaining = consumer_buffer;
     while( remaining>0 && cursor!=null ) {
       val next = cursor.getNext
       // Browsers prefetch all messages..

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Tue Feb 21 02:04:15 2012
@@ -58,12 +58,6 @@ public class QueueDTO extends StringIdDT
     public String tail_buffer;
 
     /**
-     *  The amount of memory buffer space to use per consumer.
-     */
-    @XmlAttribute(name="consumer_buffer")
-    public String consumer_buffer;
-
-    /**
      * Should this queue persistently store it's entries?
      */
     @XmlAttribute(name="persistent")
@@ -141,8 +135,6 @@ public class QueueDTO extends StringIdDT
             return false;
         if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
             return false;
-        if (consumer_buffer != null ? !consumer_buffer.equals(queueDTO.consumer_buffer) : queueDTO.consumer_buffer != null)
-            return false;
         if (max_enqueue_rate != null ? !max_enqueue_rate.equals(queueDTO.max_enqueue_rate) : queueDTO.max_enqueue_rate != null)
             return false;
         if (other != null ? !other.equals(queueDTO.other) : queueDTO.other != null) return false;
@@ -161,7 +153,6 @@ public class QueueDTO extends StringIdDT
         int result = super.hashCode();
         result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
         result = 31 * result + (mirrored != null ? mirrored.hashCode() : 0);
-        result = 31 * result + (consumer_buffer != null ? consumer_buffer.hashCode() : 0);
         result = 31 * result + (persistent != null ? persistent.hashCode() : 0);
         result = 31 * result + (swap != null ? swap.hashCode() : 0);
         result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Feb 21 02:04:15 2012
@@ -440,7 +440,7 @@ class StompProtocolHandler extends Proto
     def dispatch_queue = StompProtocolHandler.this.dispatchQueue
 
     override def connection = Some(StompProtocolHandler.this.connection)
-    override def receive_buffer_size = codec.write_buffer_size
+    override val receive_buffer_size = buffer_size
 
     def is_persistent = false
 
@@ -476,7 +476,7 @@ class StompProtocolHandler extends Proto
       producer.dispatch_queue.assertExecuting()
       retain
 
-      val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
+      val downstream = session_manager.open(producer.dispatch_queue, buffer_size)
 
       override def toString = "connection to "+StompProtocolHandler.this.connection.transport.getRemoteAddress
 
@@ -605,6 +605,10 @@ class StompProtocolHandler extends Proto
     config.die_delay.getOrElse(DEFAULT_DIE_DELAY)
   }
 
+  def buffer_size = {
+    MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
+  }
+
   override def set_connection(connection: BrokerConnection) = {
     super.set_connection(connection)
     import collection.JavaConversions._
@@ -1007,7 +1011,7 @@ class StompProtocolHandler extends Proto
     val addresses = decode_addresses(dest)
     val key = addresses.toList
 
-    override def send_buffer_size = codec.read_buffer_size
+    override def send_buffer_size = buffer_size
 
     override def connection = Some(StompProtocolHandler.this.connection)
 
@@ -1230,15 +1234,15 @@ class StompProtocolHandler extends Proto
       case Some(value) =>
         value.toString.split(",").toList match {
           case x :: Nil =>
-            (codec.write_buffer_size, x.toInt, true)
+            (buffer_size, x.toInt, true)
           case x :: y :: Nil =>
             (y.toInt, x.toInt, true)
           case x :: y :: z :: _ =>
             (y.toInt, x.toInt, z.toBoolean)
-          case _ => (codec.write_buffer_size, 1, true)
+          case _ => (buffer_size, 1, true)
         }
       case None =>
-        (codec.write_buffer_size, 1, true)
+        (buffer_size, 1, true)
     }
 
     val selector = get(headers, SELECTOR) match {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java Tue Feb 21 02:04:15 2012
@@ -104,14 +104,21 @@ public class StompDTO extends ProtocolDT
     @XmlAttribute(name="die_delay")
     public Long die_delay;
 
+    @XmlAttribute(name="buffer_size")
+    public String buffer_size;
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (!(o instanceof StompDTO)) return false;
         if (!super.equals(o)) return false;
 
         StompDTO stompDTO = (StompDTO) o;
 
+        if (add_redeliveries_header != null ? !add_redeliveries_header.equals(stompDTO.add_redeliveries_header) : stompDTO.add_redeliveries_header != null)
+            return false;
+        if (add_timestamp_header != null ? !add_timestamp_header.equals(stompDTO.add_timestamp_header) : stompDTO.add_timestamp_header != null)
+            return false;
         if (add_user_header != null ? !add_user_header.equals(stompDTO.add_user_header) : stompDTO.add_user_header != null)
             return false;
         if (add_user_headers != null ? !add_user_headers.equals(stompDTO.add_user_headers) : stompDTO.add_user_headers != null)
@@ -120,8 +127,11 @@ public class StompDTO extends ProtocolDT
             return false;
         if (any_descendant_wildcard != null ? !any_descendant_wildcard.equals(stompDTO.any_descendant_wildcard) : stompDTO.any_descendant_wildcard != null)
             return false;
+        if (buffer_size != null ? !buffer_size.equals(stompDTO.buffer_size) : stompDTO.buffer_size != null)
+            return false;
         if (destination_separator != null ? !destination_separator.equals(stompDTO.destination_separator) : stompDTO.destination_separator != null)
             return false;
+        if (die_delay != null ? !die_delay.equals(stompDTO.die_delay) : stompDTO.die_delay != null) return false;
         if (max_data_length != null ? !max_data_length.equals(stompDTO.max_data_length) : stompDTO.max_data_length != null)
             return false;
         if (max_header_length != null ? !max_header_length.equals(stompDTO.max_header_length) : stompDTO.max_header_length != null)
@@ -138,6 +148,10 @@ public class StompDTO extends ProtocolDT
             return false;
         if (regex_wildcard_start != null ? !regex_wildcard_start.equals(stompDTO.regex_wildcard_start) : stompDTO.regex_wildcard_start != null)
             return false;
+        if (temp_queue_prefix != null ? !temp_queue_prefix.equals(stompDTO.temp_queue_prefix) : stompDTO.temp_queue_prefix != null)
+            return false;
+        if (temp_topic_prefix != null ? !temp_topic_prefix.equals(stompDTO.temp_topic_prefix) : stompDTO.temp_topic_prefix != null)
+            return false;
         if (topic_prefix != null ? !topic_prefix.equals(stompDTO.topic_prefix) : stompDTO.topic_prefix != null)
             return false;
 
@@ -149,18 +163,24 @@ public class StompDTO extends ProtocolDT
         int result = super.hashCode();
         result = 31 * result + (add_user_header != null ? add_user_header.hashCode() : 0);
         result = 31 * result + (add_user_headers != null ? add_user_headers.hashCode() : 0);
+        result = 31 * result + (add_timestamp_header != null ? add_timestamp_header.hashCode() : 0);
+        result = 31 * result + (add_redeliveries_header != null ? add_redeliveries_header.hashCode() : 0);
         result = 31 * result + (max_header_length != null ? max_header_length.hashCode() : 0);
         result = 31 * result + (max_headers != null ? max_headers.hashCode() : 0);
         result = 31 * result + (max_data_length != null ? max_data_length.hashCode() : 0);
         result = 31 * result + (protocol_filters != null ? protocol_filters.hashCode() : 0);
         result = 31 * result + (queue_prefix != null ? queue_prefix.hashCode() : 0);
         result = 31 * result + (topic_prefix != null ? topic_prefix.hashCode() : 0);
+        result = 31 * result + (temp_queue_prefix != null ? temp_queue_prefix.hashCode() : 0);
+        result = 31 * result + (temp_topic_prefix != null ? temp_topic_prefix.hashCode() : 0);
         result = 31 * result + (destination_separator != null ? destination_separator.hashCode() : 0);
         result = 31 * result + (path_separator != null ? path_separator.hashCode() : 0);
         result = 31 * result + (any_child_wildcard != null ? any_child_wildcard.hashCode() : 0);
         result = 31 * result + (any_descendant_wildcard != null ? any_descendant_wildcard.hashCode() : 0);
         result = 31 * result + (regex_wildcard_start != null ? regex_wildcard_start.hashCode() : 0);
         result = 31 * result + (regex_wildcard_end != null ? regex_wildcard_end.hashCode() : 0);
+        result = 31 * result + (die_delay != null ? die_delay.hashCode() : 0);
+        result = 31 * result + (buffer_size != null ? buffer_size.hashCode() : 0);
         return result;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java Tue Feb 21 02:04:15 2012
@@ -25,21 +25,21 @@ import java.util.regex.Pattern;
  */
 public class MemoryPropertyEditor extends PropertyEditorSupport {
 
-    private static final Pattern BYTE_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*b?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern BYTE_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*b?\\s*$", Pattern.CASE_INSENSITIVE);
 
-    private static final Pattern KIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*k(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern MIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*m(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern GIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*g(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern TIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*t(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern PIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*p(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern EIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*e(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
-
-    private static final Pattern KB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*kb?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern MB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*mb?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern GB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*gb?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern TB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*tb?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern PB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*pb?\\s*$", Pattern.CASE_INSENSITIVE);
-    private static final Pattern EB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*eb?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern KIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*k(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern MIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*m(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern GIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*g(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern TIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*t(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern PIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*p(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern EIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*e(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+
+    private static final Pattern KB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*kb?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern MB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*mb?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern GB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*gb?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern TB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*tb?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern PB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*pb?\\s*$", Pattern.CASE_INSENSITIVE);
+    private static final Pattern EB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*eb?\\s*$", Pattern.CASE_INSENSITIVE);
 
     public void setAsText(String text) throws IllegalArgumentException {
 

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Tue Feb 21 02:04:15 2012
@@ -314,10 +314,7 @@ A `queue` element may be configured with
   details.  Defaults to false.
 
 * `tail_buffer` : The amount of memory buffer space allocated for holding
-freshly enqueued message.  Defaults to 64k.
-
-* `consumer_buffer` : The amount of memory buffer space allocated to each
-subscription for receiving messages.  Defaults to 256k.
+freshly enqueued message.  Defaults to `640k`.
 
 * `persistent` : If set to false, then the queue will not persistently
 store it's message.  Defaults to true.
@@ -343,7 +340,7 @@ memory.  Defaults to true.
 * `fast_delivery_rate`: The message delivery rate (in bytes/sec) at which             
   the queue considers the consumers fast enough to start slowing down enqueue
   rate to match the consumption rate if the consumers are at the 
-  tail of the queue.           
+  tail of the queue.  Defaults to `1M`           
   
 * `catchup_enqueue_rate`:  If set, and the the current delivery
    rate is exceeding the configured value of `fast_delivery_rate` and 
@@ -1017,6 +1014,8 @@ in the `apollo.xml` configuration file t
 in the STOMP protocol implementation.  The `stomp` element supports the 
 following configuration attributes:
 
+* `buffer_size` : How much each producer or subscription will buffer between
+   the client and the broker. Defaults to `640k`.
 * `add_user_header` :  Name of the header which will be added to every received 
   message received.  The value of the header will be set to the id of user that 
   sent the message.  Not set by default.