You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/17 17:17:18 UTC

svn commit: r1158764 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/ apollo-openwire/src/main/scala/org/apache/active...

Author: chirino
Date: Wed Aug 17 15:17:17 2011
New Revision: 1158764

URL: http://svn.apache.org/viewvc?rev=1158764&view=rev
Log:
Openwire Temp Destinations now working. Topics can now be deleted.

Added:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1158764&r1=1158763&r2=1158764&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Aug 17 15:17:17 2011
@@ -36,8 +36,15 @@ import collection.{Iterable, JavaConvers
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait RouterListener {
-  def on_create(path:Path, destination:DestinationDTO, security:SecurityContext)
-  def on_destroy(path:Path, destination:DestinationDTO, security:SecurityContext)
+  def on_create(destination:DomainDestination, security:SecurityContext)
+  def on_destroy(destination:DomainDestination, security:SecurityContext)
+
+  def on_connect(destination:DomainDestination, producer:BindableDeliveryProducer, security:SecurityContext)
+  def on_disconnect(destination:DomainDestination, producer:BindableDeliveryProducer)
+
+  def on_bind(destination:DomainDestination, consumer:DeliveryConsumer, security:SecurityContext)
+  def on_unbind(destination:DomainDestination, consumer:DeliveryConsumer, persistent:Boolean)
+
   def close
 }
 /**
@@ -217,11 +224,14 @@ class LocalRouter(val virtual_host:Virtu
 
     def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
     def can_bind_all(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
+      if( security==null ) {
+        return None
+      }
 
       // Only allow the owner to bind.
       if( destination.temp_owner != null ) {
-        for( connection <- consumer.connection) {
-          if( connection.id != destination.temp_owner.longValue() ) {
+        for( connection <- security.connection_id) {
+          if( connection != destination.temp_owner.longValue() ) {
             return Some("Not authorized to receive from the temporary destination.")
           }
         }
@@ -257,6 +267,9 @@ class LocalRouter(val virtual_host:Virtu
       matches.foreach { dest=>
         if( can_bind_one(path, destination, consumer, security) ) {
           dest.bind(destination, consumer)
+          for( l <- router_listeners) {
+            l.on_bind(dest, consumer, security)
+          }
         }
       }
       consumer.retain
@@ -268,16 +281,12 @@ class LocalRouter(val virtual_host:Virtu
       if( consumers_by_path.remove(path, new ConsumerContext(destination, consumer, null) ) ) {
         get_destination_matches(path).foreach{ dest=>
           dest.unbind(consumer, persistent)
+          for( l <- router_listeners) {
+            l.on_unbind(dest, consumer, persistent)
+          }
         }
         consumer.release
       }
-
-//      if( persistent ) {
-//          destroy_queue(consumer.binding, security_context).failure_option.foreach{ reason=>
-//            async_die(reason)
-//          }
-//      }
-
     }
 
     def can_connect_one(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean
@@ -322,6 +331,9 @@ class LocalRouter(val virtual_host:Virtu
       get_destination_matches(path).foreach { dest=>
         if( can_connect_one(path, destination, producer, security) ) {
           dest.connect(destination, producer)
+          for( l <- router_listeners) {
+            l.on_connect(dest, producer, security)
+          }
         }
       }
       producers_by_path.put(path, new ProducerContext(destination, producer, security))
@@ -332,6 +344,9 @@ class LocalRouter(val virtual_host:Virtu
       producers_by_path.remove(path, new ProducerContext(destination, producer, null))
       get_destination_matches(path).foreach { dest=>
         dest.disconnect(producer)
+        for( l <- router_listeners) {
+          l.on_disconnect(dest, producer)
+        }
       }
     }
 
@@ -380,6 +395,9 @@ class LocalRouter(val virtual_host:Virtu
           // Connects a producer directly to a durable subscription..
           durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
             dest.connect(destination, producer)
+            for( l <- router_listeners) {
+              l.on_connect(dest, producer, security)
+            }
           }
 
         case _ => super.connect(path, destination, producer, security)
@@ -391,14 +409,29 @@ class LocalRouter(val virtual_host:Virtu
         case destination:DurableSubscriptionDestinationDTO =>
           durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
             dest.disconnect(producer)
+            for( l <- router_listeners) {
+              l.on_disconnect(dest, producer)
+            }
           }
         case _ => super.disconnect(destination, producer)
       }
     }
 
     def can_destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Option[String] = {
+      if( security == null ) {
+        return None
+      }
+
+      if( destination.temp_owner != null ) {
+        for( connection <- security.connection_id) {
+          if( connection != destination.temp_owner.longValue() ) {
+            return Some("Not authorized to destroy the temporary destination.")
+          }
+        }
+      }
+
       val matches = get_destination_matches(path)
-      val rc = matches.foldLeft(None:Option[String]) { case (rc,dest) =>
+      matches.foldLeft(None:Option[String]) { case (rc,dest) =>
         rc.orElse {
           if( virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_destroy(security, virtual_host, dest.config)) {
             Some("Not authorized to destroy topic: %s".format(dest.id))
@@ -407,19 +440,36 @@ class LocalRouter(val virtual_host:Virtu
           }
         }
       }
-
-      // TODO: destroy not yet supported on topics..  Need to disconnect all
-      // clients and destroy remove any durable subs on the topic.
-      Some("Topic destroy not yet implemented.")
     }
 
     def destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Unit = {
       val matches = get_destination_matches(path)
       matches.foreach { dest =>
         for( l <- router_listeners) {
-          l.on_destroy(path, destination, security)
+          l.on_destroy(dest, security)
         }
-//        remove_destination(dest.path, dest)
+
+        // Disconnect the producers.
+        dest.disconnect_producers
+
+        // Delete the durable subs which
+        for( queue <- dest.durable_subscriptions ) {
+          // we delete the durable sub if it's not wildcard'ed
+          if( !PathParser.containsWildCards(queue.binding.destination) ) {
+            _destroy_queue(queue.id, null)
+          }
+        }
+
+        for( consumer <- dest.consumers ) {
+          consumer match {
+            case queue:Queue =>
+              // Delete any attached queue consumers..
+              _destroy_queue(queue.id, null)
+          }
+        }
+
+        // Un-register the topic.
+        remove_destination(path, dest)
       }
     }
 
@@ -451,7 +501,7 @@ class LocalRouter(val virtual_host:Virtu
       add_destination(path, topic)
 
       for( l <- router_listeners) {
-        l.on_create(path, destination, security)
+        l.on_create(topic, security)
       }
       Success(topic)
     }
@@ -545,6 +595,10 @@ class LocalRouter(val virtual_host:Virtu
           queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
           queue.bind(destination, consumer)
 
+          for( l <- router_listeners) {
+            l.on_bind(queue, consumer, security)
+          }
+
         case _ =>
           super.bind(path, destination, consumer, security)
       }
@@ -558,6 +612,9 @@ class LocalRouter(val virtual_host:Virtu
             if( persistent ) {
               _destroy_queue(queue, security)
             }
+            for( l <- router_listeners) {
+              l.on_unbind(queue, consumer, persistent)
+            }
           }
         case _ =>
           super.unbind( destination, consumer, persistent, security)
@@ -711,7 +768,7 @@ class LocalRouter(val virtual_host:Virtu
       val matches = get_destination_matches(path)
       matches.foreach { queue =>
         for( l <- router_listeners) {
-          l.on_destroy(queue.binding.destination, queue.binding.binding_dto, security)
+          l.on_destroy(queue, security)
         }
         _destroy_queue(queue)
       }
@@ -736,7 +793,11 @@ class LocalRouter(val virtual_host:Virtu
       val binding = QueueDomainQueueBinding.create(dto)
       val config = binding.config(virtual_host)
       if( can_create_queue(config, security) ) {
-        Success(_create_queue(binding))
+        var queue = _create_queue(binding)
+        for( l <- router_listeners) {
+          l.on_create(queue, security)
+        }
+        Success(queue)
       } else {
         Failure("Not authorized to create the queue")
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1158764&r1=1158763&r2=1158764&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Aug 17 15:17:17 2011
@@ -47,7 +47,7 @@ class Queue(val router: LocalRouter, val
 
   def id = binding.id
 
-  override def toString: String =  id
+  override def toString = binding.destination.toString
 
   def virtual_host = router.virtual_host
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1158764&r1=1158763&r2=1158764&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed Aug 17 15:17:17 2011
@@ -49,6 +49,8 @@ class Topic(val router:LocalRouter, val 
 
   import OptionSupport._
 
+  override def toString = destination_dto.toString
+
   def virtual_host: VirtualHost = router.virtual_host
 
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
@@ -207,4 +209,12 @@ class Topic(val router:LocalRouter, val 
     check_idle
   }
 
+  def disconnect_producers:Unit ={
+    for( producer <- producers ) {
+      producer.unbind(consumers.toList ::: durable_subscriptions.toList)
+    }
+    producers.clear
+    check_idle
+  }
+
 }

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index?rev=1158764&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index Wed Aug 17 15:17:17 2011
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.openwire.DestinationAdvisoryRouterListenerFactory
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1158764&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Wed Aug 17 15:17:17 2011
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import command._
+import org.apache.activemq.apollo.dto.DestinationDTO
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import collection.mutable.HashMap
+import DestinationConverter._
+import support.advisory.AdvisorySupport
+import scala.util.continuations._
+import org.apache.activemq.apollo.util._
+import java.util.Map.Entry
+import org.apache.activemq.apollo.broker._
+import org.fusesource.hawtdispatch._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object DestinationAdvisoryRouterListenerFactory extends RouterListenerFactory.Provider {
+  def create(router: Router) = new DestinationAdvisoryRouterListener(router)
+}
+
+object DestinationAdvisoryRouterListener extends Log {
+  final val ID_GENERATOR = new IdGenerator
+}
+
+/**
+ * <p>
+ *   A listener to Route events which implements Destination advisories
+ *   which are needed
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DestinationAdvisoryRouterListener(router: Router) extends RouterListener {
+
+  import DestinationAdvisoryRouterListener._
+
+  final val destination_advisories = HashMap[ActiveMQDestination, Delivery]()
+  final val advisoryProducerId = new ProducerId
+  final val messageIdGenerator = new LongSequenceGenerator
+
+  advisoryProducerId.setConnectionId(ID_GENERATOR.generateId)
+
+
+  class ProducerRoute extends DeliveryProducerRoute(router) {
+    val sink_switcher = new MutableSink[Delivery]
+    val overflow_sink = new OverflowSink(sink_switcher)
+
+    override protected def on_connected = {
+      sink_switcher.downstream = Some(this)
+    }
+
+    override def dispatch_queue = router.virtual_host.dispatch_queue
+  }
+
+  var producerRoutes = new LRUCache[List[DestinationDTO], ProducerRoute](10) {
+    override def onCacheEviction(eldest: Entry[List[DestinationDTO], ProducerRoute]) = {
+      router.disconnect(eldest.getKey.toArray, eldest.getValue)
+    }
+  }
+
+  def on_create(dest: DomainDestination, security: SecurityContext) = {
+    val ow_destination = to_activemq_destination(Array(dest.destination_dto))
+    if (!AdvisorySupport.isAdvisoryTopic(ow_destination)) {
+      destination_advisories.getOrElseUpdate(ow_destination, {
+        var info = new DestinationInfo(null, DestinationInfo.ADD_OPERATION_TYPE, ow_destination)
+        val topic = AdvisorySupport.getDestinationAdvisoryTopic(ow_destination);
+        val advisory = create_advisory_delivery(topic, info)
+        send(advisory)
+        advisory
+      })
+    }
+  }
+
+  def on_destroy(dest: DomainDestination, security: SecurityContext) = {
+    val destination = to_activemq_destination(Array(dest.destination_dto))
+    if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+      for (info <- destination_advisories.remove(destination)) {
+        var info = new DestinationInfo(null, DestinationInfo.REMOVE_OPERATION_TYPE, destination)
+        val topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
+        send(create_advisory_delivery(topic, info));
+      }
+    }
+  }
+
+  def on_bind(dest: DomainDestination, consumer: DeliveryConsumer, security: SecurityContext) = {
+    val destination = to_activemq_destination(Array(dest.destination_dto))
+    if (AdvisorySupport.isDestinationAdvisoryTopic(destination) && !destination_advisories.isEmpty) {
+      // replay the destination advisories..
+      val producer = new ProducerRoute {
+        override def on_connected = {
+          overflow_sink.refiller = ^{
+            // once the sink is not overflowed.. then we can disconnect
+            if(!overflow_sink.overflowed) {
+              unbind(consumer::Nil)
+              overflow_sink.refiller = NOOP
+            }
+          }
+          overflow_sink.refiller.run()
+          super.on_connected
+        }
+      }
+      producer.bind(consumer::Nil)
+      producer.connected()
+      for( info <- destination_advisories.values ) {
+        producer.overflow_sink.offer(info)
+      }
+    }
+  }
+
+  def on_unbind(dest: DomainDestination, consumer: DeliveryConsumer, persistent: Boolean) = {
+  }
+  def on_connect(dest: DomainDestination, producer: BindableDeliveryProducer, security: SecurityContext) = {
+  }
+  def on_disconnect(dest: DomainDestination, producer: BindableDeliveryProducer) = {
+  }
+
+
+  def close = {
+    import collection.JavaConversions._
+    for (entry <- producerRoutes.entrySet()) {
+      router.disconnect(entry.getKey.toArray, entry.getValue)
+    }
+    producerRoutes.clear
+  }
+
+  def create_advisory_delivery(topic: ActiveMQTopic, command: Command) = {
+
+    // advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
+    // val id = getBrokerId() != null ? getBrokerId().getValue(): "NOT_SET";
+    // advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
+    val message = new ActiveMQMessage()
+    message.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, "NOT_SET");
+
+    //    val url = getBrokerService().getVmConnectorURI().toString();
+    //    if (getBrokerService().getDefaultSocketURIString() != null) {
+    //      url = getBrokerService().getDefaultSocketURIString();
+    //    }
+    //    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
+
+    //set the data structure
+    message.setDataStructure(command);
+    message.setPersistent(false);
+    message.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+    message.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
+//    message.setTargetConsumerId(targetConsumerId);
+    message.setDestination(topic);
+    message.setResponseRequired(false);
+    message.setProducerId(advisoryProducerId);
+
+    val delivery = new Delivery
+    delivery.message = new OpenwireMessage(message)
+    delivery.size = message.getSize
+    delivery
+  }
+
+  def send(delivery:Delivery): Unit = {
+    val message = delivery.message.asInstanceOf[OpenwireMessage].message
+    val dest: Array[DestinationDTO] = to_destination_dto(message.getDestination)
+    val key = dest.toList
+
+    val route = producerRoutes.get(key) match {
+      case null =>
+        // create the producer route...
+        val route = new ProducerRoute
+        producerRoutes.put(key, route)
+        reset {
+          val rc = router.connect(dest, route, null)
+          rc match {
+            case Some(failure) =>
+              warn("Could not connect to advisory topic: " + message.getDestination)
+            case None =>
+          }
+        }
+        route
+
+      case route => route
+    }
+    route.overflow_sink.offer(delivery)
+  }
+
+}
+

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1158764&r1=1158763&r2=1158764&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Aug 17 15:17:17 2011
@@ -724,6 +724,28 @@ class OpenwireProtocolHandler extends Pr
 
   class ConsumerContext(val parent: SessionContext, val info: ConsumerInfo) extends BaseRetained with DeliveryConsumer {
 
+//  The following comes in handy if we need to debug the
+//  reference counts of the consumers.
+//    val r = new BaseRetained
+//
+//    def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
+//    def retained: Int =r.retained
+//
+//    def printST(name:String) = {
+//      val e = new Exception
+//      println(name+": "+connection.map(_.id))
+//      println("  "+e.getStackTrace.drop(1).take(4).mkString("\n  "))
+//    }
+//
+//    def retain: Unit = {
+//      printST("retain")
+//      r.retain
+//    }
+//    def release: Unit = {
+//      printST("release")
+//      r.release
+//    }
+
     var selector_expression:BooleanExpression = _
     var destination:Array[DestinationDTO] = _
 

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala?rev=1158764&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala Wed Aug 17 15:17:17 2011
@@ -0,0 +1,80 @@
+package org.apache.activemq.apollo.openwire
+
+import javax.jms._
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+class TempDestinationTest extends OpenwireTestSupport {
+
+  test("Temp Queue Destinations") {
+    test_temp_destination((session:Session) => session.createTemporaryQueue())
+  }
+
+  test("Temp Topic Destinations") {
+    test_temp_destination((session:Session) => session.createTemporaryTopic())
+  }
+
+  def test_temp_destination(func:(Session)=>Destination) = {
+    connect()
+
+    val connection2 = connect(true)
+    val session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val dest = func(session);
+    val consumer = session.createConsumer(dest)
+
+    val producer2 = session2.createProducer(dest)
+
+    def put(id:Int) = producer2.send(session.createTextMessage("message:"+id))
+    def get(id:Int) = {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(dest)
+      m.getText should equal ("message:"+id)
+    }
+
+    Thread.sleep(1000);
+    List(1,2,3).foreach(put _)
+    List(1,2,3).foreach(get _)
+
+    // A different connection should not be able to consume from it.
+    try {
+      session2.createConsumer(dest)
+      fail("expected jms exception")
+    } catch {
+      case e:JMSException => println(e)
+    }
+
+    // delete the temporary destination.
+    consumer.close()
+    dest match {
+      case dest:TemporaryQueue=> dest.delete()
+      case dest:TemporaryTopic=> dest.delete()
+    }
+
+    // The producer should no longer be able to send to it.
+    Thread.sleep(1000);
+    try {
+      put(4)
+      fail("expected jms exception")
+    } catch {
+      case e:JMSException => println(e)
+    }
+
+  }
+
+}
\ No newline at end of file