You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/06/03 22:46:23 UTC

svn commit: r1131226 - /activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala

Author: tabish
Date: Fri Jun  3 20:46:23 2011
New Revision: 1131226

URL: http://svn.apache.org/viewvc?rev=1131226&view=rev
Log:
https://issues.apache.org/jira/browse/APLO-30

Adds some initial tests for QueueBrowser support over OpenWire.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala?rev=1131226&r1=1131225&r2=1131226&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala Fri Jun  3 20:46:23 2011
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.openwire
 
-import javax.jms.{TextMessage, Session}
+import javax.jms.{Message, TextMessage, Session}
 
 class QueueTest extends OpenwireTestSupport {
 
@@ -94,4 +94,172 @@ class QueueTest extends OpenwireTestSupp
     get(1)
     get(3)
   }
+
+  test("Receive then Browse and then Receive again") {
+    connect()
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val producer = session.createProducer(queue("BROWSER.TEST"))
+    var consumer = session.createConsumer(queue("BROWSER.TEST"))
+
+    val outbound = List(session.createTextMessage("First Message"),
+                        session.createTextMessage("Second Message"),
+                        session.createTextMessage("Third Message"))
+
+    // lets consume any outstanding messages from previous test runs
+    while (consumer.receive(1000) != null) {
+    }
+
+    producer.send(outbound(0));
+    producer.send(outbound(1));
+    producer.send(outbound(2));
+
+    consumer.receive(200) should be(outbound(0))
+
+    consumer.close();
+
+    val browser = session.createBrowser(queue("BROWSER.TEST"))
+    val enumeration = browser.getEnumeration
+
+    // browse the second
+    enumeration.hasMoreElements should be(true)
+    enumeration.nextElement() should be(outbound(1))
+
+    // browse the third.
+    enumeration.hasMoreElements should be(true)
+    enumeration.nextElement() should be(outbound(2))
+
+    // There should be no more.
+    var tooMany = false;
+    while (enumeration.hasMoreElements) {
+        debug("Got extra message: %s", enumeration.nextElement());
+        tooMany = true;
+    }
+    tooMany should be(false)
+    browser.close()
+
+    // Re-open the consumer.
+    consumer = session.createConsumer(queue("BROWSER.TEST"));
+    // Receive the second.
+    consumer.receive(200) should be(outbound(1))
+    // Receive the third.
+    consumer.receive(200) should be(outbound(2))
+    consumer.close()
+  }
+
+  test("Browse Queue then Receive messages") {
+    connect()
+
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+    val producer = session.createProducer(queue("BROWSER.TEST"))
+
+    val outbound = List(session.createTextMessage("First Message"),
+                        session.createTextMessage("Second Message"),
+                        session.createTextMessage("Third Message"))
+
+    producer.send(outbound(0))
+
+    // create browser first
+    val browser = session.createBrowser(queue("BROWSER.TEST"))
+    val enumeration = browser.getEnumeration
+
+    // create consumer
+    val consumer = session.createConsumer(queue("BROWSER.TEST"))
+
+    // browse the first message
+    enumeration.hasMoreElements should be(true)
+    enumeration.nextElement() should be(outbound(0))
+
+    // Receive the first message.
+    consumer.receive(100) should be(outbound(0))
+  }
+
+//  test("Queue Browser With 2 Consumers") {
+//    val numMessages = 1000;
+//
+//    connect()
+//
+//    default_connection.setAlwaysSyncSend(false);
+//
+//    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+//
+//    val destination = queue("BROWSER.TEST")
+//
+//    val destinationPrefetch10 = queue("TEST?jms.prefetchSize=10")
+//    val destinationPrefetch1 = queue("TEST?jms.prefetchsize=1")
+//
+//    val connection2 = create_connection
+//    connection2.start()
+//    connections.add(connection2)
+//    val session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE)
+//
+//    val producer = session.createProducer(destination)
+//    val consumer = session.createConsumer(destinationPrefetch10)
+//
+//    for (i <- 1 to 10) {
+//        val message = session.createTextMessage("Message: " + i)
+//        producer.send(message)
+//    }
+//
+//    val browser = session2.createBrowser(destinationPrefetch1)
+//    val browserView = browser.getEnumeration()
+//
+//    val messages = List[Message]
+//    for (i <- 0toInt numMessages) {
+//      val m1 = consumer.receive(5000)
+//      m1 shoulld not be(null)
+//      messages += m1
+//    }
+//
+//    val i = 0;
+//    for (;i < numMessages && browserView.hasMoreElements(); i++) {
+//        Message m1 = messages.get(i);
+//        Message m2 = browserView.nextElement();
+//        assertNotNull("m2 is null for index: " + i, m2);
+//        assertEquals(m1.getJMSMessageID(), m2.getJMSMessageID());
+//    }
+//
+//    // currently browse max page size is ignored for a queue browser consumer
+//    // only guarantee is a page size - but a snapshot of pagedinpending is
+//    // used so it is most likely more
+//    assertTrue("got at least our expected minimum in the browser: ", i > BaseDestination.MAX_PAGE_SIZE);
+//
+//    assertFalse("nothing left in the browser", browserView.hasMoreElements());
+//    assertNull("consumer finished", consumer.receiveNoWait());
+//  }
+
+  test("Browse Close") {
+    connect()
+    val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+    val destination = queue("BROWSER.TEST")
+
+    val outbound = List(session.createTextMessage("First Message"),
+                        session.createTextMessage("Second Message"),
+                        session.createTextMessage("Third Message"))
+
+    val producer = session.createProducer(destination)
+    producer.send(outbound(0))
+    producer.send(outbound(1))
+    producer.send(outbound(2))
+
+    // create browser first
+    val browser = session.createBrowser(destination)
+    val enumeration = browser.getEnumeration
+
+    // browse some messages
+    enumeration.nextElement() should equal(outbound(0))
+    enumeration.nextElement() should equal(outbound(1))
+
+    browser.close()
+
+    // create consumer
+    val consumer = session.createConsumer(destination)
+
+    // Receive the first message.
+    consumer.receive(1000) should equal(outbound(0))
+    consumer.receive(1000) should equal(outbound(1))
+    consumer.receive(1000) should equal(outbound(2))
+  }
 }
\ No newline at end of file