You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/19 00:12:57 UTC

svn commit: r1374667 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Author: chirino
Date: Sat Aug 18 22:12:57 2012
New Revision: 1374667

URL: http://svn.apache.org/viewvc?rev=1374667&view=rev
Log:
Adjust session sinks so that they credit in larger batches when the sink is under load by multiple producers.  It also will round robin the messages from multiple sessions to improve delivery fairness.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1374667&r1=1374666&r2=1374667&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Sat Aug 18 22:12:57 2012
@@ -19,7 +19,8 @@ package org.apache.activemq.apollo.broke
 import org.fusesource.hawtdispatch._
 import java.util.LinkedList
 import org.fusesource.hawtdispatch.transport.Transport
-import collection.mutable.HashSet
+import collection.mutable.{ListBuffer, HashSet}
+import org.apache.activemq.apollo.util.list.{LinkedNodeList, LinkedNode}
 
 /**
  * <p>
@@ -343,31 +344,14 @@ trait SessionSinkFilter[T] extends Sessi
 class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
 
   var sessions = HashSet[Session[T]]()
-  val overflow = new OverflowSink[(Session[T],T)](downstream)
-
-  def delivered(session:Session[Delivery], size:Int) = {
-    consumer_queue.assertExecuting()
-    session.credit_adder.merge((1, size));
-  }
-
-  // use a event aggregating source to coalesce multiple events from the same thread.
-  // all the sessions send to the same source.
-  val source = createSource(new ListEventAggregator[(Session[T],T)](), consumer_queue)
-  source.setEventHandler(^{drain_source});
-  source.resume
-
-  def drain_source = {
-    source.getData.foreach { event =>
-      // overflow sinks can always accept more values.
-      val f1 = overflow.full
-      overflow.offer(event)
-    }
-  }
+  var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
+  var overflow_size = 0L
+  var high_overflow_size = 64*1024
 
   def open(producer_queue:DispatchQueue, delivery_credits:Int, size_credits:Int):SessionSink[T] = {
     val session = new Session[T](this, producer_queue)
     consumer_queue <<| ^{
-      session.credit_adder.merge((delivery_credits, size_credits));
+      session.credit(delivery_credits, size_credits);
       sessions += session
     }
     session
@@ -378,25 +362,81 @@ class SessionSinkMux[T](val downstream:S
       session match {
         case s:Session[T] =>
           sessions -= s
-          s.producer_queue {
-            s.close(rejection_handler)
-          }
+          s.close(rejection_handler)
       }
     }
   }
 
   def time_stamp = 0L
+
+  downstream.refiller = ^{ drain_overflow }
+
+  def drain_overflow:Unit = {
+    while( !overflowed_sessions.isEmpty) {
+      val session = overflowed_sessions.getHead.session
+      val value = session.overflow.getFirst()
+      if( downstream.offer((session, value)) ) {
+        session.overflow.removeFirst()
+        overflow_size -= sizer.size(value)
+        if( session.overflow.isEmpty ) {
+          session.overflow_node.unlink()
+          session.on_overflow_drain()
+          if( session.pending_delivery_credits!=0 || session.pending_size_credits!=0 ) {
+            session.credit(session.pending_delivery_credits, session.pending_size_credits)
+            session.pending_delivery_credits = 0
+            session.pending_size_credits = 0
+          }
+        } else {
+          // to fairly consume values from all sessions.
+          overflowed_sessions.rotate()
+        }
+      } else {
+        return
+      }
+    }
+  }
+
+  def delivered(session:Session[Delivery], size:Int) = {
+    if( overflow_size >= high_overflow_size && !session.overflow.isEmpty) {
+      session.pending_delivery_credits += 1
+      session.pending_size_credits += size
+    } else {
+      session.credit(1, size)
+    }
+  }
+
 }
 
+case class SessionLinkedNode[T](session:Session[T]) extends LinkedNode[SessionLinkedNode[T]]
+
 /**
  * tracks one producer to consumer session / credit window.
  */
 class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T] {
 
+  val overflow = new LinkedList[T]()
+  var pending_delivery_credits = 0
+  var pending_size_credits = 0
+
+  // use a event aggregating source to coalesce multiple events from the same thread.
+  val overflow_source = createSource(new ListEventAggregator[T](), mux.consumer_queue)
+  overflow_source.setEventHandler(^{
+    for( value <- overflow_source.getData ) {
+      if( overflow.isEmpty ) {
+        mux.overflowed_sessions.addLast(overflow_node);
+      }
+      overflow.add(value)
+      mux.overflow_size += sizer.size(value)
+    }
+    mux.drain_overflow
+  });
+  overflow_source.resume
+
   var refiller:Task = NOOP
+  var rejection_handler: (T)=>Unit = _
+  val overflow_node = SessionLinkedNode[T](this)
 
   private def sizer = mux.sizer
-  private def downstream = mux.source
   var delivery_credits = 0
   var size_credits = 0
 
@@ -407,6 +447,11 @@ class Session[T](mux:SessionSinkMux[T], 
   @volatile
   var enqueue_ts = mux.time_stamp
 
+  def credit(delivery_credits:Int, size_credits:Int) = {
+    mux.consumer_queue.assertExecuting()
+    credit_adder.merge((delivery_credits, size_credits))
+  }
+
   // create a source to coalesce credit events back to the producer side...
   val credit_adder = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
     def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
@@ -428,8 +473,7 @@ class Session[T](mux:SessionSinkMux[T], 
   }
   credit_adder.resume
 
-  private var rejection_handler: (T)=>Unit = _
-  
+
   private def add_credits(count:Int, size:Int) = {
     delivery_credits += count
     size_credits += size
@@ -464,17 +508,19 @@ class Session[T](mux:SessionSinkMux[T], 
         enqueue_ts = mux.time_stamp
   
         add_credits(-1, -size)
-        downstream.merge((this, value))
+        overflow_source.merge(value)
       }
       true
     }
   }
 
+  var on_overflow_drain = ()=>{}
+
   def close(rejection_handler:(T)=>Unit) = {
-    producer_queue.assertExecuting()
-    assert(this.rejection_handler==null)
-    this.rejection_handler=rejection_handler
-    refiller.run
+    producer_queue {
+      this.rejection_handler=rejection_handler
+      refiller.run
+    }
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1374667&r1=1374666&r2=1374667&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Sat Aug 18 22:12:57 2012
@@ -201,8 +201,9 @@ class StompParallelTest extends StompTes
     client.socket.setSoTimeout(1 * 1000)
     var block_count = 0
     try {
+      receipt_counter.set(0L)
       while (true) {
-        sync_send("/queue/quota.assured1", "%01024d".format(block_count))
+        sync_send("/queue/quota.assured1", "%01024d".format(block_count), "message-id:"+block_count+"\n")
         block_count += 1
       }
     } catch {
@@ -213,9 +214,11 @@ class StompParallelTest extends StompTes
     // Send 5 more messages which do not fit in the queue, they will be
     // held in the producer connection's delivery session buffer..
     connect("1.1")
-    for (i <- 0 until (block_count + 5)) {
-      async_send("/queue/quota.assured2", "%01024d".format(i))
+    receipt_counter.set(0L)
+    for (i <- 0 until (block_count-1)) {
+      sync_send("/queue/quota.assured2", "%01024d".format(i), "message-id:"+i+"\n")
     }
+    async_send("/queue/quota.assured2", "%01024d".format(block_count-1))
 
     // Even though we disconnect, those 5 that did not fit should still
     // get delivered once the queue unblocks..
@@ -224,7 +227,7 @@ class StompParallelTest extends StompTes
     // Lets make sure non of the messages were dropped.
     connect("1.1")
     subscribe("0", "/queue/quota.assured2")
-    for (i <- 0 until (block_count + 5)) {
+    for (i <- 0 until block_count) {
       assert_received("%01024d".format(i))
     }
 
@@ -242,8 +245,9 @@ class StompParallelTest extends StompTes
     client.socket.setSoTimeout(1 * 1000)
     var block_count = 0
     try {
+      receipt_counter.set(0L)
       while (true) {
-        sync_send("/topic/quota.assured1", "%01024d".format(block_count))
+        sync_send("/topic/quota.assured1", "%01024d".format(block_count), "message-id:"+block_count+"\n")
         block_count += 1
       }
     } catch {
@@ -253,21 +257,23 @@ class StompParallelTest extends StompTes
     close(consumer)
 
     connect("1.1", consumer)
-    subscribe("0", "/topic/quota.assured2", "client", headers = "credit:1,0\n", c = consumer)
+    subscribe("1", "/topic/quota.assured2", "client", headers = "credit:1,0\n", c = consumer)
 
     // Send 5 more messages which do not fit in the consumer buffer, they will be
     // held in the producer connection's delivery session buffer..
     connect("1.1")
-    for (i <- 0 until (block_count + 5)) {
-      async_send("/topic/quota.assured2", "%01024d".format(i))
+    receipt_counter.set(0L)
+    for (i <- 0 until (block_count-1)) {
+      sync_send("/topic/quota.assured2", "%01024d".format(i), "message-id:"+i+"\n")
     }
+    async_send("/topic/quota.assured2", "%01024d".format(block_count-1))
 
     // Even though we disconnect, those 5 that did not fit should still
     // get delivered once the queue unblocks..
     disconnect()
 
     // Lets make sure non of the messages were dropped.
-    for (i <- 0 until (block_count + 5)) {
+    for (i <- 0 until block_count) {
       assert_received("%01024d".format(i), c = consumer)(true)
     }