You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/17 03:58:05 UTC

svn commit: r1245304 - /activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Author: chirino
Date: Fri Feb 17 02:58:05 2012
New Revision: 1245304

URL: http://svn.apache.org/viewvc?rev=1245304&view=rev
Log:
Openwire should close consumers on disconnect and only parse producer destinations the first time a message is sent.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1245304&r1=1245303&r2=1245304&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Feb 17 02:58:05 2012
@@ -92,9 +92,9 @@ class OpenwireProtocolHandler extends Pr
 
   def broker = connection.connector.broker
 
-  var producerRoutes = new LRUCache[List[ConnectAddress], DeliveryProducerRoute](10) {
-    override def onCacheEviction(eldest: Entry[List[ConnectAddress], DeliveryProducerRoute]) = {
-      host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
+  var producerRoutes = new LRUCache[ActiveMQDestination, OpenwireDeliveryProducerRoute](10) {
+    override def onCacheEviction(eldest: Entry[ActiveMQDestination, OpenwireDeliveryProducerRoute]) = {
+      host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
     }
   }
 
@@ -227,26 +227,19 @@ class OpenwireProtocolHandler extends Pr
       heart_beat_monitor.stop
 
       import collection.JavaConversions._
-      producerRoutes.foreach{ case (dests, route) =>
-        val addresses = dests.toArray
+      producerRoutes.values.foreach{ route =>
         host.dispatch_queue {
-          host.router.disconnect(addresses, route)
+          host.router.disconnect(route.addresses, route)
         }
       }
       producerRoutes.clear
 
-      //      consumers.foreach{
-      //        case (_, consumer) =>
-      //          if (consumer.binding == null) {
-      //            host.router.unbind(consumer.destination, consumer)
-      //          } else {
-      //            host.router.get_queue(consumer.binding) {
-      //              queue =>
-      //                queue.foreach(_.unbind(consumer :: Nil))
-      //            }
-      //          }
-      //      }
-      //      consumers = Map()
+      all_consumers.values.foreach { consumer =>
+        host.dispatch_queue {
+          host.router.unbind(consumer.addresses, consumer, false , security_context)
+        }
+      }
+      all_consumers.clear()
       trace("openwire protocol resources released")
     }
   }
@@ -532,7 +525,8 @@ class OpenwireProtocolHandler extends Pr
 
   def on_remove_info(info: RemoveInfo) = {
     info.getObjectId match {
-      case id: ConnectionId => Option(connection_context).foreach(_.dettach)
+      case id: ConnectionId =>
+        Option(connection_context).foreach(_.dettach)
       case id: SessionId => all_sessions.get(id).foreach(_.dettach)
       case id: ProducerId => all_producers.get(id).foreach(_.dettach)
       case id: ConsumerId => all_consumers.get(id).foreach(_.dettach )
@@ -610,9 +604,9 @@ class OpenwireProtocolHandler extends Pr
     messages_received += 1
     val producer = all_producers.get(msg.getProducerId).getOrElse(die("Producer associated with the message has not been registered."))
 
-    if (msg.getOriginalDestination() == null) {
-      msg.setOriginalDestination(msg.getDestination());
-    }
+//    if (msg.getOriginalDestination() == null) {
+//      msg.setOriginalDestination(msg.getDestination());
+//    }
 
     if( msg.getTransactionId==null ) {
       perform_send(msg)
@@ -623,26 +617,26 @@ class OpenwireProtocolHandler extends Pr
     }
   }
 
+  case class OpenwireDeliveryProducerRoute(addresses:Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) {
+    override def connection = Some(OpenwireProtocolHandler.this.connection)
+    override def dispatch_queue = queue
+    refiller = ^ {
+      resume_read
+    }
+  }
+
   def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
 
-    val destiantion = to_destination_dto(msg.getDestination, this)
-    val key = destiantion.toList
-    producerRoutes.get(key) match {
+    producerRoutes.get(msg.getDestination) match {
       case null =>
         // create the producer route...
-
-        val route = new DeliveryProducerRoute(host.router) {
-          override def connection = Some(OpenwireProtocolHandler.this.connection)
-          override def dispatch_queue = queue
-          refiller = ^ {
-            resume_read
-          }
-        }
+        val addresses = to_destination_dto(msg.getDestination, this)
+        val route = OpenwireDeliveryProducerRoute(addresses)
 
         // don't process frames until producer is connected...
         connection.transport.suspendRead
         host.dispatch_queue {
-          val rc = host.router.connect(destiantion, route, security_context)
+          val rc = host.router.connect(addresses, route, security_context)
           dispatchQueue {
             rc match {
               case Some(failure) =>
@@ -650,7 +644,7 @@ class OpenwireProtocolHandler extends Pr
               case None =>
                 if (!connection.stopped) {
                   resume_read
-                  producerRoutes.put(key, route)
+                  producerRoutes.put(msg.getDestination, route)
                   send_via_route(route, msg, uow)
                 }
             }