You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/07 13:54:30 UTC

svn commit: r1132963 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java

Author: chirino
Date: Tue Jun  7 11:54:30 2011
New Revision: 1132963

URL: http://svn.apache.org/viewvc?rev=1132963&view=rev
Log:
exposing more tuning options.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1132963&r1=1132962&r2=1132963&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue Jun  7 11:54:30 2011
@@ -183,6 +183,10 @@ trait SessionSink[T] extends Sink[T] {
   def remaining_capacity:Int
 }
 
+object SinkMux {
+  val default_session_max_credits = System.getProperty("apollo.default_session_max_credits", ""+(1024*32)).toInt
+}
+
 /**
  *  <p>
  * A SinkMux multiplexes access to a target sink so that multiple
@@ -197,7 +201,6 @@ trait SessionSink[T] extends Sink[T] {
 class SinkMux[T](val downstream:Sink[T], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
 
   var sessions = HashSet[Session[T]]()
-  var session_max_credits = 1024*32;
 
   val overflow = new OverflowSink[(Session[T],T)](downstream.map(_._2)) {
     // Once a value leaves the overflow, then we can credit the
@@ -234,13 +237,13 @@ class SinkMux[T](val downstream:Sink[T],
     sessions.foreach(_.credit_adder.resume)
   }
 
-  def open(producer_queue:DispatchQueue):SessionSink[T] = {
+  def open(producer_queue:DispatchQueue, credits:Int=SinkMux.default_session_max_credits):SessionSink[T] = {
     val session = new Session[T](producer_queue, 0, this)
     consumer_queue <<| ^{
       if( overflow.full ) {
         session.credit_adder.suspend
       }
-      session.credit_adder.merge(session_max_credits);
+      session.credit_adder.merge(credits);
       sessions += session
     }
     session
@@ -267,7 +270,6 @@ class Session[T](val producer_queue:Disp
 
   var refiller:Runnable = NOOP
 
-  private def session_max_credits = mux.session_max_credits
   private def sizer = mux.sizer
   private def downstream = mux.source
 
@@ -285,8 +287,7 @@ class Session[T](val producer_queue:Disp
     credits += value;
     if( closed || credits <= 0 ) {
       _full = true
-    } else if( credits==session_max_credits ) {
-      // refill once we are empty.
+    } else if( credits >= 0 ) {
       if( _full ) {
         _full  = false
         refiller.run

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java?rev=1132963&r1=1132962&r2=1132963&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java Tue Jun  7 11:54:30 2011
@@ -80,6 +80,6 @@ public class BrokerDTO {
     @XmlElement(name="service")
     public List<String> services = new ArrayList<String>();
 
-
+    @XmlAttribute(name="sticky_dispatching")
     public Boolean sticky_dispatching;
 }