You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/21 03:04:16 UTC
svn commit: r1291553 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-sto...
Author: chirino
Date: Tue Feb 21 02:04:15 2012
New Revision: 1291553
URL: http://svn.apache.org/viewvc?rev=1291553&view=rev
Log:
Use better session buffering defaults so that we perform better when transferring larger messages.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Feb 21 02:04:15 2012
@@ -132,11 +132,6 @@ class Queue(val router: LocalRouter, val
var tune_swap_range_size = 0
/**
- * The amount of memory buffer space to use per subscription.
- */
- var tune_consumer_buffer = 0
-
- /**
* The max memory to allow this queue to grow to.
*/
var tune_quota = -1L
@@ -220,20 +215,20 @@ class Queue(val router: LocalRouter, val
var config:QueueDTO = _
def configure(update:QueueDTO) = {
- def mem_size(value:String, default:Int) = Option(value).map(MemoryPropertyEditor.parse(_).toInt).getOrElse(default)
+ def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
- producer_swapped_in.size_max += mem_size(update.tail_buffer, 1024*64) - Option(config).map{ config=>
- mem_size(config.tail_buffer, 1024*64)
+ producer_swapped_in.size_max += mem_size(update.tail_buffer, "640k") - Option(config).map{ config=>
+ mem_size(config.tail_buffer, "640k")
}.getOrElse(0)
tune_persistent = virtual_host.store !=null && update.persistent.getOrElse(true)
tune_swap = tune_persistent && update.swap.getOrElse(true)
tune_swap_range_size = update.swap_range_size.getOrElse(10000)
- tune_consumer_buffer = mem_size(update.consumer_buffer, 256*1024)
- tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,1024*1024)
- tune_catchup_enqueue_rate = mem_size(update.catchup_enqueue_rate,-1)
- tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,-1)
- tune_quota = mem_size(update.quota,-1)
+ tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,"1M")
+ tune_catchup_enqueue_rate = mem_size(update.catchup_enqueue_rate,"-1")
+ tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,"-1")
+ tune_quota = mem_size(update.quota,"-1")
+
auto_delete_after = update.auto_delete_after.getOrElse(30)
if( auto_delete_after!= 0 ) {
// we don't auto delete explicitly configured queues,
@@ -374,22 +369,9 @@ class Queue(val router: LocalRouter, val
def update(on_completed:Runnable) = dispatch_queue {
val prev_persistent = tune_persistent
- val prev_consumer_size = tune_consumer_buffer
configure(binding.config(virtual_host))
- val consumer_buffer_change = tune_consumer_buffer-prev_consumer_size
- if( consumer_buffer_change!=0 ) {
- // for each
- all_subscriptions.values.foreach { sub =>
- // open session
- if( sub.session!=null ) {
- // change the queue capacity, by the change in consumer buffer change.
- change_consumer_capacity(consumer_buffer_change)
- }
- }
- }
-
restore_from_store {
check_idle
trigger_swap
@@ -2028,6 +2010,8 @@ class Subscription(val queue:Queue, val
def browser = consumer.browser
def exclusive = consumer.exclusive
+ val consumer_buffer = consumer.receive_buffer_size
+
// This opens up the consumer
def open() = {
consumer.retain
@@ -2049,10 +2033,10 @@ class Subscription(val queue:Queue, val
}
}
pos ::= this
-
+
queue.all_subscriptions += consumer -> this
queue.consumer_counter += 1
- queue.change_consumer_capacity( queue.tune_consumer_buffer )
+ queue.change_consumer_capacity( consumer_buffer )
if( exclusive ) {
queue.exclusive_subscriptions.append(this)
@@ -2092,7 +2076,7 @@ class Subscription(val queue:Queue, val
// The following action gets executed once all acquired messages
// ared acked or nacked.
pending_close_action = ()=> {
- queue.change_consumer_capacity( - queue.tune_consumer_buffer )
+ queue.change_consumer_capacity( - consumer_buffer )
if( exclusive ) {
// rewind all the subs to the start of the queue.
@@ -2209,7 +2193,7 @@ class Subscription(val queue:Queue, val
pos // start prefetching from the current position.
}
- var remaining = queue.tune_consumer_buffer;
+ var remaining = consumer_buffer;
while( remaining>0 && cursor!=null ) {
val next = cursor.getNext
// Browsers prefetch all messages..
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Tue Feb 21 02:04:15 2012
@@ -58,12 +58,6 @@ public class QueueDTO extends StringIdDT
public String tail_buffer;
/**
- * The amount of memory buffer space to use per consumer.
- */
- @XmlAttribute(name="consumer_buffer")
- public String consumer_buffer;
-
- /**
* Should this queue persistently store it's entries?
*/
@XmlAttribute(name="persistent")
@@ -141,8 +135,6 @@ public class QueueDTO extends StringIdDT
return false;
if (catchup_enqueue_rate != null ? !catchup_enqueue_rate.equals(queueDTO.catchup_enqueue_rate) : queueDTO.catchup_enqueue_rate != null)
return false;
- if (consumer_buffer != null ? !consumer_buffer.equals(queueDTO.consumer_buffer) : queueDTO.consumer_buffer != null)
- return false;
if (max_enqueue_rate != null ? !max_enqueue_rate.equals(queueDTO.max_enqueue_rate) : queueDTO.max_enqueue_rate != null)
return false;
if (other != null ? !other.equals(queueDTO.other) : queueDTO.other != null) return false;
@@ -161,7 +153,6 @@ public class QueueDTO extends StringIdDT
int result = super.hashCode();
result = 31 * result + (auto_delete_after != null ? auto_delete_after.hashCode() : 0);
result = 31 * result + (mirrored != null ? mirrored.hashCode() : 0);
- result = 31 * result + (consumer_buffer != null ? consumer_buffer.hashCode() : 0);
result = 31 * result + (persistent != null ? persistent.hashCode() : 0);
result = 31 * result + (swap != null ? swap.hashCode() : 0);
result = 31 * result + (swap_range_size != null ? swap_range_size.hashCode() : 0);
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Feb 21 02:04:15 2012
@@ -440,7 +440,7 @@ class StompProtocolHandler extends Proto
def dispatch_queue = StompProtocolHandler.this.dispatchQueue
override def connection = Some(StompProtocolHandler.this.connection)
- override def receive_buffer_size = codec.write_buffer_size
+ override val receive_buffer_size = buffer_size
def is_persistent = false
@@ -476,7 +476,7 @@ class StompProtocolHandler extends Proto
producer.dispatch_queue.assertExecuting()
retain
- val downstream = session_manager.open(producer.dispatch_queue, receive_buffer_size)
+ val downstream = session_manager.open(producer.dispatch_queue, buffer_size)
override def toString = "connection to "+StompProtocolHandler.this.connection.transport.getRemoteAddress
@@ -605,6 +605,10 @@ class StompProtocolHandler extends Proto
config.die_delay.getOrElse(DEFAULT_DIE_DELAY)
}
+ def buffer_size = {
+ MemoryPropertyEditor.parse(Option(config.buffer_size).getOrElse("640k")).toInt
+ }
+
override def set_connection(connection: BrokerConnection) = {
super.set_connection(connection)
import collection.JavaConversions._
@@ -1007,7 +1011,7 @@ class StompProtocolHandler extends Proto
val addresses = decode_addresses(dest)
val key = addresses.toList
- override def send_buffer_size = codec.read_buffer_size
+ override def send_buffer_size = buffer_size
override def connection = Some(StompProtocolHandler.this.connection)
@@ -1230,15 +1234,15 @@ class StompProtocolHandler extends Proto
case Some(value) =>
value.toString.split(",").toList match {
case x :: Nil =>
- (codec.write_buffer_size, x.toInt, true)
+ (buffer_size, x.toInt, true)
case x :: y :: Nil =>
(y.toInt, x.toInt, true)
case x :: y :: z :: _ =>
(y.toInt, x.toInt, z.toBoolean)
- case _ => (codec.write_buffer_size, 1, true)
+ case _ => (buffer_size, 1, true)
}
case None =>
- (codec.write_buffer_size, 1, true)
+ (buffer_size, 1, true)
}
val selector = get(headers, SELECTOR) match {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/dto/StompDTO.java Tue Feb 21 02:04:15 2012
@@ -104,14 +104,21 @@ public class StompDTO extends ProtocolDT
@XmlAttribute(name="die_delay")
public Long die_delay;
+ @XmlAttribute(name="buffer_size")
+ public String buffer_size;
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (!(o instanceof StompDTO)) return false;
if (!super.equals(o)) return false;
StompDTO stompDTO = (StompDTO) o;
+ if (add_redeliveries_header != null ? !add_redeliveries_header.equals(stompDTO.add_redeliveries_header) : stompDTO.add_redeliveries_header != null)
+ return false;
+ if (add_timestamp_header != null ? !add_timestamp_header.equals(stompDTO.add_timestamp_header) : stompDTO.add_timestamp_header != null)
+ return false;
if (add_user_header != null ? !add_user_header.equals(stompDTO.add_user_header) : stompDTO.add_user_header != null)
return false;
if (add_user_headers != null ? !add_user_headers.equals(stompDTO.add_user_headers) : stompDTO.add_user_headers != null)
@@ -120,8 +127,11 @@ public class StompDTO extends ProtocolDT
return false;
if (any_descendant_wildcard != null ? !any_descendant_wildcard.equals(stompDTO.any_descendant_wildcard) : stompDTO.any_descendant_wildcard != null)
return false;
+ if (buffer_size != null ? !buffer_size.equals(stompDTO.buffer_size) : stompDTO.buffer_size != null)
+ return false;
if (destination_separator != null ? !destination_separator.equals(stompDTO.destination_separator) : stompDTO.destination_separator != null)
return false;
+ if (die_delay != null ? !die_delay.equals(stompDTO.die_delay) : stompDTO.die_delay != null) return false;
if (max_data_length != null ? !max_data_length.equals(stompDTO.max_data_length) : stompDTO.max_data_length != null)
return false;
if (max_header_length != null ? !max_header_length.equals(stompDTO.max_header_length) : stompDTO.max_header_length != null)
@@ -138,6 +148,10 @@ public class StompDTO extends ProtocolDT
return false;
if (regex_wildcard_start != null ? !regex_wildcard_start.equals(stompDTO.regex_wildcard_start) : stompDTO.regex_wildcard_start != null)
return false;
+ if (temp_queue_prefix != null ? !temp_queue_prefix.equals(stompDTO.temp_queue_prefix) : stompDTO.temp_queue_prefix != null)
+ return false;
+ if (temp_topic_prefix != null ? !temp_topic_prefix.equals(stompDTO.temp_topic_prefix) : stompDTO.temp_topic_prefix != null)
+ return false;
if (topic_prefix != null ? !topic_prefix.equals(stompDTO.topic_prefix) : stompDTO.topic_prefix != null)
return false;
@@ -149,18 +163,24 @@ public class StompDTO extends ProtocolDT
int result = super.hashCode();
result = 31 * result + (add_user_header != null ? add_user_header.hashCode() : 0);
result = 31 * result + (add_user_headers != null ? add_user_headers.hashCode() : 0);
+ result = 31 * result + (add_timestamp_header != null ? add_timestamp_header.hashCode() : 0);
+ result = 31 * result + (add_redeliveries_header != null ? add_redeliveries_header.hashCode() : 0);
result = 31 * result + (max_header_length != null ? max_header_length.hashCode() : 0);
result = 31 * result + (max_headers != null ? max_headers.hashCode() : 0);
result = 31 * result + (max_data_length != null ? max_data_length.hashCode() : 0);
result = 31 * result + (protocol_filters != null ? protocol_filters.hashCode() : 0);
result = 31 * result + (queue_prefix != null ? queue_prefix.hashCode() : 0);
result = 31 * result + (topic_prefix != null ? topic_prefix.hashCode() : 0);
+ result = 31 * result + (temp_queue_prefix != null ? temp_queue_prefix.hashCode() : 0);
+ result = 31 * result + (temp_topic_prefix != null ? temp_topic_prefix.hashCode() : 0);
result = 31 * result + (destination_separator != null ? destination_separator.hashCode() : 0);
result = 31 * result + (path_separator != null ? path_separator.hashCode() : 0);
result = 31 * result + (any_child_wildcard != null ? any_child_wildcard.hashCode() : 0);
result = 31 * result + (any_descendant_wildcard != null ? any_descendant_wildcard.hashCode() : 0);
result = 31 * result + (regex_wildcard_start != null ? regex_wildcard_start.hashCode() : 0);
result = 31 * result + (regex_wildcard_end != null ? regex_wildcard_end.hashCode() : 0);
+ result = 31 * result + (die_delay != null ? die_delay.hashCode() : 0);
+ result = 31 * result + (buffer_size != null ? buffer_size.hashCode() : 0);
return result;
}
}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/MemoryPropertyEditor.java Tue Feb 21 02:04:15 2012
@@ -25,21 +25,21 @@ import java.util.regex.Pattern;
*/
public class MemoryPropertyEditor extends PropertyEditorSupport {
- private static final Pattern BYTE_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*b?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern BYTE_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*b?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern KIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*k(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern MIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*m(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern GIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*g(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern TIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*t(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern PIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*p(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern EIB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*e(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
-
- private static final Pattern KB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*kb?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern MB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*mb?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern GB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*gb?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern TB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*tb?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern PB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*pb?\\s*$", Pattern.CASE_INSENSITIVE);
- private static final Pattern EB_PATTERN = Pattern.compile("^\\s*(\\d+)\\s*eb?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern KIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*k(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern MIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*m(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern GIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*g(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern TIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*t(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern PIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*p(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern EIB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*e(ib)?\\s*$", Pattern.CASE_INSENSITIVE);
+
+ private static final Pattern KB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*kb?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern MB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*mb?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern GB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*gb?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern TB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*tb?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern PB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*pb?\\s*$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern EB_PATTERN = Pattern.compile("^\\s*(-?\\d+)\\s*eb?\\s*$", Pattern.CASE_INSENSITIVE);
public void setAsText(String text) throws IllegalArgumentException {
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1291553&r1=1291552&r2=1291553&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Tue Feb 21 02:04:15 2012
@@ -314,10 +314,7 @@ A `queue` element may be configured with
details. Defaults to false.
* `tail_buffer` : The amount of memory buffer space allocated for holding
-freshly enqueued message. Defaults to 64k.
-
-* `consumer_buffer` : The amount of memory buffer space allocated to each
-subscription for receiving messages. Defaults to 256k.
+freshly enqueued message. Defaults to `640k`.
* `persistent` : If set to false, then the queue will not persistently
store it's message. Defaults to true.
@@ -343,7 +340,7 @@ memory. Defaults to true.
* `fast_delivery_rate`: The message delivery rate (in bytes/sec) at which
the queue considers the consumers fast enough to start slowing down enqueue
rate to match the consumption rate if the consumers are at the
- tail of the queue.
+ tail of the queue. Defaults to `1M`
* `catchup_enqueue_rate`: If set, and the the current delivery
rate is exceeding the configured value of `fast_delivery_rate` and
@@ -1017,6 +1014,8 @@ in the `apollo.xml` configuration file t
in the STOMP protocol implementation. The `stomp` element supports the
following configuration attributes:
+* `buffer_size` : How much each producer or subscription will buffer between
+ the client and the broker. Defaults to `640k`.
* `add_user_header` : Name of the header which will be added to every received
message received. The value of the header will be set to the id of user that
sent the message. Not set by default.