You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/01/30 17:46:06 UTC

svn commit: r1440513 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/s...

Author: chirino
Date: Wed Jan 30 16:46:05 2013
New Revision: 1440513

URL: http://svn.apache.org/viewvc?rev=1440513&view=rev
Log:
In cases were pub sub is used with large numbers of consumers / producers and destinations, the number of routing sessions can grow really fast.  lets shutdown sessions once they are idle for over 2 seconds.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jan 30 16:46:05 2013
@@ -380,6 +380,7 @@ class Broker() extends BaseService with 
     }
     schedule_reoccurring(1, SECONDS) {
       virtualhost_maintenance
+      connection_maintenance
       roll_current_period
       tune_send_receive_buffers
     }
@@ -474,6 +475,13 @@ class Broker() extends BaseService with 
     }
   }
 
+  def connection_maintenance = {
+    val time = now
+    for ( c <- connections.values ) {
+      c.maintenance(time)
+    }
+  }
+
   protected def init_logs = {
     import OptionSupport._
     // Configure the logging categories...

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jan 30 16:46:05 2013
@@ -111,7 +111,11 @@ class BrokerConnection(var connector: Co
   var protocol_handler: ProtocolHandler = null;
 
   def session_id = Option(protocol_handler).flatMap(_.session_id)
-  
+  def maintenance(now:Long) = {
+    if ( protocol_handler!=null ) {
+      protocol_handler.maintenance(now)
+    }
+  }
   override def toString = "id: "+id.toString
 
   protected override  def _start(on_completed:Task) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jan 30 16:46:05 2013
@@ -204,6 +204,7 @@ object DeliveryProducerRoute extends Log
 abstract class DeliveryProducerRoute(router:Router) extends Sink[Delivery] with BindableDeliveryProducer {
   import DeliveryProducerRoute._
 
+  var last_send = Broker.now
   val reained_base = new BaseRetained
   def release = reained_base.release
   def retain = reained_base.retain
@@ -281,7 +282,7 @@ abstract class DeliveryProducerRoute(rou
     if( full ) {
       false
     } else {
-
+      last_send = Broker.now
       // Do we need to store the message if we have a matching consumer?
       pendingAck = delivery.ack
       val copy = delivery.copy

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed Jan 30 16:46:05 2013
@@ -84,6 +84,8 @@ trait ProtocolHandler {
 
   def protocol:String
 
+  def maintenance(now:Long) = {}
+
   def session_id:Option[String]
 
   var connection:BrokerConnection = null;

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jan 30 16:46:05 2013
@@ -673,14 +673,6 @@ class StompProtocolHandler extends Proto
   var closed = false
   var consumers = Map[AsciiBuffer, StompConsumer]()
 
-  var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
-    override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
-      host.dispatch_queue {
-        host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
-      }
-    }
-  }
-
   var host:VirtualHost = null
 
   private def queue = connection.dispatch_queue
@@ -1149,8 +1141,7 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  class StompProducerRoute(dest: AsciiBuffer) extends DeliveryProducerRoute(host.router) {
-
+  class StompProducerRoute(val dest: AsciiBuffer) extends DeliveryProducerRoute(host.router) {
     val addresses = decode_addresses(dest)
     val key = addresses.toList
 
@@ -1165,6 +1156,29 @@ class StompProtocolHandler extends Proto
     }
   }
 
+  override def maintenance(now:Long) = dispatchQueue {
+    import collection.JavaConversions._
+    val expired = ListBuffer[StompProducerRoute]()
+    for( route <- producerRoutes.values() ) {
+      if( (now - route.last_send) > 2000 ) {
+        expired += route
+      }
+    }
+    for( route <- expired ) {
+      producerRoutes.remove(route.dest)
+      host.dispatch_queue {
+        host.router.disconnect(route.addresses, route)
+      }
+    }
+  }
+
+  var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
+    override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
+      host.dispatch_queue {
+        host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
+      }
+    }
+  }
 
   def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
     val dest = get(frame.headers, DESTINATION).get
@@ -1293,6 +1307,7 @@ class StompProtocolHandler extends Proto
 
   def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
     var storeBatch:StoreUOW=null
+
     // User might be asking for ack that we have processed the message..
     val receipt = frame.header(RECEIPT_REQUESTED)