You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 14:42:29 UTC

svn commit: r786040 [2/6] - in /activemq/sandbox/activemq-flow: activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/test/java/org/apache/activemq/legacy/broker/ activemq-all/src/test/java/org/apache/activemq/legacy/broker/advisory/ ...

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,925 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Test cases used to test the JMS message consumer.
+ * 
+ * @version $Revision$
+ */
+public class JMSConsumerTest extends JmsTestSupport {
+
+    private static final Log LOG = LogFactory.getLog(JMSConsumerTest.class);
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+    public int prefetch;
+    public int ackMode;
+    public byte destinationType;
+    public boolean durableConsumer;
+
+    public static Test suite() {
+        return suite(JMSConsumerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public void initCombosForTestMessageListenerWithConsumerCanBeStopped() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testMessageListenerWithConsumerCanBeStopped() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done1 = new CountDownLatch(1);
+        final CountDownLatch done2 = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 1) {
+                    done1.countDown();
+                }
+                if (counter.get() == 2) {
+                    done2.countDown();
+                }
+            }
+        });
+
+        // Send a first message to make sure that the consumer dispatcher is
+        // running
+        sendMessages(session, destination, 1);
+        assertTrue(done1.await(1, TimeUnit.SECONDS));
+        assertEquals(1, counter.get());
+
+        // Stop the consumer.
+        consumer.stop();
+
+        // Send a message, but should not get delivered.
+        sendMessages(session, destination, 1);
+        assertFalse(done2.await(1, TimeUnit.SECONDS));
+        assertEquals(1, counter.get());
+
+        // Start the consumer, and the message should now get delivered.
+        consumer.start();
+        assertTrue(done2.await(1, TimeUnit.SECONDS));
+        assertEquals(2, counter.get());
+    }
+
+
+    public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch closeDone = new CountDownLatch(1);
+        
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+        // preload the queue
+        sendMessages(session, destination, 2000);
+        
+
+        final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
+       
+        final Map<Thread, Throwable> exceptions = 
+            java.util.Collections.synchronizedMap(new HashMap<Thread, Throwable>());
+        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+                LOG.error("Uncaught exception:", e);
+                exceptions.put(t, e);
+            }
+        });
+        
+        final class AckAndClose implements Runnable {            
+            private Message message;
+
+            public AckAndClose(Message m) {
+                this.message = m;
+            }
+
+            public void run() {
+                try {   
+                    int count = counter.incrementAndGet();
+                    if (count == 590) {
+                        // close in a separate thread is ok by jms
+                        consumer.close();
+                        closeDone.countDown();
+                    }
+                    if (count % 200 == 0) {
+                        // ensure there are some outstanding messages
+                        // ack every 200
+                        message.acknowledge();
+                    }
+                } catch (Exception e) {        
+                    LOG.error("Exception on close or ack:", e);
+                    exceptions.put(Thread.currentThread(), e);
+                } 
+            }  
+        };
+    
+        final ExecutorService executor = Executors.newCachedThreadPool();
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) { 
+                // ack and close eventually in separate thread
+                executor.execute(new AckAndClose(m));
+            }
+        });
+
+        assertTrue(closeDone.await(20, TimeUnit.SECONDS));
+        // await possible exceptions
+        Thread.sleep(1000);
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+
+    
+    public void initCombosForTestMutiReceiveWithPrefetch1() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+                                                      Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testMutiReceiveWithPrefetch1() throws Exception {
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // Make sure 4 messages were delivered.
+        Message message = null;
+        for (int i = 0; i < 4; i++) {
+            message = consumer.receive(1000);
+            assertNotNull(message);
+        }
+        assertNull(consumer.receiveNoWait());
+        message.acknowledge();
+    }
+
+    public void initCombosForTestDurableConsumerSelectorChange() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testDurableConsumerSelectorChange() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.setClientID("test");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+        MessageConsumer consumer = session.createDurableSubscriber((Topic)destination, "test", "color='red'", false);
+
+        // Send the messages
+        TextMessage message = session.createTextMessage("1st");
+        message.setStringProperty("color", "red");
+        producer.send(message);
+
+        Message m = consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("1st", ((TextMessage)m).getText());
+
+        // Change the subscription.
+        consumer.close();
+        consumer = session.createDurableSubscriber((Topic)destination, "test", "color='blue'", false);
+
+        message = session.createTextMessage("2nd");
+        message.setStringProperty("color", "red");
+        producer.send(message);
+        message = session.createTextMessage("3rd");
+        message.setStringProperty("color", "blue");
+        producer.send(message);
+
+        // Selector should skip the 2nd message.
+        m = consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("3rd", ((TextMessage)m).getText());
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void initCombosForTestSendReceiveBytesMessage() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testSendReceiveBytesMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        BytesMessage message = session.createBytesMessage();
+        message.writeBoolean(true);
+        message.writeBoolean(false);
+        producer.send(message);
+
+        // Make sure only 1 message was delivered.
+        BytesMessage m = (BytesMessage)consumer.receive(1000);
+        assertNotNull(m);
+        assertTrue(m.readBoolean());
+        assertFalse(m.readBoolean());
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void initCombosForTestSetMessageListenerAfterStart() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testSetMessageListenerAfterStart() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // See if the message get sent to the listener
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    public void initCombosForTestPassMessageListenerIntoCreateConsumer() {
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testPassMessageListenerIntoCreateConsumer() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination, new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() { 
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+    }
+
+    public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
+    
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch sendDone = new CountDownLatch(1);
+        final CountDownLatch got2Done = new CountDownLatch(1);
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        // This test case does not work if optimized message dispatch is used as
+        // the main thread send block until the consumer receives the
+        // message. This test depends on thread decoupling so that the main
+        // thread can stop the consumer thread.
+        connection.setOptimizedMessageDispatch(false);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in first listener: " + tm.getText());
+                    assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 2) {
+                        sendDone.await();
+                        connection.close();
+                        got2Done.countDown();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+        sendDone.countDown();
+
+        // Wait for first 2 messages to arrive.
+        assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+        // Re-start connection.
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Pickup the remaining messages.
+        final CountDownLatch done2 = new CountDownLatch(1);
+        session = connection.createSession(false, ackMode);
+        consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in second listener: " + tm.getText());
+                    // order is not guaranteed as the connection is started before the listener is set.
+                    // assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 4) {
+                        done2.countDown();
+                    }
+                } catch (Throwable e) {
+                    LOG.error("unexpected ex onMessage: ", e);
+                }
+            }
+        });
+
+        assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack mode
+        assertEquals(5, counter.get());      
+    }
+
+    public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+    }
+
+    public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
+    
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch sendDone = new CountDownLatch(1);
+        final CountDownLatch got2Done = new CountDownLatch(1);
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        // This test case does not work if optimized message dispatch is used as
+        // the main thread send block until the consumer receives the
+        // message. This test depends on thread decoupling so that the main
+        // thread can stop the consumer thread.
+        connection.setOptimizedMessageDispatch(false);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in first listener: " + tm.getText());
+                    assertEquals("" + counter.get(), tm.getText());
+                    counter.incrementAndGet();
+                    m.acknowledge();
+                    if (counter.get() == 2) {
+                        sendDone.await();
+                        connection.close();
+                        got2Done.countDown();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+        sendDone.countDown();
+
+        // Wait for first 2 messages to arrive.
+        assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+
+        // Re-start connection.
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Pickup the remaining messages.
+        final CountDownLatch done2 = new CountDownLatch(1);
+        session = connection.createSession(false, ackMode);
+        consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                try {
+                    TextMessage tm = (TextMessage)m;
+                    LOG.info("Got in second listener: " + tm.getText());
+                    counter.incrementAndGet();
+                    if (counter.get() == 4) {
+                        done2.countDown();
+                    }
+                } catch (Throwable e) {
+                    LOG.error("unexpected ex onMessage: ", e);
+                }
+            }
+        });
+
+        assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // close from onMessage with Auto_ack will ack
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testMessageListenerWithConsumerWithPrefetch1() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    public void initCombosForTestMessageListenerWithConsumer() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testMessageListenerWithConsumer() throws Exception {
+
+        final AtomicInteger counter = new AtomicInteger(0);
+        final CountDownLatch done = new CountDownLatch(1);
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(new MessageListener() {
+            public void onMessage(Message m) {
+                counter.incrementAndGet();
+                if (counter.get() == 4) {
+                    done.countDown();
+                }
+            }
+        });
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
+        Thread.sleep(200);
+
+        // Make sure only 4 messages were delivered.
+        assertEquals(4, counter.get());
+    }
+
+    public void initCombosForTestUnackedWithPrefetch1StayInQueue() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+                                                      Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+    }
+
+    public void testUnackedWithPrefetch1StayInQueue() throws Exception {
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Use all the ack modes
+        Session session = connection.createSession(false, ackMode);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // Only pick up the first 2 messages.
+        Message message = null;
+        for (int i = 0; i < 2; i++) {
+            message = consumer.receive(1000);
+            assertNotNull(message);
+        }
+        message.acknowledge();
+
+        connection.close();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        // Use all the ack modes
+        session = connection.createSession(false, ackMode);
+        consumer = session.createConsumer(destination);
+
+        // Pickup the rest of the messages.
+        for (int i = 0; i < 2; i++) {
+            message = consumer.receive(1000);
+            assertNotNull(message);
+        }
+        message.acknowledge();
+        assertNull(consumer.receiveNoWait());
+
+    }
+
+    public void initCombosForTestPrefetch1MessageNotDispatched() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+    }
+
+    public void testPrefetch1MessageNotDispatched() throws Exception {
+
+        // Set prefetch to 1
+        connection.getPrefetchPolicy().setAll(1);
+        connection.start();
+
+        Session session = connection.createSession(true, 0);
+        destination = new ActiveMQQueue("TEST");
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send 2 messages to the destination.
+        sendMessages(session, destination, 2);
+        session.commit();
+
+        // The prefetch should fill up with 1 message.
+        // Since prefetch is still full, the 2nd message should get dispatched
+        // to another consumer.. lets create the 2nd consumer test that it does
+        // make sure it does.
+        ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection();
+        connection2.start();
+        connections.add(connection2);
+        Session session2 = connection2.createSession(true, 0);
+        MessageConsumer consumer2 = session2.createConsumer(destination);
+
+        // Pick up the first message.
+        Message message1 = consumer.receive(1000);
+        assertNotNull(message1);
+
+        // Pick up the 2nd messages.
+        Message message2 = consumer2.receive(1000);
+        assertNotNull(message2);
+
+        session.commit();
+        session2.commit();
+
+        assertNull(consumer.receiveNoWait());
+
+    }
+
+    public void initCombosForTestDontStart() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testDontStart() throws Exception {
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 1);
+
+        // Make sure no messages were delivered.
+        assertNull(consumer.receive(1000));
+    }
+
+    public void initCombosForTestStartAfterSend() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testStartAfterSend() throws Exception {
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 1);
+
+        // Start the conncection after the message was sent.
+        connection.start();
+
+        // Make sure only 1 message was delivered.
+        assertNotNull(consumer.receive(1000));
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void initCombosForTestReceiveMessageWithConsumer() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testReceiveMessageWithConsumer() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 1);
+
+        // Make sure only 1 message was delivered.
+        Message m = consumer.receive(1000);
+        assertNotNull(m);
+        assertEquals("0", ((TextMessage)m).getText());
+        assertNull(consumer.receiveNoWait());
+    }
+
+    
+    public void testDupsOkConsumer() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+        destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        // Send the messages
+        sendMessages(session, destination, 4);
+
+        // Make sure only 4 message are delivered.
+        for( int i=0; i < 4; i++){
+            Message m = consumer.receive(1000);
+            assertNotNull(m);
+        }
+        assertNull(consumer.receive(1000));
+        
+        // Close out the consumer.. no other messages should be left on the queue.
+        consumer.close();
+        
+        consumer = session.createConsumer(destination);
+        assertNull(consumer.receive(1000));
+    }
+
+    public void testRedispatchOfUncommittedTx() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+        
+        sendMessages(connection, destination, 2);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        assertNotNull(consumer.receive(1000));
+        assertNotNull(consumer.receive(1000));
+        
+        // install another consumer while message dispatch is unacked/uncommitted
+        Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+        // no commit so will auto rollback and get re-dispatched to redisptachConsumer
+        session.close();
+                
+        Message msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        
+        msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        redispatchSession.commit();
+        
+        assertNull(redispatchConsumer.receive(500));
+        redispatchSession.close();
+    }
+
+    
+    public void testRedispatchOfRolledbackTx() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+        
+        sendMessages(connection, destination, 2);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        assertNotNull(consumer.receive(1000));
+        assertNotNull(consumer.receive(1000));
+        
+        // install another consumer while message dispatch is unacked/uncommitted
+        Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+        session.rollback();
+        session.close();
+                
+        Message msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        assertEquals(2, msg.getLongProperty("JMSXDeliveryCount"));
+        redispatchSession.commit();
+        
+        assertNull(redispatchConsumer.receive(500));
+        redispatchSession.close();
+    }
+    
+    
+    public void initCombosForTestAckOfExpired() {
+        addCombinationValues("destinationType", 
+                new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+        
+    public void testAckOfExpired() throws Exception {
+        
+        ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false");
+        connection = (ActiveMQConnection) fact.createConnection();
+        
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
+        destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ?
+                session.createQueue("test") : session.createTopic("test"));
+                    
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.setStatsEnabled(true);
+                
+        Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
+            MessageProducer producer = sendSession.createProducer(destination);
+        producer.setTimeToLive(1000);
+        final int count = 4;
+        for (int i = 0; i < count; i++) {
+            TextMessage message = sendSession.createTextMessage("" + i);
+            producer.send(message);
+        }
+        
+        // let first bunch in queue expire
+        Thread.sleep(1000);
+        
+        producer.setTimeToLive(0);
+        for (int i = 0; i < count; i++) {
+            TextMessage message = sendSession.createTextMessage("no expiry" + i);
+            producer.send(message);
+        }
+        
+        ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer;
+        
+        for(int i=0; i<count; i++) {
+            TextMessage msg = (TextMessage) amqConsumer.receive();
+            assertNotNull(msg);
+            assertTrue(msg.getText().contains("no expiry"));
+            
+            // force an ack when there are expired messages
+            amqConsumer.acknowledge();         
+        }
+        assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount());
+
+//        TODO: 
+//        DestinationViewMBean view = createView(destination);
+//        
+//        assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
+//        assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
+//        assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount());
+    }
+    
+//    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+//         MBeanServer mbeanServer = broker.getMBeanServer();
+//         String domain = "org.apache.activemq";
+//         ObjectName name;
+//        if (destination.isQueue()) {
+//            name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
+//        } else {
+//            name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=test");
+//        }
+//        return (DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, DestinationViewMBean.class, true);
+//    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSConsumerTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ * Test cases used to test the JMS message exclusive consumers.
+ * 
+ * @version $Revision$
+ */
+public class JMSExclusiveConsumerTest extends JmsTestSupport {
+
+    public int deliveryMode;
+
+    public static Test suite() {
+        return suite(JMSExclusiveConsumerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public void initCombosForTestRoundRobinDispatchOnNonExclusive() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+    }
+
+    /**
+     * Shows that by default messages are round robined across a set of
+     * consumers.
+     * 
+     * @throws Exception
+     */
+    public void testRoundRobinDispatchOnNonExclusive() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+
+        MessageConsumer consumer1 = session.createConsumer(destination);
+        MessageConsumer consumer2 = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+
+        Message m;
+        m = consumer2.receive(1000);
+        assertNotNull(m);
+
+        m = consumer1.receive(1000);
+        assertNotNull(m);
+
+        assertNull(consumer1.receiveNoWait());
+        assertNull(consumer2.receiveNoWait());
+    }
+
+    public void initCombosForTestDispatchExclusive() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+    }
+
+    /**
+     * Shows that if the "?consumer.exclusive=true" option is added to
+     * destination, then all messages are routed to 1 consumer.
+     * 
+     * @throws Exception
+     */
+    public void testDispatchExclusive() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST?consumer.exclusive=true");
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(deliveryMode);
+
+        MessageConsumer consumer1 = session.createConsumer(destination);
+        MessageConsumer consumer2 = session.createConsumer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        producer.send(session.createTextMessage("2nd"));
+        producer.send(session.createTextMessage("3nd"));
+
+        Message m;
+        m = consumer2.receive(1000);
+        if (m != null) {
+            // Consumer 2 should get all the messages.
+            for (int i = 0; i < 2; i++) {
+                m = consumer2.receive(1000);
+                assertNotNull(m);
+            }
+        } else {
+            // Consumer 1 should get all the messages.
+            for (int i = 0; i < 3; i++) {
+                m = consumer1.receive(1000);
+                assertNotNull(m);
+            }
+        }
+
+        assertNull(consumer1.receiveNoWait());
+        assertNull(consumer2.receiveNoWait());
+    }
+
+    public void testMixExclusiveWithNonExclusive() throws Exception {
+        ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=true");
+        ActiveMQQueue nonExclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=false");
+
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer nonExCon = session.createConsumer(nonExclusiveQueue);
+        MessageConsumer exCon = session.createConsumer(exclusiveQueue);
+
+        MessageProducer prod = session.createProducer(exclusiveQueue);
+        prod.send(session.createMessage());
+        prod.send(session.createMessage());
+        prod.send(session.createMessage());
+
+        Message m;
+        for (int i = 0; i < 3; i++) {
+            m = exCon.receive(1000);
+            assertNotNull(m);
+            m = nonExCon.receive(1000);
+            assertNull(m);
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSExclusiveConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,500 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.net.URISyntaxException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Vector;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * Test cases used to test the JMS message consumer.
+ * 
+ * @version $Revision$
+ */
+public class JMSMessageTest extends JmsTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+    public int prefetch;
+    public int ackMode;
+    public byte destinationType;
+    public boolean durableConsumer;
+    public String connectURL;
+
+    /**
+     * Run all these tests in both marshaling and non-marshaling mode.
+     */
+    public void initCombos() {
+        addCombinationValues("connectURL", new Object[] {"vm://localhost?marshal=false",
+                                                         "vm://localhost?marshal=true"});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
+    }
+
+    public void testTextMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message.
+        {
+            TextMessage message = session.createTextMessage();
+            message.setText("Hi");
+            producer.send(message);
+        }
+
+        // Check the Message
+        {
+            TextMessage message = (TextMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("Hi", message.getText());
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public static Test suite() {
+        return suite(JMSMessageTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectURL);
+        return factory;
+    }
+
+    public void testBytesMessageLength() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message
+        {
+            BytesMessage message = session.createBytesMessage();
+            message.writeInt(1);
+            message.writeInt(2);
+            message.writeInt(3);
+            message.writeInt(4);
+            producer.send(message);
+        }
+
+        // Check the message.
+        {
+            BytesMessage message = (BytesMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals(16, message.getBodyLength());
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testObjectMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // send the message.
+        {
+            ObjectMessage message = session.createObjectMessage();
+            message.setObject("Hi");
+            producer.send(message);
+        }
+
+        // Check the message
+        {
+            ObjectMessage message = (ObjectMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("Hi", message.getObject());
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testBytesMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message
+        {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBoolean(true);
+            producer.send(message);
+        }
+
+        // Check the message
+        {
+            BytesMessage message = (BytesMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertTrue(message.readBoolean());
+
+            try {
+                message.readByte();
+                fail("Expected exception not thrown.");
+            } catch (MessageEOFException e) {
+            }
+
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testStreamMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message.
+        {
+            StreamMessage message = session.createStreamMessage();
+            message.writeString("This is a test to see how it works.");
+            producer.send(message);
+        }
+
+        // Check the message.
+        {
+            StreamMessage message = (StreamMessage)consumer.receive(1000);
+            assertNotNull(message);
+
+            // Invalid conversion should throw exception and not move the stream
+            // position.
+            try {
+                message.readByte();
+                fail("Should have received NumberFormatException");
+            } catch (NumberFormatException e) {
+            }
+
+            assertEquals("This is a test to see how it works.", message.readString());
+
+            // Invalid conversion should throw exception and not move the stream
+            // position.
+            try {
+                message.readByte();
+                fail("Should have received MessageEOFException");
+            } catch (MessageEOFException e) {
+            }
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void testMapMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // send the message.
+        {
+            MapMessage message = session.createMapMessage();
+            message.setBoolean("boolKey", true);
+            producer.send(message);
+        }
+
+        // get the message.
+        {
+            MapMessage message = (MapMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertTrue(message.getBoolean("boolKey"));
+        }
+        assertNull(consumer.receiveNoWait());
+    }
+
+    static class ForeignMessage implements TextMessage {
+
+        public int deliveryMode;
+
+        private String messageId;
+        private long timestamp;
+        private String correlationId;
+        private Destination replyTo;
+        private Destination destination;
+        private boolean redelivered;
+        private String type;
+        private long expiration;
+        private int priority;
+        private String text;
+        private HashMap<String, Object> props = new HashMap<String, Object>();
+
+        public String getJMSMessageID() throws JMSException {
+            return messageId;
+        }
+
+        public void setJMSMessageID(String arg0) throws JMSException {
+            messageId = arg0;
+        }
+
+        public long getJMSTimestamp() throws JMSException {
+            return timestamp;
+        }
+
+        public void setJMSTimestamp(long arg0) throws JMSException {
+            timestamp = arg0;
+        }
+
+        public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+            return null;
+        }
+
+        public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
+        }
+
+        public void setJMSCorrelationID(String arg0) throws JMSException {
+            correlationId = arg0;
+        }
+
+        public String getJMSCorrelationID() throws JMSException {
+            return correlationId;
+        }
+
+        public Destination getJMSReplyTo() throws JMSException {
+            return replyTo;
+        }
+
+        public void setJMSReplyTo(Destination arg0) throws JMSException {
+            replyTo = arg0;
+        }
+
+        public Destination getJMSDestination() throws JMSException {
+            return destination;
+        }
+
+        public void setJMSDestination(Destination arg0) throws JMSException {
+            destination = arg0;
+        }
+
+        public int getJMSDeliveryMode() throws JMSException {
+            return deliveryMode;
+        }
+
+        public void setJMSDeliveryMode(int arg0) throws JMSException {
+            deliveryMode = arg0;
+        }
+
+        public boolean getJMSRedelivered() throws JMSException {
+            return redelivered;
+        }
+
+        public void setJMSRedelivered(boolean arg0) throws JMSException {
+            redelivered = arg0;
+        }
+
+        public String getJMSType() throws JMSException {
+            return type;
+        }
+
+        public void setJMSType(String arg0) throws JMSException {
+            type = arg0;
+        }
+
+        public long getJMSExpiration() throws JMSException {
+            return expiration;
+        }
+
+        public void setJMSExpiration(long arg0) throws JMSException {
+            expiration = arg0;
+        }
+
+        public int getJMSPriority() throws JMSException {
+            return priority;
+        }
+
+        public void setJMSPriority(int arg0) throws JMSException {
+            priority = arg0;
+        }
+
+        public void clearProperties() throws JMSException {
+        }
+
+        public boolean propertyExists(String arg0) throws JMSException {
+            return false;
+        }
+
+        public boolean getBooleanProperty(String arg0) throws JMSException {
+            return false;
+        }
+
+        public byte getByteProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        public short getShortProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        public int getIntProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        public long getLongProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        public float getFloatProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        public double getDoubleProperty(String arg0) throws JMSException {
+            return 0;
+        }
+
+        public String getStringProperty(String arg0) throws JMSException {
+            return (String)props.get(arg0);
+        }
+
+        public Object getObjectProperty(String arg0) throws JMSException {
+            return props.get(arg0);
+        }
+
+        public Enumeration getPropertyNames() throws JMSException {
+            return new Vector<String>(props.keySet()).elements();
+        }
+
+        public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
+        }
+
+        public void setByteProperty(String arg0, byte arg1) throws JMSException {
+        }
+
+        public void setShortProperty(String arg0, short arg1) throws JMSException {
+        }
+
+        public void setIntProperty(String arg0, int arg1) throws JMSException {
+        }
+
+        public void setLongProperty(String arg0, long arg1) throws JMSException {
+        }
+
+        public void setFloatProperty(String arg0, float arg1) throws JMSException {
+        }
+
+        public void setDoubleProperty(String arg0, double arg1) throws JMSException {
+        }
+
+        public void setStringProperty(String arg0, String arg1) throws JMSException {
+            props.put(arg0, arg1);
+        }
+
+        public void setObjectProperty(String arg0, Object arg1) throws JMSException {
+            props.put(arg0, arg1);
+        }
+
+        public void acknowledge() throws JMSException {
+        }
+
+        public void clearBody() throws JMSException {
+        }
+
+        public void setText(String arg0) throws JMSException {
+            text = arg0;
+        }
+
+        public String getText() throws JMSException {
+            return text;
+        }
+    }
+
+    public void testForeignMessage() throws Exception {
+
+        // Receive a message with the JMS API
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the message.
+        {
+            ForeignMessage message = new ForeignMessage();
+            message.text = "Hello";
+            message.setStringProperty("test", "value");
+            long timeToLive = 10000L;
+            long start = System.currentTimeMillis();
+            producer.send(message, Session.AUTO_ACKNOWLEDGE, 7, timeToLive);
+            long end = System.currentTimeMillis();
+
+
+            //validate jms spec 1.1 section 3.4.11 table 3.1
+            // JMSDestination, JMSDeliveryMode,  JMSExpiration, JMSPriority, JMSMessageID, and JMSTimestamp
+            //must be set by sending a message.
+            assertEquals(destination, message.getJMSDestination());
+            assertEquals(Session.AUTO_ACKNOWLEDGE, message.getJMSDeliveryMode());
+            assertTrue(start  + timeToLive <= message.getJMSExpiration());
+            assertTrue(end + timeToLive >= message.getJMSExpiration());
+            assertEquals(7, message.getJMSPriority());
+            assertNotNull(message.getJMSMessageID());
+            assertTrue(start <= message.getJMSTimestamp());
+            assertTrue(end >= message.getJMSTimestamp());
+        }
+
+        // Validate message is OK.
+        {
+            TextMessage message = (TextMessage)consumer.receive(1000);
+            assertNotNull(message);
+            assertEquals("Hello", message.getText());
+            assertEquals("value", message.getStringProperty("test"));
+        }
+
+        assertNull(consumer.receiveNoWait());
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSMessageTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.util.Enumeration;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+
+public class JMSUsecaseTest extends JmsTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+    public int prefetch;
+    public byte destinationType;
+    public boolean durableConsumer;
+
+    public static Test suite() {
+        return suite(JMSUsecaseTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public void initCombosForTestQueueBrowser() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+    }
+
+    public void testQueueBrowser() throws Exception {
+
+        // Send a message to the broker.
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        sendMessages(session, destination, 5);
+
+        QueueBrowser browser = session.createBrowser((Queue)destination);
+        Enumeration enumeration = browser.getEnumeration();
+        for (int i = 0; i < 5; i++) {
+            Thread.sleep(100);
+            assertTrue(enumeration.hasMoreElements());
+            Message m = (Message)enumeration.nextElement();
+            assertNotNull(m);
+            assertEquals("" + i, ((TextMessage)m).getText());
+        }
+        assertFalse(enumeration.hasMoreElements());
+    }
+
+    public void initCombosForTestSendReceive() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testSendReceive() throws Exception {
+        // Send a message to the broker.
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        destination = createDestination(session, destinationType);
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        ActiveMQMessage message = new ActiveMQMessage();
+        producer.send(message);
+
+        // Make sure only 1 message was delivered.
+        assertNotNull(consumer.receive(1000));
+        assertNull(consumer.receiveNoWait());
+    }
+
+    public void initCombosForTestSendReceiveTransacted() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testSendReceiveTransacted() throws Exception {
+        // Send a message to the broker.
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        destination = createDestination(session, destinationType);
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        producer.send(session.createTextMessage("test"));
+
+        // Message should not be delivered until commit.
+        assertNull(consumer.receiveNoWait());
+        session.commit();
+
+        // Make sure only 1 message was delivered.
+        Message message = consumer.receive(1000);
+        assertNotNull(message);
+        assertFalse(message.getJMSRedelivered());
+        assertNull(consumer.receiveNoWait());
+
+        // Message should be redelivered is rollback is used.
+        session.rollback();
+
+        // Make sure only 1 message was delivered.
+        message = consumer.receive(2000);
+        assertNotNull(message);
+        assertTrue(message.getJMSRedelivered());
+        assertNull(consumer.receiveNoWait());
+
+        // If we commit now, the message should not be redelivered.
+        session.commit();
+        assertNull(consumer.receiveNoWait());
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JMSUsecaseTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.legacy.broker.BrokerFactory;
+import org.apache.activemq.legacy.broker.BrokerService;
+import org.apache.activemq.legacy.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Benchmarks the broker by starting many consumer and producers against the
+ * same destination. Make sure you run with jvm option -server (makes a big
+ * difference). The tests simulate storing 1000 1k jms messages to see the rate
+ * of processing msg/sec.
+ * 
+ * @version $Revision$
+ */
+public class JmsBenchmark extends JmsTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(JmsBenchmark.class);
+
+    private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5));
+    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10"));
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 60));
+    private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10"));
+    private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10"));
+
+    public ActiveMQDestination destination;
+
+    public static Test suite() {
+        return suite(JmsBenchmark.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(JmsBenchmark.class);
+    }
+
+    public void initCombos() {
+        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return BrokerFactory.createBroker(new URI("broker://(tcp://localhost:0)?persistent=false"));
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException {
+        return new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)).getServer().getConnectURI());
+    }
+
+    /**
+     * @throws Throwable
+     */
+    public void testConcurrentSendReceive() throws Throwable {
+
+        final Semaphore connectionsEstablished = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final Semaphore workerDone = new Semaphore(1 - (CONSUMER_COUNT + PRODUCER_COUNT));
+        final CountDownLatch sampleTimeDone = new CountDownLatch(1);
+
+        final AtomicInteger producedMessages = new AtomicInteger(0);
+        final AtomicInteger receivedMessages = new AtomicInteger(0);
+
+        final Callable producer = new Callable() {
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(destination);
+                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                BytesMessage message = session.createBytesMessage();
+                message.writeBytes(new byte[1024]);
+                connection.start();
+                connectionsEstablished.release();
+
+                while (!sampleTimeDone.await(0, TimeUnit.MILLISECONDS)) {
+                    producer.send(message);
+                    producedMessages.incrementAndGet();
+                }
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Callable consumer = new Callable() {
+            public Object call() throws JMSException, InterruptedException {
+                Connection connection = factory.createConnection();
+                connections.add(connection);
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = session.createConsumer(destination);
+
+                consumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message msg) {
+                        receivedMessages.incrementAndGet();
+                    }
+                });
+                connection.start();
+
+                connectionsEstablished.release();
+                sampleTimeDone.await();
+
+                connection.close();
+                workerDone.release();
+                return null;
+            }
+        };
+
+        final Throwable workerError[] = new Throwable[1];
+        for (int i = 0; i < PRODUCER_COUNT; i++) {
+            new Thread("Producer:" + i) {
+                public void run() {
+                    try {
+                        producer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        for (int i = 0; i < CONSUMER_COUNT; i++) {
+            new Thread("Consumer:" + i) {
+                public void run() {
+                    try {
+                        consumer.call();
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                        workerError[0] = e;
+                    }
+                }
+            }.start();
+        }
+
+        LOG.info(getName() + ": Waiting for Producers and Consumers to startup.");
+        connectionsEstablished.acquire();
+        LOG.info("Producers and Consumers are now running.  Waiting for system to reach steady state: " + (SAMPLE_DELAY / 1000.0f) + " seconds");
+        Thread.sleep(1000 * 10);
+
+        LOG.info("Starting sample: " + SAMPLES + " each lasting " + (SAMPLE_DURATION / 1000.0f) + " seconds");
+
+        for (int i = 0; i < SAMPLES; i++) {
+
+            long start = System.currentTimeMillis();
+            producedMessages.set(0);
+            receivedMessages.set(0);
+
+            Thread.sleep(SAMPLE_DURATION);
+
+            long end = System.currentTimeMillis();
+            int r = receivedMessages.get();
+            int p = producedMessages.get();
+
+            LOG.info("published: " + p + " msgs at " + (p * 1000f / (end - start)) + " msgs/sec, " + "consumed: " + r + " msgs at " + (r * 1000f / (end - start)) + " msgs/sec");
+        }
+
+        LOG.info("Sample done.");
+        sampleTimeDone.countDown();
+
+        workerDone.acquire();
+        if (workerError[0] != null) {
+            throw workerError[0];
+        }
+
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsBenchmark.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test1;
+
+import java.util.Enumeration;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JmsQueueBrowserTest extends JmsTestSupport {
+    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+            .getLog(JmsQueueBrowserTest.class);
+    
+
+    /**
+     * Tests the queue browser. Browses the messages then the consumer tries to receive them. The messages should still
+     * be in the queue even when it was browsed.
+     *
+     * @throws Exception
+     */
+    public void testReceiveBrowseReceive() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.start();
+
+        Message[] outbound = new Message[]{session.createTextMessage("First Message"),
+                                           session.createTextMessage("Second Message"),
+                                           session.createTextMessage("Third Message")};
+
+        // lets consume any outstanding messages from previous test runs
+        while (consumer.receive(1000) != null) {
+        }
+
+        producer.send(outbound[0]);
+        producer.send(outbound[1]);
+        producer.send(outbound[2]);
+
+        // Get the first.
+        assertEquals(outbound[0], consumer.receive(1000));
+        consumer.close();
+        //Thread.sleep(200);
+
+        QueueBrowser browser = session.createBrowser((Queue) destination);
+        Enumeration enumeration = browser.getEnumeration();
+
+        // browse the second
+        assertTrue("should have received the second message", enumeration.hasMoreElements());
+        assertEquals(outbound[1], (Message) enumeration.nextElement());
+
+        // browse the third.
+        assertTrue("Should have received the third message", enumeration.hasMoreElements());
+        assertEquals(outbound[2], (Message) enumeration.nextElement());
+
+        // There should be no more.
+        boolean tooMany = false;
+        while (enumeration.hasMoreElements()) {
+            LOG.info("Got extra message: " + ((TextMessage) enumeration.nextElement()).getText());
+            tooMany = true;
+        }
+        assertFalse(tooMany);
+        browser.close();
+
+        // Re-open the consumer.
+        consumer = session.createConsumer(destination);
+        // Receive the second.
+        assertEquals(outbound[1], consumer.receive(1000));
+        // Receive the third.
+        assertEquals(outbound[2], consumer.receive(1000));
+        consumer.close();
+
+    }
+    
+    public void testBrowseReceive() throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+       
+        connection.start();
+
+        Message[] outbound = new Message[]{session.createTextMessage("First Message"),
+                                           session.createTextMessage("Second Message"),
+                                           session.createTextMessage("Third Message")};
+
+        
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(outbound[0]);
+        
+        // create browser first
+        QueueBrowser browser = session.createBrowser((Queue) destination);
+        Enumeration enumeration = browser.getEnumeration();
+        
+        // create consumer
+        MessageConsumer consumer = session.createConsumer(destination);
+        
+        // browse the first message
+        assertTrue("should have received the first message", enumeration.hasMoreElements());
+        assertEquals(outbound[0], (Message) enumeration.nextElement());
+        
+        // Receive the first message.
+        assertEquals(outbound[0], consumer.receive(1000));
+        consumer.close();
+        browser.close();
+        producer.close();
+
+    }    
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test1/JmsQueueBrowserTest.java
------------------------------------------------------------------------------
    svn:executable = *