You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/10/24 16:31:01 UTC
svn commit: r707644 - in
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory:
MasterSlaveTempQueueMemoryTest.java TempQueueMemoryTest.java
Author: gtully
Date: Fri Oct 24 07:31:01 2008
New Revision: 707644
URL: http://svn.apache.org/viewvc?rev=707644&view=rev
Log:
more transactional and concurrent tests for master slave to try and reproduce AMQ-1983
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=707644&r1=707643&r2=707644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Fri Oct 24 07:31:01 2008
@@ -19,6 +19,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -156,5 +157,29 @@
messagesToSend = 10;
testLoadRequestReply();
}
+
+ public void testLoadRequestReplyWithTransactions() throws Exception {
+ serverTransactional = clientTransactional = true;
+ messagesToSend = 100;
+ reInitialiseSessions();
+ testLoadRequestReply();
+ }
+
+ public void testConcurrentConsumerLoadRequestReplyWithTransactions() throws Exception {
+ serverTransactional = true;
+ numConsumers = numProducers = 10;
+ messagesToSend = 100;
+ reInitialiseSessions();
+ testLoadRequestReply();
+ }
+ protected void reInitialiseSessions() throws Exception {
+ // reinitialize so they can respect the transactional flags
+ serverSession.close();
+ clientSession.close();
+ serverSession = serverConnection.createSession(serverTransactional,
+ serverTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ clientSession = clientConnection.createSession(clientTransactional,
+ clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java?rev=707644&r1=707643&r2=707644&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java Fri Oct 24 07:31:01 2008
@@ -16,8 +16,11 @@
*/
package org.apache.activemq.advisory;
+import java.util.Vector;
+
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -39,38 +42,73 @@
protected Destination serverDestination;
protected int messagesToSend = 2000;
protected boolean deleteTempQueue = true;
+ protected boolean serverTransactional = false;
+ protected boolean clientTransactional = false;
+ protected int numConsumers = 1;
+ protected int numProducers = 1;
+
public void testLoadRequestReply() throws Exception {
- MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination);
- serverConsumer.setMessageListener(new MessageListener() {
- public void onMessage(Message msg) {
+ for (int i=0; i< numConsumers; i++) {
+ serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() {
+ public void onMessage(Message msg) {
+ try {
+ Destination replyTo = msg.getJMSReplyTo();
+ MessageProducer producer = serverSession.createProducer(replyTo);
+ producer.send(replyTo, msg);
+ if (serverTransactional) {
+ serverSession.commit();
+ }
+ producer.close();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ class Producer extends Thread {
+ private int numToSend;
+ public Producer(int numToSend) {
+ this.numToSend = numToSend;
+ }
+ public void run() {
+ MessageProducer producer;
try {
- Destination replyTo = msg.getJMSReplyTo();
- MessageProducer producer = serverSession.createProducer(replyTo);
- producer.send(replyTo, msg);
- producer.close();
- } catch (Exception e) {
+ producer = clientSession.createProducer(serverDestination);
+
+ for (int i =0; i< numToSend; i++) {
+ TemporaryQueue replyTo = clientSession.createTemporaryQueue();
+ MessageConsumer consumer = clientSession.createConsumer(replyTo);
+ Message msg = clientSession.createMessage();
+ msg.setJMSReplyTo(replyTo);
+ producer.send(msg);
+ if (clientTransactional) {
+ clientSession.commit();
+ }
+ Message reply = consumer.receive();
+ if (clientTransactional) {
+ clientSession.commit();
+ }
+ consumer.close();
+ if (deleteTempQueue) {
+ replyTo.delete();
+ } else {
+ // temp queue will be cleaned up on clientConnection.close
+ }
+ }
+ } catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
- });
-
- MessageProducer producer = clientSession.createProducer(serverDestination);
- for (int i =0; i< messagesToSend; i++) {
- TemporaryQueue replyTo = clientSession.createTemporaryQueue();
- MessageConsumer consumer = clientSession.createConsumer(replyTo);
- Message msg = clientSession.createMessage();
- msg.setJMSReplyTo(replyTo);
- producer.send(msg);
- Message reply = consumer.receive();
- consumer.close();
- if (deleteTempQueue) {
- replyTo.delete();
- } else {
- // temp queue will be cleaned up on clientConnection.close
- }
}
+ Vector<Thread> threads = new Vector<Thread>(numProducers);
+ for (int i=0; i<numProducers ; i++) {
+ threads.add(new Producer(messagesToSend/numProducers));
+ }
+ startAndJoinThreads(threads);
clientSession.close();
serverSession.close();
@@ -91,7 +129,16 @@
//serverDestination +
- assertTrue(rb.getDestinationMap().size()==6);
+ assertEquals(6, rb.getDestinationMap().size());
+ }
+
+ private void startAndJoinThreads(Vector<Thread> threads) throws Exception {
+ for (Thread thread: threads) {
+ thread.start();
+ }
+ for (Thread thread: threads) {
+ thread.join();
+ }
}
protected void setUp() throws Exception {
@@ -108,9 +155,13 @@
protected void tearDown() throws Exception {
super.tearDown();
+ serverTransactional = clientTransactional = false;
+ numConsumers = numProducers = 1;
+ messagesToSend = 2000;
}
protected Destination createDestination() {
return new ActiveMQQueue(getClass().getName());
}
+
}