You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/14 15:56:17 UTC

svn commit: r964051 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java

Author: gtully
Date: Wed Jul 14 13:56:17 2010
New Revision: 964051

URL: http://svn.apache.org/viewvc?rev=964051&view=rev
Log:
add test that validates https://issues.apache.org/activemq/browse/AMQ-1893 is resolved on trunk

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java   (with props)

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java?rev=964051&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java Wed Jul 14 13:56:17 2010
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AMQ1893Test extends TestCase {
+
+    private static final Log log = LogFactory.getLog(AMQ1893Test.class);
+
+    static final String QUEUE_NAME = "TEST";
+
+    static final int MESSAGE_COUNT_OF_ONE_GROUP = 10000;
+
+    static final int[] PRIORITIES = new int[]{0, 5, 10};
+
+    static final boolean debug = false;
+
+    private BrokerService brokerService;
+
+    private ActiveMQQueue destination;
+
+    @Override
+    protected void setUp() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+        destination = new ActiveMQQueue(QUEUE_NAME);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        // Stop any running threads.
+        brokerService.stop();
+    }
+
+
+    public void testProduceConsumeWithSelector() throws Exception {
+        new TestProducer().produceMessages();
+        new TestConsumer().consume();
+    }
+
+    
+    class TestProducer {
+
+        public void produceMessages() throws Exception {
+            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                    brokerService.getTransportConnectors().get(0).getConnectUri().toString()
+            );
+            Connection connection = connectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createQueue(QUEUE_NAME);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            long start = System.currentTimeMillis();
+
+            for (int priority : PRIORITIES) {
+
+                String name = null;
+                if (priority == 10) {
+                    name = "high";
+                } else if (priority == 5) {
+                    name = "mid";
+                } else {
+                    name = "low";
+                }
+
+                for (int i = 1; i <= MESSAGE_COUNT_OF_ONE_GROUP; i++) {
+
+                    TextMessage message = session.createTextMessage(name + "_" + i);
+                    message.setIntProperty("priority", priority);
+
+                    producer.send(message);
+                }
+            }
+
+            long end = System.currentTimeMillis();
+
+            log.info("sent " + (MESSAGE_COUNT_OF_ONE_GROUP * 3) + " messages in " + (end - start) + " ms");
+
+            producer.close();
+            session.close();
+            connection.close();
+        }
+    }
+
+    class TestConsumer {
+
+        private CountDownLatch finishLatch = new CountDownLatch(1);
+
+
+
+        public void consume() throws Exception {
+            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                    brokerService.getTransportConnectors().get(0).getConnectUri().toString()
+            );
+
+
+            final int totalMessageCount = MESSAGE_COUNT_OF_ONE_GROUP * PRIORITIES.length;
+            final AtomicInteger counter = new AtomicInteger();
+            final MessageListener listener = new MessageListener() {
+                public void onMessage(Message message) {
+
+                    if (debug) {
+                        try {
+                            log.info(((TextMessage) message).getText());
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                    }
+
+                    if (counter.incrementAndGet() == totalMessageCount) {
+
+                            finishLatch.countDown();
+
+                    }
+                }
+            };
+
+            int consumerCount = PRIORITIES.length;
+            Connection[] connections = new Connection[consumerCount];
+            Session[] sessions = new Session[consumerCount];
+            MessageConsumer[] consumers = new MessageConsumer[consumerCount];
+
+            for (int i = 0; i < consumerCount; i++) {
+                String selector = "priority = " + PRIORITIES[i];
+
+                connections[i] = connectionFactory.createConnection();
+                sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                consumers[i] = sessions[i].createConsumer(destination, selector);
+                consumers[i].setMessageListener(listener);
+            }
+
+            for (Connection connection : connections) {
+                connection.start();
+            }
+
+            log.info("received " + counter.get() + " messages");
+
+            assertTrue("got all messages in time", finishLatch.await(60, TimeUnit.SECONDS));
+
+            log.info("received " + counter.get() + " messages");
+
+            for (MessageConsumer consumer : consumers) {
+                consumer.close();
+            }
+
+            for (Session session : sessions) {
+                session.close();
+            }
+
+            for (Connection connection : connections) {
+                connection.close();
+            }
+        }
+
+    }
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date