You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 14:42:29 UTC
svn commit: r786040 [2/6] - in /activemq/sandbox/activemq-flow:
activemq-all/src/test/java/org/apache/activemq/legacy/
activemq-all/src/test/java/org/apache/activemq/legacy/broker/
activemq-all/src/test/java/org/apache/activemq/legacy/broker/advisory/ ...
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,925 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Test cases used to test the JMS message consumer.
+ *
+ * @version $Revision$
+ */
+public class JMSConsumerTest extends JmsTestSupport {
+
+ private static final Log LOG = LogFactory.getLog(JMSConsumerTest.class);
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+ public int prefetch;
+ public int ackMode;
+ public byte destinationType;
+ public boolean durableConsumer;
+
+ public static Test suite() {
+ return suite(JMSConsumerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testMessageListenerWithConsumerCanBeStopped() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done1 = new CountDownLatch(1);
+ final CountDownLatch done2 = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 1) {
+ done1.countDown();
+ }
+ if (counter.get() == 2) {
+ done2.countDown();
+ }
+ }
+ });
+
+ // Send a first message to make sure that the consumer dispatcher is
+ // running
+ sendMessages(session, destination, 1);
+ assertTrue(done1.await(1, TimeUnit.SECONDS));
+ assertEquals(1, counter.get());
+
+ // Stop the consumer.
+ consumer.stop();
+
+ // Send a message, but should not get delivered.
+ sendMessages(session, destination, 1);
+ assertFalse(done2.await(1, TimeUnit.SECONDS));
+ assertEquals(1, counter.get());
+
+ // Start the consumer, and the message should now get delivered.
+ consumer.start();
+ assertTrue(done2.await(1, TimeUnit.SECONDS));
+ assertEquals(2, counter.get());
+ }
+
+
+ public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch closeDone = new CountDownLatch(1);
+
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ // preload the queue
+ sendMessages(session, destination, 2000);
+
+
+ final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+
+ final Map<Thread, Throwable> exceptions =
+ java.util.Collections.synchronizedMap(new HashMap<Thread, Throwable>());
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Uncaught exception:", e);
+ exceptions.put(t, e);
+ }
+ });
+
+ final class AckAndClose implements Runnable {
+ private Message message;
+
+ public AckAndClose(Message m) {
+ this.message = m;
+ }
+
+ public void run() {
+ try {
+ int count = counter.incrementAndGet();
+ if (count == 590) {
+ // close in a separate thread is ok by jms
+ consumer.close();
+ closeDone.countDown();
+ }
+ if (count % 200 == 0) {
+ // ensure there are some outstanding messages
+ // ack every 200
+ message.acknowledge();
+ }
+ } catch (Exception e) {
+ LOG.error("Exception on close or ack:", e);
+ exceptions.put(Thread.currentThread(), e);
+ }
+ }
+ };
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ // ack and close eventually in separate thread
+ executor.execute(new AckAndClose(m));
+ }
+ });
+
+ assertTrue(closeDone.await(20, TimeUnit.SECONDS));
+ // await possible exceptions
+ Thread.sleep(1000);
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+ }
+
+
+ public void initCombosForTestMutiReceiveWithPrefetch1() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+ Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testMutiReceiveWithPrefetch1() throws Exception {
+
+ // Set prefetch to 1
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // Make sure 4 messages were delivered.
+ Message message = null;
+ for (int i = 0; i < 4; i++) {
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ }
+ assertNull(consumer.receiveNoWait());
+ message.acknowledge();
+ }
+
+ public void initCombosForTestDurableConsumerSelectorChange() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testDurableConsumerSelectorChange() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.setClientID("test");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(deliveryMode);
+ MessageConsumer consumer = session.createDurableSubscriber((Topic)destination, "test", "color='red'", false);
+
+ // Send the messages
+ TextMessage message = session.createTextMessage("1st");
+ message.setStringProperty("color", "red");
+ producer.send(message);
+
+ Message m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("1st", ((TextMessage)m).getText());
+
+ // Change the subscription.
+ consumer.close();
+ consumer = session.createDurableSubscriber((Topic)destination, "test", "color='blue'", false);
+
+ message = session.createTextMessage("2nd");
+ message.setStringProperty("color", "red");
+ producer.send(message);
+ message = session.createTextMessage("3rd");
+ message.setStringProperty("color", "blue");
+ producer.send(message);
+
+ // Selector should skip the 2nd message.
+ m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("3rd", ((TextMessage)m).getText());
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public void initCombosForTestSendReceiveBytesMessage() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testSendReceiveBytesMessage() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBoolean(true);
+ message.writeBoolean(false);
+ producer.send(message);
+
+ // Make sure only 1 message was delivered.
+ BytesMessage m = (BytesMessage)consumer.receive(1000);
+ assertNotNull(m);
+ assertTrue(m.readBoolean());
+ assertFalse(m.readBoolean());
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public void initCombosForTestSetMessageListenerAfterStart() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testSetMessageListenerAfterStart() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // See if the message get sent to the listener
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testPassMessageListenerIntoCreateConsumer() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination, new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+ }
+
+ public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch sendDone = new CountDownLatch(1);
+ final CountDownLatch got2Done = new CountDownLatch(1);
+
+ // Set prefetch to 1
+ connection.getPrefetchPolicy().setAll(1);
+ // This test case does not work if optimized message dispatch is used as
+ // the main thread send block until the consumer receives the
+ // message. This test depends on thread decoupling so that the main
+ // thread can stop the consumer thread.
+ connection.setOptimizedMessageDispatch(false);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in first listener: " + tm.getText());
+ assertEquals("" + counter.get(), tm.getText());
+ counter.incrementAndGet();
+ if (counter.get() == 2) {
+ sendDone.await();
+ connection.close();
+ got2Done.countDown();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+ sendDone.countDown();
+
+ // Wait for first 2 messages to arrive.
+ assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+ // Re-start connection.
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Pickup the remaining messages.
+ final CountDownLatch done2 = new CountDownLatch(1);
+ session = connection.createSession(false, ackMode);
+ consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in second listener: " + tm.getText());
+ // order is not guaranteed as the connection is started before the listener is set.
+ // assertEquals("" + counter.get(), tm.getText());
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done2.countDown();
+ }
+ } catch (Throwable e) {
+ LOG.error("unexpected ex onMessage: ", e);
+ }
+ }
+ });
+
+ assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack mode
+ assertEquals(5, counter.get());
+ }
+
+ public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+ }
+
+ public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch sendDone = new CountDownLatch(1);
+ final CountDownLatch got2Done = new CountDownLatch(1);
+
+ // Set prefetch to 1
+ connection.getPrefetchPolicy().setAll(1);
+ // This test case does not work if optimized message dispatch is used as
+ // the main thread send block until the consumer receives the
+ // message. This test depends on thread decoupling so that the main
+ // thread can stop the consumer thread.
+ connection.setOptimizedMessageDispatch(false);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in first listener: " + tm.getText());
+ assertEquals("" + counter.get(), tm.getText());
+ counter.incrementAndGet();
+ m.acknowledge();
+ if (counter.get() == 2) {
+ sendDone.await();
+ connection.close();
+ got2Done.countDown();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+ sendDone.countDown();
+
+ // Wait for first 2 messages to arrive.
+ assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+ // Re-start connection.
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Pickup the remaining messages.
+ final CountDownLatch done2 = new CountDownLatch(1);
+ session = connection.createSession(false, ackMode);
+ consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ try {
+ TextMessage tm = (TextMessage)m;
+ LOG.info("Got in second listener: " + tm.getText());
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done2.countDown();
+ }
+ } catch (Throwable e) {
+ LOG.error("unexpected ex onMessage: ", e);
+ }
+ }
+ });
+
+ assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // close from onMessage with Auto_ack will ack
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ public void initCombosForTestMessageListenerWithConsumer() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testMessageListenerWithConsumer() throws Exception {
+
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message m) {
+ counter.incrementAndGet();
+ if (counter.get() == 4) {
+ done.countDown();
+ }
+ }
+ });
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+ Thread.sleep(200);
+
+ // Make sure only 4 messages were delivered.
+ assertEquals(4, counter.get());
+ }
+
+ public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+ Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+ }
+
+ public void testUnackedWithPrefetch1StayInQueue() throws Exception {
+
+ // Set prefetch to 1
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Use all the ack modes
+ Session session = connection.createSession(false, ackMode);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // Only pick up the first 2 messages.
+ Message message = null;
+ for (int i = 0; i < 2; i++) {
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ }
+ message.acknowledge();
+
+ connection.close();
+ connection = (ActiveMQConnection)factory.createConnection();
+ connections.add(connection);
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ // Use all the ack modes
+ session = connection.createSession(false, ackMode);
+ consumer = session.createConsumer(destination);
+
+ // Pickup the rest of the messages.
+ for (int i = 0; i < 2; i++) {
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ }
+ message.acknowledge();
+ assertNull(consumer.receiveNoWait());
+
+ }
+
+ public void initCombosForTestPrefetch1MessageNotDispatched() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testPrefetch1MessageNotDispatched() throws Exception {
+
+ // Set prefetch to 1
+ connection.getPrefetchPolicy().setAll(1);
+ connection.start();
+
+ Session session = connection.createSession(true, 0);
+ destination = new ActiveMQQueue("TEST");
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send 2 messages to the destination.
+ sendMessages(session, destination, 2);
+ session.commit();
+
+ // The prefetch should fill up with 1 message.
+ // Since prefetch is still full, the 2nd message should get dispatched
+ // to another consumer.. lets create the 2nd consumer test that it does
+ // make sure it does.
+ ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
+ connection2.start();
+ connections.add(connection2);
+ Session session2 = connection2.createSession(true, 0);
+ MessageConsumer consumer2 = session2.createConsumer(destination);
+
+ // Pick up the first message.
+ Message message1 = consumer.receive(1000);
+ assertNotNull(message1);
+
+ // Pick up the 2nd messages.
+ Message message2 = consumer2.receive(1000);
+ assertNotNull(message2);
+
+ session.commit();
+ session2.commit();
+
+ assertNull(consumer.receiveNoWait());
+
+ }
+
+ public void initCombosForTestDontStart() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testDontStart() throws Exception {
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 1);
+
+ // Make sure no messages were delivered.
+ assertNull(consumer.receive(1000));
+ }
+
+ public void initCombosForTestStartAfterSend() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testStartAfterSend() throws Exception {
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 1);
+
+ // Start the conncection after the message was sent.
+ connection.start();
+
+ // Make sure only 1 message was delivered.
+ assertNotNull(consumer.receive(1000));
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public void initCombosForTestReceiveMessageWithConsumer() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testReceiveMessageWithConsumer() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 1);
+
+ // Make sure only 1 message was delivered.
+ Message m = consumer.receive(1000);
+ assertNotNull(m);
+ assertEquals("0", ((TextMessage)m).getText());
+ assertNull(consumer.receiveNoWait());
+ }
+
+
+ public void testDupsOkConsumer() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // Send the messages
+ sendMessages(session, destination, 4);
+
+ // Make sure only 4 message are delivered.
+ for( int i=0; i < 4; i++){
+ Message m = consumer.receive(1000);
+ assertNotNull(m);
+ }
+ assertNull(consumer.receive(1000));
+
+ // Close out the consumer.. no other messages should be left on the queue.
+ consumer.close();
+
+ consumer = session.createConsumer(destination);
+ assertNull(consumer.receive(1000));
+ }
+
+ public void testRedispatchOfUncommittedTx() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ sendMessages(connection, destination, 2);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ assertNotNull(consumer.receive(1000));
+ assertNotNull(consumer.receive(1000));
+
+ // install another consumer while message dispatch is unacked/uncommitted
+ Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+ // no commit so will auto rollback and get re-dispatched to redisptachConsumer
+ session.close();
+
+ Message msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+
+ msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+ redispatchSession.commit();
+
+ assertNull(redispatchConsumer.receive(500));
+ redispatchSession.close();
+ }
+
+
+ public void testRedispatchOfRolledbackTx() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ sendMessages(connection, destination, 2);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ assertNotNull(consumer.receive(1000));
+ assertNotNull(consumer.receive(1000));
+
+ // install another consumer while message dispatch is unacked/uncommitted
+ Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+ session.rollback();
+ session.close();
+
+ Message msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+ msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+ redispatchSession.commit();
+
+ assertNull(redispatchConsumer.receive(500));
+ redispatchSession.close();
+ }
+
+
+ public void initCombosForTestAckOfExpired() {
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testAckOfExpired() throws Exception {
+
+ ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
+ connection = (ActiveMQConnection) fact.createConnection();
+
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ?
+ session.createQueue("test") : session.createTopic("test"));
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.setStatsEnabled(true);
+
+ Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sendSession.createProducer(destination);
+ producer.setTimeToLive(1000);
+ final int count = 4;
+ for (int i = 0; i < count; i++) {
+ TextMessage message = sendSession.createTextMessage("" + i);
+ producer.send(message);
+ }
+
+ // let first bunch in queue expire
+ Thread.sleep(1000);
+
+ producer.setTimeToLive(0);
+ for (int i = 0; i < count; i++) {
+ TextMessage message = sendSession.createTextMessage("no expiry" + i);
+ producer.send(message);
+ }
+
+ ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
+
+ for(int i=0; i<count; i++) {
+ TextMessage msg = (TextMessage) amqConsumer.receive();
+ assertNotNull(msg);
+ assertTrue(msg.getText().contains("no expiry"));
+
+ // force an ack when there are expired messages
+ amqConsumer.acknowledge();
+ }
+ assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
+
+// TODO:
+// DestinationViewMBean view = createView(destination);
+//
+// assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
+// assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
+// assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount());
+ }
+
+// protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+// MBeanServer mbeanServer = broker.getMBeanServer();
+// String domain = "org.apache.activemq";
+// ObjectName name;
+// if (destination.isQueue()) {
+// name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+// } else {
+// name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+// }
+// return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
+// }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ * Test cases used to test the JMS message exclusive consumers.
+ *
+ * @version $Revision$
+ */
+public class JMSExclusiveConsumerTest extends JmsTestSupport {
+
+ public int deliveryMode;
+
+ public static Test suite() {
+ return suite(JMSExclusiveConsumerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ public void initCombosForTestRoundRobinDispatchOnNonExclusive() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ /**
+ * Shows that by default messages are round robined across a set of
+ * consumers.
+ *
+ * @throws Exception
+ */
+ public void testRoundRobinDispatchOnNonExclusive() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(deliveryMode);
+
+ MessageConsumer consumer1 = session.createConsumer(destination);
+ MessageConsumer consumer2 = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+
+ Message m;
+ m = consumer2.receive(1000);
+ assertNotNull(m);
+
+ m = consumer1.receive(1000);
+ assertNotNull(m);
+
+ assertNull(consumer1.receiveNoWait());
+ assertNull(consumer2.receiveNoWait());
+ }
+
+ public void initCombosForTestDispatchExclusive() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ /**
+ * Shows that if the "?consumer.exclusive=true" option is added to
+ * destination, then all messages are routed to 1 consumer.
+ *
+ * @throws Exception
+ */
+ public void testDispatchExclusive() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST?consumer.exclusive=true");
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(deliveryMode);
+
+ MessageConsumer consumer1 = session.createConsumer(destination);
+ MessageConsumer consumer2 = session.createConsumer(destination);
+
+ // Send the messages
+ producer.send(session.createTextMessage("1st"));
+ producer.send(session.createTextMessage("2nd"));
+ producer.send(session.createTextMessage("3nd"));
+
+ Message m;
+ m = consumer2.receive(1000);
+ if (m != null) {
+ // Consumer 2 should get all the messages.
+ for (int i = 0; i < 2; i++) {
+ m = consumer2.receive(1000);
+ assertNotNull(m);
+ }
+ } else {
+ // Consumer 1 should get all the messages.
+ for (int i = 0; i < 3; i++) {
+ m = consumer1.receive(1000);
+ assertNotNull(m);
+ }
+ }
+
+ assertNull(consumer1.receiveNoWait());
+ assertNull(consumer2.receiveNoWait());
+ }
+
+ public void testMixExclusiveWithNonExclusive() throws Exception {
+ ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=true");
+ ActiveMQQueue nonExclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=false");
+
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer nonExCon = session.createConsumer(nonExclusiveQueue);
+ MessageConsumer exCon = session.createConsumer(exclusiveQueue);
+
+ MessageProducer prod = session.createProducer(exclusiveQueue);
+ prod.send(session.createMessage());
+ prod.send(session.createMessage());
+ prod.send(session.createMessage());
+
+ Message m;
+ for (int i = 0; i < 3; i++) {
+ m = exCon.receive(1000);
+ assertNotNull(m);
+ m = nonExCon.receive(1000);
+ assertNull(m);
+ }
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.net.URISyntaxException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Vector;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * Test cases used to test the JMS message consumer.
+ *
+ * @version $Revision$
+ */
+public class JMSMessageTest extends JmsTestSupport {
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+ public int prefetch;
+ public int ackMode;
+ public byte destinationType;
+ public boolean durableConsumer;
+ public String connectURL;
+
+ /**
+ * Run all these tests in both marshaling and non-marshaling mode.
+ */
+ public void initCombos() {
+ addCombinationValues("connectURL", new Object[] {"vm://localhost?marshal=false",
+ "vm://localhost?marshal=true"});
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+ }
+
+ public void testTextMessage() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the message.
+ {
+ TextMessage message = session.createTextMessage();
+ message.setText("Hi");
+ producer.send(message);
+ }
+
+ // Check the Message
+ {
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals("Hi", message.getText());
+ }
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public static Test suite() {
+ return suite(JMSMessageTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectURL);
+ return factory;
+ }
+
+ public void testBytesMessageLength() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the message
+ {
+ BytesMessage message = session.createBytesMessage();
+ message.writeInt(1);
+ message.writeInt(2);
+ message.writeInt(3);
+ message.writeInt(4);
+ producer.send(message);
+ }
+
+ // Check the message.
+ {
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(16, message.getBodyLength());
+ }
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public void testObjectMessage() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // send the message.
+ {
+ ObjectMessage message = session.createObjectMessage();
+ message.setObject("Hi");
+ producer.send(message);
+ }
+
+ // Check the message
+ {
+ ObjectMessage message = (ObjectMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals("Hi", message.getObject());
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public void testBytesMessage() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the message
+ {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBoolean(true);
+ producer.send(message);
+ }
+
+ // Check the message
+ {
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertTrue(message.readBoolean());
+
+ try {
+ message.readByte();
+ fail("Expected exception not thrown.");
+ } catch (MessageEOFException e) {
+ }
+
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public void testStreamMessage() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the message.
+ {
+ StreamMessage message = session.createStreamMessage();
+ message.writeString("This is a test to see how it works.");
+ producer.send(message);
+ }
+
+ // Check the message.
+ {
+ StreamMessage message = (StreamMessage)consumer.receive(1000);
+ assertNotNull(message);
+
+ // Invalid conversion should throw exception and not move the stream
+ // position.
+ try {
+ message.readByte();
+ fail("Should have received NumberFormatException");
+ } catch (NumberFormatException e) {
+ }
+
+ assertEquals("This is a test to see how it works.", message.readString());
+
+ // Invalid conversion should throw exception and not move the stream
+ // position.
+ try {
+ message.readByte();
+ fail("Should have received MessageEOFException");
+ } catch (MessageEOFException e) {
+ }
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public void testMapMessage() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // send the message.
+ {
+ MapMessage message = session.createMapMessage();
+ message.setBoolean("boolKey", true);
+ producer.send(message);
+ }
+
+ // get the message.
+ {
+ MapMessage message = (MapMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertTrue(message.getBoolean("boolKey"));
+ }
+ assertNull(consumer.receiveNoWait());
+ }
+
+ static class ForeignMessage implements TextMessage {
+
+ public int deliveryMode;
+
+ private String messageId;
+ private long timestamp;
+ private String correlationId;
+ private Destination replyTo;
+ private Destination destination;
+ private boolean redelivered;
+ private String type;
+ private long expiration;
+ private int priority;
+ private String text;
+ private HashMap<String, Object> props = new HashMap<String, Object>();
+
+ public String getJMSMessageID() throws JMSException {
+ return messageId;
+ }
+
+ public void setJMSMessageID(String arg0) throws JMSException {
+ messageId = arg0;
+ }
+
+ public long getJMSTimestamp() throws JMSException {
+ return timestamp;
+ }
+
+ public void setJMSTimestamp(long arg0) throws JMSException {
+ timestamp = arg0;
+ }
+
+ public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+ return null;
+ }
+
+ public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
+ }
+
+ public void setJMSCorrelationID(String arg0) throws JMSException {
+ correlationId = arg0;
+ }
+
+ public String getJMSCorrelationID() throws JMSException {
+ return correlationId;
+ }
+
+ public Destination getJMSReplyTo() throws JMSException {
+ return replyTo;
+ }
+
+ public void setJMSReplyTo(Destination arg0) throws JMSException {
+ replyTo = arg0;
+ }
+
+ public Destination getJMSDestination() throws JMSException {
+ return destination;
+ }
+
+ public void setJMSDestination(Destination arg0) throws JMSException {
+ destination = arg0;
+ }
+
+ public int getJMSDeliveryMode() throws JMSException {
+ return deliveryMode;
+ }
+
+ public void setJMSDeliveryMode(int arg0) throws JMSException {
+ deliveryMode = arg0;
+ }
+
+ public boolean getJMSRedelivered() throws JMSException {
+ return redelivered;
+ }
+
+ public void setJMSRedelivered(boolean arg0) throws JMSException {
+ redelivered = arg0;
+ }
+
+ public String getJMSType() throws JMSException {
+ return type;
+ }
+
+ public void setJMSType(String arg0) throws JMSException {
+ type = arg0;
+ }
+
+ public long getJMSExpiration() throws JMSException {
+ return expiration;
+ }
+
+ public void setJMSExpiration(long arg0) throws JMSException {
+ expiration = arg0;
+ }
+
+ public int getJMSPriority() throws JMSException {
+ return priority;
+ }
+
+ public void setJMSPriority(int arg0) throws JMSException {
+ priority = arg0;
+ }
+
+ public void clearProperties() throws JMSException {
+ }
+
+ public boolean propertyExists(String arg0) throws JMSException {
+ return false;
+ }
+
+ public boolean getBooleanProperty(String arg0) throws JMSException {
+ return false;
+ }
+
+ public byte getByteProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ public short getShortProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ public int getIntProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ public long getLongProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ public float getFloatProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ public double getDoubleProperty(String arg0) throws JMSException {
+ return 0;
+ }
+
+ public String getStringProperty(String arg0) throws JMSException {
+ return (String)props.get(arg0);
+ }
+
+ public Object getObjectProperty(String arg0) throws JMSException {
+ return props.get(arg0);
+ }
+
+ public Enumeration getPropertyNames() throws JMSException {
+ return new Vector<String>(props.keySet()).elements();
+ }
+
+ public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
+ }
+
+ public void setByteProperty(String arg0, byte arg1) throws JMSException {
+ }
+
+ public void setShortProperty(String arg0, short arg1) throws JMSException {
+ }
+
+ public void setIntProperty(String arg0, int arg1) throws JMSException {
+ }
+
+ public void setLongProperty(String arg0, long arg1) throws JMSException {
+ }
+
+ public void setFloatProperty(String arg0, float arg1) throws JMSException {
+ }
+
+ public void setDoubleProperty(String arg0, double arg1) throws JMSException {
+ }
+
+ public void setStringProperty(String arg0, String arg1) throws JMSException {
+ props.put(arg0, arg1);
+ }
+
+ public void setObjectProperty(String arg0, Object arg1) throws JMSException {
+ props.put(arg0, arg1);
+ }
+
+ public void acknowledge() throws JMSException {
+ }
+
+ public void clearBody() throws JMSException {
+ }
+
+ public void setText(String arg0) throws JMSException {
+ text = arg0;
+ }
+
+ public String getText() throws JMSException {
+ return text;
+ }
+ }
+
+ public void testForeignMessage() throws Exception {
+
+ // Receive a message with the JMS API
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ // Send the message.
+ {
+ ForeignMessage message = new ForeignMessage();
+ message.text = "Hello";
+ message.setStringProperty("test", "value");
+ long timeToLive = 10000L;
+ long start = System.currentTimeMillis();
+ producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive);
+ long end = System.currentTimeMillis();
+
+
+ //validate jms spec 1.1 section 3.4.11 table 3.1
+ // JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, JMSMessageID, and JMSTimestamp
+ //must be set by sending a message.
+ assertEquals(destination, message.getJMSDestination());
+ assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
+ assertTrue(start + timeToLive <= message.getJMSExpiration());
+ assertTrue(end + timeToLive >= message.getJMSExpiration());
+ assertEquals(7, message.getJMSPriority());
+ assertNotNull(message.getJMSMessageID());
+ assertTrue(start <= message.getJMSTimestamp());
+ assertTrue(end >= message.getJMSTimestamp());
+ }
+
+ // Validate message is OK.
+ {
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals("Hello", message.getText());
+ assertEquals("value", message.getStringProperty("test"));
+ }
+
+ assertNull(consumer.receiveNoWait());
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.util.Enumeration;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+
+public class JMSUsecaseTest extends JmsTestSupport {
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+ public int prefetch;
+ public byte destinationType;
+ public boolean durableConsumer;
+
+ public static Test suite() {
+ return suite(JMSUsecaseTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+ public void initCombosForTestQueueBrowser() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueBrowser() throws Exception {
+
+ // Send a message to the broker.
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ sendMessages(session, destination, 5);
+
+ QueueBrowser browser = session.createBrowser((Queue)destination);
+ Enumeration enumeration = browser.getEnumeration();
+ for (int i = 0; i < 5; i++) {
+ Thread.sleep(100);
+ assertTrue(enumeration.hasMoreElements());
+ Message m = (Message)enumeration.nextElement();
+ assertNotNull(m);
+ assertEquals("" + i, ((TextMessage)m).getText());
+ }
+ assertFalse(enumeration.hasMoreElements());
+ }
+
+ public void initCombosForTestSendReceive() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testSendReceive() throws Exception {
+ // Send a message to the broker.
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = createDestination(session, destinationType);
+ MessageProducer producer = session.createProducer(destination);
+ MessageConsumer consumer = session.createConsumer(destination);
+ ActiveMQMessage message = new ActiveMQMessage();
+ producer.send(message);
+
+ // Make sure only 1 message was delivered.
+ assertNotNull(consumer.receive(1000));
+ assertNull(consumer.receiveNoWait());
+ }
+
+ public void initCombosForTestSendReceiveTransacted() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testSendReceiveTransacted() throws Exception {
+ // Send a message to the broker.
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ destination = createDestination(session, destinationType);
+ MessageProducer producer = session.createProducer(destination);
+ MessageConsumer consumer = session.createConsumer(destination);
+ producer.send(session.createTextMessage("test"));
+
+ // Message should not be delivered until commit.
+ assertNull(consumer.receiveNoWait());
+ session.commit();
+
+ // Make sure only 1 message was delivered.
+ Message message = consumer.receive(1000);
+ assertNotNull(message);
+ assertFalse(message.getJMSRedelivered());
+ assertNull(consumer.receiveNoWait());
+
+ // Message should be redelivered is rollback is used.
+ session.rollback();
+
+ // Make sure only 1 message was delivered.
+ message = consumer.receive(2000);
+ assertNotNull(message);
+ assertTrue(message.getJMSRedelivered());
+ assertNull(consumer.receiveNoWait());
+
+ // If we commit now, the message should not be redelivered.
+ session.commit();
+ assertNull(consumer.receiveNoWait());
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.legacy.broker.BrokerFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Benchmarks the broker by starting many consumer and producers against the
+ * same destination. Make sure you run with jvm option -server (makes a big
+ * difference). The tests simulate storing 1000 1k jms messages to see the rate
+ * of processing msg/sec.
+ *
+ * @version $Revision$
+ */
+public class JmsBenchmark extends JmsTestSupport {
+ private static final transient Log LOG = LogFactory.getLog(JmsBenchmark.class);
+
+ private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
+ private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10"));
+ private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 60));
+ private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10"));
+ private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
+
+ public ActiveMQDestination destination;
+
+ public static Test suite() {
+ return suite(JmsBenchmark.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(JmsBenchmark.class);
+ }
+
+ public void initCombos() {
+ addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ return BrokerFactory.createBroker(new URI("broker://(tcp://localhost:0)?persistent=false"));
+ }
+
+ protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+ return new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)).getServer().getConnectURI());
+ }
+
+ /**
+ * @throws Throwable
+ */
+ public void testConcurrentSendReceive() throws Throwable {
+
+ final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+ final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+ final CountDownLatch sampleTimeDone = new CountDownLatch(1);
+
+ final AtomicInteger producedMessages = new AtomicInteger(0);
+ final AtomicInteger receivedMessages = new AtomicInteger(0);
+
+ final Callable producer = new Callable() {
+ public Object call() throws JMSException, InterruptedException {
+ Connection connection = factory.createConnection();
+ connections.add(connection);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(new byte[1024]);
+ connection.start();
+ connectionsEstablished.release();
+
+ while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
+ producer.send(message);
+ producedMessages.incrementAndGet();
+ }
+
+ connection.close();
+ workerDone.release();
+ return null;
+ }
+ };
+
+ final Callable consumer = new Callable() {
+ public Object call() throws JMSException, InterruptedException {
+ Connection connection = factory.createConnection();
+ connections.add(connection);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message msg) {
+ receivedMessages.incrementAndGet();
+ }
+ });
+ connection.start();
+
+ connectionsEstablished.release();
+ sampleTimeDone.await();
+
+ connection.close();
+ workerDone.release();
+ return null;
+ }
+ };
+
+ final Throwable workerError[] = new Throwable[1];
+ for (int i = 0; i < PRODUCER_COUNT; i++) {
+ new Thread("Producer:" + i) {
+ public void run() {
+ try {
+ producer.call();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ workerError[0] = e;
+ }
+ }
+ }.start();
+ }
+
+ for (int i = 0; i < CONSUMER_COUNT; i++) {
+ new Thread("Consumer:" + i) {
+ public void run() {
+ try {
+ consumer.call();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ workerError[0] = e;
+ }
+ }
+ }.start();
+ }
+
+ LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
+ connectionsEstablished.acquire();
+ LOG.info("Producers and Consumers are now running. Waiting for system to reach steady state: " + (SAMPLE_DELAY / 1000.0f) + " seconds");
+ Thread.sleep(1000 * 10);
+
+ LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
+
+ for (int i = 0; i < SAMPLES; i++) {
+
+ long start = System.currentTimeMillis();
+ producedMessages.set(0);
+ receivedMessages.set(0);
+
+ Thread.sleep(SAMPLE_DURATION);
+
+ long end = System.currentTimeMillis();
+ int r = receivedMessages.get();
+ int p = producedMessages.get();
+
+ LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
+ }
+
+ LOG.info("Sample done.");
+ sampleTimeDone.countDown();
+
+ workerDone.acquire();
+ if (workerError[0] != null) {
+ throw workerError[0];
+ }
+
+ }
+
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.util.Enumeration;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JmsQueueBrowserTest extends JmsTestSupport {
+ private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+ .getLog(JmsQueueBrowserTest.class);
+
+
+ /**
+ * Tests the queue browser. Browses the messages then the consumer tries to receive them. The messages should still
+ * be in the queue even when it was browsed.
+ *
+ * @throws Exception
+ */
+ public void testReceiveBrowseReceive() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ MessageProducer producer = session.createProducer(destination);
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.start();
+
+ Message[] outbound = new Message[]{session.createTextMessage("First Message"),
+ session.createTextMessage("Second Message"),
+ session.createTextMessage("Third Message")};
+
+ // lets consume any outstanding messages from previous test runs
+ while (consumer.receive(1000) != null) {
+ }
+
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ producer.send(outbound[2]);
+
+ // Get the first.
+ assertEquals(outbound[0], consumer.receive(1000));
+ consumer.close();
+ //Thread.sleep(200);
+
+ QueueBrowser browser = session.createBrowser((Queue) destination);
+ Enumeration enumeration = browser.getEnumeration();
+
+ // browse the second
+ assertTrue("should have received the second message", enumeration.hasMoreElements());
+ assertEquals(outbound[1], (Message) enumeration.nextElement());
+
+ // browse the third.
+ assertTrue("Should have received the third message", enumeration.hasMoreElements());
+ assertEquals(outbound[2], (Message) enumeration.nextElement());
+
+ // There should be no more.
+ boolean tooMany = false;
+ while (enumeration.hasMoreElements()) {
+ LOG.info("Got extra message: " + ((TextMessage) enumeration.nextElement()).getText());
+ tooMany = true;
+ }
+ assertFalse(tooMany);
+ browser.close();
+
+ // Re-open the consumer.
+ consumer = session.createConsumer(destination);
+ // Receive the second.
+ assertEquals(outbound[1], consumer.receive(1000));
+ // Receive the third.
+ assertEquals(outbound[2], consumer.receive(1000));
+ consumer.close();
+
+ }
+
+ public void testBrowseReceive() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+
+ connection.start();
+
+ Message[] outbound = new Message[]{session.createTextMessage("First Message"),
+ session.createTextMessage("Second Message"),
+ session.createTextMessage("Third Message")};
+
+
+ MessageProducer producer = session.createProducer(destination);
+ producer.send(outbound[0]);
+
+ // create browser first
+ QueueBrowser browser = session.createBrowser((Queue) destination);
+ Enumeration enumeration = browser.getEnumeration();
+
+ // create consumer
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // browse the first message
+ assertTrue("should have received the first message", enumeration.hasMoreElements());
+ assertEquals(outbound[0], (Message) enumeration.nextElement());
+
+ // Receive the first message.
+ assertEquals(outbound[0], consumer.receive(1000));
+ consumer.close();
+ browser.close();
+ producer.close();
+
+ }
+}
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java
------------------------------------------------------------------------------
svn:executable = *