You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/13 19:41:45 UTC

svn commit: r1384425 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala

Author: chirino
Date: Thu Sep 13 17:41:45 2012
New Revision: 1384425

URL: http://svn.apache.org/viewvc?rev=1384425&view=rev
Log:
Fixing bug in SessionSinkMux that cause duplicate deliveries to occur.  Bug was introduced in rev 1374667.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1384425&r1=1384424&r2=1384425&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Thu Sep 13 17:41:45 2012
@@ -346,7 +346,7 @@ class SessionSinkMux[T](val downstream:S
   var sessions = HashSet[Session[T]]()
   var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
   var overflow_size = 0L
-  var high_overflow_size = 64*1024
+  var high_overflow_size = 4*1024
 
   def open(producer_queue:DispatchQueue, delivery_credits:Int, size_credits:Int):SessionSink[T] = {
     val session = new Session[T](this, producer_queue)
@@ -369,37 +369,49 @@ class SessionSinkMux[T](val downstream:S
 
   def time_stamp = 0L
 
-  downstream.refiller = ^{ drain_overflow }
+  downstream.refiller = ^{ drain_source.merge(0x01) }
+
+  val drain_source = createSource(EventAggregators.INTEGER_OR, consumer_queue)
+  drain_source.setEventHandler(^{
+    var data = drain_source.getData
+    if( data > 0 ) {
+      drain_overflow
+    }
+  })
+  drain_source.resume()
 
   def drain_overflow:Unit = {
-    while( !overflowed_sessions.isEmpty) {
+    consumer_queue.assertExecuting()
+    while( !downstream.full && !overflowed_sessions.isEmpty) {
+
       val session = overflowed_sessions.getHead.session
-      val value = session.overflow.getFirst()
-      if( downstream.offer((session, value)) ) {
-        session.overflow.removeFirst()
-        overflow_size -= sizer.size(value)
-        if( session.overflow.isEmpty ) {
-          session.overflow_node.unlink()
-          session.on_overflow_drain()
-          if( session.pending_delivery_credits!=0 || session.pending_size_credits!=0 ) {
-            session.credit(session.pending_delivery_credits, session.pending_size_credits)
-            session.pending_delivery_credits = 0
-            session.pending_size_credits = 0
-          }
-        } else {
-          // to fairly consume values from all sessions.
-          overflowed_sessions.rotate()
+      val consumer_side = session.consumer_side
+      val value = consumer_side.overlfow.getFirst()
+
+      val accepted = downstream.offer((session, value))
+      assert(accepted, "The downstream sink violated it's contract, an offer was not accepted but it had told us it was not full")
+
+      consumer_side.overlfow.removeFirst()
+      overflow_size -= sizer.size(value)
+      if( consumer_side.overlfow.isEmpty ) {
+        consumer_side.overflow_node.unlink()
+        if( consumer_side.pending_delivery_credits!=0 || consumer_side.pending_size_credits!=0 ) {
+          session.credit(consumer_side.pending_delivery_credits, consumer_side.pending_size_credits)
+          consumer_side.pending_delivery_credits = 0
+          consumer_side.pending_size_credits = 0
         }
       } else {
-        return
+        // to fairly consume values from all sessions.
+        overflowed_sessions.rotate()
       }
     }
   }
 
   def delivered(session:Session[Delivery], size:Int) = {
-    if( overflow_size >= high_overflow_size && !session.overflow.isEmpty) {
-      session.pending_delivery_credits += 1
-      session.pending_size_credits += size
+    val consumer_side = session.consumer_side
+    if( overflow_size >= high_overflow_size && !consumer_side.overlfow.isEmpty) {
+      consumer_side.pending_delivery_credits += 1
+      consumer_side.pending_size_credits += size
     } else {
       session.credit(1, size)
     }
@@ -414,27 +426,35 @@ case class SessionLinkedNode[T](session:
  */
 class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T] {
 
-  val overflow = new LinkedList[T]()
-  var pending_delivery_credits = 0
-  var pending_size_credits = 0
-
-  // use a event aggregating source to coalesce multiple events from the same thread.
-  val overflow_source = createSource(new ListEventAggregator[T](), mux.consumer_queue)
-  overflow_source.setEventHandler(^{
-    for( value <- overflow_source.getData ) {
-      if( overflow.isEmpty ) {
-        mux.overflowed_sessions.addLast(overflow_node);
-      }
-      overflow.add(value)
-      mux.overflow_size += sizer.size(value)
-    }
-    mux.drain_overflow
-  });
-  overflow_source.resume
+  // Access to the consumer_side must be done from the consumer dispatch queue..
+  object consumer_side {
+
+    val overlfow = new LinkedList[T]()
+    var pending_delivery_credits = 0
+    var pending_size_credits = 0
+    val overflow_node = SessionLinkedNode[T](Session.this)
+
+    // use a event aggregating source to coalesce multiple events from the same thread.
+    val source = createSource(new ListEventAggregator[T](), mux.consumer_queue)
+
+    source.setEventHandler(^{
+
+      mux.consumer_queue.assertExecuting()
+      for( value <- source.getData ) {
+        if( overlfow.isEmpty ) {
+          mux.overflowed_sessions.addLast(overflow_node);
+        }
+        overlfow.add(value)
+        mux.overflow_size += sizer.size(value)
+      }
+      mux.drain_source.merge(0x02)
+    });
+    source.resume
+
+  }
 
   var refiller:Task = NOOP
   var rejection_handler: (T)=>Unit = _
-  val overflow_node = SessionLinkedNode[T](this)
 
   private def sizer = mux.sizer
   var delivery_credits = 0
@@ -447,10 +467,7 @@ class Session[T](mux:SessionSinkMux[T], 
   @volatile
   var enqueue_ts = mux.time_stamp
 
-  def credit(delivery_credits:Int, size_credits:Int) = {
-    mux.consumer_queue.assertExecuting()
-    credit_adder.merge((delivery_credits, size_credits))
-  }
+  def credit(delivery_credits:Int, size_credits:Int) = credit_adder.merge((delivery_credits, size_credits))
 
   // create a source to coalesce credit events back to the producer side...
   val credit_adder = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
@@ -464,7 +481,7 @@ class Session[T](mux:SessionSinkMux[T], 
     def mergeEvents(previous:(Int, Int), event:(Int, Int)) = (previous._1+event._1, previous._2+event._2)
   }, producer_queue)
 
-  credit_adder.onEvent{
+  credit_adder.onEvent {
     val (count, size) = credit_adder.getData
     add_credits(count, size)
     if( (size > 0 || count>0) && !_full ) {
@@ -508,14 +525,12 @@ class Session[T](mux:SessionSinkMux[T], 
         enqueue_ts = mux.time_stamp
   
         add_credits(-1, -size)
-        overflow_source.merge(value)
+        consumer_side.source.merge(value)
       }
       true
     }
   }
 
-  var on_overflow_drain = ()=>{}
-
   def close(rejection_handler:(T)=>Unit) = {
     producer_queue {
       this.rejection_handler=rejection_handler