You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/03 19:25:03 UTC
svn commit: r1099136 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Author: chirino
Date: Tue May 3 17:25:03 2011
New Revision: 1099136
URL: http://svn.apache.org/viewvc?rev=1099136&view=rev
Log:
playing with optimizing the session window sizes.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue May 3 17:25:03 2011
@@ -75,6 +75,7 @@ trait DeliveryConsumer extends Retained
trait DeliverySession extends Sink[Delivery] {
def producer:DeliveryProducer
def consumer:DeliveryConsumer
+ def remaining_capacity:Int
def close:Unit
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue May 3 17:25:03 2011
@@ -95,7 +95,7 @@ class Queue(val router: LocalRouter, val
/**
* The amount of memory buffer space for receiving messages.
*/
- def tune_producer_buffer = config.producer_buffer.getOrElse(32*1024)
+ def tune_producer_buffer = config.producer_buffer.getOrElse(256*1024)
/**
* The amount of memory buffer space for the queue..
@@ -135,7 +135,7 @@ class Queue(val router: LocalRouter, val
tune_persistent = virtual_host.store !=null && config.persistent.getOrElse(true)
tune_swap = tune_persistent && config.swap.getOrElse(true)
tune_swap_range_size = config.swap_range_size.getOrElse(10000)
- tune_consumer_buffer = config.consumer_buffer.getOrElse(32*1024)
+ tune_consumer_buffer = config.consumer_buffer.getOrElse(256*1024)
}
configure(config)
@@ -469,6 +469,7 @@ class Queue(val router: LocalRouter, val
addCapacity( tune_producer_buffer )
}
+ def remaining_capacity = session.remaining_capacity
def close = {
session_manager.close(session)
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue May 3 17:25:03 2011
@@ -179,6 +179,10 @@ class MutableSink[T] extends Sink[T] {
}
+trait SessionSink[T] extends Sink[T] {
+ def remaining_capacity:Int
+}
+
/**
* <p>
* A SinkMux multiplexes access to a target sink so that multiple
@@ -230,7 +234,7 @@ class SinkMux[T](val downstream:Sink[T],
sessions.foreach(_.credit_adder.resume)
}
- def open(producer_queue:DispatchQueue):Sink[T] = {
+ def open(producer_queue:DispatchQueue):SessionSink[T] = {
val session = new Session[T](producer_queue, 0, this)
consumer_queue <<| ^{
if( overflow.full ) {
@@ -259,7 +263,7 @@ class SinkMux[T](val downstream:Sink[T],
/**
* tracks one producer to consumer session / credit window.
*/
-class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends Sink[T] {
+class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends SessionSink[T] {
var refiller:Runnable = NOOP
@@ -295,6 +299,7 @@ class Session[T](val producer_queue:Disp
// producer serial dispatch queue
///////////////////////////////////////////////////
+ def remaining_capacity = credits
override def full = {
assert(producer_queue.isExecuting)
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue May 3 17:25:03 2011
@@ -532,6 +532,8 @@ class OpenwireProtocolHandler extends Pr
release
}
+ def remaining_capacity = outbound_session.remaining_capacity
+
// Delegate all the flow control stuff to the session
def offer(delivery:Delivery) = {
if( outbound_session.full ) {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue May 3 17:25:03 2011
@@ -260,6 +260,8 @@ class StompProtocolHandler extends Proto
val session = session_manager.open(producer.dispatch_queue)
+ def remaining_capacity = session.remaining_capacity
+
def close = {
assert(producer.dispatch_queue.isExecuting)
if( !closed ) {