You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/02 23:34:22 UTC

svn commit: r1130827 - in /activemq/activemq-apollo/trunk/apollo-openwire/src: main/scala/org/apache/activemq/apollo/openwire/ test/resources/ test/scala/org/apache/activemq/apollo/openwire/

Author: tabish
Date: Thu Jun  2 21:34:21 2011
New Revision: 1130827

URL: http://svn.apache.org/viewvc?rev=1130827&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30

Enable exclusive consumers, adds new tests and updates the openwire test resources a bit.

Added:
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1130827&r1=1130826&r2=1130827&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Jun  2 21:34:21 2011
@@ -157,7 +157,7 @@ class OpenwireProtocolHandler extends Pr
     outbound_sessions = new SinkMux[Command](connection.transport_sink.map {
       x:Command =>
         x.setCommandId(next_command_id)
-        info("sending frame: %s", x)
+        debug("sending frame: %s", x.toString)
         x
     }, dispatchQueue, OpenwireCodec)
     connection_session = new OverflowSink(outbound_sessions.open(dispatchQueue));
@@ -214,7 +214,7 @@ class OpenwireProtocolHandler extends Pr
     }
     try {
       current_command = command
-      println("received: %s", command)
+      trace("received: %s", command)
       if (wire_format == null) {
         command match {
           case codec: OpenwireCodec =>
@@ -444,6 +444,9 @@ class OpenwireProtocolHandler extends Pr
     var selector_expression:BooleanExpression = _
     var destination:Array[DestinationDTO] = _
 
+    override def exclusive = info.isExclusive
+    override def browser = info.isBrowser
+
     def attach = {
 
       if( info.getDestination == null ) fail("destination was not set")

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml?rev=1130827&r1=1130826&r2=1130827&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml Thu Jun  2 21:34:21 2011
@@ -23,6 +23,6 @@
         <host_name>localhost</host_name>
     </virtual_host>
 
-    <connector id="tcp" protocol="openwire" bind="tcp://[::]:0"/>
+    <connector id="tcp" protocol="openwire" bind="tcp://0.0.0.0:0"/>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties?rev=1130827&r1=1130826&r2=1130827&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties Thu Jun  2 21:34:21 2011
@@ -16,20 +16,31 @@
 ## ---------------------------------------------------------------------------
 
 #
-# The logging properties used during tests..
+# Setup the default logging levels
 #
-log4j.rootLogger=WARN, console, file
-log4j.logger.org.apache.activemq=TRACE
+log4j.rootLogger=WARN, console, logfile
+log4j.logger.org.apache.activemq.apollo=INFO
 
-# Console will only display warnnings
+#
+# Uncomment one of the following to enable debug logging
+#
+# log4j.logger.org.apache.activemq.apollo=DEBUG
+# log4j.logger.org.apache.activemq.apollo.broker=DEBUG
+# log4j.logger.org.apache.activemq.apollo.web=DEBUG
+# log4j.logger.org.apache.activemq.apollo.cli=DEBUG
+# log4j.logger.org.apache.activemq.apollo.broker.store.hawtdb=DEBUG
+
+# Console Settings
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
-log4j.appender.console.threshold=TRACE
+log4j.appender.console.layout.ConversionPattern=%-5p | %m%n
+log4j.appender.console.threshold=INFO
 
-# File appender will contain all info messages
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
-log4j.appender.file.file=target/test.log
-log4j.appender.file.append=true
+# File Settings
+log4j.appender.logfile=org.apache.log4j.RollingFileAppender
+log4j.appender.logfile.file=${apollo.base}/log/apollo.log
+log4j.appender.logfile.maxFileSize=5MB
+log4j.appender.logfile.maxBackupIndex=5
+log4j.appender.logfile.append=true
+log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
+log4j.appender.logfile.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala Thu Jun  2 21:34:21 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.{TextMessage, Session}
+
+class DurableSubscriberTest extends OpenwireTestSupport {
+
+  test("Topic /w Durable sub retains messages.") {
+
+    def create_durable_sub() {
+      val connection = connect(false)
+      connection.setClientID("test")
+      connection.start()
+      val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+      val subscriber = session.createDurableSubscriber(topic("example"), "test")
+      session.close()
+      connection.close()
+      if (default_connection == connection) {
+        default_connection = null
+      }
+    }
+
+    create_durable_sub()
+
+    connect(false)
+    default_connection.setClientID("test")
+    default_connection.start()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(topic("example"))
+    def put(id:Int) {
+      producer.send(session.createTextMessage("message:"+id))
+    }
+
+    List(1,2,3).foreach(put _)
+
+    val subscriber = session.createDurableSubscriber(topic("example"), "test")
+
+    def get(id:Int) {
+      val m = subscriber.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(topic("example"))
+      m.getText should equal ("message:"+id)
+    }
+
+    List(1,2,3).foreach(get _)
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala Thu Jun  2 21:34:21 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.{TextMessage, Session}
+import org.apache.activemq.apollo.openwire.command.ActiveMQQueue
+
+class ExclusiveConsumerTest extends OpenwireTestSupport {
+
+  test("Exclusive Consumer Selected when created first") {
+
+    connect();
+
+    val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+    val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+    val msg = senderSession.createTextMessage("test");
+    producer.send(msg);
+
+    Thread.sleep(100);
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(100) should not be(null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Exclusive Consumer Selected when Created After Non-Exclusive Consumer") {
+
+    connect();
+
+    val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+    val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+    val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+    val msg = senderSession.createTextMessage("test");
+    producer.send(msg);
+
+    Thread.sleep(100);
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(100) should not be(null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Failover To Another Exclusive Consumer Created First") {
+    connect();
+
+    val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+    val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+    val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+    val msg = senderSession.createTextMessage("test");
+    producer.send(msg);
+
+    Thread.sleep(100);
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer1.receive(100) should not be(null)
+    exclusiveConsumer2.receive(100) should be(null)
+    fallbackConsumer.receive(100) should be(null)
+
+    // Close the exclusive consumer to verify the non-exclusive consumer
+    // takes over
+    exclusiveConsumer1.close()
+
+    producer.send(msg);
+    producer.send(msg);
+
+    exclusiveConsumer2.receive(100) should not be(null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Failover To Another Exclusive Consumer Created After a non-exclusive Consumer") {
+
+    connect();
+
+    val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+    val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+    val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+    val msg = senderSession.createTextMessage("test");
+    producer.send(msg);
+
+    Thread.sleep(100);
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer1.receive(100) should not be(null)
+    exclusiveConsumer2.receive(100) should be(null)
+    fallbackConsumer.receive(100) should be(null)
+
+    // Close the exclusive consumer to verify the non-exclusive consumer
+    // takes over
+    exclusiveConsumer1.close()
+
+    producer.send(msg);
+    producer.send(msg);
+
+    exclusiveConsumer2.receive(100) should not be(null)
+    fallbackConsumer.receive(100) should be(null)
+  }
+
+  test("Failover To NonExclusive Consumer") {
+
+    connect();
+
+    val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+    val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+    val msg = senderSession.createTextMessage("test");
+    producer.send(msg);
+
+    Thread.sleep(100);
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(100) should not be(null)
+    fallbackConsumer.receive(100) should be(null)
+
+    // Close the exclusive consumer to verify the non-exclusive consumer
+    // takes over
+    exclusiveConsumer.close()
+
+    producer.send(msg);
+    fallbackConsumer.receive(100) should not be(null)
+  }
+
+  test("Fallback To Exclusive Consumer") {
+    connect();
+
+    val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    var exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+    val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1"));
+    val producer = senderSession.createProducer(queue("TEST.QUEUE1"));
+
+    val msg = senderSession.createTextMessage("test");
+    producer.send(msg);
+
+    Thread.sleep(100);
+
+    // Verify exclusive consumer receives the message.
+    assert(exclusiveConsumer.receive(100) != null, "The exclusive consumer should have got a Message");
+    assert(fallbackConsumer.receive(100) == null, "The non-exclusive consumer shouldn't have a message");
+
+    // Close the exclusive consumer to verify the non-exclusive consumer
+    // takes over
+    exclusiveConsumer.close()
+
+    producer.send(msg)
+    assert(fallbackConsumer.receive(100) != null, "The non-exclusive consumer should have a message");
+
+    // Create exclusive consumer to determine if it will start receiving
+    // the messages.
+    exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true"))
+
+    producer.send(msg)
+
+    // Verify exclusive consumer receives the message.
+    exclusiveConsumer.receive(100) should not be(null)
+//    fallbackConsumer.receive(100) should be(null)
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala Thu Jun  2 21:34:21 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.apollo.openwire
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.BeforeAndAfterEach
+import java.lang.String
+import org.apache.activemq.apollo.broker.{KeyStorage, Broker, BrokerFactory}
+import org.apache.activemq.apollo.util.{FileSupport, Logging, FunSuiteSupport, ServiceControl}
+import FileSupport._
+import javax.jms.Connection
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue}
+
+class OpenwireTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach with Logging {
+  var broker: Broker = null
+  var port = 0
+
+  val broker_config_uri = "xml:classpath:apollo-openwire.xml"
+
+  override protected def beforeAll() {
+    info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
+    broker = BrokerFactory.createBroker(broker_config_uri)
+    ServiceControl.start(broker, "Starting broker")
+    port = broker.get_socket_address.getPort
+  }
+
+  var default_connection:Connection = _
+  var connections = List[Connection]()
+
+  override protected def afterAll() {
+    broker.stop()
+  }
+
+  override protected def afterEach() {
+    super.afterEach()
+    connections.foreach(_.close())
+    connections = Nil
+    default_connection = null
+  }
+
+  def create_connection_factory = new ActiveMQConnectionFactory("tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port))
+  def create_connection: Connection = create_connection_factory.createConnection
+  def queue(value:String) = new ActiveMQQueue(value);
+  def topic(value:String) = new ActiveMQTopic(value);
+
+  def connect(start:Boolean=true) = {
+    val connection = create_connection
+    connections ::= connection
+    if(default_connection==null) {
+      default_connection = connection
+    }
+    if( start ) {
+      connection.start()
+    }
+    connection
+  }
+
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala Thu Jun  2 21:34:21 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.{TextMessage, Session}
+
+class QueueTest extends OpenwireTestSupport {
+
+  test("Queue order preserved") {
+
+    connect()
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue("example"))
+    def put(id:Int) {
+      producer.send(session.createTextMessage("message:"+id))
+    }
+
+    List(1,2,3).foreach(put _)
+
+    val consumer = session.createConsumer(queue("example"))
+
+    def get(id:Int) {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(queue("example"))
+      m.getText should equal ("message:"+id)
+    }
+
+    List(1,2,3).foreach(get _)
+  }
+
+  test("Test that messages are consumed ") {
+
+    connect()
+    var session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val queue = session.createQueue("test");
+    val producer = session.createProducer(queue);
+    producer.send(session.createTextMessage("Hello"));
+
+    // Consume the message...
+    var consumer = session.createConsumer(queue)
+    var msg = consumer.receive(1000)
+    assert(msg != null)
+
+    Thread.sleep(1000)
+
+    session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    // Attempt to Consume the message...check if message was acknowledge
+    consumer = session.createConsumer(queue)
+    msg = consumer.receive(1000)
+    assert(msg == null)
+
+    session.close()
+  }
+
+  test("Queue and a selector") {
+    connect()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(queue("example"))
+
+    def put(id:Int, color:String) {
+      val message = session.createTextMessage("message:" + id)
+      message.setStringProperty("color", color)
+      producer.send(message)
+    }
+
+    List((1, "red"), (2, "blue"), (3, "red")).foreach {
+      case (id, color) => put(id, color)
+    }
+
+    val consumer = session.createConsumer(queue("example"), "color='red'")
+
+    def get(id:Int) {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(queue("example"))
+      m.getText should equal ("message:"+id)
+    }
+
+    get(1)
+    get(3)
+  }
+}
\ No newline at end of file

Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala?rev=1130827&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala Thu Jun  2 21:34:21 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.openwire
+
+import javax.jms.{TextMessage, Session}
+
+class TopicTest extends OpenwireTestSupport {
+
+  test("Topic drops messages sent before before subscription is established") {
+    connect()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val producer = session.createProducer(topic("example"))
+    def put(id:Int) {
+      producer.send(session.createTextMessage("message:"+id))
+    }
+
+    put(1)
+
+    val consumer = session.createConsumer(topic("example"))
+
+    put(2)
+    put(3)
+
+    def get(id:Int) {
+      val m = consumer.receive().asInstanceOf[TextMessage]
+      m.getJMSDestination should equal(topic("example"))
+      m.getText should equal ("message:"+id)
+    }
+
+    List(2,3).foreach(get _)
+  }
+
+}
\ No newline at end of file