You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/01 18:45:13 UTC
svn commit: r1441554 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/s...
Author: chirino
Date: Fri Feb 1 17:45:12 2013
New Revision: 1441554
URL: http://svn.apache.org/viewvc?rev=1441554&view=rev
Log:
Simpler impl of route cleanup logic. Avoids NPE that could ocurr /w previous impl.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1441554&r1=1441553&r2=1441554&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Feb 1 17:45:12 2013
@@ -380,7 +380,6 @@ class Broker() extends BaseService with
}
schedule_reoccurring(1, SECONDS) {
virtualhost_maintenance
- connection_maintenance
roll_current_period
tune_send_receive_buffers
}
@@ -475,13 +474,6 @@ class Broker() extends BaseService with
}
}
- def connection_maintenance = {
- val time = now
- for ( c <- connections.values ) {
- c.maintenance(time)
- }
- }
-
protected def init_logs = {
import OptionSupport._
// Configure the logging categories...
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1441554&r1=1441553&r2=1441554&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Fri Feb 1 17:45:12 2013
@@ -111,11 +111,6 @@ class BrokerConnection(var connector: Co
var protocol_handler: ProtocolHandler = null;
def session_id = Option(protocol_handler).flatMap(_.session_id)
- def maintenance(now:Long) = {
- if ( protocol_handler!=null ) {
- protocol_handler.maintenance(now)
- }
- }
override def toString = "id: "+id.toString
protected override def _start(on_completed:Task) = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1441554&r1=1441553&r2=1441554&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Fri Feb 1 17:45:12 2013
@@ -84,8 +84,6 @@ trait ProtocolHandler {
def protocol:String
- def maintenance(now:Long) = {}
-
def session_id:Option[String]
var connection:BrokerConnection = null;
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1441554&r1=1441553&r2=1441554&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Feb 1 17:45:12 2013
@@ -1156,7 +1156,26 @@ class StompProtocolHandler extends Proto
}
}
- override def maintenance(now:Long) = dispatchQueue {
+ var maintenance_scheduled = false
+ def schedule_maintenance:Unit = {
+ if(!maintenance_scheduled && !producerRoutes.isEmpty) {
+ maintenance_scheduled = true
+ dispatchQueue.after(2, TimeUnit.SECONDS) {
+ maintenance_scheduled = false
+ if( !producerRoutes.isEmpty ) {
+ try {
+ producer_maintenance
+ } finally {
+ schedule_maintenance
+ }
+ }
+ }
+ }
+ }
+
+ def producer_maintenance = dispatchQueue {
+ println("doing route maint...")
+ val now = Broker.now
import collection.JavaConversions._
val expired = ListBuffer[StompProducerRoute]()
for( route <- producerRoutes.values() ) {
@@ -1166,6 +1185,7 @@ class StompProtocolHandler extends Proto
}
for( route <- expired ) {
producerRoutes.remove(route.dest)
+ println("Expired route to: "+route.dest)
host.dispatch_queue {
host.router.disconnect(route.addresses, route)
}
@@ -1199,6 +1219,7 @@ class StompProtocolHandler extends Proto
if (!connection.stopped) {
resume_read
producerRoutes.put(trimmed_dest, route)
+ schedule_maintenance
send_via_route(route.addresses, route, frame, uow)
}
}