You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/17 03:58:05 UTC
svn commit: r1245304 -
/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Author: chirino
Date: Fri Feb 17 02:58:05 2012
New Revision: 1245304
URL: http://svn.apache.org/viewvc?rev=1245304&view=rev
Log:
Openwire should close consumers on disconnect and only parse producer destinations the first time a message is sent.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1245304&r1=1245303&r2=1245304&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Feb 17 02:58:05 2012
@@ -92,9 +92,9 @@ class OpenwireProtocolHandler extends Pr
def broker = connection.connector.broker
- var producerRoutes = new LRUCache[List[ConnectAddress], DeliveryProducerRoute](10) {
- override def onCacheEviction(eldest: Entry[List[ConnectAddress], DeliveryProducerRoute]) = {
- host.router.disconnect(eldest.getKey.toArray, eldest.getValue)
+ var producerRoutes = new LRUCache[ActiveMQDestination, OpenwireDeliveryProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[ActiveMQDestination, OpenwireDeliveryProducerRoute]) = {
+ host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
}
}
@@ -227,26 +227,19 @@ class OpenwireProtocolHandler extends Pr
heart_beat_monitor.stop
import collection.JavaConversions._
- producerRoutes.foreach{ case (dests, route) =>
- val addresses = dests.toArray
+ producerRoutes.values.foreach{ route =>
host.dispatch_queue {
- host.router.disconnect(addresses, route)
+ host.router.disconnect(route.addresses, route)
}
}
producerRoutes.clear
- // consumers.foreach{
- // case (_, consumer) =>
- // if (consumer.binding == null) {
- // host.router.unbind(consumer.destination, consumer)
- // } else {
- // host.router.get_queue(consumer.binding) {
- // queue =>
- // queue.foreach(_.unbind(consumer :: Nil))
- // }
- // }
- // }
- // consumers = Map()
+ all_consumers.values.foreach { consumer =>
+ host.dispatch_queue {
+ host.router.unbind(consumer.addresses, consumer, false , security_context)
+ }
+ }
+ all_consumers.clear()
trace("openwire protocol resources released")
}
}
@@ -532,7 +525,8 @@ class OpenwireProtocolHandler extends Pr
def on_remove_info(info: RemoveInfo) = {
info.getObjectId match {
- case id: ConnectionId => Option(connection_context).foreach(_.dettach)
+ case id: ConnectionId =>
+ Option(connection_context).foreach(_.dettach)
case id: SessionId => all_sessions.get(id).foreach(_.dettach)
case id: ProducerId => all_producers.get(id).foreach(_.dettach)
case id: ConsumerId => all_consumers.get(id).foreach(_.dettach )
@@ -610,9 +604,9 @@ class OpenwireProtocolHandler extends Pr
messages_received += 1
val producer = all_producers.get(msg.getProducerId).getOrElse(die("Producer associated with the message has not been registered."))
- if (msg.getOriginalDestination() == null) {
- msg.setOriginalDestination(msg.getDestination());
- }
+// if (msg.getOriginalDestination() == null) {
+// msg.setOriginalDestination(msg.getDestination());
+// }
if( msg.getTransactionId==null ) {
perform_send(msg)
@@ -623,26 +617,26 @@ class OpenwireProtocolHandler extends Pr
}
}
+ case class OpenwireDeliveryProducerRoute(addresses:Array[SimpleAddress]) extends DeliveryProducerRoute(host.router) {
+ override def connection = Some(OpenwireProtocolHandler.this.connection)
+ override def dispatch_queue = queue
+ refiller = ^ {
+ resume_read
+ }
+ }
+
def perform_send(msg:ActiveMQMessage, uow:StoreUOW=null): Unit = {
- val destiantion = to_destination_dto(msg.getDestination, this)
- val key = destiantion.toList
- producerRoutes.get(key) match {
+ producerRoutes.get(msg.getDestination) match {
case null =>
// create the producer route...
-
- val route = new DeliveryProducerRoute(host.router) {
- override def connection = Some(OpenwireProtocolHandler.this.connection)
- override def dispatch_queue = queue
- refiller = ^ {
- resume_read
- }
- }
+ val addresses = to_destination_dto(msg.getDestination, this)
+ val route = OpenwireDeliveryProducerRoute(addresses)
// don't process frames until producer is connected...
connection.transport.suspendRead
host.dispatch_queue {
- val rc = host.router.connect(destiantion, route, security_context)
+ val rc = host.router.connect(addresses, route, security_context)
dispatchQueue {
rc match {
case Some(failure) =>
@@ -650,7 +644,7 @@ class OpenwireProtocolHandler extends Pr
case None =>
if (!connection.stopped) {
resume_read
- producerRoutes.put(key, route)
+ producerRoutes.put(msg.getDestination, route)
send_via_route(route, msg, uow)
}
}