You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/01/30 17:46:06 UTC
svn commit: r1440513 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/s...
Author: chirino
Date: Wed Jan 30 16:46:05 2013
New Revision: 1440513
URL: http://svn.apache.org/viewvc?rev=1440513&view=rev
Log:
In cases were pub sub is used with large numbers of consumers / producers and destinations, the number of routing sessions can grow really fast. lets shutdown sessions once they are idle for over 2 seconds.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jan 30 16:46:05 2013
@@ -380,6 +380,7 @@ class Broker() extends BaseService with
}
schedule_reoccurring(1, SECONDS) {
virtualhost_maintenance
+ connection_maintenance
roll_current_period
tune_send_receive_buffers
}
@@ -474,6 +475,13 @@ class Broker() extends BaseService with
}
}
+ def connection_maintenance = {
+ val time = now
+ for ( c <- connections.values ) {
+ c.maintenance(time)
+ }
+ }
+
protected def init_logs = {
import OptionSupport._
// Configure the logging categories...
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jan 30 16:46:05 2013
@@ -111,7 +111,11 @@ class BrokerConnection(var connector: Co
var protocol_handler: ProtocolHandler = null;
def session_id = Option(protocol_handler).flatMap(_.session_id)
-
+ def maintenance(now:Long) = {
+ if ( protocol_handler!=null ) {
+ protocol_handler.maintenance(now)
+ }
+ }
override def toString = "id: "+id.toString
protected override def _start(on_completed:Task) = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jan 30 16:46:05 2013
@@ -204,6 +204,7 @@ object DeliveryProducerRoute extends Log
abstract class DeliveryProducerRoute(router:Router) extends Sink[Delivery] with BindableDeliveryProducer {
import DeliveryProducerRoute._
+ var last_send = Broker.now
val reained_base = new BaseRetained
def release = reained_base.release
def retain = reained_base.retain
@@ -281,7 +282,7 @@ abstract class DeliveryProducerRoute(rou
if( full ) {
false
} else {
-
+ last_send = Broker.now
// Do we need to store the message if we have a matching consumer?
pendingAck = delivery.ack
val copy = delivery.copy
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Wed Jan 30 16:46:05 2013
@@ -84,6 +84,8 @@ trait ProtocolHandler {
def protocol:String
+ def maintenance(now:Long) = {}
+
def session_id:Option[String]
var connection:BrokerConnection = null;
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1440513&r1=1440512&r2=1440513&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Wed Jan 30 16:46:05 2013
@@ -673,14 +673,6 @@ class StompProtocolHandler extends Proto
var closed = false
var consumers = Map[AsciiBuffer, StompConsumer]()
- var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
- override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
- host.dispatch_queue {
- host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
- }
- }
- }
-
var host:VirtualHost = null
private def queue = connection.dispatch_queue
@@ -1149,8 +1141,7 @@ class StompProtocolHandler extends Proto
}
}
- class StompProducerRoute(dest: AsciiBuffer) extends DeliveryProducerRoute(host.router) {
-
+ class StompProducerRoute(val dest: AsciiBuffer) extends DeliveryProducerRoute(host.router) {
val addresses = decode_addresses(dest)
val key = addresses.toList
@@ -1165,6 +1156,29 @@ class StompProtocolHandler extends Proto
}
}
+ override def maintenance(now:Long) = dispatchQueue {
+ import collection.JavaConversions._
+ val expired = ListBuffer[StompProducerRoute]()
+ for( route <- producerRoutes.values() ) {
+ if( (now - route.last_send) > 2000 ) {
+ expired += route
+ }
+ }
+ for( route <- expired ) {
+ producerRoutes.remove(route.dest)
+ host.dispatch_queue {
+ host.router.disconnect(route.addresses, route)
+ }
+ }
+ }
+
+ var producerRoutes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
+ host.dispatch_queue {
+ host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
+ }
+ }
+ }
def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
val dest = get(frame.headers, DESTINATION).get
@@ -1293,6 +1307,7 @@ class StompProtocolHandler extends Proto
def send_via_route(addresses: Array[SimpleAddress], route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
var storeBatch:StoreUOW=null
+
// User might be asking for ack that we have processed the message..
val receipt = frame.header(RECEIPT_REQUESTED)