You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/08/17 17:17:18 UTC
svn commit: r1158764 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/
apollo-openwire/src/main/scala/org/apache/active...
Author: chirino
Date: Wed Aug 17 15:17:17 2011
New Revision: 1158764
URL: http://svn.apache.org/viewvc?rev=1158764&view=rev
Log:
Openwire Temp Destinations now working. Topics can now be deleted.
Added:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1158764&r1=1158763&r2=1158764&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed Aug 17 15:17:17 2011
@@ -36,8 +36,15 @@ import collection.{Iterable, JavaConvers
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait RouterListener {
- def on_create(path:Path, destination:DestinationDTO, security:SecurityContext)
- def on_destroy(path:Path, destination:DestinationDTO, security:SecurityContext)
+ def on_create(destination:DomainDestination, security:SecurityContext)
+ def on_destroy(destination:DomainDestination, security:SecurityContext)
+
+ def on_connect(destination:DomainDestination, producer:BindableDeliveryProducer, security:SecurityContext)
+ def on_disconnect(destination:DomainDestination, producer:BindableDeliveryProducer)
+
+ def on_bind(destination:DomainDestination, consumer:DeliveryConsumer, security:SecurityContext)
+ def on_unbind(destination:DomainDestination, consumer:DeliveryConsumer, persistent:Boolean)
+
def close
}
/**
@@ -217,11 +224,14 @@ class LocalRouter(val virtual_host:Virtu
def can_bind_one(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Boolean
def can_bind_all(path:Path, destination:DestinationDTO, consumer:DeliveryConsumer, security:SecurityContext):Option[String] = {
+ if( security==null ) {
+ return None
+ }
// Only allow the owner to bind.
if( destination.temp_owner != null ) {
- for( connection <- consumer.connection) {
- if( connection.id != destination.temp_owner.longValue() ) {
+ for( connection <- security.connection_id) {
+ if( connection != destination.temp_owner.longValue() ) {
return Some("Not authorized to receive from the temporary destination.")
}
}
@@ -257,6 +267,9 @@ class LocalRouter(val virtual_host:Virtu
matches.foreach { dest=>
if( can_bind_one(path, destination, consumer, security) ) {
dest.bind(destination, consumer)
+ for( l <- router_listeners) {
+ l.on_bind(dest, consumer, security)
+ }
}
}
consumer.retain
@@ -268,16 +281,12 @@ class LocalRouter(val virtual_host:Virtu
if( consumers_by_path.remove(path, new ConsumerContext(destination, consumer, null) ) ) {
get_destination_matches(path).foreach{ dest=>
dest.unbind(consumer, persistent)
+ for( l <- router_listeners) {
+ l.on_unbind(dest, consumer, persistent)
+ }
}
consumer.release
}
-
-// if( persistent ) {
-// destroy_queue(consumer.binding, security_context).failure_option.foreach{ reason=>
-// async_die(reason)
-// }
-// }
-
}
def can_connect_one(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Boolean
@@ -322,6 +331,9 @@ class LocalRouter(val virtual_host:Virtu
get_destination_matches(path).foreach { dest=>
if( can_connect_one(path, destination, producer, security) ) {
dest.connect(destination, producer)
+ for( l <- router_listeners) {
+ l.on_connect(dest, producer, security)
+ }
}
}
producers_by_path.put(path, new ProducerContext(destination, producer, security))
@@ -332,6 +344,9 @@ class LocalRouter(val virtual_host:Virtu
producers_by_path.remove(path, new ProducerContext(destination, producer, null))
get_destination_matches(path).foreach { dest=>
dest.disconnect(producer)
+ for( l <- router_listeners) {
+ l.on_disconnect(dest, producer)
+ }
}
}
@@ -380,6 +395,9 @@ class LocalRouter(val virtual_host:Virtu
// Connects a producer directly to a durable subscription..
durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
dest.connect(destination, producer)
+ for( l <- router_listeners) {
+ l.on_connect(dest, producer, security)
+ }
}
case _ => super.connect(path, destination, producer, security)
@@ -391,14 +409,29 @@ class LocalRouter(val virtual_host:Virtu
case destination:DurableSubscriptionDestinationDTO =>
durable_subscriptions_by_id.get(destination.subscription_id).foreach { dest=>
dest.disconnect(producer)
+ for( l <- router_listeners) {
+ l.on_disconnect(dest, producer)
+ }
}
case _ => super.disconnect(destination, producer)
}
}
def can_destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Option[String] = {
+ if( security == null ) {
+ return None
+ }
+
+ if( destination.temp_owner != null ) {
+ for( connection <- security.connection_id) {
+ if( connection != destination.temp_owner.longValue() ) {
+ return Some("Not authorized to destroy the temporary destination.")
+ }
+ }
+ }
+
val matches = get_destination_matches(path)
- val rc = matches.foldLeft(None:Option[String]) { case (rc,dest) =>
+ matches.foldLeft(None:Option[String]) { case (rc,dest) =>
rc.orElse {
if( virtual_host.authorizer!=null && security!=null && !virtual_host.authorizer.can_destroy(security, virtual_host, dest.config)) {
Some("Not authorized to destroy topic: %s".format(dest.id))
@@ -407,19 +440,36 @@ class LocalRouter(val virtual_host:Virtu
}
}
}
-
- // TODO: destroy not yet supported on topics.. Need to disconnect all
- // clients and destroy remove any durable subs on the topic.
- Some("Topic destroy not yet implemented.")
}
def destroy_destination(path:Path, destination: DestinationDTO, security: SecurityContext): Unit = {
val matches = get_destination_matches(path)
matches.foreach { dest =>
for( l <- router_listeners) {
- l.on_destroy(path, destination, security)
+ l.on_destroy(dest, security)
}
-// remove_destination(dest.path, dest)
+
+ // Disconnect the producers.
+ dest.disconnect_producers
+
+ // Delete the durable subs which
+ for( queue <- dest.durable_subscriptions ) {
+ // we delete the durable sub if it's not wildcard'ed
+ if( !PathParser.containsWildCards(queue.binding.destination) ) {
+ _destroy_queue(queue.id, null)
+ }
+ }
+
+ for( consumer <- dest.consumers ) {
+ consumer match {
+ case queue:Queue =>
+ // Delete any attached queue consumers..
+ _destroy_queue(queue.id, null)
+ }
+ }
+
+ // Un-register the topic.
+ remove_destination(path, dest)
}
}
@@ -451,7 +501,7 @@ class LocalRouter(val virtual_host:Virtu
add_destination(path, topic)
for( l <- router_listeners) {
- l.on_create(path, destination, security)
+ l.on_create(topic, security)
}
Success(topic)
}
@@ -545,6 +595,10 @@ class LocalRouter(val virtual_host:Virtu
queue.dispatch_queue.setTargetQueue(consumer.dispatch_queue)
queue.bind(destination, consumer)
+ for( l <- router_listeners) {
+ l.on_bind(queue, consumer, security)
+ }
+
case _ =>
super.bind(path, destination, consumer, security)
}
@@ -558,6 +612,9 @@ class LocalRouter(val virtual_host:Virtu
if( persistent ) {
_destroy_queue(queue, security)
}
+ for( l <- router_listeners) {
+ l.on_unbind(queue, consumer, persistent)
+ }
}
case _ =>
super.unbind( destination, consumer, persistent, security)
@@ -711,7 +768,7 @@ class LocalRouter(val virtual_host:Virtu
val matches = get_destination_matches(path)
matches.foreach { queue =>
for( l <- router_listeners) {
- l.on_destroy(queue.binding.destination, queue.binding.binding_dto, security)
+ l.on_destroy(queue, security)
}
_destroy_queue(queue)
}
@@ -736,7 +793,11 @@ class LocalRouter(val virtual_host:Virtu
val binding = QueueDomainQueueBinding.create(dto)
val config = binding.config(virtual_host)
if( can_create_queue(config, security) ) {
- Success(_create_queue(binding))
+ var queue = _create_queue(binding)
+ for( l <- router_listeners) {
+ l.on_create(queue, security)
+ }
+ Success(queue)
} else {
Failure("Not authorized to create the queue")
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1158764&r1=1158763&r2=1158764&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Aug 17 15:17:17 2011
@@ -47,7 +47,7 @@ class Queue(val router: LocalRouter, val
def id = binding.id
- override def toString: String = id
+ override def toString = binding.destination.toString
def virtual_host = router.virtual_host
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1158764&r1=1158763&r2=1158764&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed Aug 17 15:17:17 2011
@@ -49,6 +49,8 @@ class Topic(val router:LocalRouter, val
import OptionSupport._
+ override def toString = destination_dto.toString
+
def virtual_host: VirtualHost = router.virtual_host
def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
@@ -207,4 +209,12 @@ class Topic(val router:LocalRouter, val
check_idle
}
+ def disconnect_producers:Unit ={
+ for( producer <- producers ) {
+ producer.unbind(consumers.toList ::: durable_subscriptions.toList)
+ }
+ producers.clear
+ check_idle
+ }
+
}
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index?rev=1158764&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/resources/META-INF/services/org.apache.activemq.apollo/router-listener-factory.index Wed Aug 17 15:17:17 2011
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+org.apache.activemq.apollo.openwire.DestinationAdvisoryRouterListenerFactory
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1158764&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Wed Aug 17 15:17:17 2011
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import command._
+import org.apache.activemq.apollo.dto.DestinationDTO
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import collection.mutable.HashMap
+import DestinationConverter._
+import support.advisory.AdvisorySupport
+import scala.util.continuations._
+import org.apache.activemq.apollo.util._
+import java.util.Map.Entry
+import org.apache.activemq.apollo.broker._
+import org.fusesource.hawtdispatch._
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object DestinationAdvisoryRouterListenerFactory extends RouterListenerFactory.Provider {
+ def create(router: Router) = new DestinationAdvisoryRouterListener(router)
+}
+
+object DestinationAdvisoryRouterListener extends Log {
+ final val ID_GENERATOR = new IdGenerator
+}
+
+/**
+ * <p>
+ * A listener to Route events which implements Destination advisories
+ * which are needed
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DestinationAdvisoryRouterListener(router: Router) extends RouterListener {
+
+ import DestinationAdvisoryRouterListener._
+
+ final val destination_advisories = HashMap[ActiveMQDestination, Delivery]()
+ final val advisoryProducerId = new ProducerId
+ final val messageIdGenerator = new LongSequenceGenerator
+
+ advisoryProducerId.setConnectionId(ID_GENERATOR.generateId)
+
+
+ class ProducerRoute extends DeliveryProducerRoute(router) {
+ val sink_switcher = new MutableSink[Delivery]
+ val overflow_sink = new OverflowSink(sink_switcher)
+
+ override protected def on_connected = {
+ sink_switcher.downstream = Some(this)
+ }
+
+ override def dispatch_queue = router.virtual_host.dispatch_queue
+ }
+
+ var producerRoutes = new LRUCache[List[DestinationDTO], ProducerRoute](10) {
+ override def onCacheEviction(eldest: Entry[List[DestinationDTO], ProducerRoute]) = {
+ router.disconnect(eldest.getKey.toArray, eldest.getValue)
+ }
+ }
+
+ def on_create(dest: DomainDestination, security: SecurityContext) = {
+ val ow_destination = to_activemq_destination(Array(dest.destination_dto))
+ if (!AdvisorySupport.isAdvisoryTopic(ow_destination)) {
+ destination_advisories.getOrElseUpdate(ow_destination, {
+ var info = new DestinationInfo(null, DestinationInfo.ADD_OPERATION_TYPE, ow_destination)
+ val topic = AdvisorySupport.getDestinationAdvisoryTopic(ow_destination);
+ val advisory = create_advisory_delivery(topic, info)
+ send(advisory)
+ advisory
+ })
+ }
+ }
+
+ def on_destroy(dest: DomainDestination, security: SecurityContext) = {
+ val destination = to_activemq_destination(Array(dest.destination_dto))
+ if (!AdvisorySupport.isAdvisoryTopic(destination)) {
+ for (info <- destination_advisories.remove(destination)) {
+ var info = new DestinationInfo(null, DestinationInfo.REMOVE_OPERATION_TYPE, destination)
+ val topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
+ send(create_advisory_delivery(topic, info));
+ }
+ }
+ }
+
+ def on_bind(dest: DomainDestination, consumer: DeliveryConsumer, security: SecurityContext) = {
+ val destination = to_activemq_destination(Array(dest.destination_dto))
+ if (AdvisorySupport.isDestinationAdvisoryTopic(destination) && !destination_advisories.isEmpty) {
+ // replay the destination advisories..
+ val producer = new ProducerRoute {
+ override def on_connected = {
+ overflow_sink.refiller = ^{
+ // once the sink is not overflowed.. then we can disconnect
+ if(!overflow_sink.overflowed) {
+ unbind(consumer::Nil)
+ overflow_sink.refiller = NOOP
+ }
+ }
+ overflow_sink.refiller.run()
+ super.on_connected
+ }
+ }
+ producer.bind(consumer::Nil)
+ producer.connected()
+ for( info <- destination_advisories.values ) {
+ producer.overflow_sink.offer(info)
+ }
+ }
+ }
+
+ def on_unbind(dest: DomainDestination, consumer: DeliveryConsumer, persistent: Boolean) = {
+ }
+ def on_connect(dest: DomainDestination, producer: BindableDeliveryProducer, security: SecurityContext) = {
+ }
+ def on_disconnect(dest: DomainDestination, producer: BindableDeliveryProducer) = {
+ }
+
+
+ def close = {
+ import collection.JavaConversions._
+ for (entry <- producerRoutes.entrySet()) {
+ router.disconnect(entry.getKey.toArray, entry.getValue)
+ }
+ producerRoutes.clear
+ }
+
+ def create_advisory_delivery(topic: ActiveMQTopic, command: Command) = {
+
+ // advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
+ // val id = getBrokerId() != null ? getBrokerId().getValue(): "NOT_SET";
+ // advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
+ val message = new ActiveMQMessage()
+ message.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, "NOT_SET");
+
+ // val url = getBrokerService().getVmConnectorURI().toString();
+ // if (getBrokerService().getDefaultSocketURIString() != null) {
+ // url = getBrokerService().getDefaultSocketURIString();
+ // }
+ // advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
+
+ //set the data structure
+ message.setDataStructure(command);
+ message.setPersistent(false);
+ message.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+ message.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
+// message.setTargetConsumerId(targetConsumerId);
+ message.setDestination(topic);
+ message.setResponseRequired(false);
+ message.setProducerId(advisoryProducerId);
+
+ val delivery = new Delivery
+ delivery.message = new OpenwireMessage(message)
+ delivery.size = message.getSize
+ delivery
+ }
+
+ def send(delivery:Delivery): Unit = {
+ val message = delivery.message.asInstanceOf[OpenwireMessage].message
+ val dest: Array[DestinationDTO] = to_destination_dto(message.getDestination)
+ val key = dest.toList
+
+ val route = producerRoutes.get(key) match {
+ case null =>
+ // create the producer route...
+ val route = new ProducerRoute
+ producerRoutes.put(key, route)
+ reset {
+ val rc = router.connect(dest, route, null)
+ rc match {
+ case Some(failure) =>
+ warn("Could not connect to advisory topic: " + message.getDestination)
+ case None =>
+ }
+ }
+ route
+
+ case route => route
+ }
+ route.overflow_sink.offer(delivery)
+ }
+
+}
+
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1158764&r1=1158763&r2=1158764&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed Aug 17 15:17:17 2011
@@ -724,6 +724,28 @@ class OpenwireProtocolHandler extends Pr
class ConsumerContext(val parent: SessionContext, val info: ConsumerInfo) extends BaseRetained with DeliveryConsumer {
+// The following comes in handy if we need to debug the
+// reference counts of the consumers.
+// val r = new BaseRetained
+//
+// def setDisposer(p1: Runnable): Unit = r.setDisposer(p1)
+// def retained: Int =r.retained
+//
+// def printST(name:String) = {
+// val e = new Exception
+// println(name+": "+connection.map(_.id))
+// println(" "+e.getStackTrace.drop(1).take(4).mkString("\n "))
+// }
+//
+// def retain: Unit = {
+// printST("retain")
+// r.retain
+// }
+// def release: Unit = {
+// printST("release")
+// r.release
+// }
+
var selector_expression:BooleanExpression = _
var destination:Array[DestinationDTO] = _
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala?rev=1158764&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TempDestinationTest.scala Wed Aug 17 15:17:17 2011
@@ -0,0 +1,80 @@
+package org.apache.activemq.apollo.openwire
+
+import javax.jms._
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+class TempDestinationTest extends OpenwireTestSupport {
+
+ test("Temp Queue Destinations") {
+ test_temp_destination((session:Session) => session.createTemporaryQueue())
+ }
+
+ test("Temp Topic Destinations") {
+ test_temp_destination((session:Session) => session.createTemporaryTopic())
+ }
+
+ def test_temp_destination(func:(Session)=>Destination) = {
+ connect()
+
+ val connection2 = connect(true)
+ val session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val dest = func(session);
+ val consumer = session.createConsumer(dest)
+
+ val producer2 = session2.createProducer(dest)
+
+ def put(id:Int) = producer2.send(session.createTextMessage("message:"+id))
+ def get(id:Int) = {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(dest)
+ m.getText should equal ("message:"+id)
+ }
+
+ Thread.sleep(1000);
+ List(1,2,3).foreach(put _)
+ List(1,2,3).foreach(get _)
+
+ // A different connection should not be able to consume from it.
+ try {
+ session2.createConsumer(dest)
+ fail("expected jms exception")
+ } catch {
+ case e:JMSException => println(e)
+ }
+
+ // delete the temporary destination.
+ consumer.close()
+ dest match {
+ case dest:TemporaryQueue=> dest.delete()
+ case dest:TemporaryTopic=> dest.delete()
+ }
+
+ // The producer should no longer be able to send to it.
+ Thread.sleep(1000);
+ try {
+ put(4)
+ fail("expected jms exception")
+ } catch {
+ case e:JMSException => println(e)
+ }
+
+ }
+
+}
\ No newline at end of file