You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/27 18:04:02 UTC
svn commit: r1366431 [1/2] - in /activemq/activemq-apollo/trunk: ./
apollo-amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-broker/src/test/scala/ apol...
Author: chirino
Date: Fri Jul 27 16:04:01 2012
New Revision: 1366431
URL: http://svn.apache.org/viewvc?rev=1366431&view=rev
Log:
Changes needed to get most of the stomp tests running in parallel.
Added:
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/scalatest/
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/scalatest/ParallelBeforeAndAfterAll.scala
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/scalatest/junit/
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/scalatest/junit/ParallelJUnitRunner.scala
Modified:
activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-itests/pom.xml
activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala
activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/SessionResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/pom.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/pom.xml Fri Jul 27 16:04:01 2012
@@ -191,6 +191,17 @@
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin-version}</version>
+ <configuration>
+ <parallel>classes</parallel>
+ <perCoreThreadCount>false</perCoreThreadCount>
+ <threadCount>1</threadCount>
+ </configuration>
+ </plugin>
+
</plugins>
</build>
<profiles>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Jul 27 16:04:01 2012
@@ -309,6 +309,8 @@ class VirtualHost(val broker: Broker, va
})
}
+ import FutureResult._
+
def get_queue_metrics:FutureResult[AggregateDestMetricsDTO] = {
val queues:Iterable[Queue] = local_router.local_queue_domain.destinations
val metrics = sync_all (queues) { queue =>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala Fri Jul 27 16:04:01 2012
@@ -83,7 +83,7 @@ case class RawMessage(payload:Buffer) ex
} else if( toType == classOf[ByteBuffer] ) {
toType.cast(payload.toByteBuffer)
} else {
- null
+ null.asInstanceOf[T]
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Fri Jul 27 16:04:01 2012
@@ -22,8 +22,12 @@ import org.apache.activemq.apollo.util._
import FileSupport._
import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.dto.{AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
+import collection.immutable.HashMap
+import java.io.File
+import org.scalatest.{ParallelTestExecution, OneInstancePerTest}
object BrokerTestSupport {
+ import FutureResult._
def connector_port(broker:Broker, connector: String): Option[Int] = Option(connector).map {
id => broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(0)
@@ -106,7 +110,21 @@ object BrokerTestSupport {
def webadmin_uri(broker:Broker, scheme:String) = {
Option(broker.web_server).flatMap(_.uris().find(_.getScheme == scheme)).get
}
+
}
+
+trait BrokerParallelTestExecution extends ParallelTestExecution {
+ self: BrokerFunSuiteSupport =>
+
+ override def newInstance = {
+ val rc = super.newInstance.asInstanceOf[BrokerFunSuiteSupport]
+ rc.broker = broker
+ rc.port = port
+ rc
+ }
+
+}
+
/**
* <p>
* </p>
@@ -118,19 +136,14 @@ class BrokerFunSuiteSupport extends FunS
var port = 0
def broker_config_uri = "xml:classpath:apollo.xml"
-
- def createBroker: Broker = {
- info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
- var broker = BrokerFactory.createBroker(broker_config_uri)
- broker.setTmp(basedir / "target" / "tmp")
- broker.getTmp().mkdirs()
- broker
- }
+ def createBroker = BrokerFactory.createBroker(broker_config_uri)
override def beforeAll() = {
super.beforeAll()
try {
broker = createBroker
+ broker.setTmp(test_data_dir / "tmp")
+ broker.getTmp().mkdirs()
ServiceControl.start(broker)
port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
} catch {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Fri Jul 27 16:04:01 2012
@@ -45,7 +45,7 @@ abstract class StoreFunSuiteSupport exte
*/
- def data_directory = basedir / "target" / "apollo-data"
+ def data_directory = test_data_dir / "store"
override protected def beforeAll() = {
super.beforeAll()
Modified: activemq/activemq-apollo/trunk/apollo-itests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/pom.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/pom.xml Fri Jul 27 16:04:01 2012
@@ -161,21 +161,15 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin-version}</version>
-
<configuration>
- <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
- <useSystemClassLoader>false</useSystemClassLoader>
- <forkMode>pertest</forkMode>
- <childDelegation>false</childDelegation>
- <useFile>true</useFile>
- <redirectTestOutputToFile>true</redirectTestOutputToFile>
- <failIfNoTests>false</failIfNoTests>
-
<excludes>
<!--
<exclude>**/JmsTopicTransactionTest.*</exclude>
-->
</excludes>
+ <parallel>classes</parallel>
+ <perCoreThreadCount>false</perCoreThreadCount>
+ <threadCount>1</threadCount>
</configuration>
</plugin>
Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala Fri Jul 27 16:04:01 2012
@@ -10,7 +10,8 @@ import org.apache.activemq.apollo.dto.{D
import org.apache.activemq.apollo.broker.LocalRouter
import org.fusesource.hawtdispatch.Future
import scala.Predef._
-import org.apache.activemq.apollo.util.Success
+import org.apache.activemq.apollo.util.{FutureResult, Success}
+import FutureResult._
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
Modified: activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/pom.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/pom.xml Fri Jul 27 16:04:01 2012
@@ -195,7 +195,16 @@
</execution>
</executions>
</plugin>
-
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin-version}</version>
+ <configuration>
+ <parallel>classes</parallel>
+ <perCoreThreadCount>false</perCoreThreadCount>
+ <threadCount>1</threadCount>
+ </configuration>
+ </plugin>
</plugins>
</build>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml Fri Jul 27 16:04:01 2012
@@ -21,13 +21,27 @@
<virtual_host id="default">
<host_name>localhost</host_name>
+ <queue id="quota.**" quota="10k"/>
+ <topic id="quota.**" slow_consumer_policy="queue">
+ <subscription quota="10k"/>
+ </topic>
+
<queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
<queue id="mirrored.**" mirrored="true"/>
+ <topic id="queued.**" slow_consumer_policy="queue">
+ <subscription tail_buffer="4k"/>
+ </topic>
+
+ <queue id="drop.head.persistent" full_policy="drop head" quota="100k"/>
+ <queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
+ <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
+ <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
<bdb_store directory="${testdatadir}"/>
</virtual_host>
<!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
+ <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Fri Jul 27 16:04:01 2012
@@ -21,18 +21,27 @@
<virtual_host id="default">
<host_name>localhost</host_name>
+ <queue id="quota.**" quota="10k"/>
+ <topic id="quota.**" slow_consumer_policy="queue">
+ <subscription quota="10k"/>
+ </topic>
+
+ <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+ <queue id="mirrored.**" mirrored="true"/>
+ <topic id="queued.**" slow_consumer_policy="queue">
+ <subscription tail_buffer="4k"/>
+ </topic>
+
<queue id="drop.head.persistent" full_policy="drop head" quota="100k"/>
<queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
<queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
<queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
- <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
- <queue id="mirrored.**" mirrored="true"/>
-
<leveldb_store directory="${testdatadir}"/>
</virtual_host>
<!--<web_admin bind="http://0.0.0.0:61680"/>-->
<connector id="tcp" bind="tcp://0.0.0.0:0"/>
+ <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Jul 27 16:04:01 2012
@@ -17,17 +17,20 @@
package org.apache.activemq.apollo.stomp
import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.BeforeAndAfterEach
+import org.scalatest._
import java.lang.String
import java.util.concurrent.TimeUnit._
+import scala.Some
import org.apache.activemq.apollo.util._
import java.util.concurrent.atomic.AtomicLong
import FileSupport._
import java.nio.channels.DatagramChannel
import org.fusesource.hawtbuf.AsciiBuffer
import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.dto.{TopicStatusDTO, KeyStorageDTO}
+import org.apache.activemq.apollo.dto.KeyStorageDTO
import java.net.{SocketTimeoutException, InetSocketAddress}
+import org.junit.runner.RunWith
+import scala.Some
class StompTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach {
@@ -179,7 +182,8 @@ class StompTestSupport extends BrokerFun
/**
* These test cases check to make sure the broker stats are consistent with what
- * would be expected.
+ * would be expected. These tests can't be run in parallell since they look at
+ * agreggate destination metrics.
*/
class StompMetricsTest extends StompTestSupport {
@@ -423,16 +427,328 @@ class StompMetricsTest extends StompTest
}
+class StompSslTest extends StompTestSupport with BrokerParallelTestExecution {
-class Stomp10ConnectTest extends StompTestSupport {
+ override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl.xml"
- test("Stomp 1.0 CONNECT") {
- connect("1.0")
+ val config = new KeyStorageDTO
+ config.file = basedir/"src"/"test"/"resources"/"client.ks"
+ config.password = "password"
+ config.key_password = "password"
+
+ client.key_storeage = new KeyStorage(config)
+
+ test("Connect over SSL") {
+ connect("1.1")
+ }
+}
+
+/**
+ * These tests seem to have trouble being run in Parallel
+ */
+class StompSerialTest extends StompTestSupport with BrokerParallelTestExecution {
+
+ // This is the test case for https://issues.apache.org/jira/browse/APLO-88
+ test("ACK then socket close with/without DISCONNECT, should still ACK") {
+ for(i <- 1 until 3) {
+ connect("1.1")
+
+ def send(id:Int) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/from-seq-end\n" +
+ "message-id:id-"+i+"-"+id+"\n"+
+ "receipt:0\n"+
+ "\n")
+ wait_for_receipt("0")
+ }
+
+ def get(seq:Long) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should include("message-id:id-"+i+"-"+seq+"\n")
+ client.write(
+ "ACK\n" +
+ "subscription:0\n" +
+ "message-id:id-"+i+"-"+seq+"\n" +
+ "\n")
+ }
+
+ send(1)
+ send(2)
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/from-seq-end\n" +
+ "id:0\n" +
+ "ack:client\n"+
+ "\n")
+ get(1)
+ client.write(
+ "DISCONNECT\n" +
+ "\n")
+ client.close
+
+ connect("1.1")
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/from-seq-end\n" +
+ "id:0\n" +
+ "ack:client\n"+
+ "\n")
+ get(2)
+ client.close
+ }
+ }
+
+}
+class StompSecurityTest extends StompTestSupport {
+
+ override def broker_config_uri: String = "xml:classpath:apollo-stomp-secure.xml"
+
+ override def is_parallel_test_class: Boolean = false
+
+ override def beforeAll = {
+ try {
+ println("before: "+testName)
+ val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
+ System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
+ } catch {
+ case x:Throwable => x.printStackTrace
+ }
+ super.beforeAll
+ }
+
+ test("Connect with valid id password but can't connect") {
+
+ val frame = connect_request("1.1", client,
+ "login:can_not_connect\n" +
+ "passcode:can_not_connect\n")
+ frame should startWith("ERROR\n")
+ frame should include("message:Not authorized to connect")
+
+ }
+
+ test("Connect with no id password") {
+ val frame = connect_request("1.1", client)
+ frame should startWith("ERROR\n")
+ frame should include("message:Authentication failed.")
+ }
+
+ test("Connect with invalid id password") {
+ val frame = connect_request("1.1", client,
+ "login:foo\n" +
+ "passcode:bar\n")
+ frame should startWith("ERROR\n")
+ frame should include("message:Authentication failed.")
+
+ }
+
+ test("Connect with valid id password that can connect") {
+ connect("1.1", client,
+ "login:can_only_connect\n" +
+ "passcode:can_only_connect\n")
+
+ }
+
+ test("Connector restricted user on the right connector") {
+ connect("1.1", client,
+ "login:connector_restricted\n" +
+ "passcode:connector_restricted\n", "tcp2")
+ }
+
+ test("Connector restricted user on the wrong connector") {
+ val frame = connect_request("1.1", client,
+ "login:connector_restricted\n" +
+ "passcode:connector_restricted\n", "tcp")
+ frame should startWith("ERROR\n")
+ frame should include("message:Not authorized to connect to connector 'tcp'.")
+ }
+
+ test("Send not authorized") {
+ connect("1.1", client,
+ "login:can_only_connect\n" +
+ "passcode:can_only_connect\n")
+
+ client.write(
+ "SEND\n" +
+ "destination:/queue/secure\n" +
+ "receipt:0\n" +
+ "\n" +
+ "Hello Wolrd\n")
+
+ val frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include("message:Not authorized to create the queue")
+ }
+
+ test("Send authorized but not create") {
+ connect("1.1", client,
+ "login:can_send_queue\n" +
+ "passcode:can_send_queue\n")
+
+ client.write(
+ "SEND\n" +
+ "destination:/queue/secure\n" +
+ "receipt:0\n" +
+ "\n" +
+ "Hello Wolrd\n")
+
+ val frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include("message:Not authorized to create the queue")
+
+ }
+
+ test("Consume authorized but not create") {
+ connect("1.1", client,
+ "login:can_consume_queue\n" +
+ "passcode:can_consume_queue\n")
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/secure\n" +
+ "id:0\n" +
+ "receipt:0\n" +
+ "\n")
+
+ val frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include("message:Not authorized to create the queue")
+ }
+
+ test("Send and create authorized") {
+ connect("1.1", client,
+ "login:can_send_create_queue\n" +
+ "passcode:can_send_create_queue\n")
+
+ client.write(
+ "SEND\n" +
+ "destination:/queue/secure\n" +
+ "receipt:0\n" +
+ "\n" +
+ "Hello Wolrd\n")
+
+ wait_for_receipt("0")
+
+ }
+
+ test("Send and create authorized via id_regex") {
+ connect("1.1", client,
+ "login:guest\n" +
+ "passcode:guest\n")
+
+ client.write(
+ "SEND\n" +
+ "destination:/queue/testblah\n" +
+ "receipt:0\n" +
+ "\n" +
+ "Hello Wolrd\n")
+
+ wait_for_receipt("0")
+
+ client.write(
+ "SEND\n" +
+ "destination:/queue/notmatch\n" +
+ "receipt:1\n" +
+ "\n" +
+ "Hello Wolrd\n")
+
+ val frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include("message:Not authorized to create the queue")
+ }
+
+ test("Can send and once created") {
+
+ // Now try sending with the lower access id.
+ connect("1.1", client,
+ "login:can_send_queue\n" +
+ "passcode:can_send_queue\n")
+
+ client.write(
+ "SEND\n" +
+ "destination:/queue/secure\n" +
+ "receipt:0\n" +
+ "\n" +
+ "Hello Wolrd\n")
+
+ wait_for_receipt("0")
+
+ }
+
+ test("Consume not authorized") {
+ connect("1.1", client,
+ "login:can_only_connect\n" +
+ "passcode:can_only_connect\n")
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/secure\n" +
+ "id:0\n" +
+ "receipt:0\n" +
+ "\n")
+
+ val frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include("message:Not authorized to consume from the queue")
+ }
+
+ test("Consume authorized and JMSXUserID is set on message") {
+ connect("1.1", client,
+ "login:can_send_create_consume_queue\n" +
+ "passcode:can_send_create_consume_queue\n")
+
+ subscribe("0","/queue/sendsid")
+ async_send("/queue/sendsid", "hello")
+
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should include("JMSXUserID:can_send_create_consume_queue\n")
+ frame should include("sender-ip:127.0.0.1\n")
+ }
+}
+class StompSslSecurityTest extends StompTestSupport {
+
+ override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl-secure.xml"
+ override def is_parallel_test_class: Boolean = false
+
+ override def beforeAll = {
+ // System.setProperty("javax.net.debug", "all")
+ try {
+ val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
+ System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
+ } catch {
+ case x:Throwable => x.printStackTrace
+ }
+ super.beforeAll
+ }
+
+ def use_client_cert = {
+ val config = new KeyStorageDTO
+ config.file = basedir/"src"/"test"/"resources"/"client.ks"
+ config.password = "password"
+ config.key_password = "password"
+ client.key_storeage = new KeyStorage(config)
+ }
+
+ test("Connect with cert and no id password") {
+ use_client_cert
+ connect("1.1", client)
}
}
-class Stomp11ConnectTest extends StompTestSupport {
+/**
+ * These tests can be run in parallel against a single Apollo broker.
+ */
+class StompParallelTest extends StompTestSupport with BrokerParallelTestExecution {
+
+ def skip_if_using_store = skip(broker_config_uri.endsWith("-bdb.xml") || broker_config_uri.endsWith("-leveldb.xml"))
+
+ test("Stomp 1.0 CONNECT") {
+ connect("1.0")
+ }
test("Stomp 1.1 CONNECT") {
connect("1.1")
@@ -497,10 +813,6 @@ class Stomp11ConnectTest extends StompTe
frame should include regex("""message:.+?\n""")
}
-}
-
-class Stomp11HeartBeatTest extends StompTestSupport {
-
test("Stomp 1.1 Broker sends heart-beat") {
client.open("localhost", port)
@@ -559,54 +871,24 @@ class Stomp11HeartBeatTest extends Stomp
}
}
-}
-
-class StompPersistentQueueTest extends StompTestSupport {
-
- override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
-
- test("(APLO-198) Apollo sometimes does not send all the messages in a queue") {
- connect("1.1")
- for( i <- 0 until 10000 ) {
- async_send("/queue/BIGQUEUE", "message #"+i)
- }
- sync_send("/queue/BIGQUEUE", "END")
- client.close
-
- var counter = 0
- for( i <- 0 until 100 ) {
- connect("1.1")
- subscribe("1", "/queue/BIGQUEUE", "client", false, "", false)
- for( j <- 0 until 100 ) {
- assert_received("message #"+counter)(true)
- counter+=1
- }
- client.write(
- "DISCONNECT\n" +
- "receipt:disco\n" +
- "\n")
- wait_for_receipt("disco", client, true)
- client.close
- within(2, SECONDS) {
- val status = queue_status("BIGQUEUE")
- status.consumers.size() should be(0)
- }
- }
+ test("UDP to STOMP interop") {
connect("1.1")
- subscribe("1", "/queue/BIGQUEUE", "client")
- assert_received("END")(true)
+ subscribe("0", "/topic/udp")
- }
+ val udp_port:Int = connector_port("udp").get
+ val channel = DatagramChannel.open();
-}
+ val target = new InetSocketAddress("127.0.0.1", udp_port)
+ channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
-/**
- * These disconnect tests assure that we don't drop message deliviers that are in flight
- * if a client disconnects before those deliveries are accepted by the target destination.
- */
-class StompDisconnectTest extends StompTestSupport {
+ assert_received("Hello")
+ }
+ /**
+ * These disconnect tests assure that we don't drop message deliviers that are in flight
+ * if a client disconnects before those deliveries are accepted by the target destination.
+ */
test("Messages delivery assured to a queued once a disconnect receipt is received") {
// figure out at what point a quota'ed queue stops accepting more messages.
@@ -685,9 +967,6 @@ class StompDisconnectTest extends StompT
}
}
-}
-
-class StompDestinationTest extends StompTestSupport {
test("APLO-206 - Load balance of job queues using small consumer credit windows") {
connect("1.1")
@@ -711,6 +990,7 @@ class StompDestinationTest extends Stomp
}
test("Browsing queues does not cause AssertionError. Reported in APLO-156") {
+ skip_if_using_store
connect("1.1")
subscribe("0", "/queue/TOOL.DEFAULT")
async_send("/queue/TOOL.DEFAULT", "1")
@@ -734,74 +1014,22 @@ class StompDestinationTest extends Stomp
sync_send("/topic/retained-example", 3)
subscribe("0", "/topic/retained-example")
assert_received(2)
- async_send("/topic/retained-example", 4)
- assert_received(4)
- }
-
- test("retain:remove makes a topic forget the message") {
- connect("1.1")
- async_send("/topic/retained-example2", 1)
- async_send("/topic/retained-example2", 2, "retain:set\n")
- async_send("/topic/retained-example2", 3, "retain:remove\n")
- subscribe("0", "/topic/retained-example2")
- async_send("/topic/retained-example2", 4)
- assert_received(4)
- }
-
- // This is the test case for https://issues.apache.org/jira/browse/APLO-88
- test("ACK then socket close with/without DISCONNECT, should still ACK") {
- for(i <- 1 until 3) {
- connect("1.1")
-
- def send(id:Int) = {
- client.write(
- "SEND\n" +
- "destination:/queue/from-seq-end\n" +
- "message-id:id-"+i+"-"+id+"\n"+
- "receipt:0\n"+
- "\n")
- wait_for_receipt("0")
- }
-
- def get(seq:Long) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should include("message-id:id-"+i+"-"+seq+"\n")
- client.write(
- "ACK\n" +
- "subscription:0\n" +
- "message-id:id-"+i+"-"+seq+"\n" +
- "\n")
- }
-
- send(1)
- send(2)
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/queue/from-seq-end\n" +
- "id:0\n" +
- "ack:client\n"+
- "\n")
- get(1)
- client.write(
- "DISCONNECT\n" +
- "\n")
- client.close
-
- connect("1.1")
- client.write(
- "SUBSCRIBE\n" +
- "destination:/queue/from-seq-end\n" +
- "id:0\n" +
- "ack:client\n"+
- "\n")
- get(2)
- client.close
- }
+ async_send("/topic/retained-example", 4)
+ assert_received(4)
+ }
+
+ test("retain:remove makes a topic forget the message") {
+ connect("1.1")
+ async_send("/topic/retained-example2", 1)
+ async_send("/topic/retained-example2", 2, "retain:set\n")
+ async_send("/topic/retained-example2", 3, "retain:remove\n")
+ subscribe("0", "/topic/retained-example2")
+ async_send("/topic/retained-example2", 4)
+ assert_received(4)
}
test("Setting `from-seq` header to -1 results in subscription starting at end of the queue.") {
+ skip_if_using_store
connect("1.1")
def send(id:Int) = {
@@ -899,6 +1127,7 @@ class StompDestinationTest extends Stomp
}
test("The `from-seq` header can be used to resume delivery from a given point in a queue.") {
+ skip_if_using_store
connect("1.1")
def send(id:Int) = {
@@ -1120,6 +1349,7 @@ class StompDestinationTest extends Stomp
}
test("Queue browsers don't consume the messages") {
+ skip_if_using_store
connect("1.1")
def put(id:Int) = {
@@ -1212,7 +1442,7 @@ class StompDestinationTest extends Stomp
def put(id:Int) = {
client.write(
"SEND\n" +
- "destination:/topic/updates\n" +
+ "destination:/topic/updates1\n" +
"\n" +
"message:"+id+"\n")
}
@@ -1220,7 +1450,7 @@ class StompDestinationTest extends Stomp
client.write(
"SUBSCRIBE\n" +
- "destination:/topic/updates\n" +
+ "destination:/topic/updates1\n" +
"id:0\n" +
"receipt:0\n" +
"\n")
@@ -1247,366 +1477,113 @@ class StompDestinationTest extends Stomp
def put(id:Int) = {
client.write(
"SEND\n" +
- "destination:/topic/updates\n" +
+ "destination:/topic/updates2\n" +
"\n" +
"message:"+id+"\n")
}
client.write(
"SUBSCRIBE\n" +
- "destination:/topic/updates\n" +
- "id:my-sub-name\n" +
- "persistent:true\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
- client.close
-
- // Close him out.. since persistent:true then
- // the topic subscription will be persistent accross client
- // connections.
-
- connect("1.1")
- put(1)
- put(2)
- put(3)
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/updates\n" +
+ "destination:/topic/updates2\n" +
"id:my-sub-name\n" +
"persistent:true\n" +
- "\n")
-
- def get(id:Int) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should include ("subscription:my-sub-name\n")
- frame should endWith regex("\n\nmessage:"+id+"\n")
- }
-
- get(1)
- get(2)
- get(3)
- }
-
- test("Queue and a selector") {
- connect("1.1")
-
- def put(id:Int, color:String) = {
- client.write(
- "SEND\n" +
- "destination:/queue/selected\n" +
- "color:"+color+"\n" +
- "\n" +
- "message:"+id+"\n")
- }
- put(1, "red")
- put(2, "blue")
- put(3, "red")
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/queue/selected\n" +
- "selector:color='red'\n" +
- "id:0\n" +
- "\n")
-
- def get(id:Int) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should endWith regex("\n\nmessage:"+id+"\n")
- }
- get(1)
- get(3)
- }
-
- test("Topic and a selector") {
- connect("1.1")
-
- def put(id:Int, color:String) = {
- client.write(
- "SEND\n" +
- "destination:/topic/selected\n" +
- "color:"+color+"\n" +
- "\n" +
- "message:"+id+"\n")
- }
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/selected\n" +
- "selector:color='red'\n" +
- "id:0\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
-
- put(1, "red")
- put(2, "blue")
- put(3, "red")
-
- def get(id:Int) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should endWith regex("\n\nmessage:"+id+"\n")
- }
- get(1)
- get(3)
- }
-
-
-}
-
-class DurableSubscriptionOnLevelDBTest extends StompTestSupport {
-
- override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
-
- test("Multiple dsubs contain the same messages (Test case for APLO-210)") {
-
- val sub_count = 3
- val message_count = 1000
-
- // establish 3 durable subs..
- connect("1.1")
- for( sub <- 1 to sub_count ) {
- subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true)
- }
- close()
-
- connect("1.1")
-
- val filler = ":"+("x"*(1024*10))
-
- // Now send a bunch of messages....
- for( i <- 1 to message_count ) {
- async_send(dest="/topic/sometopic", headers="persistent:true\n", body=i+filler)
- }
-
- // Empty out the durable durable sub
- for( sub <- 1 to sub_count ) {
- subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true, sync=false)
- for( i <- 1 to message_count ) {
- assert_received(body=i+filler, sub="sub"+sub)
- }
- }
-
- }
-
- test("Can directly send an recieve from a durable sub") {
- connect("1.1")
-
- // establish 2 durable subs..
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/sometopic\n" +
- "id:sub1\n" +
- "persistent:true\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/sometopic\n" +
- "id:sub2\n" +
- "persistent:true\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
-
- client.close
- connect("1.1")
-
- // Now send a bunch of messages....
- // Send only to sub 1
- client.write(
- "SEND\n" +
- "destination:/dsub/sub1\n" +
- "\n" +
- "sub1 msg\n")
-
- // Send to all subs
- client.write(
- "SEND\n" +
- "destination:/topic/sometopic\n" +
- "\n" +
- "LAST\n")
-
-
- // Now try to get all the previously sent messages.
- def get(expected:String) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should endWith("\n\n"+expected)
- }
-
- // Empty out the first durable sub
- client.write(
- "SUBSCRIBE\n" +
- "destination:/dsub/sub1\n" +
- "id:1\n" +
- "\n")
-
- get("sub1 msg\n")
- get("LAST\n")
-
- // Empty out the 2nd durable sub
- client.write(
- "SUBSCRIBE\n" +
- "destination:/dsub/sub2\n" +
- "id:2\n" +
- "\n")
-
- get("LAST\n")
- }
- test("You can connect and then unsubscribe from existing durable sub (APLO-157)") {
- connect("1.1")
- subscribe("APLO-157", "/topic/APLO-157", "auto", true)
- client.close()
-
- // Make sure the durable sub exists.
- connect("1.1")
- sync_send("/topic/APLO-157", "1")
- subscribe("APLO-157", "/topic/APLO-157", "client", true)
- assert_received("1")
- client.close()
-
- // Delete the durable sub..
- connect("1.1")
- unsubscribe("APLO-157", "persistent:true\n")
- client.close()
-
- // Make sure the durable sub does not exists.
- connect("1.1")
- subscribe("APLO-157", "/topic/APLO-157", "client", true)
- async_send("/topic/APLO-157", "2")
- assert_received("2")
- unsubscribe("APLO-157", "persistent:true\n")
-
- }
-
- test("Can create dsubs with dots in them") {
- connect("1.1")
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/sometopic\n" +
- "id:sub.1\n" +
- "persistent:true\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
-
- client.write(
- "SEND\n" +
- "destination:/dsub/sub.1\n" +
"receipt:0\n" +
- "\n" +
- "content\n")
+ "\n")
wait_for_receipt("0")
+ client.close
- }
+ // Close him out.. since persistent:true then
+ // the topic subscription will be persistent accross client
+ // connections.
- test("Duplicate SUBSCRIBE updates durable subscription bindings") {
connect("1.1")
+ put(1)
+ put(2)
+ put(3)
client.write(
"SUBSCRIBE\n" +
- "destination:/topic/a\n" +
- "id:sub1\n" +
+ "destination:/topic/updates2\n" +
+ "id:my-sub-name\n" +
"persistent:true\n" +
- "receipt:0\n" +
"\n")
- wait_for_receipt("0")
- def get(expected:String) = {
+ def get(id:Int) = {
val frame = client.receive()
frame should startWith("MESSAGE\n")
- frame should endWith("\n\n"+expected)
+ frame should include ("subscription:my-sub-name\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
}
- // Validate that the durable sub is bound to /topic/a
- client.write(
- "SEND\n" +
- "destination:/topic/a\n" +
- "\n" +
- "1\n")
- get("1\n")
-
- client.write(
- "UNSUBSCRIBE\n" +
- "id:sub1\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
+ get(1)
+ get(2)
+ get(3)
+ }
- // Switch the durable sub to /topic/b
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/b\n" +
- "id:sub1\n" +
- "persistent:true\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
+ test("Queue and a selector") {
+ connect("1.1")
- // all these should get dropped
- for ( i <- 1 to 500 ) {
+ def put(id:Int, color:String) = {
client.write(
"SEND\n" +
- "destination:/topic/a\n" +
+ "destination:/queue/selected\n" +
+ "color:"+color+"\n" +
"\n" +
- "DROPPED\n")
+ "message:"+id+"\n")
}
+ put(1, "red")
+ put(2, "blue")
+ put(3, "red")
- // Not this one.. it's on the updated topic
client.write(
- "SEND\n" +
- "destination:/topic/b\n" +
- "\n" +
- "2\n")
- get("2\n")
+ "SUBSCRIBE\n" +
+ "destination:/queue/selected\n" +
+ "selector:color='red'\n" +
+ "id:0\n" +
+ "\n")
+ def get(id:Int) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+ get(1)
+ get(3)
}
- test("Direct send to a non-existant a durable sub fails") {
+ test("Topic and a selector") {
connect("1.1")
- client.write(
- "SEND\n" +
- "destination:/dsub/doesnotexist\n" +
- "receipt:0\n" +
- "\n" +
- "content\n")
-
- val frame = client.receive()
- frame should startWith("ERROR\n")
- frame should include("message:The destination does not exist")
- }
-
- test("Direct subscribe to a non-existant a durable sub fails") {
- connect("1.1")
+ def put(id:Int, color:String) = {
+ client.write(
+ "SEND\n" +
+ "destination:/topic/selected\n" +
+ "color:"+color+"\n" +
+ "\n" +
+ "message:"+id+"\n")
+ }
client.write(
"SUBSCRIBE\n" +
- "destination:/dsub/doesnotexist\n" +
- "id:1\n" +
+ "destination:/topic/selected\n" +
+ "selector:color='red'\n" +
+ "id:0\n" +
"receipt:0\n" +
"\n")
+ wait_for_receipt("0")
- val frame = client.receive()
- frame should startWith("ERROR\n")
- frame should include("message:Durable subscription does not exist")
+ put(1, "red")
+ put(2, "blue")
+ put(3, "red")
+ def get(id:Int) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith regex("\n\nmessage:"+id+"\n")
+ }
+ get(1)
+ get(3)
}
-}
-
-class DurableSubscriptionOnBDBTest extends DurableSubscriptionOnLevelDBTest {
- override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
-}
-
-class StompMirroredQueueTest extends StompTestSupport {
test("Topic gets copy of message sent to queue") {
connect("1.1")
@@ -1678,22 +1655,45 @@ class StompMirroredQueueTest extends Sto
get(2)
}
+ def path_separator = "."
-}
+ test("Messages Expire") {
+ connect("1.1")
-class StompSslDestinationTest extends StompDestinationTest {
- override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl.xml"
+ def put(msg:String, ttl:Option[Long]=None) = {
+ val expires_header = ttl.map(t=> "expires:"+(System.currentTimeMillis()+t)+"\n").getOrElse("")
+ client.write(
+ "SEND\n" +
+ expires_header +
+ "destination:/queue/exp\n" +
+ "\n" +
+ "message:"+msg+"\n")
+ }
- val config = new KeyStorageDTO
- config.file = basedir/"src"/"test"/"resources"/"client.ks"
- config.password = "password"
- config.key_password = "password"
+ put("1")
+ put("2", Some(1000L))
+ put("3")
- client.key_storeage = new KeyStorage(config)
+ Thread.sleep(2000)
-}
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/exp\n" +
+ "id:1\n" +
+ "receipt:0\n"+
+ "\n")
+ wait_for_receipt("0")
+
+
+ def get(dest:String) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith("\n\nmessage:%s\n".format(dest))
+ }
-class StompReceiptTest extends StompTestSupport {
+ get("1")
+ get("3")
+ }
test("Receipts on SEND to unconsummed topic") {
connect("1.1")
@@ -1742,11 +1742,9 @@ class StompReceiptTest extends StompTest
put(2)
wait_for_receipt("1")
wait_for_receipt("2")
-
+
}
-}
-class StompTransactionTest extends StompTestSupport {
-
+
test("Transacted commit after unsubscribe"){
val producer = new StompClient
val consumer = new StompClient
@@ -1872,11 +1870,6 @@ class StompTransactionTest extends Stomp
}
-}
-
-
-class StompAckModeTest extends StompTestSupport {
-
test("ack:client redelivers on client disconnect") {
connect("1.1")
@@ -1936,8 +1929,8 @@ class StompAckModeTest extends StompTest
"id:0\n" +
"\n")
get(3)
-
-
+
+
}
@@ -2002,593 +1995,526 @@ class StompAckModeTest extends StompTest
get(1)
get(3)
-
}
-}
-
-class StompSecurityTest extends StompTestSupport {
-
- override def broker_config_uri: String = "xml:classpath:apollo-stomp-secure.xml"
+ test("Temp Queue Send Receive") {
+ connect("1.1")
- override def beforeAll = {
- try {
- val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
- System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
- } catch {
- case x:Throwable => x.printStackTrace
+ def put(msg:String) = {
+ client.write(
+ "SEND\n" +
+ "destination:/temp-queue/test\n" +
+ "reply-to:/temp-queue/test\n" +
+ "receipt:0\n" +
+ "\n" +
+ "message:"+msg+"\n")
+ wait_for_receipt("0")
}
- super.beforeAll
- }
-
- test("Connect with valid id password but can't connect") {
-
- val frame = connect_request("1.1", client,
- "login:can_not_connect\n" +
- "passcode:can_not_connect\n")
- frame should startWith("ERROR\n")
- frame should include("message:Not authorized to connect")
-
- }
-
- test("Connect with no id password") {
- val frame = connect_request("1.1", client)
- frame should startWith("ERROR\n")
- frame should include("message:Authentication failed.")
- }
-
- test("Connect with invalid id password") {
- val frame = connect_request("1.1", client,
- "login:foo\n" +
- "passcode:bar\n")
- frame should startWith("ERROR\n")
- frame should include("message:Authentication failed.")
-
- }
-
- test("Connect with valid id password that can connect") {
- connect("1.1", client,
- "login:can_only_connect\n" +
- "passcode:can_only_connect\n")
-
- }
-
- test("Connector restricted user on the right connector") {
- connect("1.1", client,
- "login:connector_restricted\n" +
- "passcode:connector_restricted\n", "tcp2")
- }
-
- test("Connector restricted user on the wrong connector") {
- val frame = connect_request("1.1", client,
- "login:connector_restricted\n" +
- "passcode:connector_restricted\n", "tcp")
- frame should startWith("ERROR\n")
- frame should include("message:Not authorized to connect to connector 'tcp'.")
- }
-
- test("Send not authorized") {
- connect("1.1", client,
- "login:can_only_connect\n" +
- "passcode:can_only_connect\n")
+
+ put("1")
client.write(
- "SEND\n" +
- "destination:/queue/secure\n" +
- "receipt:0\n" +
- "\n" +
- "Hello Wolrd\n")
+ "SUBSCRIBE\n" +
+ "destination:/temp-queue/test\n" +
+ "id:1\n" +
+ "\n")
- val frame = client.receive()
- frame should startWith("ERROR\n")
- frame should include("message:Not authorized to create the queue")
- }
+ def get(dest:String) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith("\n\nmessage:%s\n".format(dest))
- test("Send authorized but not create") {
- connect("1.1", client,
- "login:can_send_queue\n" +
- "passcode:can_send_queue\n")
+ // extract headers as a map of values.
+ Map((frame.split("\n").reverse.flatMap { line =>
+ if( line.contains(":") ) {
+ val parts = line.split(":", 2)
+ Some((parts(0), parts(1)))
+ } else {
+ None
+ }
+ }):_*)
+ }
- client.write(
+ // The destination and reply-to headers should get updated with actual
+ // Queue names
+ val message = get("1")
+ val actual_temp_dest_name = message.get("destination").get
+ actual_temp_dest_name should startWith("/queue/temp.default.")
+ message.get("reply-to") should be === ( message.get("destination") )
+
+ // Different connection should be able to send a message to the temp destination..
+ var other = new StompClient
+ connect("1.1", other)
+ other.write(
"SEND\n" +
- "destination:/queue/secure\n" +
+ "destination:"+actual_temp_dest_name+"\n" +
"receipt:0\n" +
- "\n" +
- "Hello Wolrd\n")
+ "\n")
+ wait_for_receipt("0", other)
- val frame = client.receive()
+ // First client chould get the message.
+ var frame = client.receive()
+ frame should startWith("MESSAGE\n")
+
+ // But not consume from it.
+ other.write(
+ "SUBSCRIBE\n" +
+ "destination:"+actual_temp_dest_name+"\n" +
+ "id:1\n" +
+ "receipt:0\n" +
+ "\n")
+ frame = other.receive()
frame should startWith("ERROR\n")
- frame should include("message:Not authorized to create the queue")
+ frame should include regex("""message:Not authorized to receive from the temporary destination""")
+ other.close()
+
+ // Check that temp queue is deleted once the client disconnects
+ put("2")
+ expect(true)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
+ client.close();
+ within(10, SECONDS) {
+ expect(false)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
+ }
}
- test("Consume authorized but not create") {
- connect("1.1", client,
- "login:can_consume_queue\n" +
- "passcode:can_consume_queue\n")
+ test("Temp Topic Send Receive") {
+ connect("1.1")
client.write(
"SUBSCRIBE\n" +
- "destination:/queue/secure\n" +
- "id:0\n" +
- "receipt:0\n" +
+ "destination:/temp-topic/test\n" +
+ "id:1\n" +
"\n")
- val frame = client.receive()
- frame should startWith("ERROR\n")
- frame should include("message:Not authorized to create the queue")
- }
+ def get(dest:String) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith("\n\nmessage:%s\n".format(dest))
- test("Send and create authorized") {
- connect("1.1", client,
- "login:can_send_create_queue\n" +
- "passcode:can_send_create_queue\n")
+ // extract headers as a map of values.
+ Map((frame.split("\n").reverse.flatMap { line =>
+ if( line.contains(":") ) {
+ val parts = line.split(":", 2)
+ Some((parts(0), parts(1)))
+ } else {
+ None
+ }
+ }):_*)
+ }
- client.write(
- "SEND\n" +
- "destination:/queue/secure\n" +
- "receipt:0\n" +
- "\n" +
- "Hello Wolrd\n")
+ def put(msg:String) = {
+ client.write(
+ "SEND\n" +
+ "destination:/temp-topic/test\n" +
+ "reply-to:/temp-topic/test\n" +
+ "receipt:0\n" +
+ "\n" +
+ "message:"+msg+"\n")
+ wait_for_receipt("0", client)
+ }
+ put("1")
- wait_for_receipt("0")
+ // The destination and reply-to headers should get updated with actual
+ // Queue names
+ val message = get("1")
+ val actual_temp_dest_name = message.get("destination").get
+ actual_temp_dest_name should startWith("/topic/temp.default.")
+ message.get("reply-to") should be === ( message.get("destination") )
- }
+ // Different connection should be able to send a message to the temp destination..
+ var other = new StompClient
+ connect("1.1", other)
+ other.write(
+ "SEND\n" +
+ "destination:"+actual_temp_dest_name+"\n" +
+ "receipt:0\n" +
+ "\n")
+ wait_for_receipt("0", other)
- test("Send and create authorized via id_regex") {
- connect("1.1", client,
- "login:guest\n" +
- "passcode:guest\n")
+ // First client chould get the message.
+ var frame = client.receive()
+ frame should startWith("MESSAGE\n")
- client.write(
- "SEND\n" +
- "destination:/queue/testblah\n" +
+ // But not consume from it.
+ other.write(
+ "SUBSCRIBE\n" +
+ "destination:"+actual_temp_dest_name+"\n" +
+ "id:1\n" +
"receipt:0\n" +
- "\n" +
- "Hello Wolrd\n")
+ "\n")
+ frame = other.receive()
+ frame should startWith("ERROR\n")
+ frame should include regex("""message:Not authorized to receive from the temporary destination""")
+ other.close()
- wait_for_receipt("0")
+ // Check that temp queue is deleted once the client disconnects
+ put("2")
+ expect(true)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
+ client.close();
- client.write(
- "SEND\n" +
- "destination:/queue/notmatch\n" +
- "receipt:1\n" +
- "\n" +
- "Hello Wolrd\n")
+ within(10, SECONDS) {
+ expect(false)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
+ }
- val frame = client.receive()
- frame should startWith("ERROR\n")
- frame should include("message:Not authorized to create the queue")
- }
- test("Can send and once created") {
+ }
- // Now try sending with the lower access id.
- connect("1.1", client,
- "login:can_send_queue\n" +
- "passcode:can_send_queue\n")
+ test("Odd reply-to headers do not cause errors") {
+ connect("1.1")
client.write(
"SEND\n" +
- "destination:/queue/secure\n" +
+ "destination:/queue/oddrepyto\n" +
+ "reply-to:sms:8139993334444\n" +
"receipt:0\n" +
- "\n" +
- "Hello Wolrd\n")
-
+ "\n")
wait_for_receipt("0")
- }
-
- test("Consume not authorized") {
- connect("1.1", client,
- "login:can_only_connect\n" +
- "passcode:can_only_connect\n")
-
client.write(
"SUBSCRIBE\n" +
- "destination:/queue/secure\n" +
- "id:0\n" +
- "receipt:0\n" +
+ "destination:/queue/oddrepyto\n" +
+ "id:1\n" +
"\n")
val frame = client.receive()
- frame should startWith("ERROR\n")
- frame should include("message:Not authorized to consume from the queue")
- }
-
- test("Consume authorized and JMSXUserID is set on message") {
- connect("1.1", client,
- "login:can_send_create_consume_queue\n" +
- "passcode:can_send_create_consume_queue\n")
-
- subscribe("0","/queue/sendsid")
- async_send("/queue/sendsid", "hello")
-
- val frame = client.receive()
frame should startWith("MESSAGE\n")
- frame should include("JMSXUserID:can_send_create_consume_queue\n")
- frame should include("sender-ip:127.0.0.1\n")
+ frame should include("reply-to:sms:8139993334444\n")
}
-}
-
-class StompSslSecurityTest extends StompTestSupport {
- override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl-secure.xml"
+ test("NACKing moves messages to DLQ (non-persistent)") {
+ connect("1.1")
+ sync_send("/queue/nacker.a", "this msg is not persistent")
- override def beforeAll = {
- // System.setProperty("javax.net.debug", "all")
- try {
- val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
- System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
- } catch {
- case x:Throwable => x.printStackTrace
- }
- super.beforeAll
- }
+ subscribe("0", "/queue/nacker.a", "client", false, "", false)
+ subscribe("dlq", "/queue/dlq.nacker.a", "auto", false, "", false)
+ var ack = assert_received("this msg is not persistent", "0")
+ ack(false)
+ ack = assert_received("this msg is not persistent", "0")
+ ack(false)
- def use_client_cert = {
- val config = new KeyStorageDTO
- config.file = basedir/"src"/"test"/"resources"/"client.ks"
- config.password = "password"
- config.key_password = "password"
- client.key_storeage = new KeyStorage(config)
+ // It should be sent to the DLQ after the 2nd nak
+ assert_received("this msg is not persistent", "dlq")
}
- test("Connect with cert and no id password") {
- use_client_cert
- connect("1.1", client)
+ test("NACKing moves messages to DLQ (persistent)") {
+ connect("1.1")
+ sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
+
+ subscribe("0", "/queue/nacker.b", "client", false, "", false)
+ subscribe("dlq", "/queue/dlq.nacker.b", "auto", false, "", false)
+ var ack = assert_received("this msg is persistent", "0")
+ ack(false)
+ ack = assert_received("this msg is persistent", "0")
+ ack(false)
+
+ // It should be sent to the DLQ after the 2nd nak
+ assert_received("this msg is persistent", "dlq")
}
-}
+ test("NACKing without DLQ consumer (persistent)"){
+ connect("1.1")
+ sync_send("/queue/nacker.c", "this msg is persistent", "persistent:true\n")
-class StompWildcardTest extends StompTestSupport {
+ subscribe("0", "/queue/nacker.c", "client", false, "", false)
- def path_separator = "."
+ var ack = assert_received("this msg is persistent", "0")
+ ack(false)
+ ack = assert_received("this msg is persistent", "0")
+ ack(false)
+ Thread.sleep(1000)
+ }
- test("Wildcard subscription") {
- connect("1.1")
- client.write(
- "SUBSCRIBE\n" +
- "destination:/queue/foo"+path_separator+"*\n" +
- "id:1\n" +
- "receipt:0\n"+
- "\n")
+}
+class StompLevelDBParallelTest extends StompParallelTest with BrokerParallelTestExecution {
- wait_for_receipt("0")
+ override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
- def put(dest:String) = {
- client.write(
- "SEND\n" +
- "destination:/queue/"+dest+"\n" +
- "\n" +
- "message:"+dest+"\n")
+ test("(APLO-198) Apollo sometimes does not send all the messages in a queue") {
+ skip_if_using_store
+ connect("1.1")
+ for( i <- 0 until 10000 ) {
+ async_send("/queue/BIGQUEUE", "message #"+i)
}
+ sync_send("/queue/BIGQUEUE", "END")
+ client.close
- def get(dest:String) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should endWith("\n\nmessage:%s\n".format(dest))
+ var counter = 0
+ for( i <- 0 until 100 ) {
+ connect("1.1")
+ subscribe("1", "/queue/BIGQUEUE", "client", false, "", false)
+ for( j <- 0 until 100 ) {
+ assert_received("message #"+counter)(true)
+ counter+=1
+ }
+ client.write(
+ "DISCONNECT\n" +
+ "receipt:disco\n" +
+ "\n")
+ wait_for_receipt("disco", client, true)
+ client.close
+ within(2, SECONDS) {
+ val status = queue_status("BIGQUEUE")
+ status.consumers.size() should be(0)
+ }
}
- // We should not get this one..
- put("bar"+path_separator+"a")
-
- put("foo"+path_separator+"a")
- get("foo"+path_separator+"a")
+ connect("1.1")
+ subscribe("1", "/queue/BIGQUEUE", "client")
+ assert_received("END")(true)
- put("foo"+path_separator+"b")
- get("foo"+path_separator+"b")
}
-}
-class CustomStompWildcardTest extends StompWildcardTest {
- override def broker_config_uri: String = "xml:classpath:apollo-stomp-custom-dest-delimiters.xml"
- override def path_separator = "/"
-}
+ test("Multiple dsubs contain the same messages (Test case for APLO-210)") {
+ skip_if_using_store
-class StompExpirationTest extends StompTestSupport {
+ val sub_count = 3
+ val message_count = 1000
- def path_separator = "."
+ // establish 3 durable subs..
+ connect("1.1")
+ for( sub <- 1 to sub_count ) {
+ subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true)
+ }
+ close()
- test("Messages Expire") {
connect("1.1")
- def put(msg:String, ttl:Option[Long]=None) = {
- val expires_header = ttl.map(t=> "expires:"+(System.currentTimeMillis()+t)+"\n").getOrElse("")
- client.write(
- "SEND\n" +
- expires_header +
- "destination:/queue/exp\n" +
- "\n" +
- "message:"+msg+"\n")
+ val filler = ":"+("x"*(1024*10))
+
+ // Now send a bunch of messages....
+ for( i <- 1 to message_count ) {
+ async_send(dest="/topic/sometopic", headers="persistent:true\n", body=i+filler)
}
- put("1")
- put("2", Some(1000L))
- put("3")
+ // Empty out the durable durable sub
+ for( sub <- 1 to sub_count ) {
+ subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true, sync=false)
+ for( i <- 1 to message_count ) {
+ assert_received(body=i+filler, sub="sub"+sub)
+ }
+ }
- Thread.sleep(2000)
+ }
+
+ test("Can directly send an recieve from a durable sub") {
+ skip_if_using_store
+ connect("1.1")
+ // establish 2 durable subs..
client.write(
"SUBSCRIBE\n" +
- "destination:/queue/exp\n" +
- "id:1\n" +
- "receipt:0\n"+
+ "destination:/topic/sometopic\n" +
+ "id:sub1\n" +
+ "persistent:true\n" +
+ "receipt:0\n" +
"\n")
wait_for_receipt("0")
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/topic/sometopic\n" +
+ "id:sub2\n" +
+ "persistent:true\n" +
+ "receipt:0\n" +
+ "\n")
+ wait_for_receipt("0")
- def get(dest:String) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should endWith("\n\nmessage:%s\n".format(dest))
- }
-
- get("1")
- get("3")
- }
-}
+ client.close
+ connect("1.1")
-class StompTempDestinationTest extends StompTestSupport {
+ // Now send a bunch of messages....
+ // Send only to sub 1
+ client.write(
+ "SEND\n" +
+ "destination:/dsub/sub1\n" +
+ "\n" +
+ "sub1 msg\n")
- def path_separator = "."
+ // Send to all subs
+ client.write(
+ "SEND\n" +
+ "destination:/topic/sometopic\n" +
+ "\n" +
+ "LAST\n")
- test("Temp Queue Send Receive") {
- connect("1.1")
- def put(msg:String) = {
- client.write(
- "SEND\n" +
- "destination:/temp-queue/test\n" +
- "reply-to:/temp-queue/test\n" +
- "receipt:0\n" +
- "\n" +
- "message:"+msg+"\n")
- wait_for_receipt("0")
+ // Now try to get all the previously sent messages.
+ def get(expected:String) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith("\n\n"+expected)
}
- put("1")
-
+ // Empty out the first durable sub
client.write(
"SUBSCRIBE\n" +
- "destination:/temp-queue/test\n" +
+ "destination:/dsub/sub1\n" +
"id:1\n" +
"\n")
- def get(dest:String) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should endWith("\n\nmessage:%s\n".format(dest))
+ get("sub1 msg\n")
+ get("LAST\n")
- // extract headers as a map of values.
- Map((frame.split("\n").reverse.flatMap { line =>
- if( line.contains(":") ) {
- val parts = line.split(":", 2)
- Some((parts(0), parts(1)))
- } else {
- None
- }
- }):_*)
- }
+ // Empty out the 2nd durable sub
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/dsub/sub2\n" +
+ "id:2\n" +
+ "\n")
- // The destination and reply-to headers should get updated with actual
- // Queue names
- val message = get("1")
- val actual_temp_dest_name = message.get("destination").get
- actual_temp_dest_name should startWith("/queue/temp.default.")
- message.get("reply-to") should be === ( message.get("destination") )
+ get("LAST\n")
+ }
+ test("You can connect and then unsubscribe from existing durable sub (APLO-157)") {
+ skip_if_using_store
+ connect("1.1")
+ subscribe("APLO-157", "/topic/APLO-157", "auto", true)
+ client.close()
- // Different connection should be able to send a message to the temp destination..
- var other = new StompClient
- connect("1.1", other)
- other.write(
- "SEND\n" +
- "destination:"+actual_temp_dest_name+"\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0", other)
+ // Make sure the durable sub exists.
+ connect("1.1")
+ sync_send("/topic/APLO-157", "1")
+ subscribe("APLO-157", "/topic/APLO-157", "client", true)
+ assert_received("1")
+ client.close()
- // First client chould get the message.
- var frame = client.receive()
- frame should startWith("MESSAGE\n")
+ // Delete the durable sub..
+ connect("1.1")
+ unsubscribe("APLO-157", "persistent:true\n")
+ client.close()
- // But not consume from it.
- other.write(
+ // Make sure the durable sub does not exists.
+ connect("1.1")
+ subscribe("APLO-157", "/topic/APLO-157", "client", true)
+ async_send("/topic/APLO-157", "2")
+ assert_received("2")
+ unsubscribe("APLO-157", "persistent:true\n")
+
+ }
+
+ test("Can create dsubs with dots in them") {
+ connect("1.1")
+
+ client.write(
"SUBSCRIBE\n" +
- "destination:"+actual_temp_dest_name+"\n" +
- "id:1\n" +
+ "destination:/topic/sometopic\n" +
+ "id:sub.1\n" +
+ "persistent:true\n" +
"receipt:0\n" +
"\n")
- frame = other.receive()
- frame should startWith("ERROR\n")
- frame should include regex("""message:Not authorized to receive from the temporary destination""")
- other.close()
+ wait_for_receipt("0")
- // Check that temp queue is deleted once the client disconnects
- put("2")
- expect(true)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
- client.close();
+ client.write(
+ "SEND\n" +
+ "destination:/dsub/sub.1\n" +
+ "receipt:0\n" +
+ "\n" +
+ "content\n")
+ wait_for_receipt("0")
- within(10, SECONDS) {
- expect(false)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
- }
}
- test("Temp Topic Send Receive") {
+ test("Duplicate SUBSCRIBE updates durable subscription bindings") {
+ skip_if_using_store
connect("1.1")
client.write(
"SUBSCRIBE\n" +
- "destination:/temp-topic/test\n" +
- "id:1\n" +
+ "destination:/topic/a\n" +
+ "id:sub1\n" +
+ "persistent:true\n" +
+ "receipt:0\n" +
"\n")
+ wait_for_receipt("0")
- def get(dest:String) = {
+ def get(expected:String) = {
val frame = client.receive()
frame should startWith("MESSAGE\n")
- frame should endWith("\n\nmessage:%s\n".format(dest))
-
- // extract headers as a map of values.
- Map((frame.split("\n").reverse.flatMap { line =>
- if( line.contains(":") ) {
- val parts = line.split(":", 2)
- Some((parts(0), parts(1)))
- } else {
- None
- }
- }):_*)
- }
-
- def put(msg:String) = {
- client.write(
- "SEND\n" +
- "destination:/temp-topic/test\n" +
- "reply-to:/temp-topic/test\n" +
- "receipt:0\n" +
- "\n" +
- "message:"+msg+"\n")
- wait_for_receipt("0", client)
+ frame should endWith("\n\n"+expected)
}
- put("1")
-
- // The destination and reply-to headers should get updated with actual
- // Queue names
- val message = get("1")
- val actual_temp_dest_name = message.get("destination").get
- actual_temp_dest_name should startWith("/topic/temp.default.")
- message.get("reply-to") should be === ( message.get("destination") )
- // Different connection should be able to send a message to the temp destination..
- var other = new StompClient
- connect("1.1", other)
- other.write(
+ // Validate that the durable sub is bound to /topic/a
+ client.write(
"SEND\n" +
- "destination:"+actual_temp_dest_name+"\n" +
+ "destination:/topic/a\n" +
+ "\n" +
+ "1\n")
+ get("1\n")
+
+ client.write(
+ "UNSUBSCRIBE\n" +
+ "id:sub1\n" +
"receipt:0\n" +
"\n")
- wait_for_receipt("0", other)
-
- // First client chould get the message.
- var frame = client.receive()
- frame should startWith("MESSAGE\n")
+ wait_for_receipt("0")
- // But not consume from it.
- other.write(
+ // Switch the durable sub to /topic/b
+ client.write(
"SUBSCRIBE\n" +
- "destination:"+actual_temp_dest_name+"\n" +
- "id:1\n" +
+ "destination:/topic/b\n" +
+ "id:sub1\n" +
+ "persistent:true\n" +
"receipt:0\n" +
"\n")
- frame = other.receive()
- frame should startWith("ERROR\n")
- frame should include regex("""message:Not authorized to receive from the temporary destination""")
- other.close()
-
- // Check that temp queue is deleted once the client disconnects
- put("2")
- expect(true)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
- client.close();
+ wait_for_receipt("0")
- within(10, SECONDS) {
- expect(false)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
+ // all these should get dropped
+ for ( i <- 1 to 500 ) {
+ client.write(
+ "SEND\n" +
+ "destination:/topic/a\n" +
+ "\n" +
+ "DROPPED\n")
}
+ // Not this one.. it's on the updated topic
+ client.write(
+ "SEND\n" +
+ "destination:/topic/b\n" +
+ "\n" +
+ "2\n")
+ get("2\n")
}
-
- test("Odd reply-to headers do not cause errors") {
+ test("Direct send to a non-existant a durable sub fails") {
connect("1.1")
client.write(
"SEND\n" +
- "destination:/queue/oddrepyto\n" +
- "reply-to:sms:8139993334444\n" +
+ "destination:/dsub/doesnotexist\n" +
"receipt:0\n" +
- "\n")
- wait_for_receipt("0")
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/queue/oddrepyto\n" +
- "id:1\n" +
- "\n")
+ "\n" +
+ "content\n")
val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should include("reply-to:sms:8139993334444\n")
+ frame should startWith("ERROR\n")
+ frame should include("message:The destination does not exist")
}
-}
-
-class StompUdpInteropTest extends StompTestSupport {
- test("UDP to STOMP interop") {
-
+ test("Direct subscribe to a non-existant a durable sub fails") {
connect("1.1")
- subscribe("0", "/topic/udp")
-
- val udp_port:Int = connector_port("udp").get
- val channel = DatagramChannel.open();
-
- val target = new InetSocketAddress("127.0.0.1", udp_port)
- channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
-
- assert_received("Hello")
- }
-}
-class StompNackTest extends StompTestSupport {
-
- test("NACKing moves messages to DLQ (non-persistent)") {
- connect("1.1")
- sync_send("/queue/nacker.a", "this msg is not persistent")
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/dsub/doesnotexist\n" +
+ "id:1\n" +
+ "receipt:0\n" +
+ "\n")
- subscribe("0", "/queue/nacker.a", "client", false, "", false)
- subscribe("dlq", "/queue/dlq.nacker.a", "auto", false, "", false)
- var ack = assert_received("this msg is not persistent", "0")
- ack(false)
- ack = assert_received("this msg is not persistent", "0")
- ack(false)
+ val frame = client.receive()
+ frame should startWith("ERROR\n")
+ frame should include("message:Durable subscription does not exist")
- // It should be sent to the DLQ after the 2nd nak
- assert_received("this msg is not persistent", "dlq")
}
- test("NACKing moves messages to DLQ (persistent)") {
- connect("1.1")
- sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
-
- subscribe("0", "/queue/nacker.b", "client", false, "", false)
- subscribe("dlq", "/queue/dlq.nacker.b", "auto", false, "", false)
- var ack = assert_received("this msg is persistent", "0")
- ack(false)
- ack = assert_received("this msg is persistent", "0")
- ack(false)
-
- // It should be sent to the DLQ after the 2nd nak
- assert_received("this msg is persistent", "dlq")
- }
}
-
-class StompNackTestOnLevelDBTest extends StompNackTest {
- override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
-
- test("NACKing without DLQ consumer (persistent)"){
- connect("1.1")
- sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
-
- subscribe("0", "/queue/nacker.b", "client", false, "", false)
-
- var ack = assert_received("this msg is persistent", "0")
- ack(false)
- ack = assert_received("this msg is persistent", "0")
- ack(false)
- Thread.sleep(1000)
- }
+class StompBDBParallelTest extends StompLevelDBParallelTest {
+ override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
}
-class StompDropPolicyTest extends StompTestSupport {
+class StompDropPolicyTest extends StompTestSupport with BrokerParallelTestExecution {
override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
@@ -2599,7 +2525,10 @@ class StompDropPolicyTest extends StompT
sync_send("/queue/drop.head.persistent", "%0100d".format(i))
}
subscribe("0", "/queue/drop.head.persistent")
- for(i <- 446 until 1000) {
+
+ val initial = client.receive().split("\n").last.toInt
+ initial should be > ( 100 )
+ for(i <- (initial+1) until 1000) {
assert_received("%0100d".format(i))
}
}
@@ -2611,7 +2540,9 @@ class StompDropPolicyTest extends StompT
sync_send("/queue/drop.head.non", "%0100d".format(i))
}
subscribe("0", "/queue/drop.head.non")
- for(i <- 427 until 1000) {
+ val initial = client.receive().split("\n").last.toInt
+ initial should be > ( 100 )
+ for(i <- (initial+1) until 1000) {
assert_received("%0100d".format(i))
}
}
@@ -2654,4 +2585,49 @@ class StompDropPolicyTest extends StompT
async_send("/queue/drop.tail.non", "end")
assert_received("end")
}
-}
\ No newline at end of file
+}
+
+class StompWildcardParallelTest extends StompTestSupport with BrokerParallelTestExecution {
+
+ def path_separator = "."
+
+ test("Wildcard subscription") {
+ connect("1.1")
+
+ client.write(
+ "SUBSCRIBE\n" +
+ "destination:/queue/foo"+path_separator+"*\n" +
+ "id:1\n" +
+ "receipt:0\n"+
+ "\n")
+
+ wait_for_receipt("0")
+
+ def put(dest:String) = {
+ client.write(
+ "SEND\n" +
+ "destination:/queue/"+dest+"\n" +
+ "\n" +
+ "message:"+dest+"\n")
+ }
+
+ def get(dest:String) = {
+ val frame = client.receive()
+ frame should startWith("MESSAGE\n")
+ frame should endWith("\n\nmessage:%s\n".format(dest))
+ }
+
+ // We should not get this one..
+ put("bar"+path_separator+"a")
+
+ put("foo"+path_separator+"a")
+ get("foo"+path_separator+"a")
+
+ put("foo"+path_separator+"b")
+ get("foo"+path_separator+"b")
+ }
+}
+class StompWildcardCustomParallelTest extends StompWildcardParallelTest {
+ override def broker_config_uri: String = "xml:classpath:apollo-stomp-custom-dest-delimiters.xml"
+ override def path_separator = "/"
+}
Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala Fri Jul 27 16:04:01 2012
@@ -28,6 +28,22 @@ package object util {
type FutureResult[T] = Future[Result[T, Throwable]]
+ object FutureResult {
+
+ implicit def wrap_future_result[T](value:T):FutureResult[T] = {
+ val rc = FutureResult[T]()
+ rc.apply(Success(value))
+ rc
+ }
+
+ implicit def unwrap_future_result[T](value:FutureResult[T]):T = {
+ value.await() match {
+ case Success(value) => value
+ case Failure(value) => throw value
+ }
+ }
+ }
+
def FutureResult[T]() = Future[Result[T, Throwable]]()
def FutureResult[T](value:Result[T, Throwable]) = {
@@ -58,19 +74,6 @@ package object util {
}
}
- implicit def wrap_future_result[T](value:T):FutureResult[T] = {
- val rc = FutureResult[T]()
- rc.apply(Success(value))
- rc
- }
-
- implicit def unwrap_future_result[T](value:FutureResult[T]):T = {
- value.await() match {
- case Success(value) => value
- case Failure(value) => throw value
- }
- }
-
def sync_cb[T](func: (T=>Unit)=>Unit ) = {
var rc:Option[T] = null
val cd = new CountDownLatch(1)
Modified: activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala Fri Jul 27 16:04:01 2012
@@ -17,20 +17,28 @@
package org.apache.activemq.apollo.util
-import _root_.org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
import java.io.File
import java.lang.String
import collection.immutable.Map
import org.scalatest._
import java.util.concurrent.TimeUnit
import FileSupport._
+import scala.Some
+import org.apache.activemq.apollo.util.FunSuiteSupport._
+import java.util.concurrent.locks.{ReentrantReadWriteLock, Lock, ReadWriteLock}
+
+object FunSuiteSupport {
+ class SkipTestException extends RuntimeException
+ val parallel_test_class_lock = new ReentrantReadWriteLock()
+}
/**
* @version $Revision : 1.1 $
*/
-@RunWith(classOf[JUnitRunner])
-abstract class FunSuiteSupport extends FunSuite with Logging with BeforeAndAfterAll {
+@RunWith(classOf[org.scalatest.junit.ParallelJUnitRunner])
+abstract class FunSuiteSupport extends FunSuite with Logging with ParallelBeforeAndAfterAll {
+
protected var _basedir = try {
var file = new File(getClass.getProtectionDomain.getCodeSource.getLocation.getFile)
file = (file / ".." / "..").getCanonicalFile
@@ -44,6 +52,18 @@ abstract class FunSuiteSupport extends F
"."
}
+ def skip(check:Boolean=true):Unit = if(check) throw new SkipTestException()
+
+ override protected def test(testName: String, testTags: Tag*)(testFun: => Unit) {
+ super.test(testName, testTags:_*) {
+ try {
+ testFun
+ } catch {
+ case e:SkipTestException =>
+ }
+ }
+ }
+
/**
* Returns the base directory of the current project
*/
@@ -52,9 +72,22 @@ abstract class FunSuiteSupport extends F
/**
* Returns ${basedir}/target/test-data
*/
- def test_data_dir = basedir / "target"/ "test-data"
+ def test_data_dir = basedir / "target"/ "test-data" / (getClass.getName)
+
+ /**
+ * Can this test class run in parallel with other
+ * test classes.
+ * @return
+ */
+ def is_parallel_test_class = true
override protected def beforeAll(map: Map[String, Any]): Unit = {
+ if ( is_parallel_test_class ) {
+ parallel_test_class_lock.readLock().lock()
+ } else {
+ parallel_test_class_lock.writeLock().lock()
+ }
+
_basedir = map.get("basedir") match {
case Some(basedir) =>
basedir.toString
@@ -67,10 +100,37 @@ abstract class FunSuiteSupport extends F
super.beforeAll(map)
}
+ override protected def afterAll(configMap: Map[String, Any]) {
+ try {
+ super.afterAll(configMap)
+ } finally {
+ if ( is_parallel_test_class ) {
+ parallel_test_class_lock.readLock().unlock()
+ } else {
+ parallel_test_class_lock.writeLock().unlock()
+ }
+ }
+ }
+
+
//
// Allows us to get the current test name.
//
+ /**
+ * Defines a method (that takes a <code>configMap</code>) to be run after
+ * all of this suite's tests and nested suites have been run.
+ *
+ * <p>
+ * This trait's implementation
+ * of <code>run</code> invokes this method after executing all tests
+ * and nested suites (passing in the <code>configMap</code> passed to it), thus this
+ * method can be used to tear down a test fixture
+ * needed by the entire suite. This trait's implementation of this method invokes the
+ * overloaded form of <code>afterAll</code> that takes no <code>configMap</code>.
+ * </p>
+ */
+
val _testName = new ThreadLocal[String]();
def testName = _testName.get
@@ -81,6 +141,7 @@ abstract class FunSuiteSupport extends F
super.runTest(testName, reporter, stopper, configMap, tracker)
} finally {
_testName.remove
+
}
}