You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/18 21:11:55 UTC

svn commit: r1387327 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ a...

Author: chirino
Date: Tue Sep 18 19:11:54 2012
New Revision: 1387327

URL: http://svn.apache.org/viewvc?rev=1387327&view=rev
Log:
Further improvements for APLO-244, when there are multiple producers to a single consumer we shared the session buffer size across the producers (allocating more buffer space to fast producers).

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Tue Sep 18 19:11:54 2012
@@ -572,7 +572,7 @@ class AmqpProtocolHandler extends Protoc
       }
     }
 
-    val session_manager = new SessionSinkMux[Delivery](sink, queue, Delivery) {
+    val session_manager = new SessionSinkMux[Delivery](sink, queue, Delivery, 1, buffer_size) {
       override def time_stamp = broker.now
     }
 
@@ -580,7 +580,7 @@ class AmqpProtocolHandler extends Protoc
 
       def producer = p
       def consumer = AMQPConsumer.this
-      val downstream = session_manager.open(producer.dispatch_queue, 1, buffer_size)
+      val downstream = session_manager.open(producer.dispatch_queue)
 
       // Delegate all the flow control stuff to the session
       override def full = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue Sep 18 19:11:54 2012
@@ -92,7 +92,7 @@ class Queue(val router: LocalRouter, val
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
-  val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery) {
+  val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery, Integer.MAX_VALUE, virtual_host.broker.auto_tuned_send_receiver_buffer_size) {
     override def time_stamp = now
   }
 
@@ -554,9 +554,6 @@ class Queue(val router: LocalRouter, val
     }
   }
 
-  def change_producer_capacity(amount:Int) = might_unfill {
-    // producer_swapped_in.size_max += amount
-  }
   def change_consumer_capacity(amount:Int) = might_unfill {
     consumer_swapped_in.size_max += amount
   }
@@ -1101,13 +1098,11 @@ class Queue(val router: LocalRouter, val
     override def toString = Queue.this.toString
     override def consumer = Queue.this
 
-    val session_max = producer.send_buffer_size
-    val downstream = session_manager.open(producer.dispatch_queue, Integer.MAX_VALUE, session_max)
+    val downstream = session_manager.open(producer.dispatch_queue)
 
     dispatch_queue {
       inbound_sessions += this
       producer_counter += 1
-      change_producer_capacity( session_max )
     }
 
     def close = dispatch_queue {
@@ -1117,7 +1112,6 @@ class Queue(val router: LocalRouter, val
           delivery.ack(Undelivered, delivery.uow)
         }
       })
-      change_producer_capacity( -session_max )
       inbound_sessions -= this
       release
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue Sep 18 19:11:54 2012
@@ -21,6 +21,7 @@ import java.util.LinkedList
 import org.fusesource.hawtdispatch.transport.Transport
 import collection.mutable.{ListBuffer, HashSet}
 import org.apache.activemq.apollo.util.list.{LinkedNodeList, LinkedNode}
