You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/09 22:36:59 UTC

svn commit: r1311428 - /activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala

Author: chirino
Date: Mon Apr  9 20:36:59 2012
New Revision: 1311428

URL: http://svn.apache.org/viewvc?rev=1311428&view=rev
Log:
Don't create the advisory topics until an openwire connection needs it.. Identified in issue APLO-185.  This hides the fact that Apollo supports openwire.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1311428&r1=1311427&r2=1311428&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Mon Apr  9 20:36:59 2012
@@ -58,6 +58,7 @@ class DestinationAdvisoryRouterListener(
   final val messageIdGenerator = new LongSequenceGenerator
 
   advisoryProducerId.setConnectionId(new UTF8Buffer(ID_GENERATOR.generateId))
+  var enabled = false
 
 
   class ProducerRoute extends DeliveryProducerRoute(router) {
@@ -103,25 +104,28 @@ class DestinationAdvisoryRouterListener(
 
   def on_bind(dest: DomainDestination, consumer: DeliveryConsumer, security: SecurityContext) = {
     val destination = to_activemq_destination(Array(dest.address))
-    if (destination!=null && AdvisorySupport.isDestinationAdvisoryTopic(destination) && !destination_advisories.isEmpty) {
+    if (destination!=null && AdvisorySupport.isDestinationAdvisoryTopic(destination)) {
       // replay the destination advisories..
-      val producer = new ProducerRoute {
-        override def on_connected = {
-          overflow_sink.refiller = ^{
-            // once the sink is not overflowed.. then we can disconnect
-            if(!overflow_sink.overflowed) {
-              unbind(consumer::Nil)
-              overflow_sink.refiller = NOOP
+      enabled = true
+      if( !destination_advisories.isEmpty ) {
+        val producer = new ProducerRoute {
+          override def on_connected = {
+            overflow_sink.refiller = ^{
+              // once the sink is not overflowed.. then we can disconnect
+              if(!overflow_sink.overflowed) {
+                unbind(consumer::Nil)
+                overflow_sink.refiller = NOOP
+              }
             }
+            overflow_sink.refiller.run()
+            super.on_connected
           }
-          overflow_sink.refiller.run()
-          super.on_connected
         }
-      }
-      producer.bind(consumer::Nil)
-      producer.connected()
-      for( info <- destination_advisories.values ) {
-        producer.overflow_sink.offer(info)
+        producer.bind(consumer::Nil)
+        producer.connected()
+        for( info <- destination_advisories.values ) {
+          producer.overflow_sink.offer(info)
+        }
       }
     }
   }
@@ -176,7 +180,7 @@ class DestinationAdvisoryRouterListener(
     val message = delivery.message.asInstanceOf[OpenwireMessage].message
     val dest = to_destination_dto(message.getDestination,null)
     val key = dest.toList
-    if( router.virtual_host.service_state.is_started ) {
+    if( enabled && router.virtual_host.service_state.is_started ) {
       val route = producerRoutes.get(key) match {
         case null =>
           // create the producer route...