You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/07 13:54:30 UTC
svn commit: r1132963 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
Author: chirino
Date: Tue Jun 7 11:54:30 2011
New Revision: 1132963
URL: http://svn.apache.org/viewvc?rev=1132963&view=rev
Log:
exposing more tuning options.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1132963&r1=1132962&r2=1132963&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue Jun 7 11:54:30 2011
@@ -183,6 +183,10 @@ trait SessionSink[T] extends Sink[T] {
def remaining_capacity:Int
}
+object SinkMux {
+ val default_session_max_credits = System.getProperty("apollo.default_session_max_credits", ""+(1024*32)).toInt
+}
+
/**
* <p>
* A SinkMux multiplexes access to a target sink so that multiple
@@ -197,7 +201,6 @@ trait SessionSink[T] extends Sink[T] {
class SinkMux[T](val downstream:Sink[T], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
var sessions = HashSet[Session[T]]()
- var session_max_credits = 1024*32;
val overflow = new OverflowSink[(Session[T],T)](downstream.map(_._2)) {
// Once a value leaves the overflow, then we can credit the
@@ -234,13 +237,13 @@ class SinkMux[T](val downstream:Sink[T],
sessions.foreach(_.credit_adder.resume)
}
- def open(producer_queue:DispatchQueue):SessionSink[T] = {
+ def open(producer_queue:DispatchQueue, credits:Int=SinkMux.default_session_max_credits):SessionSink[T] = {
val session = new Session[T](producer_queue, 0, this)
consumer_queue <<| ^{
if( overflow.full ) {
session.credit_adder.suspend
}
- session.credit_adder.merge(session_max_credits);
+ session.credit_adder.merge(credits);
sessions += session
}
session
@@ -267,7 +270,6 @@ class Session[T](val producer_queue:Disp
var refiller:Runnable = NOOP
- private def session_max_credits = mux.session_max_credits
private def sizer = mux.sizer
private def downstream = mux.source
@@ -285,8 +287,7 @@ class Session[T](val producer_queue:Disp
credits += value;
if( closed || credits <= 0 ) {
_full = true
- } else if( credits==session_max_credits ) {
- // refill once we are empty.
+ } else if( credits >= 0 ) {
if( _full ) {
_full = false
refiller.run
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java?rev=1132963&r1=1132962&r2=1132963&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/BrokerDTO.java Tue Jun 7 11:54:30 2011
@@ -80,6 +80,6 @@ public class BrokerDTO {
@XmlElement(name="service")
public List<String> services = new ArrayList<String>();
-
+ @XmlAttribute(name="sticky_dispatching")
public Boolean sticky_dispatching;
}