You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/03 22:46:23 UTC
svn commit: r1131226 -
/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala
Author: tabish
Date: Fri Jun 3 20:46:23 2011
New Revision: 1131226
URL: http://svn.apache.org/viewvc?rev=1131226&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30
Adds some initial tests for QueueBrowser support over OpenWire.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala?rev=1131226&r1=1131225&r2=1131226&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala Fri Jun 3 20:46:23 2011
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.apollo.openwire
-import javax.jms.{TextMessage, Session}
+import javax.jms.{Message, TextMessage, Session}
class QueueTest extends OpenwireTestSupport {
@@ -94,4 +94,172 @@ class QueueTest extends OpenwireTestSupp
get(1)
get(3)
}
+
+ test("Receive then Browse and then Receive again") {
+ connect()
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val producer = session.createProducer(queue("BROWSER.TEST"))
+ var consumer = session.createConsumer(queue("BROWSER.TEST"))
+
+ val outbound = List(session.createTextMessage("First Message"),
+ session.createTextMessage("Second Message"),
+ session.createTextMessage("Third Message"))
+
+ // lets consume any outstanding messages from previous test runs
+ while (consumer.receive(1000) != null) {
+ }
+
+ producer.send(outbound(0));
+ producer.send(outbound(1));
+ producer.send(outbound(2));
+
+ consumer.receive(200) should be(outbound(0))
+
+ consumer.close();
+
+ val browser = session.createBrowser(queue("BROWSER.TEST"))
+ val enumeration = browser.getEnumeration
+
+ // browse the second
+ enumeration.hasMoreElements should be(true)
+ enumeration.nextElement() should be(outbound(1))
+
+ // browse the third.
+ enumeration.hasMoreElements should be(true)
+ enumeration.nextElement() should be(outbound(2))
+
+ // There should be no more.
+ var tooMany = false;
+ while (enumeration.hasMoreElements) {
+ debug("Got extra message: %s", enumeration.nextElement());
+ tooMany = true;
+ }
+ tooMany should be(false)
+ browser.close()
+
+ // Re-open the consumer.
+ consumer = session.createConsumer(queue("BROWSER.TEST"));
+ // Receive the second.
+ consumer.receive(200) should be(outbound(1))
+ // Receive the third.
+ consumer.receive(200) should be(outbound(2))
+ consumer.close()
+ }
+
+ test("Browse Queue then Receive messages") {
+ connect()
+
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+ val producer = session.createProducer(queue("BROWSER.TEST"))
+
+ val outbound = List(session.createTextMessage("First Message"),
+ session.createTextMessage("Second Message"),
+ session.createTextMessage("Third Message"))
+
+ producer.send(outbound(0))
+
+ // create browser first
+ val browser = session.createBrowser(queue("BROWSER.TEST"))
+ val enumeration = browser.getEnumeration
+
+ // create consumer
+ val consumer = session.createConsumer(queue("BROWSER.TEST"))
+
+ // browse the first message
+ enumeration.hasMoreElements should be(true)
+ enumeration.nextElement() should be(outbound(0))
+
+ // Receive the first message.
+ consumer.receive(100) should be(outbound(0))
+ }
+
+// test("Queue Browser With 2 Consumers") {
+// val numMessages = 1000;
+//
+// connect()
+//
+// default_connection.setAlwaysSyncSend(false);
+//
+// val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+//
+// val destination = queue("BROWSER.TEST")
+//
+// val destinationPrefetch10 = queue("TEST?jms.prefetchSize=10")
+// val destinationPrefetch1 = queue("TEST?jms.prefetchsize=1")
+//
+// val connection2 = create_connection
+// connection2.start()
+// connections.add(connection2)
+// val session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE)
+//
+// val producer = session.createProducer(destination)
+// val consumer = session.createConsumer(destinationPrefetch10)
+//
+// for (i <- 1 to 10) {
+// val message = session.createTextMessage("Message: " + i)
+// producer.send(message)
+// }
+//
+// val browser = session2.createBrowser(destinationPrefetch1)
+// val browserView = browser.getEnumeration()
+//
+// val messages = List[Message]
+// for (i <- 0toInt numMessages) {
+// val m1 = consumer.receive(5000)
+// m1 shoulld not be(null)
+// messages += m1
+// }
+//
+// val i = 0;
+// for (;i < numMessages && browserView.hasMoreElements(); i++) {
+// Message m1 = messages.get(i);
+// Message m2 = browserView.nextElement();
+// assertNotNull("m2 is null for index: " + i, m2);
+// assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID());
+// }
+//
+// // currently browse max page size is ignored for a queue browser consumer
+// // only guarantee is a page size - but a snapshot of pagedinpending is
+// // used so it is most likely more
+// assertTrue("got at least our expected minimum in the browser: ", i > BaseDestination.MAX_PAGE_SIZE);
+//
+// assertFalse("nothing left in the browser", browserView.hasMoreElements());
+// assertNull("consumer finished", consumer.receiveNoWait());
+// }
+
+ test("Browse Close") {
+ connect()
+ val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+ val destination = queue("BROWSER.TEST")
+
+ val outbound = List(session.createTextMessage("First Message"),
+ session.createTextMessage("Second Message"),
+ session.createTextMessage("Third Message"))
+
+ val producer = session.createProducer(destination)
+ producer.send(outbound(0))
+ producer.send(outbound(1))
+ producer.send(outbound(2))
+
+ // create browser first
+ val browser = session.createBrowser(destination)
+ val enumeration = browser.getEnumeration
+
+ // browse some messages
+ enumeration.nextElement() should equal(outbound(0))
+ enumeration.nextElement() should equal(outbound(1))
+
+ browser.close()
+
+ // create consumer
+ val consumer = session.createConsumer(destination)
+
+ // Receive the first message.
+ consumer.receive(1000) should equal(outbound(0))
+ consumer.receive(1000) should equal(outbound(1))
+ consumer.receive(1000) should equal(outbound(2))
+ }
}
\ No newline at end of file