You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/02 23:34:22 UTC
svn commit: r1130827 - in
/activemq/activemq-apollo/trunk/apollo-openwire/src:
main/scala/org/apache/activemq/apollo/openwire/ test/resources/
test/scala/org/apache/activemq/apollo/openwire/
Author: tabish
Date: Thu Jun 2 21:34:21 2011
New Revision: 1130827
URL: http://svn.apache.org/viewvc?rev=1130827&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30
Enable exclusive consumers, adds new tests and updates the openwire test resources a bit.
Added:
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala
Removed:
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml
activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1130827&r1=1130826&r2=1130827&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Jun 2 21:34:21 2011
@@ -157,7 +157,7 @@ class OpenwireProtocolHandler extends Pr
outbound_sessions = new SinkMux[Command](connection.transport_sink.map {
x:Command =>
x.setCommandId(next_command_id)
- info("sending frame: %s", x)
+ debug("sending frame: %s", x.toString)
x
}, dispatchQueue, OpenwireCodec)
connection_session = new OverflowSink(outbound_sessions.open(dispatchQueue));
@@ -214,7 +214,7 @@ class OpenwireProtocolHandler extends Pr
}
try {
current_command = command
- println("received: %s", command)
+ trace("received: %s", command)
if (wire_format == null) {
command match {
case codec: OpenwireCodec =>
@@ -444,6 +444,9 @@ class OpenwireProtocolHandler extends Pr
var selector_expression:BooleanExpression = _
var destination:Array[DestinationDTO] = _
+ override def exclusive = info.isExclusive
+ override def browser = info.isBrowser
+
def attach = {
if( info.getDestination == null ) fail("destination was not set")
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml?rev=1130827&r1=1130826&r2=1130827&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml Thu Jun 2 21:34:21 2011
@@ -23,6 +23,6 @@
<host_name>localhost</host_name>
</virtual_host>
- <connector id="tcp" protocol="openwire" bind="tcp://[::]:0"/>
+ <connector id="tcp" protocol="openwire" bind="tcp://0.0.0.0:0"/>
</broker>
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties?rev=1130827&r1=1130826&r2=1130827&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties Thu Jun 2 21:34:21 2011
@@ -16,20 +16,31 @@
## ---------------------------------------------------------------------------
#
-# The logging properties used during tests..
+# Setup the default logging levels
#
-log4j.rootLogger=WARN, console, file
-log4j.logger.org.apache.activemq=TRACE
+log4j.rootLogger=WARN, console, logfile
+log4j.logger.org.apache.activemq.apollo=INFO
-# Console will only display warnnings
+#
+# Uncomment one of the following to enable debug logging
+#
+# log4j.logger.org.apache.activemq.apollo=DEBUG
+# log4j.logger.org.apache.activemq.apollo.broker=DEBUG
+# log4j.logger.org.apache.activemq.apollo.web=DEBUG
+# log4j.logger.org.apache.activemq.apollo.cli=DEBUG
+# log4j.logger.org.apache.activemq.apollo.broker.store.hawtdb=DEBUG
+
+# Console Settings
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
-log4j.appender.console.threshold=TRACE
+log4j.appender.console.layout.ConversionPattern=%-5p | %m%n
+log4j.appender.console.threshold=INFO
-# File appender will contain all info messages
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
-log4j.appender.file.file=target/test.log
-log4j.appender.file.append=true
+# File Settings
+log4j.appender.logfile=org.apache.log4j.RollingFileAppender
+log4j.appender.logfile.file=${apollo.base}/log/apollo.log
+log4j.appender.logfile.maxFileSize=5MB
+log4j.appender.logfile.maxBackupIndex=5
+log4j.appender.logfile.append=true
+log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
+log4j.appender.logfile.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala Thu Jun 2 21:34:21 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.{TextMessage, Session}
+
+class DurableSubscriberTest extends OpenwireTestSupport {
+
+ test("Topic /w Durable sub retains messages.") {
+
+ def create_durable_sub() {
+ val connection = connect(false)
+ connection.setClientID("test")
+ connection.start()
+ val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val subscriber = session.createDurableSubscriber(topic("example"), "test")
+ session.close()
+ connection.close()
+ if (default_connection == connection) {
+ default_connection = null
+ }
+ }
+
+ create_durable_sub()
+
+ connect(false)
+ default_connection.setClientID("test")
+ default_connection.start()
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(topic("example"))
+ def put(id:Int) {
+ producer.send(session.createTextMessage("message:"+id))
+ }
+
+ List(1,2,3).foreach(put _)
+
+ val subscriber = session.createDurableSubscriber(topic("example"), "test")
+
+ def get(id:Int) {
+ val m = subscriber.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(topic("example"))
+ m.getText should equal ("message:"+id)
+ }
+
+ List(1,2,3).foreach(get _)
+ }
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala Thu Jun 2 21:34:21 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.{TextMessage, Session}
+import org.apache.activemq.apollo.openwire.command.ActiveMQQueue
+
+class ExclusiveConsumerTest extends OpenwireTestSupport {
+
+ test("Exclusive Consumer Selected when created first") {
+
+ connect();
+
+ val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+ val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+ val msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(100) should not be(null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Exclusive Consumer Selected when Created After Non-Exclusive Consumer") {
+
+ connect();
+
+ val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+ val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+ val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+ val msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(100) should not be(null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Failover To Another Exclusive Consumer Created First") {
+ connect();
+
+ val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+ val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+ val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+ val msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer1.receive(100) should not be(null)
+ exclusiveConsumer2.receive(100) should be(null)
+ fallbackConsumer.receive(100) should be(null)
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer1.close()
+
+ producer.send(msg);
+ producer.send(msg);
+
+ exclusiveConsumer2.receive(100) should not be(null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Failover To Another Exclusive Consumer Created After a non-exclusive Consumer") {
+
+ connect();
+
+ val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+ val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+ val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+ val msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer1.receive(100) should not be(null)
+ exclusiveConsumer2.receive(100) should be(null)
+ fallbackConsumer.receive(100) should be(null)
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer1.close()
+
+ producer.send(msg);
+ producer.send(msg);
+
+ exclusiveConsumer2.receive(100) should not be(null)
+ fallbackConsumer.receive(100) should be(null)
+ }
+
+ test("Failover To NonExclusive Consumer") {
+
+ connect();
+
+ val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+ val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+ val msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(100) should not be(null)
+ fallbackConsumer.receive(100) should be(null)
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer.close()
+
+ producer.send(msg);
+ fallbackConsumer.receive(100) should not be(null)
+ }
+
+ test("Fallback To Exclusive Consumer") {
+ connect();
+
+ val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ var exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+ val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+ val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+ val msg = senderSession.createTextMessage("test");
+ producer.send(msg);
+
+ Thread.sleep(100);
+
+ // Verify exclusive consumer receives the message.
+ assert(exclusiveConsumer.receive(100) != null, "The exclusive consumer should have got a Message");
+ assert(fallbackConsumer.receive(100) == null, "The non-exclusive consumer shouldn't have a message");
+
+ // Close the exclusive consumer to verify the non-exclusive consumer
+ // takes over
+ exclusiveConsumer.close()
+
+ producer.send(msg)
+ assert(fallbackConsumer.receive(100) != null, "The non-exclusive consumer should have a message");
+
+ // Create exclusive consumer to determine if it will start receiving
+ // the messages.
+ exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+
+ producer.send(msg)
+
+ // Verify exclusive consumer receives the message.
+ exclusiveConsumer.receive(100) should not be(null)
+// fallbackConsumer.receive(100) should be(null)
+ }
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala Thu Jun 2 21:34:21 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import java.lang.String
+import org.apache.activemq.apollo.broker.{KeyStorage, Broker, BrokerFactory}
+import org.apache.activemq.apollo.util.{FileSupport, Logging, FunSuiteSupport, ServiceControl}
+import FileSupport._
+import javax.jms.Connection
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
+
+class OpenwireTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach with Logging {
+ var broker: Broker = null
+ var port = 0
+
+ val broker_config_uri = "xml:classpath:apollo-openwire.xml"
+
+ override protected def beforeAll() {
+ info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
+ broker = BrokerFactory.createBroker(broker_config_uri)
+ ServiceControl.start(broker, "Starting broker")
+ port = broker.get_socket_address.getPort
+ }
+
+ var default_connection:Connection = _
+ var connections = List[Connection]()
+
+ override protected def afterAll() {
+ broker.stop()
+ }
+
+ override protected def afterEach() {
+ super.afterEach()
+ connections.foreach(_.close())
+ connections = Nil
+ default_connection = null
+ }
+
+ def create_connection_factory = new ActiveMQConnectionFactory("tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port))
+ def create_connection: Connection = create_connection_factory.createConnection
+ def queue(value:String) = new ActiveMQQueue(value);
+ def topic(value:String) = new ActiveMQTopic(value);
+
+ def connect(start:Boolean=true) = {
+ val connection = create_connection
+ connections ::= connection
+ if(default_connection==null) {
+ default_connection = connection
+ }
+ if( start ) {
+ connection.start()
+ }
+ connection
+ }
+
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala Thu Jun 2 21:34:21 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.{TextMessage, Session}
+
+class QueueTest extends OpenwireTestSupport {
+
+ test("Queue order preserved") {
+
+ connect()
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(queue("example"))
+ def put(id:Int) {
+ producer.send(session.createTextMessage("message:"+id))
+ }
+
+ List(1,2,3).foreach(put _)
+
+ val consumer = session.createConsumer(queue("example"))
+
+ def get(id:Int) {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(queue("example"))
+ m.getText should equal ("message:"+id)
+ }
+
+ List(1,2,3).foreach(get _)
+ }
+
+ test("Test that messages are consumed ") {
+
+ connect()
+ var session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val queue = session.createQueue("test");
+ val producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ var consumer = session.createConsumer(queue)
+ var msg = consumer.receive(1000)
+ assert(msg != null)
+
+ Thread.sleep(1000)
+
+ session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ // Attempt to Consume the message...check if message was acknowledge
+ consumer = session.createConsumer(queue)
+ msg = consumer.receive(1000)
+ assert(msg == null)
+
+ session.close()
+ }
+
+ test("Queue and a selector") {
+ connect()
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(queue("example"))
+
+ def put(id:Int, color:String) {
+ val message = session.createTextMessage("message:" + id)
+ message.setStringProperty("color", color)
+ producer.send(message)
+ }
+
+ List((1, "red"), (2, "blue"), (3, "red")).foreach {
+ case (id, color) => put(id, color)
+ }
+
+ val consumer = session.createConsumer(queue("example"), "color='red'")
+
+ def get(id:Int) {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(queue("example"))
+ m.getText should equal ("message:"+id)
+ }
+
+ get(1)
+ get(3)
+ }
+}
\ No newline at end of file
Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala Thu Jun 2 21:34:21 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.{TextMessage, Session}
+
+class TopicTest extends OpenwireTestSupport {
+
+ test("Topic drops messages sent before before subscription is established") {
+ connect()
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val producer = session.createProducer(topic("example"))
+ def put(id:Int) {
+ producer.send(session.createTextMessage("message:"+id))
+ }
+
+ put(1)
+
+ val consumer = session.createConsumer(topic("example"))
+
+ put(2)
+ put(3)
+
+ def get(id:Int) {
+ val m = consumer.receive().asInstanceOf[TextMessage]
+ m.getJMSDestination should equal(topic("example"))
+ m.getText should equal ("message:"+id)
+ }
+
+ List(2,3).foreach(get _)
+ }
+
+}
\ No newline at end of file