You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/10/24 16:31:01 UTC

svn commit: r707644 - in /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory: MasterSlaveTempQueueMemoryTest.java TempQueueMemoryTest.java

Author: gtully
Date: Fri Oct 24 07:31:01 2008
New Revision: 707644

URL: http://svn.apache.org/viewvc?rev=707644&view=rev
Log:
more transactional and concurrent tests for master slave to try and reproduce AMQ-1983

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=707644&r1=707643&r2=707644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Fri Oct 24 07:31:01 2008
@@ -19,6 +19,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -156,5 +157,29 @@
         messagesToSend = 10;
         testLoadRequestReply();
     }
+    
+    public void testLoadRequestReplyWithTransactions() throws Exception {
+        serverTransactional = clientTransactional = true;
+        messagesToSend = 100;
+        reInitialiseSessions();
+        testLoadRequestReply();
+    }
+    
+    public void testConcurrentConsumerLoadRequestReplyWithTransactions() throws Exception {
+        serverTransactional = true;
+        numConsumers = numProducers = 10;
+        messagesToSend = 100;
+        reInitialiseSessions();
+        testLoadRequestReply();
+    }
 
+    protected void reInitialiseSessions() throws Exception {
+        // reinitialize so they can respect the transactional flags 
+        serverSession.close();
+        clientSession.close();
+        serverSession = serverConnection.createSession(serverTransactional, 
+                serverTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+        clientSession = clientConnection.createSession(clientTransactional,
+                clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java?rev=707644&r1=707643&r2=707644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java Fri Oct 24 07:31:01 2008
@@ -16,8 +16,11 @@
  */
 package org.apache.activemq.advisory;
 
+import java.util.Vector;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -39,38 +42,73 @@
     protected Destination serverDestination;
     protected int messagesToSend = 2000;
     protected boolean deleteTempQueue = true;
+    protected boolean serverTransactional = false;
+    protected boolean clientTransactional = false;
+    protected int numConsumers = 1;
+    protected int numProducers = 1;
+    
 
     public void testLoadRequestReply() throws Exception {
-        MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination);
-        serverConsumer.setMessageListener(new MessageListener() {
-            public void onMessage(Message msg) {
+        for (int i=0; i< numConsumers; i++) {
+            serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() {
+                public void onMessage(Message msg) {
+                    try {
+                        Destination replyTo = msg.getJMSReplyTo();
+                        MessageProducer producer = serverSession.createProducer(replyTo);
+                        producer.send(replyTo, msg);
+                        if (serverTransactional) {
+                            serverSession.commit();
+                        }
+                        producer.close();
+                    } catch (Exception e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+        
+        class Producer extends Thread {
+            private int numToSend;
+            public Producer(int numToSend) {
+                this.numToSend = numToSend;
+            }
+            public void run() {
+                MessageProducer producer;
                 try {
-                    Destination replyTo = msg.getJMSReplyTo();
-                    MessageProducer producer = serverSession.createProducer(replyTo);
-                    producer.send(replyTo, msg);
-                    producer.close();
-                } catch (Exception e) {
+                    producer = clientSession.createProducer(serverDestination);
+               
+                    for (int i =0; i< numToSend; i++) {
+                        TemporaryQueue replyTo = clientSession.createTemporaryQueue();
+                        MessageConsumer consumer = clientSession.createConsumer(replyTo);
+                        Message msg = clientSession.createMessage();
+                        msg.setJMSReplyTo(replyTo);
+                        producer.send(msg);
+                        if (clientTransactional) {
+                            clientSession.commit();
+                        }
+                        Message reply = consumer.receive();
+                        if (clientTransactional) {
+                            clientSession.commit();
+                        }
+                        consumer.close();
+                        if (deleteTempQueue) {
+                            replyTo.delete();
+                        } else {
+                            // temp queue will be cleaned up on clientConnection.close
+                        }
+                    }
+                } catch (JMSException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
                 }
             }
-        });
-        
-        MessageProducer producer = clientSession.createProducer(serverDestination);
-        for (int i =0; i< messagesToSend; i++) {
-            TemporaryQueue replyTo = clientSession.createTemporaryQueue();
-            MessageConsumer consumer = clientSession.createConsumer(replyTo);
-            Message msg = clientSession.createMessage();
-            msg.setJMSReplyTo(replyTo);
-            producer.send(msg);
-            Message reply = consumer.receive();
-            consumer.close();
-            if (deleteTempQueue) {
-                replyTo.delete();
-            } else {
-                // temp queue will be cleaned up on clientConnection.close
-            }
         }
+        Vector<Thread> threads = new Vector<Thread>(numProducers);
+        for (int i=0; i<numProducers ; i++) {
+            threads.add(new Producer(messagesToSend/numProducers));
+        }
+        startAndJoinThreads(threads);
         
         clientSession.close();
         serverSession.close();
@@ -91,7 +129,16 @@
         
                
         //serverDestination + 
-        assertTrue(rb.getDestinationMap().size()==6);          
+        assertEquals(6, rb.getDestinationMap().size());          
+    }
+
+    private void startAndJoinThreads(Vector<Thread> threads) throws Exception {
+        for (Thread thread: threads) {
+            thread.start();
+        }
+        for (Thread thread: threads) {
+            thread.join();
+        }
     }
 
     protected void setUp() throws Exception {
@@ -108,9 +155,13 @@
     protected void tearDown() throws Exception {
         
         super.tearDown();
+        serverTransactional = clientTransactional = false;
+        numConsumers = numProducers = 1;
+        messagesToSend = 2000;
     }
     
     protected Destination createDestination() {
         return new ActiveMQQueue(getClass().getName());
     }
+    
 }