You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/01 18:45:13 UTC

svn commit: r1441554 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/s...

Author: chirino
Date: Fri Feb  1 17:45:12 2013
New Revision: 1441554

URL: http://svn.apache.org/viewvc?rev=1441554&view=rev
Log:
Simpler impl of route cleanup logic.  Avoids NPE that could ocurr /w previous impl.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1441554&r1=1441553&r2=1441554&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Feb  1 17:45:12 2013
@@ -380,7 +380,6 @@ class Broker() extends BaseService with 
     }
     schedule_reoccurring(1, SECONDS) {
       virtualhost_maintenance
-      connection_maintenance
       roll_current_period
       tune_send_receive_buffers
     }
@@ -475,13 +474,6 @@ class Broker() extends BaseService with 
     }
   }
 
-  def connection_maintenance = {
-    val time = now
-    for ( c <- connections.values ) {
-      c.maintenance(time)
-    }
-  }
-
   protected def init_logs = {
     import OptionSupport._
     // Configure the logging categories...

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1441554&r1=1441553&r2=1441554&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Fri Feb  1 17:45:12 2013
@@ -111,11 +111,6 @@ class BrokerConnection(var connector: Co
   var protocol_handler: ProtocolHandler = null;
 
   def session_id = Option(protocol_handler).flatMap(_.session_id)
-  def maintenance(now:Long) = {
-    if ( protocol_handler!=null ) {
-      protocol_handler.maintenance(now)
-    }
-  }
   override def toString = "id: "+id.toString
 
   protected override  def _start(on_completed:Task) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala?rev=1441554&r1=1441553&r2=1441554&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/Protocol.scala Fri Feb  1 17:45:12 2013
@@ -84,8 +84,6 @@ trait ProtocolHandler {
 
   def protocol:String
 
-  def maintenance(now:Long) = {}
-
   def session_id:Option[String]
 
   var connection:BrokerConnection = null;

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1441554&r1=1441553&r2=1441554&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Feb  1 17:45:12 2013
@@ -1156,7 +1156,26 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  override def maintenance(now:Long) = dispatchQueue {
+  var maintenance_scheduled = false
+  def schedule_maintenance:Unit = {
+    if(!maintenance_scheduled && !producerRoutes.isEmpty) {
+      maintenance_scheduled = true
+      dispatchQueue.after(2, TimeUnit.SECONDS) {
+        maintenance_scheduled = false
+        if( !producerRoutes.isEmpty ) {
+          try {
+            producer_maintenance
+          } finally {
+            schedule_maintenance
+          }
+        }
+      }
+    }
+  }
+
+  def producer_maintenance = dispatchQueue {
+    println("doing route maint...")
+    val now = Broker.now
     import collection.JavaConversions._
     val expired = ListBuffer[StompProducerRoute]()
     for( route <- producerRoutes.values() ) {
@@ -1166,6 +1185,7 @@ class StompProtocolHandler extends Proto
     }
     for( route <- expired ) {
       producerRoutes.remove(route.dest)
+      println("Expired route to: "+route.dest)
       host.dispatch_queue {
         host.router.disconnect(route.addresses, route)
       }
@@ -1199,6 +1219,7 @@ class StompProtocolHandler extends Proto
                 if (!connection.stopped) {
                   resume_read
                   producerRoutes.put(trimmed_dest, route)
+                  schedule_maintenance
                   send_via_route(route.addresses, route, frame, uow)
                 }
             }