You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/03 19:25:03 UTC

svn commit: r1099136 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/

Author: chirino
Date: Tue May  3 17:25:03 2011
New Revision: 1099136

URL: http://svn.apache.org/viewvc?rev=1099136&view=rev
Log:
playing with optimizing the session window sizes.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue May  3 17:25:03 2011
@@ -75,6 +75,7 @@ trait DeliveryConsumer extends Retained 
 trait DeliverySession extends Sink[Delivery] {
   def producer:DeliveryProducer
   def consumer:DeliveryConsumer
+  def remaining_capacity:Int
   def close:Unit
 }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue May  3 17:25:03 2011
@@ -95,7 +95,7 @@ class Queue(val router: LocalRouter, val
   /**
    *  The amount of memory buffer space for receiving messages.
    */
-  def tune_producer_buffer = config.producer_buffer.getOrElse(32*1024)
+  def tune_producer_buffer = config.producer_buffer.getOrElse(256*1024)
 
   /**
    *  The amount of memory buffer space for the queue..
@@ -135,7 +135,7 @@ class Queue(val router: LocalRouter, val
     tune_persistent = virtual_host.store !=null && config.persistent.getOrElse(true)
     tune_swap = tune_persistent && config.swap.getOrElse(true)
     tune_swap_range_size = config.swap_range_size.getOrElse(10000)
-    tune_consumer_buffer = config.consumer_buffer.getOrElse(32*1024)
+    tune_consumer_buffer = config.consumer_buffer.getOrElse(256*1024)
   }
   configure(config)
 
@@ -469,6 +469,7 @@ class Queue(val router: LocalRouter, val
       addCapacity( tune_producer_buffer )
     }
 
+    def remaining_capacity = session.remaining_capacity
 
     def close = {
       session_manager.close(session)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue May  3 17:25:03 2011
@@ -179,6 +179,10 @@ class MutableSink[T] extends Sink[T] {
 }
 
 
+trait SessionSink[T] extends Sink[T] {
+  def remaining_capacity:Int
+}
+
 /**
  *  <p>
  * A SinkMux multiplexes access to a target sink so that multiple
@@ -230,7 +234,7 @@ class SinkMux[T](val downstream:Sink[T],
     sessions.foreach(_.credit_adder.resume)
   }
 
-  def open(producer_queue:DispatchQueue):Sink[T] = {
+  def open(producer_queue:DispatchQueue):SessionSink[T] = {
     val session = new Session[T](producer_queue, 0, this)
     consumer_queue <<| ^{
       if( overflow.full ) {
@@ -259,7 +263,7 @@ class SinkMux[T](val downstream:Sink[T],
 /**
  * tracks one producer to consumer session / credit window.
  */
-class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends Sink[T] {
+class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends SessionSink[T] {
 
   var refiller:Runnable = NOOP
 
@@ -295,6 +299,7 @@ class Session[T](val producer_queue:Disp
   // producer serial dispatch queue
   ///////////////////////////////////////////////////
 
+  def remaining_capacity = credits
 
   override def full = {
     assert(producer_queue.isExecuting)

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue May  3 17:25:03 2011
@@ -532,6 +532,8 @@ class OpenwireProtocolHandler extends Pr
         release
       }
 
+      def remaining_capacity = outbound_session.remaining_capacity
+
       // Delegate all the flow control stuff to the session
       def offer(delivery:Delivery) = {
         if( outbound_session.full ) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1099136&r1=1099135&r2=1099136&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue May  3 17:25:03 2011
@@ -260,6 +260,8 @@ class StompProtocolHandler extends Proto
 
       val session = session_manager.open(producer.dispatch_queue)
 
+      def remaining_capacity = session.remaining_capacity
+
       def close = {
         assert(producer.dispatch_queue.isExecuting)
         if( !closed ) {