You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/18 21:11:55 UTC
svn commit: r1387327 - in /activemq/activemq-apollo/trunk:
apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ a...
Author: chirino
Date: Tue Sep 18 19:11:54 2012
New Revision: 1387327
URL: http://svn.apache.org/viewvc?rev=1387327&view=rev
Log:
Further improvements for APLO-244, when there are multiple producers to a single consumer we shared the session buffer size across the producers (allocating more buffer space to fast producers).
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Sep 18 19:11:54 2012
@@ -572,7 +572,7 @@ class AmqpProtocolHandler extends Protoc
}
}
- val session_manager = new SessionSinkMux[Delivery](sink, queue, Delivery) {
+ val session_manager = new SessionSinkMux[Delivery](sink, queue, Delivery, 1, buffer_size) {
override def time_stamp = broker.now
}
@@ -580,7 +580,7 @@ class AmqpProtocolHandler extends Protoc
def producer = p
def consumer = AMQPConsumer.this
- val downstream = session_manager.open(producer.dispatch_queue, 1, buffer_size)
+ val downstream = session_manager.open(producer.dispatch_queue)
// Delegate all the flow control stuff to the session
override def full = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Sep 18 19:11:54 2012
@@ -92,7 +92,7 @@ class Queue(val router: LocalRouter, val
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
- val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery) {
+ val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery, Integer.MAX_VALUE, virtual_host.broker.auto_tuned_send_receiver_buffer_size) {
override def time_stamp = now
}
@@ -554,9 +554,6 @@ class Queue(val router: LocalRouter, val
}
}
- def change_producer_capacity(amount:Int) = might_unfill {
- // producer_swapped_in.size_max += amount
- }
def change_consumer_capacity(amount:Int) = might_unfill {
consumer_swapped_in.size_max += amount
}
@@ -1101,13 +1098,11 @@ class Queue(val router: LocalRouter, val
override def toString = Queue.this.toString
override def consumer = Queue.this
- val session_max = producer.send_buffer_size
- val downstream = session_manager.open(producer.dispatch_queue, Integer.MAX_VALUE, session_max)
+ val downstream = session_manager.open(producer.dispatch_queue)
dispatch_queue {
inbound_sessions += this
producer_counter += 1
- change_producer_capacity( session_max )
}
def close = dispatch_queue {
@@ -1117,7 +1112,6 @@ class Queue(val router: LocalRouter, val
delivery.ack(Undelivered, delivery.uow)
}
})
- change_producer_capacity( -session_max )
inbound_sessions -= this
release
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue Sep 18 19:11:54 2012
@@ -21,6 +21,7 @@ import java.util.LinkedList
import org.fusesource.hawtdispatch.transport.Transport
import collection.mutable.{ListBuffer, HashSet}
import org.apache.activemq.apollo.util.list.{LinkedNodeList, LinkedNode}
+import java.util.concurrent.TimeUnit
/**
* <p>
@@ -341,18 +342,20 @@ trait SessionSinkFilter[T] extends Sessi
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
+class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T], delivery_credits:Int, val size_credits:Int) {
var sessions = HashSet[Session[T]]()
var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
var overflow_size = 0L
- var high_overflow_size = 4*1024
- def open(producer_queue:DispatchQueue, delivery_credits:Int, size_credits:Int):SessionSink[T] = {
+ def open(producer_queue:DispatchQueue):SessionSink[T] = {
val session = new Session[T](this, producer_queue)
consumer_queue <<| ^{
- session.credit(delivery_credits, size_credits);
sessions += session
+ val bonus = size_credits / sessions.size
+ session.consumer_side.size_bonus = bonus
+ session.credit(delivery_credits, 1+bonus);
+ schedual_rebalance
}
session
}
@@ -374,32 +377,83 @@ class SessionSinkMux[T](val downstream:S
val drain_source = createSource(EventAggregators.INTEGER_OR, consumer_queue)
drain_source.setEventHandler(^{
var data = drain_source.getData
- if( data > 0 ) {
+ if( data.intValue() > 0 ) {
drain_overflow
}
})
drain_source.resume()
+ var rebalance_schedualed = false
+ def schedual_rebalance = {
+ if ( !rebalance_schedualed ) {
+ rebalance_schedualed = true
+ consumer_queue.after(550, TimeUnit.MILLISECONDS) {
+ rebalance_schedualed = false
+ rebalance_check
+ }
+ }
+ }
+
+ var last_rebalance_ts = time_stamp
+ def rebalance_check = {
+ // re-balance periodically since it's a bit expensive.
+ var now = time_stamp
+ if ( (now - last_rebalance_ts) > 500 && sessions.size>0) {
+ last_rebalance_ts = now
+ rebalance
+ }
+ }
+
+ def rebalance = {
+ val bonus_size = size_credits / sessions.size
+
+ // allocate bonus credits to reduce session stalls.
+ val sessions_copy = sessions.toArray
+
+ var stalled_sessions = List[Session[T]]()
+ var total_stalls = 0L
+ for ( session <- sessions ) {
+ val consumer_side = session.consumer_side
+ if ( consumer_side.stall_counter > 0) {
+ total_stalls += consumer_side.stall_counter
+ stalled_sessions ::= session
+ }
+ }
+
+ for ( session <- stalled_sessions ) {
+ val consumer_side = session.consumer_side
+ var slice_percent = consumer_side.stall_counter.toFloat / total_stalls
+ val new_size_bonus = (size_credits * slice_percent).toInt
+ val change = new_size_bonus - consumer_side.size_bonus
+ consumer_side.stall_counter = 0
+ consumer_side.size_bonus += change
+ session.credit(0, change)
+ }
+ }
+
def drain_overflow:Unit = {
consumer_queue.assertExecuting()
+ rebalance_check
while( !downstream.full && !overflowed_sessions.isEmpty) {
val session = overflowed_sessions.getHead.session
val consumer_side = session.consumer_side
- val value = consumer_side.overlfow.getFirst()
+ val value = consumer_side.overflow.removeFirst()
+ assert(value!=null)
val accepted = downstream.offer((session, value))
assert(accepted, "The downstream sink violated it's contract, an offer was not accepted but it had told us it was not full")
- consumer_side.overlfow.removeFirst()
+ if ( consumer_side.stall_counter > 0 ) {
+ schedual_rebalance
+ }
+
overflow_size -= sizer.size(value)
- if( consumer_side.overlfow.isEmpty ) {
+ if( consumer_side.overflow.isEmpty ) {
consumer_side.overflow_node.unlink()
- if( consumer_side.pending_delivery_credits!=0 || consumer_side.pending_size_credits!=0 ) {
- session.credit(consumer_side.pending_delivery_credits, consumer_side.pending_size_credits)
- consumer_side.pending_delivery_credits = 0
- consumer_side.pending_size_credits = 0
- }
+ session.credit(consumer_side.pending_delivery_credits, consumer_side.pending_size_credits)
+ consumer_side.pending_delivery_credits = 0
+ consumer_side.pending_size_credits = 0
} else {
// to fairly consume values from all sessions.
overflowed_sessions.rotate()
@@ -409,7 +463,7 @@ class SessionSinkMux[T](val downstream:S
def delivered(session:Session[Delivery], size:Int) = {
val consumer_side = session.consumer_side
- if( overflow_size >= high_overflow_size && !consumer_side.overlfow.isEmpty) {
+ if( overflow_size >= size_credits && !consumer_side.overflow.isEmpty) {
consumer_side.pending_delivery_credits += 1
consumer_side.pending_size_credits += size
} else {
@@ -426,26 +480,32 @@ case class SessionLinkedNode[T](session:
*/
class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T] {
- // Access to the consumer_side must be done from the consumer dispatch queue..
+ // the consumer_side object is mutated from the consumer dispatch queue...
+ // we should think about field padding this object to avoid false sharing on the cache lines.
object consumer_side {
- val overlfow = new LinkedList[T]()
+ val overflow = new LinkedList[T]()
var pending_delivery_credits = 0
var pending_size_credits = 0
val overflow_node = SessionLinkedNode[T](Session.this)
+ var stall_counter = 0
+ var size_bonus = 0
// use a event aggregating source to coalesce multiple events from the same thread.
- val source = createSource(new ListEventAggregator[T](), mux.consumer_queue)
+ val source = createSource(new ListEventAggregator[(T, Boolean)](), mux.consumer_queue)
source.setEventHandler(^{
-
mux.consumer_queue.assertExecuting()
for( value <- source.getData ) {
- if( overlfow.isEmpty ) {
+ if( overflow.isEmpty ) {
mux.overflowed_sessions.addLast(overflow_node);
}
- overlfow.add(value)
- mux.overflow_size += sizer.size(value)
+ overflow.add(value._1)
+ val size = sizer.size(value._1)
+ mux.overflow_size += size
+ if (value._2) {
+ stall_counter += 1
+ }
}
mux.drain_source.merge(0x02)
});
@@ -453,6 +513,7 @@ class Session[T](mux:SessionSinkMux[T],
}
+ // the rest of the Session fields are mutated from the producer dispatch queue...
var refiller:Task = NOOP
var rejection_handler: (T)=>Unit = _
@@ -467,7 +528,11 @@ class Session[T](mux:SessionSinkMux[T],
@volatile
var enqueue_ts = mux.time_stamp
- def credit(delivery_credits:Int, size_credits:Int) = credit_adder.merge((delivery_credits, size_credits))
+ def credit(delivery_credits:Int, size_credits:Int) = {
+ if( delivery_credits!=0 || size_credits!=0 ) {
+ credit_adder.merge((delivery_credits, size_credits))
+ }
+ }
// create a source to coalesce credit events back to the producer side...
val credit_adder = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
@@ -525,7 +590,8 @@ class Session[T](mux:SessionSinkMux[T],
enqueue_ts = mux.time_stamp
add_credits(-1, -size)
- consumer_side.source.merge(value)
+ val stalled = size_credits <= 0 || delivery_credits<=0
+ consumer_side.source.merge((value, stalled))
}
true
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Sep 18 19:11:54 2012
@@ -832,7 +832,7 @@ class OpenwireProtocolHandler extends Pr
credit_window_filter.credit(info.getPrefetchSize, 0)
- val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery) {
+ val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE) {
override def time_stamp = broker.now
}
@@ -945,7 +945,7 @@ class OpenwireProtocolHandler extends Pr
producer.dispatch_queue.assertExecuting()
retain
- val downstream = session_manager.open(producer.dispatch_queue, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE)
+ val downstream = session_manager.open(producer.dispatch_queue)
var closed = false
def consumer = ConsumerContext.this
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Sep 18 19:11:54 2012
@@ -486,7 +486,7 @@ class StompProtocolHandler extends Proto
credit_window_filter.credit(initial_credit_window.count, initial_credit_window.size)
- val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery) {
+ val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery, initial_credit_window.count.max(1), buffer_size) {
override def time_stamp = broker.now
}
@@ -538,7 +538,7 @@ class StompProtocolHandler extends Proto
producer.dispatch_queue.assertExecuting()
retain
- val downstream = session_manager.open(producer.dispatch_queue, initial_credit_window.count.max(1), buffer_size)
+ val downstream = session_manager.open(producer.dispatch_queue)
override def toString = "connection to "+StompProtocolHandler.this.connection.transport.getRemoteAddress