You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/28 18:27:00 UTC

svn commit: r1366705 - in /activemq/activemq-apollo/trunk: apollo-broker/src/test/scala/ apollo-openwire/ apollo-openwire/src/test/resources/ apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/

Author: chirino
Date: Sat Jul 28 16:26:59 2012
New Revision: 1366705

URL: http://svn.apache.org/viewvc?rev=1366705&view=rev
Log:
Converted the openwire tests so that they can be run in parallel.

Added:
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/DestinationWildcardTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/DurableSubscriberTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/ExclusiveConsumerTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/QueueTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TempDestinationTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TopicTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Sat Jul 28 16:26:59 2012
@@ -119,8 +119,9 @@ trait BrokerParallelTestExecution extend
 
   override def newInstance = {
     val rc = super.newInstance.asInstanceOf[BrokerFunSuiteSupport]
-    rc.broker = broker
-    rc.port = port
+    rc.before_and_after_all_object = self
+    rc.broker = self.broker
+    rc.port = self.port
     rc
   }
 
@@ -133,6 +134,7 @@ trait BrokerParallelTestExecution extend
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class BrokerFunSuiteSupport extends FunSuiteSupport with Logging { // with ShouldMatchers with BeforeAndAfterEach with Logging {
+  var before_and_after_all_object:BrokerFunSuiteSupport = _
   var broker: Broker = null
   var port = 0
 
@@ -146,19 +148,26 @@ class BrokerFunSuiteSupport extends FunS
 
   override def beforeAll() = {
     super.beforeAll()
-    try {
-      broker = createBroker
-      broker.setTmp(test_data_dir / "tmp")
-      broker.getTmp().mkdirs()
-      ServiceControl.start(broker)
-      port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
-    } catch {
-      case e: Throwable => e.printStackTrace
+    if( before_and_after_all_object==null ) {
+      try {
+        broker = createBroker
+        broker.setTmp(test_data_dir / "tmp")
+        broker.getTmp().mkdirs()
+        ServiceControl.start(broker)
+        port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
+      } catch {
+        case e: Throwable => e.printStackTrace
+      }
+    } else {
+      broker = before_and_after_all_object.broker
+      port = before_and_after_all_object.port
     }
   }
 
   override def afterAll() = {
-    ServiceControl.stop(broker)
+    if( before_and_after_all_object==null ) {
+      ServiceControl.stop(broker)
+    }
     super.afterAll()
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/pom.xml?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/pom.xml Sat Jul 28 16:26:59 2012
@@ -195,16 +195,6 @@
           </execution>
         </executions>
       </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>${maven-surefire-plugin-version}</version>
-        <configuration>
-          <parallel>classes</parallel>
-          <perCoreThreadCount>false</perCoreThreadCount>
-          <threadCount>1</threadCount> 
-        </configuration>
-      </plugin>
     </plugins>
   </build>
   

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml Sat Jul 28 16:26:59 2012
@@ -23,7 +23,7 @@
 
     <queue id="mirrored.**" mirrored="true"/>
 
-    <bdb_store directory="${basedir}/target/test-data"/>
+    <bdb_store directory="${testdatadir}"/>
   </virtual_host>
 
   <web_admin bind="http://0.0.0.0:0"/>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml Sat Jul 28 16:26:59 2012
@@ -23,7 +23,7 @@
 
     <queue id="mirrored.**" mirrored="true"/>
 
-    <leveldb_store directory="${basedir}/target/test-data"/>
+    <leveldb_store directory="${testdatadir}"/>
   </virtual_host>
 
   <web_admin bind="http://0.0.0.0:0"/>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml Sat Jul 28 16:26:59 2012
@@ -19,46 +19,36 @@
 
   <authentication domain="OpenwireSecurityTest"/>
 
+  <access_rule allow="admin" action="*"/>
+  <access_rule allow="*" kind="topic" id="ActiveMQ.Advisory.**" action="receive create"/>
+
+  <access_rule allow="connect_group" action="connect"/>
+  <access_rule allow="can_send_create_consume_queue" kind="queue" action="send create consume"/>
+  <access_rule allow="can_send_create_queue" kind="queue" action="send create"/>
+  <access_rule allow="can_send_queue"        kind="queue" action="send"/>
+  <access_rule allow="can_receive_queue"     kind="queue" action="receive"/>
+  <access_rule allow="can_consume_queue"     kind="queue" action="consume"/>
+  <access_rule allow="can_send_create_topic" kind="topic" action="send create"/>
+  <access_rule allow="can_send_topic"        kind="topic" action="send"/>
+  <access_rule allow="can_recieve_topic"     kind="topic" action="receive"/>
+  <access_rule allow="can_consume_create_ds" kind="dsub"  action="consume create"/>
+  <access_rule allow="can_consume_ds"        kind="dsub"  action="consume"/>
+  <access_rule allow="can_recieve_topic"     kind="dsub"  action="receive"/>
+
+  <access_rule allow="guest" action="connect"/>
+  <access_rule allow="guest" action="create destroy send receive consume" kind="topic queue dsub" id_regex="test.*"/>
+
+  <!-- only allow connects over the tcp2 connector -->
+  <access_rule allow="connector_restricted" action="connect" connector="tcp2"/>
+
   <virtual_host id="default">
     <host_name>localhost</host_name>
-
-    <acl>
-      <connect allow="connect_group"/>
-    </acl>
-
-    <!-- queue security -->
-    <queue id="**" kind="ptp">
-      <acl>
-        <create  allow="can_send_create_queue"/>
-        <send    allow="can_send_create_queue"/>
-        <send    allow="can_send_queue"/>
-        <receive allow="can_receive_queue"/>
-        <consume allow="can_consume_queue"/>
-      </acl>
-    </queue>
-
-    <!-- topic security -->
-    <destination id="**">
-      <acl>
-        <create  allow="can_send_create_topic"/>
-        <send    allow="can_send_create_topic"/>
-        <send    allow="can_send_topic"/>
-        <receive allow="can_recieve_topic"/>
-      </acl>
-    </destination>
-
-    <!-- durable sub security -->
-    <queue id="**" kind="ds">
-      <acl>
-        <create  allow="can_consume_create_ds"/>
-        <consume allow="can_consume_create_ds"/>
-        <consume allow="can_consume_ds"/>
-      </acl>
-    </queue>
   </virtual_host>
 
-  <web_admin bind="http://0.0.0.0:0"/>
+  <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0">
   </connector>
+  <connector id="tcp2" bind="tcp://0.0.0.0:0">
+  </connector>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties Sat Jul 28 16:26:59 2012
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+admin=admin
 connect_group=CN=ssl_user|can_only_connect|can_send_create_queue|can_send_queue|can_receive_queue|can_consume_queue|can_send_create_topic|can_send_topic|can_recieve_topic|can_consume_create_ds|can_consume_ds
 
 can_not_connect=can_not_connect

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala?rev=1366705&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala Sat Jul 28 16:26:59 2012
@@ -0,0 +1,8 @@
+package org.apache.activemq.apollo.openwire.test
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class OpenwireBDBParallelTest extends OpenwireParallelTest {
+  override def broker_config_uri = "xml:classpath:apollo-openwire-bdb.xml"
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala?rev=1366705&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala Sat Jul 28 16:26:59 2012
@@ -0,0 +1,55 @@
+package org.apache.activemq.apollo.openwire.test
+
+import javax.jms.{DeliveryMode, Session}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+class OpenwireLevelDBParallelTest extends OpenwireParallelTest {
+  override def broker_config_uri = "xml:classpath:apollo-openwire-leveldb.xml"
+
+  test("Queue Prefetch and Client Ack") {
+
+    connect("?jms.useAsyncSend=true")
+    var dest = queue(next_id("prefetch"))
+
+    val session = default_connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
+    val producer = session.createProducer(dest)
+    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
+    def put(id: Int) {
+      val msg = session.createBytesMessage()
+      msg.writeBytes(new Array[Byte](1024 * 4))
+      producer.send(msg)
+    }
+
+    for (i <- 1 to 1000) {
+      put(i)
+    }
+
+    val consumer = session.createConsumer(dest)
+    def get(id: Int) {
+      val m = consumer.receive()
+      expect(true, "Did not get message: " + id)(m != null)
+    }
+    for (i <- 1 to 1000) {
+      get(i)
+    }
+    default_connection.close()
+    default_connection = null
+
+    // All those messages should get redelivered since they were not previously
+    // acked.
+    connect()
+    val session2 = default_connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
+    val consumer2 = session2.createConsumer(dest)
+    def get2(id: Int) {
+      val m = consumer2.receive()
+      expect(true, "Did not get message: " + id)(m != null)
+    }
+    for (i <- 1 to 1000) {
+      get2(i)
+    }
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala?rev=1366705&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala Sat Jul 28 16:26:59 2012
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.test
+
+import javax.jms._
+import org.apache.activemq.apollo.broker.BrokerParallelTestExecution
+
+class OpenwireParallelTest extends OpenwireTestSupport with BrokerParallelTestExecution {
+
+  def path_separator = "."
+
+  test("Topic /w Durable sub retains messages.") {
+
+    val dest = topic(next_id("example"))
+
+    def create_durable_sub() {
+      val connection = connect("", false)
+      connection.setClientID("test")
+      connection.start()
+      val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+      val subscriber = session.createDurableSubscriber(dest, "test")
+      session.close()
+      connection.close()
+      if (default_connection == connection) {
+        default_connection = null
+      }
+    }
+
+    create_durable_sub()
+
+    connect("", false)
+    default_connection.setClientID("test")
+    default_connection.start()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(dest)
+    def put(id: Int) {
+      producer.send(session.createTextMessage("message:" + id))
+    }
+
+    List(1, 2, 3).foreach(put _)
+
+    val subscriber = session.createDurableSubscriber(dest, "test")
+
+    def get(id: Int) {
+      val m = subscriber.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(dest)
+      m.getText should equal("message:" + id)
+    }
+
+    List(1, 2, 3).foreach(get _)
+  }
+
+
+  test("Wildcard subscription") {
+    connect()
+
+    val common_prefix = next_id()
+    val prefix = common_prefix + path_separator
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer1 = session.createProducer(queue(prefix + "A"))
+    val producer2 = session.createProducer(queue(prefix + "B"))
+    val producer3 = session.createProducer(queue(common_prefix+ "bar.A"))
+    def put(producer: MessageProducer, id: Int) {
+      producer.send(session.createTextMessage("message:" + id))
+    }
+
+    val consumer = session.createConsumer(queue(prefix + "*"))
+    def get(id: Int) {
+      receive_text(consumer) should equal("message:" + id)
+    }
+
+    List(1, 2, 3).foreach(put(producer1, _))
+    List(1, 2, 3).foreach(get _)
+
+    producer3.send(session.createTextMessage("This one shouldn't get consumed."))
+
+    List(1, 2, 3).foreach(put(producer2, _))
+    List(1, 2, 3).foreach(get _)
+
+    put(producer1, -1)
+    get(-1)
+  }
+
+  test("Wildcard subscription with multiple path sections") {
+    connect()
+
+    val common_prefix = next_id()
+    val prefix1 = common_prefix+"foo" + path_separator + "bar" + path_separator
+    val prefix2 = common_prefix+"foo" + path_separator + "bar" + path_separator + "cheeze" + path_separator
+    val prefix3 = common_prefix+"foo" + path_separator + "bar" + path_separator + "cracker" + path_separator
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer1 = session.createProducer(topic(prefix1 + "A"))
+    val producer2 = session.createProducer(topic(prefix2 + "B"))
+    val producer3 = session.createProducer(topic(prefix3 + "C"))
+
+    def put(producer: MessageProducer, id: Int) {
+      producer.send(session.createTextMessage("message:" + id))
+    }
+    def get(consumer: MessageConsumer, id: Int) {
+      receive_text(consumer) should equal("message:" + id)
+    }
+
+    val consumer1 = session.createConsumer(topic(prefix1 + "*"))
+    val consumer2 = session.createConsumer(topic(prefix2 + "*"))
+    val consumer3 = session.createConsumer(topic(prefix3 + "*"))
+
+
+    put(producer1, 1)
+    List(producer1, producer2, producer3).foreach(put(_, -1))
+
+    get(consumer1, 1)
+    List(consumer1, consumer2, consumer3).foreach(get(_, -1))
+
+
+    put(producer2, 2)
+    List(producer1, producer2, producer3).foreach(put(_, -1))
+
+    get(consumer2, 2)
+    List(consumer1, consumer2, consumer3).foreach(get(_, -1))
+
+
+    put(producer3, 3)
+    List(producer1, producer2, producer3).foreach(put(_, -1))
+
+    get(consumer3, 3)
+    List(consumer1, consumer2, consumer3).foreach(get(_, -1))
+
+  }
+
+  test("Exclusive Consumer Selected when created first") {
+
+    val dest_name = next_id("TEST.QUEUE")
+
+    connect()
+
+    val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+    val producer = senderSession.createProducer(queue(dest_name))
+
+    producer.send(senderSession.createTextMessage("Exclusive Consumer Selected when created first - 1"))
+
+    Thread.sleep(100)
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(100) should not be (null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Exclusive Consumer Selected when Created After Non-Exclusive Consumer") {
+    connect()
+
+    val dest_name = next_id("TEST.QUEUE")
+
+    val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+    val exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+    val producer = senderSession.createProducer(queue(dest_name))
+
+    val msg = senderSession.createTextMessage("test")
+    producer.send(msg);
+
+    Thread.sleep(100)
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(100) should not be (null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Failover To Another Exclusive Consumer Created First") {
+
+    connect()
+
+    val dest_name = next_id("TEST.QUEUE")
+
+    val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+    val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+    val producer = senderSession.createProducer(queue(dest_name))
+
+    producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created First - 1"))
+
+    Thread.sleep(100)
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer1.receive(100) should not be (null)
+    exclusiveConsumer2.receive(100) should be(null)
+    fallbackConsumer.receive(100) should be(null)
+
+    // Close the exclusive consumer to verify the non-exclusive consumer
+    // takes over
+    exclusiveConsumer1.close()
+
+    producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created First - 2"))
+    producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created First - 3"))
+
+    exclusiveConsumer2.receive(100) should not be (null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Failover To Another Exclusive Consumer Created After a non-exclusive Consumer") {
+
+    connect()
+
+    val dest_name = next_id("TEST.QUEUE")
+
+    val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+    val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+    val producer = senderSession.createProducer(queue(dest_name))
+
+    producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created After - 1"));
+
+    Thread.sleep(100)
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer1.receive(100) should not be (null)
+    exclusiveConsumer2.receive(100) should be(null)
+    fallbackConsumer.receive(100) should be(null)
+
+    // Close the exclusive consumer to verify the non-exclusive consumer
+    // takes over
+    exclusiveConsumer1.close()
+
+    producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created After - 2"))
+    producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created After - 3"))
+
+    exclusiveConsumer2.receive(100) should not be (null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Failover To NonExclusive Consumer") {
+
+    info("*** Running Test: Failover To NonExclusive Consumer")
+
+    connect()
+    val dest_name = next_id("TEST.QUEUE")
+
+    val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+    val producer = senderSession.createProducer(queue(dest_name))
+
+    producer.send(senderSession.createTextMessage("Failover To NonExclusive Consumer - 1"))
+
+    Thread.sleep(100)
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(100) should not be (null)
+    fallbackConsumer.receive(100) should be(null)
+
+    // Close the exclusive consumer to verify the non-exclusive consumer
+    // takes over
+    exclusiveConsumer.close()
+
+    producer.send(senderSession.createTextMessage("Failover To NonExclusive Consumer - 2"))
+    fallbackConsumer.receive(100) should not be (null)
+    fallbackConsumer.close()
+  }
+
+  test("Fallback To Exclusive Consumer") {
+    connect()
+    val dest_name = next_id("TEST.QUEUE")
+
+    val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    var exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+    val producer = senderSession.createProducer(queue(dest_name))
+
+    producer.send(senderSession.createTextMessage("Fallback To Exclusive Consumer - 1"))
+
+    Thread.sleep(100)
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(200) should not be (null)
+    fallbackConsumer.receive(200) should be(null)
+
+    Thread.sleep(100)
+
+    // Close the exclusive consumer to verify the non-exclusive consumer
+    // takes over
+    exclusiveConsumer.close()
+
+    producer.send(senderSession.createTextMessage("Fallback To Exclusive Consumer - 2"))
+    fallbackConsumer.receive(100) should not be (null)
+
+    // Create exclusive consumer to determine if it will start receiving
+    // the messages.
+    exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+
+    producer.send(senderSession.createTextMessage("Fallback To Exclusive Consumer - 3"))
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(100) should not be (null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Temp Queue Destinations") {
+    test_temp_destination((session: Session) => session.createTemporaryQueue())
+  }
+
+  test("Temp Topic Destinations") {
+    test_temp_destination((session: Session) => session.createTemporaryTopic())
+  }
+
+  def test_temp_destination(func: (Session) => Destination) = {
+    connect()
+
+    val connection2 = connect("", true)
+    val session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val dest = func(session);
+    val consumer = session.createConsumer(dest)
+
+    val producer2 = session2.createProducer(dest)
+
+    def put(id: Int) = producer2.send(session.createTextMessage("message:" + id))
+    def get(id: Int) = {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(dest)
+      m.getText should equal("message:" + id)
+    }
+
+    Thread.sleep(1000);
+    List(1, 2, 3).foreach(put _)
+    List(1, 2, 3).foreach(get _)
+
+    // A different connection should not be able to consume from it.
+    try {
+      session2.createConsumer(dest)
+      fail("expected jms exception")
+    } catch {
+      case e: JMSException => println(e)
+    }
+
+    // delete the temporary destination.
+    consumer.close()
+    dest match {
+      case dest: TemporaryQueue => dest.delete()
+      case dest: TemporaryTopic => dest.delete()
+    }
+
+    // The producer should no longer be able to send to it.
+    Thread.sleep(1000);
+    try {
+      put(4)
+      fail("expected jms exception")
+    } catch {
+      case e: JMSException => println(e)
+    }
+
+  }
+
+  test("Topic drops messages sent before before subscription is established") {
+    connect()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val dest = topic(next_id("example"))
+
+    val producer = session.createProducer(dest)
+    def put(id: Int) {
+      producer.send(session.createTextMessage("message:" + id))
+    }
+
+    put(1)
+
+    val consumer = session.createConsumer(dest)
+
+    put(2)
+    put(3)
+
+    def get(id: Int) {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(dest)
+      m.getText should equal("message:" + id)
+    }
+
+    List(2, 3).foreach(get _)
+  }
+
+  test("Queue Message Cached") {
+
+    connect("?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false")
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val dest = queue(next_id("example"))
+    val producer = session.createProducer(dest)
+    def put(id: Int) {
+      producer.send(session.createTextMessage("message:" + id))
+    }
+
+    List(1, 2, 3).foreach(put _)
+
+    val consumer = session.createConsumer(dest)
+
+    def get(id: Int) {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(dest)
+      m.getText should equal("message:" + id)
+    }
+
+    List(1, 2, 3).foreach(get _)
+  }
+
+  test("Queue order preserved") {
+
+    connect()
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val dest = queue(next_id("example"))
+    val producer = session.createProducer(dest)
+    def put(id: Int) {
+      producer.send(session.createTextMessage("message:" + id))
+    }
+
+    List(1, 2, 3).foreach(put _)
+
+    val consumer = session.createConsumer(dest)
+
+    def get(id: Int) {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(dest)
+      m.getText should equal("message:" + id)
+    }
+
+    List(1, 2, 3).foreach(get _)
+  }
+
+  test("Test that messages are consumed ") {
+
+    connect()
+    var session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val queue = session.createQueue(next_id("test"));
+    val producer = session.createProducer(queue);
+    producer.send(session.createTextMessage("Hello"));
+
+    // Consume the message...
+    var consumer = session.createConsumer(queue)
+    var msg = consumer.receive(1000)
+    msg should not be (null)
+
+    Thread.sleep(1000)
+
+    session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    // Attempt to Consume the message...check if message was acknowledge
+    consumer = session.createConsumer(queue)
+    msg = consumer.receive(1000)
+    msg should be(null)
+
+    session.close()
+  }
+
+  test("Queue and a selector") {
+    connect()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    var dest = queue(next_id("example"))
+    val producer = session.createProducer(dest)
+
+    def put(id: Int, color: String) {
+      val message = session.createTextMessage("message:" + id)
+      message.setStringProperty("color", color)
+      producer.send(message)
+    }
+
+    List((1, "red"), (2, "blue"), (3, "red")).foreach {
+      case (id, color) => put(id, color)
+    }
+
+    val consumer = session.createConsumer(dest, "color='red'")
+
+    def get(id: Int) {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(dest)
+      m.getText should equal("message:" + id)
+    }
+
+    get(1)
+    get(3)
+  }
+
+  test("Receive then Browse and then Receive again") {
+    connect()
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val dest = queue(next_id("BROWSER.TEST.RB"))
+    val producer = session.createProducer(dest)
+    var consumer = session.createConsumer(dest)
+
+    val outbound = List(session.createTextMessage("First Message"),
+      session.createTextMessage("Second Message"),
+      session.createTextMessage("Third Message"))
+
+    // lets consume any outstanding messages from previous test runs
+    while (consumer.receive(1000) != null) {
+    }
+
+    producer.send(outbound(0));
+    producer.send(outbound(1));
+    producer.send(outbound(2));
+
+    consumer.receive(200) should be(outbound(0))
+
+    consumer.close();
+
+    val browser = session.createBrowser(dest)
+    val enumeration = browser.getEnumeration
+
+    // browse the second
+    enumeration.hasMoreElements should be(true)
+    enumeration.nextElement() should be(outbound(1))
+
+    // browse the third.
+    enumeration.hasMoreElements should be(true)
+    enumeration.nextElement() should be(outbound(2))
+
+    // There should be no more.
+    var tooMany = false;
+    while (enumeration.hasMoreElements) {
+      debug("Got extra message: %s", enumeration.nextElement());
+      tooMany = true;
+    }
+    tooMany should be(false)
+    browser.close()
+
+    // Re-open the consumer.
+    consumer = session.createConsumer(dest);
+    // Receive the second.
+    consumer.receive(200) should be(outbound(1))
+    // Receive the third.
+    consumer.receive(200) should be(outbound(2))
+    consumer.close()
+  }
+
+  test("Browse Queue then Receive messages") {
+    connect()
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val dest = queue(next_id("BROWSER.TEST.BR"))
+    val producer = session.createProducer(dest)
+
+    val outbound = List(session.createTextMessage("First Message"),
+      session.createTextMessage("Second Message"),
+      session.createTextMessage("Third Message"))
+
+    producer.send(outbound(0))
+
+    // create browser first
+    val browser = session.createBrowser(dest)
+    val enumeration = browser.getEnumeration
+
+    // create consumer
+    val consumer = session.createConsumer(dest)
+
+    // browse the first message
+    enumeration.hasMoreElements should be(true)
+    enumeration.nextElement() should be(outbound(0))
+
+    // Receive the first message.
+    consumer.receive(100) should be(outbound(0))
+  }
+
+  //  test("Queue Browser With 2 Consumers") {
+  //    val numMessages = 1000;
+  //
+  //    connect()
+  //
+  //    default_connection.setAlwaysSyncSend(false);
+  //
+  //    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+  //
+  //    val destination = queue("BROWSER.TEST")
+  //
+  //    val destinationPrefetch10 = queue("TEST?jms.prefetchSize=10")
+  //    val destinationPrefetch1 = queue("TEST?jms.prefetchsize=1")
+  //
+  //    val connection2 = create_connection
+  //    connection2.start()
+  //    connections.add(connection2)
+  //    val session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE)
+  //
+  //    val producer = session.createProducer(destination)
+  //    val consumer = session.createConsumer(destinationPrefetch10)
+  //
+  //    for (i <- 1 to 10) {
+  //        val message = session.createTextMessage("Message: " + i)
+  //        producer.send(message)
+  //    }
+  //
+  //    val browser = session2.createBrowser(destinationPrefetch1)
+  //    val browserView = browser.getEnumeration()
+  //
+  //    val messages = List[Message]
+  //    for (i <- 0toInt numMessages) {
+  //      val m1 = consumer.receive(5000)
+  //      m1 shoulld not be(null)
+  //      messages += m1
+  //    }
+  //
+  //    val i = 0;
+  //    for (;i < numMessages && browserView.hasMoreElements(); i++) {
+  //        Message m1 = messages.get(i);
+  //        Message m2 = browserView.nextElement();
+  //        assertNotNull("m2 is null for index: " + i, m2);
+  //        assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID());
+  //    }
+  //
+  //    // currently browse max page size is ignored for a queue browser consumer
+  //    // only guarantee is a page size - but a snapshot of pagedinpending is
+  //    // used so it is most likely more
+  //    assertTrue("got at least our expected minimum in the browser: ", i > BaseDestination.MAX_PAGE_SIZE);
+  //
+  //    assertFalse("nothing left in the browser", browserView.hasMoreElements());
+  //    assertNull("consumer finished", consumer.receiveNoWait());
+  //  }
+
+  test("Browse Close") {
+    connect()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val destination = queue(next_id("BROWSER.TEST.BC"))
+
+    val outbound = List(session.createTextMessage("First Message"),
+      session.createTextMessage("Second Message"),
+      session.createTextMessage("Third Message"))
+
+    val producer = session.createProducer(destination)
+    producer.send(outbound(0))
+    producer.send(outbound(1))
+    producer.send(outbound(2))
+
+    // create browser first
+    val browser = session.createBrowser(destination)
+    val enumeration = browser.getEnumeration
+
+    // browse some messages
+    enumeration.nextElement() should equal(outbound(0))
+    enumeration.nextElement() should equal(outbound(1))
+
+    browser.close()
+
+    // create consumer
+    val consumer = session.createConsumer(destination)
+
+    // Receive the first message.
+    consumer.receive(1000) should equal(outbound(0))
+    consumer.receive(1000) should equal(outbound(1))
+    consumer.receive(1000) should equal(outbound(2))
+  }
+
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala Sat Jul 28 16:26:59 2012
@@ -37,7 +37,13 @@ class OpenwireTestSupport extends Broker
 
   override protected def afterEach() {
     super.afterEach()
-    connections.foreach(_.close())
+    for ( connection <- connections ) {
+      try {
+        connection.close()
+      } catch {
+        case e =>
+      }
+    }
     connections = Nil
     default_connection = null
   }
@@ -47,14 +53,16 @@ class OpenwireTestSupport extends Broker
 
   def create_connection_factory(uri_options: String = "") = new ActiveMQConnectionFactory(connection_uri(uri_options))
 
-  def create_connection(uri_options: String = ""): Connection = create_connection_factory(uri_options).createConnection
+  def create_connection(uri_options: String = "", user:String=null, password:String=null): Connection = {
+    create_connection_factory(uri_options).createConnection(user, password)
+  }
 
   def queue(value: String) = new ActiveMQQueue(value);
 
   def topic(value: String) = new ActiveMQTopic(value);
 
-  def connect(uri_options: String = "", start: Boolean = true) = {
-    val connection = create_connection(uri_options)
+  def connect(uri_options: String = "", start: Boolean = true, user:String=null, password:String=null) = {
+    val connection = create_connection(uri_options, user, password)
     connections ::= connection
     if (default_connection == null) {
       default_connection = connection

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala Sat Jul 28 16:26:59 2012
@@ -18,8 +18,11 @@
 package org.apache.activemq.apollo.openwire.test
 
 import javax.jms.{Session, JMSException, MessageProducer}
+import org.apache.activemq.apollo.broker.BrokerParallelTestExecution
 
-abstract class SecurityTest extends OpenwireTestSupport {
+class SecurityTest extends OpenwireTestSupport with BrokerParallelTestExecution {
+
+  override def is_parallel_test_class = false
 
   override val broker_config_uri: String = "xml:classpath:apollo-openwire-secure.xml"
 
@@ -32,71 +35,38 @@ abstract class SecurityTest extends Open
     }
     super.beforeAll
   }
-}
-
-class ConnectionFailureWithValidCredentials extends SecurityTest {
 
   test("Connect with valid id password but can't connect") {
-
-    val factory = create_connection_factory()
-    val connection = factory.createConnection("can_not_connect", "can_not_connect")
-
     intercept[JMSException] {
-      connection.start()
+      connect(user="can_not_connect", password="can_not_connect")
     }
   }
-}
-
-class CoonectionFailsWhenNoCredentialsGiven extends SecurityTest {
 
   test("Connect with no id password") {
-
-    val factory = create_connection_factory()
-    val connection = factory.createConnection()
-
     intercept[JMSException] {
-      connection.start()
+      connect()
     }
   }
-}
-
-class ConnectionFailsWhenCredentialsAreInvlaid extends SecurityTest {
 
   test("Connect with invalid id password") {
-    val factory = create_connection_factory()
-    val connection = factory.createConnection("foo", "bar")
-
     intercept[JMSException] {
-      connection.start()
+      connect(user="foo", password="bar")
     }
   }
-}
 
-class ConnectionSucceedsWithValidCredentials extends SecurityTest {
   test("Connect with valid id password that can connect") {
-
-    val factory = create_connection_factory("?jms.alwaysSyncSend=true")
-    val connection = factory.createConnection("can_only_connect", "can_only_connect")
-
     try {
-      connection.start()
+      connect("?jms.alwaysSyncSend=true", user="can_only_connect", password="can_only_connect")
     } catch {
-      case e => fail("Should not have thrown an exception")
+      case e =>
+        e.printStackTrace()
+        fail("Should not have thrown an exception ")
     }
 
   }
-}
 
-class SendFailsWhenNotAuthorized extends SecurityTest {
   test("Send not authorized") {
-    val factory = create_connection_factory()
-    val connection = factory.createConnection("can_only_connect", "can_only_connect")
-
-    try {
-      connection.start()
-    } catch {
-      case e => fail("Should not have thrown an exception")
-    }
+    val connection = connect(user="can_only_connect", password="can_only_connect")
 
     val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
     val producer = session.createProducer(queue("secure"))
@@ -105,20 +75,10 @@ class SendFailsWhenNotAuthorized extends
       producer.send(session.createTextMessage("Test Message"))
     }
   }
-}
-
-class SendFailsWhenNotAuthorizedToCreateQueues extends SecurityTest {
 
   test("Send authorized but not create") {
 
-    val factory = create_connection_factory()
-    val connection = factory.createConnection("can_send_queue", "can_send_queue")
-
-    try {
-      connection.start()
-    } catch {
-      case e => fail("Should not have thrown an exception")
-    }
+    val connection = connect(user="can_send_queue", password="can_send_queue")
 
     val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
     val producer = session.createProducer(queue("secure"))
@@ -127,20 +87,10 @@ class SendFailsWhenNotAuthorizedToCreate
       producer.send(session.createTextMessage("Test Message"))
     }
   }
-}
-
-class ConsumeFailsWhenNotAuthroizedToCreateQueue extends SecurityTest {
 
   test("Consume authorized but not create") {
 
-    val factory = create_connection_factory()
-    val connection = factory.createConnection("can_consume_queue", "can_consume_queue")
-
-    try {
-      connection.start()
-    } catch {
-      case e => fail("Should not have thrown an exception")
-    }
+    val connection = connect(user="can_consume_queue", password="can_consume_queue")
 
     val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
 
@@ -149,21 +99,17 @@ class ConsumeFailsWhenNotAuthroizedToCre
       consumer.receive();
     }
   }
-}
 
-class SendSucceedsWhenCreateQueueAthorized extends SecurityTest {
   test("Send and create authorized") {
-    val factory = create_connection_factory()
-    val connection = factory.createConnection("can_send_create_queue", "can_send_create_queue")
+    create_and_send(next_id("secure"))
+  }
 
-    try {
-      connection.start()
-    } catch {
-      case e => fail("Should not have thrown an exception")
-    }
+
+  def create_and_send(dest:String) {
+    val connection = connect(user="can_send_create_queue", password="can_send_create_queue")
 
     val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
-    val producer = session.createProducer(queue("secure"))
+    val producer = session.createProducer(queue(dest))
 
     try {
       producer.send(session.createTextMessage("Test Message"))
@@ -173,39 +119,35 @@ class SendSucceedsWhenCreateQueueAthoriz
   }
 
   test("Can send and once created") {
+    val dest = next_id("secure")
+    val connection = connect(user="can_send_queue", password="can_send_queue")
 
-    val factory = create_connection_factory()
-    val connection = factory.createConnection("can_send_queue", "can_send_queue")
+    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue(dest))
 
     try {
-      connection.start()
+      producer.send(session.createTextMessage("Test Message"))
+      fail("Should have thrown an exception since dest is not created.")
     } catch {
-      case e => fail("Should not have thrown an exception")
+      case e =>
     }
 
-    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
-    val producer = session.createProducer(queue("secure"))
+    // Now actually create it...
+    create_and_send(dest)
 
     try {
       producer.send(session.createTextMessage("Test Message"))
     } catch {
-      case e => fail("Should not have thrown an exception")
+      case e =>
+        e.printStackTrace()
+        fail("Should not have thrown an exception since it was created")
     }
-  }
-}
 
-class SubscribeFailsForConnectionOnlyAuthorization extends SecurityTest {
+  }
 
   test("Consume not authorized") {
 
-    val factory = create_connection_factory()
-    val connection = factory.createConnection("can_only_connect", "can_only_connect")
-
-    try {
-      connection.start()
-    } catch {
-      case e => fail("Should not have thrown an exception")
-    }
+    val connection = connect(user="can_only_connect", password="can_only_connect")
 
     val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala Sat Jul 28 16:26:59 2012
@@ -26,15 +26,17 @@ class TransactionTest extends OpenwireTe
 
   test("Simple JMS Transaction Test") {
     connect()
+    val dest = queue(next_id("example"))
+
     val producer_session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
-    val producer = producer_session.createProducer(queue("example"))
+    val producer = producer_session.createProducer(dest)
 
     val messages = List(producer_session.createTextMessage("one"), producer_session.createTextMessage("two"), producer_session.createTextMessage("three"))
 
     messages.foreach(producer.send(_))
 
     val consumer_session = default_connection.createSession(true, Session.SESSION_TRANSACTED)
-    val consumer = consumer_session.createConsumer(queue("example"))
+    val consumer = consumer_session.createConsumer(dest)
 
     val m = consumer.receive(1000).asInstanceOf[TextMessage]
     m should not be (null)