You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/28 18:27:00 UTC
svn commit: r1366705 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/test/scala/ apollo-openwire/
apollo-openwire/src/test/resources/
apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/
Author: chirino
Date: Sat Jul 28 16:26:59 2012
New Revision: 1366705
URL: http://svn.apache.org/viewvc?rev=1366705&view=rev
Log:
Converted the openwire tests so that they can be run in parallel.
Added:
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala
Removed:
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/DestinationWildcardTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/DurableSubscriberTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/ExclusiveConsumerTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/QueueTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TempDestinationTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TopicTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Sat Jul 28 16:26:59 2012
@@ -119,8 +119,9 @@ trait BrokerParallelTestExecution extend
override def newInstance = {
val rc = super.newInstance.asInstanceOf[BrokerFunSuiteSupport]
- rc.broker = broker
- rc.port = port
+ rc.before_and_after_all_object = self
+ rc.broker = self.broker
+ rc.port = self.port
rc
}
@@ -133,6 +134,7 @@ trait BrokerParallelTestExecution extend
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class BrokerFunSuiteSupport extends FunSuiteSupport with Logging { // with ShouldMatchers with BeforeAndAfterEach with Logging {
+ var before_and_after_all_object:BrokerFunSuiteSupport = _
var broker: Broker = null
var port = 0
@@ -146,19 +148,26 @@ class BrokerFunSuiteSupport extends FunS
override def beforeAll() = {
super.beforeAll()
- try {
- broker = createBroker
- broker.setTmp(test_data_dir / "tmp")
- broker.getTmp().mkdirs()
- ServiceControl.start(broker)
- port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
- } catch {
- case e: Throwable => e.printStackTrace
+ if( before_and_after_all_object==null ) {
+ try {
+ broker = createBroker
+ broker.setTmp(test_data_dir / "tmp")
+ broker.getTmp().mkdirs()
+ ServiceControl.start(broker)
+ port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
+ } catch {
+ case e: Throwable => e.printStackTrace
+ }
+ } else {
+ broker = before_and_after_all_object.broker
+ port = before_and_after_all_object.port
}
}
override def afterAll() = {
- ServiceControl.stop(broker)
+ if( before_and_after_all_object==null ) {
+ ServiceControl.stop(broker)
+ }
super.afterAll()
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/pom.xml?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/pom.xml Sat Jul 28 16:26:59 2012
@@ -195,16 +195,6 @@
</execution>
</executions>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>${maven-surefire-plugin-version}</version>
- <configuration>
- <parallel>classes</parallel>
- <perCoreThreadCount>false</perCoreThreadCount>
- <threadCount>1</threadCount>
- </configuration>
- </plugin>
</plugins>
</build>
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-bdb.xml Sat Jul 28 16:26:59 2012
@@ -23,7 +23,7 @@
<queue id="mirrored.**" mirrored="true"/>
- <bdb_store directory="${basedir}/target/test-data"/>
+ <bdb_store directory="${testdatadir}"/>
</virtual_host>
<web_admin bind="http://0.0.0.0:0"/>
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-leveldb.xml Sat Jul 28 16:26:59 2012
@@ -23,7 +23,7 @@
<queue id="mirrored.**" mirrored="true"/>
- <leveldb_store directory="${basedir}/target/test-data"/>
+ <leveldb_store directory="${testdatadir}"/>
</virtual_host>
<web_admin bind="http://0.0.0.0:0"/>
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire-secure.xml Sat Jul 28 16:26:59 2012
@@ -19,46 +19,36 @@
<authentication domain="OpenwireSecurityTest"/>
+ <access_rule allow="admin" action="*"/>
+ <access_rule allow="*" kind="topic" id="ActiveMQ.Advisory.**" action="receive create"/>
+
+ <access_rule allow="connect_group" action="connect"/>
+ <access_rule allow="can_send_create_consume_queue" kind="queue" action="send create consume"/>
+ <access_rule allow="can_send_create_queue" kind="queue" action="send create"/>
+ <access_rule allow="can_send_queue" kind="queue" action="send"/>
+ <access_rule allow="can_receive_queue" kind="queue" action="receive"/>
+ <access_rule allow="can_consume_queue" kind="queue" action="consume"/>
+ <access_rule allow="can_send_create_topic" kind="topic" action="send create"/>
+ <access_rule allow="can_send_topic" kind="topic" action="send"/>
+ <access_rule allow="can_recieve_topic" kind="topic" action="receive"/>
+ <access_rule allow="can_consume_create_ds" kind="dsub" action="consume create"/>
+ <access_rule allow="can_consume_ds" kind="dsub" action="consume"/>
+ <access_rule allow="can_recieve_topic" kind="dsub" action="receive"/>
+
+ <access_rule allow="guest" action="connect"/>
+ <access_rule allow="guest" action="create destroy send receive consume" kind="topic queue dsub" id_regex="test.*"/>
+
+ <!-- only allow connects over the tcp2 connector -->
+ <access_rule allow="connector_restricted" action="connect" connector="tcp2"/>
+
<virtual_host id="default">
<host_name>localhost</host_name>
-
- <acl>
- <connect allow="connect_group"/>
- </acl>
-
- <!-- queue security -->
- <queue id="**" kind="ptp">
- <acl>
- <create allow="can_send_create_queue"/>
- <send allow="can_send_create_queue"/>
- <send allow="can_send_queue"/>
- <receive allow="can_receive_queue"/>
- <consume allow="can_consume_queue"/>
- </acl>
- </queue>
-
- <!-- topic security -->
- <destination id="**">
- <acl>
- <create allow="can_send_create_topic"/>
- <send allow="can_send_create_topic"/>
- <send allow="can_send_topic"/>
- <receive allow="can_recieve_topic"/>
- </acl>
- </destination>
-
- <!-- durable sub security -->
- <queue id="**" kind="ds">
- <acl>
- <create allow="can_consume_create_ds"/>
- <consume allow="can_consume_create_ds"/>
- <consume allow="can_consume_ds"/>
- </acl>
- </queue>
</virtual_host>
- <web_admin bind="http://0.0.0.0:0"/>
+ <!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0">
</connector>
+ <connector id="tcp2" bind="tcp://0.0.0.0:0">
+ </connector>
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/users.properties Sat Jul 28 16:26:59 2012
@@ -15,6 +15,7 @@
# limitations under the License.
#
+admin=admin
connect_group=CN=ssl_user|can_only_connect|can_send_create_queue|can_send_queue|can_receive_queue|can_consume_queue|can_send_create_topic|can_send_topic|can_recieve_topic|can_consume_create_ds|can_consume_ds
can_not_connect=can_not_connect
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala?rev=1366705&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireBDBParallelTest.scala Sat Jul 28 16:26:59 2012
@@ -0,0 +1,8 @@
+package org.apache.activemq.apollo.openwire.test
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class OpenwireBDBParallelTest extends OpenwireParallelTest {
+ override def broker_config_uri = "xml:classpath:apollo-openwire-bdb.xml"
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala?rev=1366705&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireLevelDBParallelTest.scala Sat Jul 28 16:26:59 2012
@@ -0,0 +1,55 @@
+package org.apache.activemq.apollo.openwire.test
+
+import javax.jms.{DeliveryMode, Session}
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+
+class OpenwireLevelDBParallelTest extends OpenwireParallelTest {
+ override def broker_config_uri = "xml:classpath:apollo-openwire-leveldb.xml"
+
+ test("Queue Prefetch and Client Ack") {
+
+ connect("?jms.useAsyncSend=true")
+ var dest = queue(next_id("prefetch"))
+
+ val session = default_connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
+ val producer = session.createProducer(dest)
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
+ def put(id: Int) {
+ val msg = session.createBytesMessage()
+ msg.writeBytes(new Array[Byte](1024 * 4))
+ producer.send(msg)
+ }
+
+ for (i <- 1 to 1000) {
+ put(i)
+ }
+
+ val consumer = session.createConsumer(dest)
+ def get(id: Int) {
+ val m = consumer.receive()
+ expect(true, "Did not get message: " + id)(m != null)
+ }
+ for (i <- 1 to 1000) {
+ get(i)
+ }
+ default_connection.close()
+ default_connection = null
+
+ // All those messages should get redelivered since they were not previously
+ // acked.
+ connect()
+ val session2 = default_connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
+ val consumer2 = session2.createConsumer(dest)
+ def get2(id: Int) {
+ val m = consumer2.receive()
+ expect(true, "Did not get message: " + id)(m != null)
+ }
+ for (i <- 1 to 1000) {
+ get2(i)
+ }
+ }
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala?rev=1366705&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireParallelTest.scala Sat Jul 28 16:26:59 2012
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire.test
+
+import javax.jms._
+import org.apache.activemq.apollo.broker.BrokerParallelTestExecution
+
+class OpenwireParallelTest extends OpenwireTestSupport with BrokerParallelTestExecution {
+
+ def path_separator = "."
+
+ test("Topic /w Durable sub retains messages.") {
+
+ val dest = topic(next_id("example"))
+
+ def create_durable_sub() {
+ val connection = connect("", false)
+ connection.setClientID("test")
+ connection.start()
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val subscriber = session.createDurableSubscriber(dest, "test")
+ session.close()
+ connection.close()
+ if (default_connection == connection) {
+ default_connection = null
+ }
+ }
+
+ create_durable_sub()
+
+ connect("", false)
+ default_connection.setClientID("test")
+ default_connection.start()
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(dest)
+ def put(id: Int) {
+ producer.send(session.createTextMessage("message:" + id))
+ }
+
+ List(1, 2, 3).foreach(put _)
+
+ val subscriber = session.createDurableSubscriber(dest, "test")
+
+ def get(id: Int) {
+ val m = subscriber.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(dest)
+ m.getText should equal("message:" + id)
+ }
+
+ List(1, 2, 3).foreach(get _)
+ }
+
+
+ test("Wildcard subscription") {
+ connect()
+
+ val common_prefix = next_id()
+ val prefix = common_prefix + path_separator
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer1 = session.createProducer(queue(prefix + "A"))
+ val producer2 = session.createProducer(queue(prefix + "B"))
+ val producer3 = session.createProducer(queue(common_prefix+ "bar.A"))
+ def put(producer: MessageProducer, id: Int) {
+ producer.send(session.createTextMessage("message:" + id))
+ }
+
+ val consumer = session.createConsumer(queue(prefix + "*"))
+ def get(id: Int) {
+ receive_text(consumer) should equal("message:" + id)
+ }
+
+ List(1, 2, 3).foreach(put(producer1, _))
+ List(1, 2, 3).foreach(get _)
+
+ producer3.send(session.createTextMessage("This one shouldn't get consumed."))
+
+ List(1, 2, 3).foreach(put(producer2, _))
+ List(1, 2, 3).foreach(get _)
+
+ put(producer1, -1)
+ get(-1)
+ }
+
+ test("Wildcard subscription with multiple path sections") {
+ connect()
+
+ val common_prefix = next_id()
+ val prefix1 = common_prefix+"foo" + path_separator + "bar" + path_separator
+ val prefix2 = common_prefix+"foo" + path_separator + "bar" + path_separator + "cheeze" + path_separator
+ val prefix3 = common_prefix+"foo" + path_separator + "bar" + path_separator + "cracker" + path_separator
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer1 = session.createProducer(topic(prefix1 + "A"))
+ val producer2 = session.createProducer(topic(prefix2 + "B"))
+ val producer3 = session.createProducer(topic(prefix3 + "C"))
+
+ def put(producer: MessageProducer, id: Int) {
+ producer.send(session.createTextMessage("message:" + id))
+ }
+ def get(consumer: MessageConsumer, id: Int) {
+ receive_text(consumer) should equal("message:" + id)
+ }
+
+ val consumer1 = session.createConsumer(topic(prefix1 + "*"))
+ val consumer2 = session.createConsumer(topic(prefix2 + "*"))
+ val consumer3 = session.createConsumer(topic(prefix3 + "*"))
+
+
+ put(producer1, 1)
+ List(producer1, producer2, producer3).foreach(put(_, -1))
+
+ get(consumer1, 1)
+ List(consumer1, consumer2, consumer3).foreach(get(_, -1))
+
+
+ put(producer2, 2)
+ List(producer1, producer2, producer3).foreach(put(_, -1))
+
+ get(consumer2, 2)
+ List(consumer1, consumer2, consumer3).foreach(get(_, -1))
+
+
+ put(producer3, 3)
+ List(producer1, producer2, producer3).foreach(put(_, -1))
+
+ get(consumer3, 3)
+ List(consumer1, consumer2, consumer3).foreach(get(_, -1))
+
+ }
+
+ test("Exclusive Consumer Selected when created first") {
+
+ val dest_name = next_id("TEST.QUEUE")
+
+ connect()
+
+ val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+ val producer = senderSession.createProducer(queue(dest_name))
+
+ producer.send(senderSession.createTextMessage("Exclusive Consumer Selected when created first - 1"))
+
+ Thread.sleep(100)
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(100) should not be (null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Exclusive Consumer Selected when Created After Non-Exclusive Consumer") {
+ connect()
+
+ val dest_name = next_id("TEST.QUEUE")
+
+ val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+ val exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+ val producer = senderSession.createProducer(queue(dest_name))
+
+ val msg = senderSession.createTextMessage("test")
+ producer.send(msg);
+
+ Thread.sleep(100)
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(100) should not be (null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Failover To Another Exclusive Consumer Created First") {
+
+ connect()
+
+ val dest_name = next_id("TEST.QUEUE")
+
+ val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+ val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+ val producer = senderSession.createProducer(queue(dest_name))
+
+ producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created First - 1"))
+
+ Thread.sleep(100)
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer1.receive(100) should not be (null)
+ exclusiveConsumer2.receive(100) should be(null)
+ fallbackConsumer.receive(100) should be(null)
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer1.close()
+
+ producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created First - 2"))
+ producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created First - 3"))
+
+ exclusiveConsumer2.receive(100) should not be (null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Failover To Another Exclusive Consumer Created After a non-exclusive Consumer") {
+
+ connect()
+
+ val dest_name = next_id("TEST.QUEUE")
+
+ val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+ val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+ val producer = senderSession.createProducer(queue(dest_name))
+
+ producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created After - 1"));
+
+ Thread.sleep(100)
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer1.receive(100) should not be (null)
+ exclusiveConsumer2.receive(100) should be(null)
+ fallbackConsumer.receive(100) should be(null)
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer1.close()
+
+ producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created After - 2"))
+ producer.send(senderSession.createTextMessage("Failover To Another Exclusive Consumer Created After - 3"))
+
+ exclusiveConsumer2.receive(100) should not be (null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Failover To NonExclusive Consumer") {
+
+ info("*** Running Test: Failover To NonExclusive Consumer")
+
+ connect()
+ val dest_name = next_id("TEST.QUEUE")
+
+ val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+ val producer = senderSession.createProducer(queue(dest_name))
+
+ producer.send(senderSession.createTextMessage("Failover To NonExclusive Consumer - 1"))
+
+ Thread.sleep(100)
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(100) should not be (null)
+ fallbackConsumer.receive(100) should be(null)
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer.close()
+
+ producer.send(senderSession.createTextMessage("Failover To NonExclusive Consumer - 2"))
+ fallbackConsumer.receive(100) should not be (null)
+ fallbackConsumer.close()
+ }
+
+ test("Fallback To Exclusive Consumer") {
+ connect()
+ val dest_name = next_id("TEST.QUEUE")
+
+ val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ var exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue(dest_name))
+ val producer = senderSession.createProducer(queue(dest_name))
+
+ producer.send(senderSession.createTextMessage("Fallback To Exclusive Consumer - 1"))
+
+ Thread.sleep(100)
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(200) should not be (null)
+ fallbackConsumer.receive(200) should be(null)
+
+ Thread.sleep(100)
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer.close()
+
+ producer.send(senderSession.createTextMessage("Fallback To Exclusive Consumer - 2"))
+ fallbackConsumer.receive(100) should not be (null)
+
+ // Create exclusive consumer to determine if it will start receiving
+ // the messages.
+ exclusiveConsumer = exclusiveSession.createConsumer(queue(dest_name+"?consumer.exclusive=true"))
+
+ producer.send(senderSession.createTextMessage("Fallback To Exclusive Consumer - 3"))
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(100) should not be (null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Temp Queue Destinations") {
+ test_temp_destination((session: Session) => session.createTemporaryQueue())
+ }
+
+ test("Temp Topic Destinations") {
+ test_temp_destination((session: Session) => session.createTemporaryTopic())
+ }
+
+ def test_temp_destination(func: (Session) => Destination) = {
+ connect()
+
+ val connection2 = connect("", true)
+ val session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val dest = func(session);
+ val consumer = session.createConsumer(dest)
+
+ val producer2 = session2.createProducer(dest)
+
+ def put(id: Int) = producer2.send(session.createTextMessage("message:" + id))
+ def get(id: Int) = {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(dest)
+ m.getText should equal("message:" + id)
+ }
+
+ Thread.sleep(1000);
+ List(1, 2, 3).foreach(put _)
+ List(1, 2, 3).foreach(get _)
+
+ // A different connection should not be able to consume from it.
+ try {
+ session2.createConsumer(dest)
+ fail("expected jms exception")
+ } catch {
+ case e: JMSException => println(e)
+ }
+
+ // delete the temporary destination.
+ consumer.close()
+ dest match {
+ case dest: TemporaryQueue => dest.delete()
+ case dest: TemporaryTopic => dest.delete()
+ }
+
+ // The producer should no longer be able to send to it.
+ Thread.sleep(1000);
+ try {
+ put(4)
+ fail("expected jms exception")
+ } catch {
+ case e: JMSException => println(e)
+ }
+
+ }
+
+ test("Topic drops messages sent before before subscription is established") {
+ connect()
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val dest = topic(next_id("example"))
+
+ val producer = session.createProducer(dest)
+ def put(id: Int) {
+ producer.send(session.createTextMessage("message:" + id))
+ }
+
+ put(1)
+
+ val consumer = session.createConsumer(dest)
+
+ put(2)
+ put(3)
+
+ def get(id: Int) {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(dest)
+ m.getText should equal("message:" + id)
+ }
+
+ List(2, 3).foreach(get _)
+ }
+
+ test("Queue Message Cached") {
+
+ connect("?wireFormat.cacheEnabled=false&wireFormat.tightEncodingEnabled=false")
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val dest = queue(next_id("example"))
+ val producer = session.createProducer(dest)
+ def put(id: Int) {
+ producer.send(session.createTextMessage("message:" + id))
+ }
+
+ List(1, 2, 3).foreach(put _)
+
+ val consumer = session.createConsumer(dest)
+
+ def get(id: Int) {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(dest)
+ m.getText should equal("message:" + id)
+ }
+
+ List(1, 2, 3).foreach(get _)
+ }
+
+ test("Queue order preserved") {
+
+ connect()
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val dest = queue(next_id("example"))
+ val producer = session.createProducer(dest)
+ def put(id: Int) {
+ producer.send(session.createTextMessage("message:" + id))
+ }
+
+ List(1, 2, 3).foreach(put _)
+
+ val consumer = session.createConsumer(dest)
+
+ def get(id: Int) {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(dest)
+ m.getText should equal("message:" + id)
+ }
+
+ List(1, 2, 3).foreach(get _)
+ }
+
+ test("Test that messages are consumed ") {
+
+ connect()
+ var session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val queue = session.createQueue(next_id("test"));
+ val producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ var consumer = session.createConsumer(queue)
+ var msg = consumer.receive(1000)
+ msg should not be (null)
+
+ Thread.sleep(1000)
+
+ session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ // Attempt to Consume the message...check if message was acknowledge
+ consumer = session.createConsumer(queue)
+ msg = consumer.receive(1000)
+ msg should be(null)
+
+ session.close()
+ }
+
+ test("Queue and a selector") {
+ connect()
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ var dest = queue(next_id("example"))
+ val producer = session.createProducer(dest)
+
+ def put(id: Int, color: String) {
+ val message = session.createTextMessage("message:" + id)
+ message.setStringProperty("color", color)
+ producer.send(message)
+ }
+
+ List((1, "red"), (2, "blue"), (3, "red")).foreach {
+ case (id, color) => put(id, color)
+ }
+
+ val consumer = session.createConsumer(dest, "color='red'")
+
+ def get(id: Int) {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(dest)
+ m.getText should equal("message:" + id)
+ }
+
+ get(1)
+ get(3)
+ }
+
+ test("Receive then Browse and then Receive again") {
+ connect()
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val dest = queue(next_id("BROWSER.TEST.RB"))
+ val producer = session.createProducer(dest)
+ var consumer = session.createConsumer(dest)
+
+ val outbound = List(session.createTextMessage("First Message"),
+ session.createTextMessage("Second Message"),
+ session.createTextMessage("Third Message"))
+
+ // lets consume any outstanding messages from previous test runs
+ while (consumer.receive(1000) != null) {
+ }
+
+ producer.send(outbound(0));
+ producer.send(outbound(1));
+ producer.send(outbound(2));
+
+ consumer.receive(200) should be(outbound(0))
+
+ consumer.close();
+
+ val browser = session.createBrowser(dest)
+ val enumeration = browser.getEnumeration
+
+ // browse the second
+ enumeration.hasMoreElements should be(true)
+ enumeration.nextElement() should be(outbound(1))
+
+ // browse the third.
+ enumeration.hasMoreElements should be(true)
+ enumeration.nextElement() should be(outbound(2))
+
+ // There should be no more.
+ var tooMany = false;
+ while (enumeration.hasMoreElements) {
+ debug("Got extra message: %s", enumeration.nextElement());
+ tooMany = true;
+ }
+ tooMany should be(false)
+ browser.close()
+
+ // Re-open the consumer.
+ consumer = session.createConsumer(dest);
+ // Receive the second.
+ consumer.receive(200) should be(outbound(1))
+ // Receive the third.
+ consumer.receive(200) should be(outbound(2))
+ consumer.close()
+ }
+
+ test("Browse Queue then Receive messages") {
+ connect()
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val dest = queue(next_id("BROWSER.TEST.BR"))
+ val producer = session.createProducer(dest)
+
+ val outbound = List(session.createTextMessage("First Message"),
+ session.createTextMessage("Second Message"),
+ session.createTextMessage("Third Message"))
+
+ producer.send(outbound(0))
+
+ // create browser first
+ val browser = session.createBrowser(dest)
+ val enumeration = browser.getEnumeration
+
+ // create consumer
+ val consumer = session.createConsumer(dest)
+
+ // browse the first message
+ enumeration.hasMoreElements should be(true)
+ enumeration.nextElement() should be(outbound(0))
+
+ // Receive the first message.
+ consumer.receive(100) should be(outbound(0))
+ }
+
+ // test("Queue Browser With 2 Consumers") {
+ // val numMessages = 1000;
+ //
+ // connect()
+ //
+ // default_connection.setAlwaysSyncSend(false);
+ //
+ // val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ //
+ // val destination = queue("BROWSER.TEST")
+ //
+ // val destinationPrefetch10 = queue("TEST?jms.prefetchSize=10")
+ // val destinationPrefetch1 = queue("TEST?jms.prefetchsize=1")
+ //
+ // val connection2 = create_connection
+ // connection2.start()
+ // connections.add(connection2)
+ // val session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ //
+ // val producer = session.createProducer(destination)
+ // val consumer = session.createConsumer(destinationPrefetch10)
+ //
+ // for (i <- 1 to 10) {
+ // val message = session.createTextMessage("Message: " + i)
+ // producer.send(message)
+ // }
+ //
+ // val browser = session2.createBrowser(destinationPrefetch1)
+ // val browserView = browser.getEnumeration()
+ //
+ // val messages = List[Message]
+ // for (i <- 0toInt numMessages) {
+ // val m1 = consumer.receive(5000)
+ // m1 shoulld not be(null)
+ // messages += m1
+ // }
+ //
+ // val i = 0;
+ // for (;i < numMessages && browserView.hasMoreElements(); i++) {
+ // Message m1 = messages.get(i);
+ // Message m2 = browserView.nextElement();
+ // assertNotNull("m2 is null for index: " + i, m2);
+ // assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID());
+ // }
+ //
+ // // currently browse max page size is ignored for a queue browser consumer
+ // // only guarantee is a page size - but a snapshot of pagedinpending is
+ // // used so it is most likely more
+ // assertTrue("got at least our expected minimum in the browser: ", i > BaseDestination.MAX_PAGE_SIZE);
+ //
+ // assertFalse("nothing left in the browser", browserView.hasMoreElements());
+ // assertNull("consumer finished", consumer.receiveNoWait());
+ // }
+
+ test("Browse Close") {
+ connect()
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val destination = queue(next_id("BROWSER.TEST.BC"))
+
+ val outbound = List(session.createTextMessage("First Message"),
+ session.createTextMessage("Second Message"),
+ session.createTextMessage("Third Message"))
+
+ val producer = session.createProducer(destination)
+ producer.send(outbound(0))
+ producer.send(outbound(1))
+ producer.send(outbound(2))
+
+ // create browser first
+ val browser = session.createBrowser(destination)
+ val enumeration = browser.getEnumeration
+
+ // browse some messages
+ enumeration.nextElement() should equal(outbound(0))
+ enumeration.nextElement() should equal(outbound(1))
+
+ browser.close()
+
+ // create consumer
+ val consumer = session.createConsumer(destination)
+
+ // Receive the first message.
+ consumer.receive(1000) should equal(outbound(0))
+ consumer.receive(1000) should equal(outbound(1))
+ consumer.receive(1000) should equal(outbound(2))
+ }
+
+}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/OpenwireTestSupport.scala Sat Jul 28 16:26:59 2012
@@ -37,7 +37,13 @@ class OpenwireTestSupport extends Broker
override protected def afterEach() {
super.afterEach()
- connections.foreach(_.close())
+ for ( connection <- connections ) {
+ try {
+ connection.close()
+ } catch {
+ case e =>
+ }
+ }
connections = Nil
default_connection = null
}
@@ -47,14 +53,16 @@ class OpenwireTestSupport extends Broker
def create_connection_factory(uri_options: String = "") = new ActiveMQConnectionFactory(connection_uri(uri_options))
- def create_connection(uri_options: String = ""): Connection = create_connection_factory(uri_options).createConnection
+ def create_connection(uri_options: String = "", user:String=null, password:String=null): Connection = {
+ create_connection_factory(uri_options).createConnection(user, password)
+ }
def queue(value: String) = new ActiveMQQueue(value);
def topic(value: String) = new ActiveMQTopic(value);
- def connect(uri_options: String = "", start: Boolean = true) = {
- val connection = create_connection(uri_options)
+ def connect(uri_options: String = "", start: Boolean = true, user:String=null, password:String=null) = {
+ val connection = create_connection(uri_options, user, password)
connections ::= connection
if (default_connection == null) {
default_connection = connection
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/SecurityTest.scala Sat Jul 28 16:26:59 2012
@@ -18,8 +18,11 @@
package org.apache.activemq.apollo.openwire.test
import javax.jms.{Session, JMSException, MessageProducer}
+import org.apache.activemq.apollo.broker.BrokerParallelTestExecution
-abstract class SecurityTest extends OpenwireTestSupport {
+class SecurityTest extends OpenwireTestSupport with BrokerParallelTestExecution {
+
+ override def is_parallel_test_class = false
override val broker_config_uri: String = "xml:classpath:apollo-openwire-secure.xml"
@@ -32,71 +35,38 @@ abstract class SecurityTest extends Open
}
super.beforeAll
}
-}
-
-class ConnectionFailureWithValidCredentials extends SecurityTest {
test("Connect with valid id password but can't connect") {
-
- val factory = create_connection_factory()
- val connection = factory.createConnection("can_not_connect", "can_not_connect")
-
intercept[JMSException] {
- connection.start()
+ connect(user="can_not_connect", password="can_not_connect")
}
}
-}
-
-class CoonectionFailsWhenNoCredentialsGiven extends SecurityTest {
test("Connect with no id password") {
-
- val factory = create_connection_factory()
- val connection = factory.createConnection()
-
intercept[JMSException] {
- connection.start()
+ connect()
}
}
-}
-
-class ConnectionFailsWhenCredentialsAreInvlaid extends SecurityTest {
test("Connect with invalid id password") {
- val factory = create_connection_factory()
- val connection = factory.createConnection("foo", "bar")
-
intercept[JMSException] {
- connection.start()
+ connect(user="foo", password="bar")
}
}
-}
-class ConnectionSucceedsWithValidCredentials extends SecurityTest {
test("Connect with valid id password that can connect") {
-
- val factory = create_connection_factory("?jms.alwaysSyncSend=true")
- val connection = factory.createConnection("can_only_connect", "can_only_connect")
-
try {
- connection.start()
+ connect("?jms.alwaysSyncSend=true", user="can_only_connect", password="can_only_connect")
} catch {
- case e => fail("Should not have thrown an exception")
+ case e =>
+ e.printStackTrace()
+ fail("Should not have thrown an exception ")
}
}
-}
-class SendFailsWhenNotAuthorized extends SecurityTest {
test("Send not authorized") {
- val factory = create_connection_factory()
- val connection = factory.createConnection("can_only_connect", "can_only_connect")
-
- try {
- connection.start()
- } catch {
- case e => fail("Should not have thrown an exception")
- }
+ val connection = connect(user="can_only_connect", password="can_only_connect")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val producer = session.createProducer(queue("secure"))
@@ -105,20 +75,10 @@ class SendFailsWhenNotAuthorized extends
producer.send(session.createTextMessage("Test Message"))
}
}
-}
-
-class SendFailsWhenNotAuthorizedToCreateQueues extends SecurityTest {
test("Send authorized but not create") {
- val factory = create_connection_factory()
- val connection = factory.createConnection("can_send_queue", "can_send_queue")
-
- try {
- connection.start()
- } catch {
- case e => fail("Should not have thrown an exception")
- }
+ val connection = connect(user="can_send_queue", password="can_send_queue")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val producer = session.createProducer(queue("secure"))
@@ -127,20 +87,10 @@ class SendFailsWhenNotAuthorizedToCreate
producer.send(session.createTextMessage("Test Message"))
}
}
-}
-
-class ConsumeFailsWhenNotAuthroizedToCreateQueue extends SecurityTest {
test("Consume authorized but not create") {
- val factory = create_connection_factory()
- val connection = factory.createConnection("can_consume_queue", "can_consume_queue")
-
- try {
- connection.start()
- } catch {
- case e => fail("Should not have thrown an exception")
- }
+ val connection = connect(user="can_consume_queue", password="can_consume_queue")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
@@ -149,21 +99,17 @@ class ConsumeFailsWhenNotAuthroizedToCre
consumer.receive();
}
}
-}
-class SendSucceedsWhenCreateQueueAthorized extends SecurityTest {
test("Send and create authorized") {
- val factory = create_connection_factory()
- val connection = factory.createConnection("can_send_create_queue", "can_send_create_queue")
+ create_and_send(next_id("secure"))
+ }
- try {
- connection.start()
- } catch {
- case e => fail("Should not have thrown an exception")
- }
+
+ def create_and_send(dest:String) {
+ val connection = connect(user="can_send_create_queue", password="can_send_create_queue")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
- val producer = session.createProducer(queue("secure"))
+ val producer = session.createProducer(queue(dest))
try {
producer.send(session.createTextMessage("Test Message"))
@@ -173,39 +119,35 @@ class SendSucceedsWhenCreateQueueAthoriz
}
test("Can send and once created") {
+ val dest = next_id("secure")
+ val connection = connect(user="can_send_queue", password="can_send_queue")
- val factory = create_connection_factory()
- val connection = factory.createConnection("can_send_queue", "can_send_queue")
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(queue(dest))
try {
- connection.start()
+ producer.send(session.createTextMessage("Test Message"))
+ fail("Should have thrown an exception since dest is not created.")
} catch {
- case e => fail("Should not have thrown an exception")
+ case e =>
}
- val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
- val producer = session.createProducer(queue("secure"))
+ // Now actually create it...
+ create_and_send(dest)
try {
producer.send(session.createTextMessage("Test Message"))
} catch {
- case e => fail("Should not have thrown an exception")
+ case e =>
+ e.printStackTrace()
+ fail("Should not have thrown an exception since it was created")
}
- }
-}
-class SubscribeFailsForConnectionOnlyAuthorization extends SecurityTest {
+ }
test("Consume not authorized") {
- val factory = create_connection_factory()
- val connection = factory.createConnection("can_only_connect", "can_only_connect")
-
- try {
- connection.start()
- } catch {
- case e => fail("Should not have thrown an exception")
- }
+ val connection = connect(user="can_only_connect", password="can_only_connect")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala?rev=1366705&r1=1366704&r2=1366705&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/test/TransactionTest.scala Sat Jul 28 16:26:59 2012
@@ -26,15 +26,17 @@ class TransactionTest extends OpenwireTe
test("Simple JMS Transaction Test") {
connect()
+ val dest = queue(next_id("example"))
+
val producer_session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
- val producer = producer_session.createProducer(queue("example"))
+ val producer = producer_session.createProducer(dest)
val messages = List(producer_session.createTextMessage("one"), producer_session.createTextMessage("two"), producer_session.createTextMessage("three"))
messages.foreach(producer.send(_))
val consumer_session = default_connection.createSession(true, Session.SESSION_TRANSACTED)
- val consumer = consumer_session.createConsumer(queue("example"))
+ val consumer = consumer_session.createConsumer(dest)
val m = consumer.receive(1000).asInstanceOf[TextMessage]
m should not be (null)