You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/13 19:41:45 UTC
svn commit: r1384425 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Author: chirino
Date: Thu Sep 13 17:41:45 2012
New Revision: 1384425
URL: http://svn.apache.org/viewvc?rev=1384425&view=rev
Log:
Fixing bug in SessionSinkMux that cause duplicate deliveries to occur. Bug was introduced in rev 1374667.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1384425&r1=1384424&r2=1384425&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Thu Sep 13 17:41:45 2012
@@ -346,7 +346,7 @@ class SessionSinkMux[T](val downstream:S
var sessions = HashSet[Session[T]]()
var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
var overflow_size = 0L
- var high_overflow_size = 64*1024
+ var high_overflow_size = 4*1024
def open(producer_queue:DispatchQueue, delivery_credits:Int, size_credits:Int):SessionSink[T] = {
val session = new Session[T](this, producer_queue)
@@ -369,37 +369,49 @@ class SessionSinkMux[T](val downstream:S
def time_stamp = 0L
- downstream.refiller = ^{ drain_overflow }
+ downstream.refiller = ^{ drain_source.merge(0x01) }
+
+ val drain_source = createSource(EventAggregators.INTEGER_OR, consumer_queue)
+ drain_source.setEventHandler(^{
+ var data = drain_source.getData
+ if( data > 0 ) {
+ drain_overflow
+ }
+ })
+ drain_source.resume()
def drain_overflow:Unit = {
- while( !overflowed_sessions.isEmpty) {
+ consumer_queue.assertExecuting()
+ while( !downstream.full && !overflowed_sessions.isEmpty) {
+
val session = overflowed_sessions.getHead.session
- val value = session.overflow.getFirst()
- if( downstream.offer((session, value)) ) {
- session.overflow.removeFirst()
- overflow_size -= sizer.size(value)
- if( session.overflow.isEmpty ) {
- session.overflow_node.unlink()
- session.on_overflow_drain()
- if( session.pending_delivery_credits!=0 || session.pending_size_credits!=0 ) {
- session.credit(session.pending_delivery_credits, session.pending_size_credits)
- session.pending_delivery_credits = 0
- session.pending_size_credits = 0
- }
- } else {
- // to fairly consume values from all sessions.
- overflowed_sessions.rotate()
+ val consumer_side = session.consumer_side
+ val value = consumer_side.overlfow.getFirst()
+
+ val accepted = downstream.offer((session, value))
+ assert(accepted, "The downstream sink violated it's contract, an offer was not accepted but it had told us it was not full")
+
+ consumer_side.overlfow.removeFirst()
+ overflow_size -= sizer.size(value)
+ if( consumer_side.overlfow.isEmpty ) {
+ consumer_side.overflow_node.unlink()
+ if( consumer_side.pending_delivery_credits!=0 || consumer_side.pending_size_credits!=0 ) {
+ session.credit(consumer_side.pending_delivery_credits, consumer_side.pending_size_credits)
+ consumer_side.pending_delivery_credits = 0
+ consumer_side.pending_size_credits = 0
}
} else {
- return
+ // to fairly consume values from all sessions.
+ overflowed_sessions.rotate()
}
}
}
def delivered(session:Session[Delivery], size:Int) = {
- if( overflow_size >= high_overflow_size && !session.overflow.isEmpty) {
- session.pending_delivery_credits += 1
- session.pending_size_credits += size
+ val consumer_side = session.consumer_side
+ if( overflow_size >= high_overflow_size && !consumer_side.overlfow.isEmpty) {
+ consumer_side.pending_delivery_credits += 1
+ consumer_side.pending_size_credits += size
} else {
session.credit(1, size)
}
@@ -414,27 +426,35 @@ case class SessionLinkedNode[T](session:
*/
class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T] {
- val overflow = new LinkedList[T]()
- var pending_delivery_credits = 0
- var pending_size_credits = 0
-
- // use a event aggregating source to coalesce multiple events from the same thread.
- val overflow_source = createSource(new ListEventAggregator[T](), mux.consumer_queue)
- overflow_source.setEventHandler(^{
- for( value <- overflow_source.getData ) {
- if( overflow.isEmpty ) {
- mux.overflowed_sessions.addLast(overflow_node);
- }
- overflow.add(value)
- mux.overflow_size += sizer.size(value)
- }
- mux.drain_overflow
- });
- overflow_source.resume
+ // Access to the consumer_side must be done from the consumer dispatch queue..
+ object consumer_side {
+
+ val overlfow = new LinkedList[T]()
+ var pending_delivery_credits = 0
+ var pending_size_credits = 0
+ val overflow_node = SessionLinkedNode[T](Session.this)
+
+ // use a event aggregating source to coalesce multiple events from the same thread.
+ val source = createSource(new ListEventAggregator[T](), mux.consumer_queue)
+
+ source.setEventHandler(^{
+
+ mux.consumer_queue.assertExecuting()
+ for( value <- source.getData ) {
+ if( overlfow.isEmpty ) {
+ mux.overflowed_sessions.addLast(overflow_node);
+ }
+ overlfow.add(value)
+ mux.overflow_size += sizer.size(value)
+ }
+ mux.drain_source.merge(0x02)
+ });
+ source.resume
+
+ }
var refiller:Task = NOOP
var rejection_handler: (T)=>Unit = _
- val overflow_node = SessionLinkedNode[T](this)
private def sizer = mux.sizer
var delivery_credits = 0
@@ -447,10 +467,7 @@ class Session[T](mux:SessionSinkMux[T],
@volatile
var enqueue_ts = mux.time_stamp
- def credit(delivery_credits:Int, size_credits:Int) = {
- mux.consumer_queue.assertExecuting()
- credit_adder.merge((delivery_credits, size_credits))
- }
+ def credit(delivery_credits:Int, size_credits:Int) = credit_adder.merge((delivery_credits, size_credits))
// create a source to coalesce credit events back to the producer side...
val credit_adder = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
@@ -464,7 +481,7 @@ class Session[T](mux:SessionSinkMux[T],
def mergeEvents(previous:(Int, Int), event:(Int, Int)) = (previous._1+event._1, previous._2+event._2)
}, producer_queue)
- credit_adder.onEvent{
+ credit_adder.onEvent {
val (count, size) = credit_adder.getData
add_credits(count, size)
if( (size > 0 || count>0) && !_full ) {
@@ -508,14 +525,12 @@ class Session[T](mux:SessionSinkMux[T],
enqueue_ts = mux.time_stamp
add_credits(-1, -size)
- overflow_source.merge(value)
+ consumer_side.source.merge(value)
}
true
}
}
- var on_overflow_drain = ()=>{}
-
def close(rejection_handler:(T)=>Unit) = {
producer_queue {
this.rejection_handler=rejection_handler