+import java.util.concurrent.TimeUnit
 
 /**
  * <p>
@@ -341,18 +342,20 @@ trait SessionSinkFilter[T] extends Sessi
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T]) {
+class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T], delivery_credits:Int, val size_credits:Int) {
 
   var sessions = HashSet[Session[T]]()
   var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
   var overflow_size = 0L
-  var high_overflow_size = 4*1024
 
-  def open(producer_queue:DispatchQueue, delivery_credits:Int, size_credits:Int):SessionSink[T] = {
+  def open(producer_queue:DispatchQueue):SessionSink[T] = {
     val session = new Session[T](this, producer_queue)
     consumer_queue <<| ^{
-      session.credit(delivery_credits, size_credits);
       sessions += session
+      val bonus = size_credits / sessions.size
+      session.consumer_side.size_bonus = bonus
+      session.credit(delivery_credits, 1+bonus);
+      schedual_rebalance
     }
     session
   }
@@ -374,32 +377,83 @@ class SessionSinkMux[T](val downstream:S
   val drain_source = createSource(EventAggregators.INTEGER_OR, consumer_queue)
   drain_source.setEventHandler(^{
     var data = drain_source.getData
-    if( data > 0 ) {
+    if( data.intValue() > 0 ) {
       drain_overflow
     }
   })
   drain_source.resume()
 
+  var rebalance_schedualed = false
+  def schedual_rebalance = {
+    if ( !rebalance_schedualed ) {
+      rebalance_schedualed  = true
+      consumer_queue.after(550, TimeUnit.MILLISECONDS) {
+        rebalance_schedualed = false
+        rebalance_check
+      }
+    }
+  }
+
+  var last_rebalance_ts = time_stamp
+  def rebalance_check = {
+    // re-balance periodically since it's a bit expensive.
+    var now = time_stamp
+    if ( (now - last_rebalance_ts) > 500 && sessions.size>0) {
+      last_rebalance_ts = now
+      rebalance
+    }
+  }
+
+  def rebalance = {
+    val bonus_size = size_credits / sessions.size
+
+    // allocate bonus credits to reduce session stalls.
+    val sessions_copy = sessions.toArray
+
+    var stalled_sessions = List[Session[T]]()
+    var total_stalls = 0L
+    for ( session <- sessions ) {
+      val consumer_side = session.consumer_side
+      if ( consumer_side.stall_counter > 0) {
+        total_stalls += consumer_side.stall_counter
+        stalled_sessions ::= session
+      }
+    }
+
+    for ( session <- stalled_sessions ) {
+      val consumer_side = session.consumer_side
+      var slice_percent = consumer_side.stall_counter.toFloat / total_stalls
+      val new_size_bonus = (size_credits * slice_percent).toInt
+      val change = new_size_bonus - consumer_side.size_bonus
+      consumer_side.stall_counter = 0
+      consumer_side.size_bonus += change
+      session.credit(0, change)
+    }
+  }
+
   def drain_overflow:Unit = {
     consumer_queue.assertExecuting()
+    rebalance_check
     while( !downstream.full && !overflowed_sessions.isEmpty) {
 
       val session = overflowed_sessions.getHead.session
       val consumer_side = session.consumer_side
-      val value = consumer_side.overlfow.getFirst()
+      val value = consumer_side.overflow.removeFirst()
+      assert(value!=null)
 
       val accepted = downstream.offer((session, value))
       assert(accepted, "The downstream sink violated it's contract, an offer was not accepted but it had told us it was not full")
 
-      consumer_side.overlfow.removeFirst()
+      if (  consumer_side.stall_counter > 0  ) {
+        schedual_rebalance
+      }
+
       overflow_size -= sizer.size(value)
-      if( consumer_side.overlfow.isEmpty ) {
+      if( consumer_side.overflow.isEmpty ) {
         consumer_side.overflow_node.unlink()
-        if( consumer_side.pending_delivery_credits!=0 || consumer_side.pending_size_credits!=0 ) {
-          session.credit(consumer_side.pending_delivery_credits, consumer_side.pending_size_credits)
-          consumer_side.pending_delivery_credits = 0
-          consumer_side.pending_size_credits = 0
-        }
+        session.credit(consumer_side.pending_delivery_credits, consumer_side.pending_size_credits)
+        consumer_side.pending_delivery_credits = 0
+        consumer_side.pending_size_credits = 0
       } else {
         // to fairly consume values from all sessions.
         overflowed_sessions.rotate()
@@ -409,7 +463,7 @@ class SessionSinkMux[T](val downstream:S
 
   def delivered(session:Session[Delivery], size:Int) = {
     val consumer_side = session.consumer_side
-    if( overflow_size >= high_overflow_size && !consumer_side.overlfow.isEmpty) {
+    if( overflow_size >= size_credits && !consumer_side.overflow.isEmpty) {
       consumer_side.pending_delivery_credits += 1
       consumer_side.pending_size_credits += size
     } else {
@@ -426,26 +480,32 @@ case class SessionLinkedNode[T](session:
  */
 class Session[T](mux:SessionSinkMux[T], val producer_queue:DispatchQueue) extends SessionSink[T] {
 
-  // Access to the consumer_side must be done from the consumer dispatch queue..
+  // the consumer_side object is mutated from the consumer dispatch queue...
+  // we should think about field padding this object to avoid false sharing on the cache lines.
   object consumer_side {
 
-    val overlfow = new LinkedList[T]()
+    val overflow = new LinkedList[T]()
     var pending_delivery_credits = 0
     var pending_size_credits = 0
     val overflow_node = SessionLinkedNode[T](Session.this)
+    var stall_counter = 0
+    var size_bonus = 0
 
     // use a event aggregating source to coalesce multiple events from the same thread.
-    val source = createSource(new ListEventAggregator[T](), mux.consumer_queue)
+    val source = createSource(new ListEventAggregator[(T, Boolean)](), mux.consumer_queue)
 
     source.setEventHandler(^{
-
       mux.consumer_queue.assertExecuting()
       for( value <- source.getData ) {
-        if( overlfow.isEmpty ) {
+        if( overflow.isEmpty ) {
           mux.overflowed_sessions.addLast(overflow_node);
         }
-        overlfow.add(value)
-        mux.overflow_size += sizer.size(value)
+        overflow.add(value._1)
+        val size = sizer.size(value._1)
+        mux.overflow_size += size
+        if (value._2) {
+          stall_counter += 1
+        }
       }
       mux.drain_source.merge(0x02)
     });
@@ -453,6 +513,7 @@ class Session[T](mux:SessionSinkMux[T], 
 
   }
 
+  // the rest of the Session fields are mutated from the producer dispatch queue...
   var refiller:Task = NOOP
   var rejection_handler: (T)=>Unit = _
 
@@ -467,7 +528,11 @@ class Session[T](mux:SessionSinkMux[T], 
   @volatile
   var enqueue_ts = mux.time_stamp
 
-  def credit(delivery_credits:Int, size_credits:Int) = credit_adder.merge((delivery_credits, size_credits))
+  def credit(delivery_credits:Int, size_credits:Int) = {
+    if( delivery_credits!=0 || size_credits!=0 ) {
+      credit_adder.merge((delivery_credits, size_credits))
+    }
+  }
 
   // create a source to coalesce credit events back to the producer side...
   val credit_adder = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
@@ -525,7 +590,8 @@ class Session[T](mux:SessionSinkMux[T], 
         enqueue_ts = mux.time_stamp
   
         add_credits(-1, -size)
-        consumer_side.source.merge(value)
+        val stalled = size_credits <= 0 || delivery_credits<=0
+        consumer_side.source.merge((value, stalled))
       }
       true
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue Sep 18 19:11:54 2012
@@ -832,7 +832,7 @@ class OpenwireProtocolHandler extends Pr
 
     credit_window_filter.credit(info.getPrefetchSize, 0)
 
-    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery) {
+    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE) {
       override def time_stamp = broker.now
     }
 
@@ -945,7 +945,7 @@ class OpenwireProtocolHandler extends Pr
       producer.dispatch_queue.assertExecuting()
       retain
 
-      val downstream = session_manager.open(producer.dispatch_queue, info.getCurrentPrefetchSize.max(1), Integer.MAX_VALUE)
+      val downstream = session_manager.open(producer.dispatch_queue)
       var closed = false
 
       def consumer = ConsumerContext.this

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1387327&r1=1387326&r2=1387327&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue Sep 18 19:11:54 2012
@@ -486,7 +486,7 @@ class StompProtocolHandler extends Proto
 
     credit_window_filter.credit(initial_credit_window.count, initial_credit_window.size)
 
-    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery) {
+    val session_manager:SessionSinkMux[Delivery] = new SessionSinkMux[Delivery](credit_window_filter, dispatchQueue, Delivery, initial_credit_window.count.max(1), buffer_size) {
       override def time_stamp = broker.now
     }
 
@@ -538,7 +538,7 @@ class StompProtocolHandler extends Proto
       producer.dispatch_queue.assertExecuting()
       retain
 
-      val downstream = session_manager.open(producer.dispatch_queue, initial_credit_window.count.max(1), buffer_size)
+      val downstream = session_manager.open(producer.dispatch_queue)
 
       override def toString = "connection to "+StompProtocolHandler.this.connection.transport.getRemoteAddress