You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/03 22:31:56 UTC
svn commit: r1153649 [1/2] - in
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq: ./
advisory/ broker/jmx/ bugs/ usecases/
Author: tabish
Date: Wed Aug 3 20:31:53 2011
New Revision: 1153649
URL: http://svn.apache.org/viewvc?rev=1153649&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2411
remove some of the dependencies on port 61616 in current tests.
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SlowConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DispatchMultipleConsumersTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupCloseTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java Wed Aug 3 20:31:53 2011
@@ -27,8 +27,8 @@ import javax.jms.Destination;
/**
* A useful base class which creates and closes an embedded broker
- *
- *
+ *
+ *
*/
public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
@@ -39,7 +39,7 @@ public abstract class EmbeddedBrokerTest
protected boolean useTopic;
protected ActiveMQDestination destination;
protected JmsTemplate template;
-
+
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
@@ -64,7 +64,7 @@ public abstract class EmbeddedBrokerTest
/**
* Factory method to create a new {@link JmsTemplate}
- *
+ *
* @return a newly created JmsTemplate
*/
protected JmsTemplate createJmsTemplate() {
@@ -73,7 +73,7 @@ public abstract class EmbeddedBrokerTest
/**
* Factory method to create a new {@link Destination}
- *
+ *
* @return newly created Destinaiton
*/
protected ActiveMQDestination createDestination() {
@@ -101,7 +101,7 @@ public abstract class EmbeddedBrokerTest
/**
* Factory method to create a new {@link ConnectionFactory} instance
- *
+ *
* @return a newly created connection factory
*/
protected ConnectionFactory createConnectionFactory() throws Exception {
@@ -110,7 +110,7 @@ public abstract class EmbeddedBrokerTest
/**
* Factory method to create a new broker
- *
+ *
* @throws Exception
*/
protected BrokerService createBroker() throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java Wed Aug 3 20:31:53 2011
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -61,9 +62,9 @@ public class OnePrefetchAsyncConsumerTes
// Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from
// an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the PrefetchSubscription
producer.send(session.createTextMessage("Msg3"));
-
+
session.commit();
-
+
// wait for test to complete and the test result to get set
// this happens asynchronously since the messages are delivered asynchronously
synchronized (testMutex) {
@@ -71,13 +72,18 @@ public class OnePrefetchAsyncConsumerTes
testMutex.wait();
}
}
-
+
//test completed, result is ready
assertTrue("Attempted to retrieve more than one ServerSession at a time", testMutex.testSuccessful);
}
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+ }
+
protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:61616";
+ bindAddress = "tcp://localhost:0";
super.setUp();
testMutex = new TestMutex();
@@ -105,7 +111,7 @@ public class OnePrefetchAsyncConsumerTes
answer.setDestinationPolicy(policyMap);
return answer;
}
-
+
protected Queue createQueue() {
return new ActiveMQQueue(getDestinationString());
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java Wed Aug 3 20:31:53 2011
@@ -17,18 +17,14 @@
package org.apache.activemq;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.Session;
-import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- *
- *
- */
public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ReconnectWithSameClientIDTest.class);
@@ -59,8 +55,13 @@ public class ReconnectWithSameClientIDTe
useConnection(connection);
}
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+ }
+
protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:61616";
+ bindAddress = "tcp://localhost:0";
super.setUp();
}
@@ -75,9 +76,5 @@ public class ReconnectWithSameClientIDTe
protected void useConnection(Connection connection) throws JMSException {
connection.setClientID("foo");
connection.start();
- /**
- * Session session = connection.createSession(transacted, authMode);
- * return session;
- */
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java Wed Aug 3 20:31:53 2011
@@ -29,19 +29,15 @@ import org.apache.activemq.ActiveMQSessi
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
-
+
private static final transient Logger LOG = LoggerFactory.getLogger(MasterSlaveTempQueueMemoryTest.class);
-
- String masterBindAddress = "tcp://localhost:61616";
- String slaveBindAddress = "tcp://localhost:62616";
+
BrokerService slave;
/*
@@ -51,17 +47,16 @@ public class MasterSlaveTempQueueMemoryT
@Override
protected BrokerService createBroker() throws Exception {
// bindAddress is used by super.createBroker
- bindAddress = masterBindAddress;
+ bindAddress = "tcp://localhost:0";
BrokerService master = super.createBroker();
master.setBrokerName("master");
configureBroker(master);
- bindAddress = slaveBindAddress;
slave = super.createBroker();
slave.setBrokerName("slave");
- slave.setMasterConnectorURI(masterBindAddress);
-
+ slave.setMasterConnectorURI(master.getTransportConnectors().get(0).getPublishableConnectString());
+
configureBroker(slave);
- bindAddress = masterBindAddress;
+ bindAddress = master.getTransportConnectors().get(0).getPublishableConnectString();
return master;
}
@@ -74,15 +69,15 @@ public class MasterSlaveTempQueueMemoryT
// optimized dispatch does not effect the determinism of inflight between
// master and slave in this test
//broker.setDestinationPolicy(policyMap);
-
+
}
@Override
protected void startBroker() throws Exception {
-
- // because master will wait for slave to connect it needs
+
+ // because master will wait for slave to connect it needs
// to be in a separate thread
- Thread starterThread = new Thread() {
+ Thread starterThread = new Thread() {
public void run() {
try {
broker.setWaitForSlave(true);
@@ -94,7 +89,7 @@ public class MasterSlaveTempQueueMemoryT
}
};
starterThread.start();
-
+
slave.start();
starterThread.join(60*1000);
assertTrue("slave is indeed a slave", slave.isSlave());
@@ -104,7 +99,7 @@ public class MasterSlaveTempQueueMemoryT
protected void tearDown() throws Exception {
slave.stop();
super.tearDown();
-
+
}
@Override
@@ -112,25 +107,25 @@ public class MasterSlaveTempQueueMemoryT
super.testLoadRequestReply();
Thread.sleep(2000);
-
+
// some checks on the slave
AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor(
AdvisoryBroker.class);
-
+
assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size());
-
+
RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor(
- RegionBroker.class);
-
- //serverDestination +
- assertEquals(6, rb.getDestinationMap().size());
-
+ RegionBroker.class);
+
+ //serverDestination +
+ assertEquals(6, rb.getDestinationMap().size());
+
RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
LOG.info("enqueues " + rb.getDestinationStatistics().getEnqueues().getCount());
assertEquals("enqueues match", rb.getDestinationStatistics().getEnqueues().getCount(), masterRb.getDestinationStatistics().getEnqueues().getCount());
-
+
LOG.info("dequeues " + rb.getDestinationStatistics().getDequeues().getCount());
assertEquals("dequeues match",
rb.getDestinationStatistics().getDequeues().getCount(),
@@ -145,24 +140,24 @@ public class MasterSlaveTempQueueMemoryT
// slave does not actually dispatch any messages, so no request/reply(2) pair per iteration(COUNT)
// slave estimate must be >= actual master value
// master does not always reach expected total, should be assertEquals.., why?
- assertTrue("dispatched to slave is as good as master, master="
+ assertTrue("dispatched to slave is as good as master, master="
+ masterRb.getDestinationStatistics().getDispatched().getCount(),
- rb.getDestinationStatistics().getDispatched().getCount() + 2*messagesToSend >=
+ rb.getDestinationStatistics().getDispatched().getCount() + 2*messagesToSend >=
masterRb.getDestinationStatistics().getDispatched().getCount());
}
-
+
public void testMoreThanPageSizeUnacked() throws Exception {
-
+
final int messageCount = Queue.MAX_PAGE_SIZE + 10;
final CountDownLatch latch = new CountDownLatch(1);
-
+
serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQSession s = (ActiveMQSession) serverSession;
s.setSessionAsyncDispatch(true);
-
+
MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination);
serverConsumer.setMessageListener(new MessageListener() {
-
+
public void onMessage(Message msg) {
try {
latch.await(30L, TimeUnit.SECONDS);
@@ -171,41 +166,41 @@ public class MasterSlaveTempQueueMemoryT
}
}
});
-
+
MessageProducer producer = clientSession.createProducer(serverDestination);
for (int i =0; i< messageCount; i++) {
Message msg = clientSession.createMessage();
producer.send(msg);
}
Thread.sleep(5000);
-
+
RegionBroker slaveRb = (RegionBroker) slave.getBroker().getAdaptor(
RegionBroker.class);
RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
-
- assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount());
+
+ assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
-
+
latch.countDown();
Thread.sleep(5000);
- assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount());
+ assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
}
-
+
public void testLoadRequestReplyWithNoTempQueueDelete() throws Exception {
deleteTempQueue = false;
messagesToSend = 10;
testLoadRequestReply();
}
-
+
public void testLoadRequestReplyWithTransactions() throws Exception {
serverTransactional = clientTransactional = true;
messagesToSend = 100;
reInitialiseSessions();
testLoadRequestReply();
}
-
+
public void testConcurrentConsumerLoadRequestReplyWithTransactions() throws Exception {
serverTransactional = true;
numConsumers = numProducers = 10;
@@ -215,10 +210,10 @@ public class MasterSlaveTempQueueMemoryT
}
protected void reInitialiseSessions() throws Exception {
- // reinitialize so they can respect the transactional flags
+ // reinitialize so they can respect the transactional flags
serverSession.close();
clientSession.close();
- serverSession = serverConnection.createSession(serverTransactional,
+ serverSession = serverConnection.createSession(serverTransactional,
serverTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
clientSession = clientConnection.createSession(clientTransactional,
clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Wed Aug 3 20:31:53 2011
@@ -25,6 +25,7 @@ import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -48,7 +49,6 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
-import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
@@ -90,8 +90,9 @@ public class MBeanTest extends EmbeddedB
public void testConnectors() throws Exception{
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
- assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
-
+ assertEquals("openwire URL port doesn't equal bind Address",
+ new URI(broker.getOpenWireURL()).getPort(),
+ new URI(this.broker.getTransportConnectors().get(0).getPublishableConnectString()).getPort());
}
public void testMBeans() throws Exception {
@@ -317,10 +318,9 @@ public class MBeanTest extends EmbeddedB
String newDestination = getSecondDestinationString();
long queueSize = queue.getQueueSize();
+ assertTrue(queueSize > 0);
queue.copyMatchingMessagesTo("counter > 2", newDestination);
-
-
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
@@ -334,6 +334,7 @@ public class MBeanTest extends EmbeddedB
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
}
+ @SuppressWarnings("rawtypes")
protected void assertSendViaMBean() throws Exception {
String queueName = getDestinationString() + ".SendMBBean";
@@ -353,7 +354,7 @@ public class MBeanTest extends EmbeddedB
for (int i = 0; i < count; i++) {
String body = "message:" + i;
- Map headers = new HashMap();
+ Map<String, Object> headers = new HashMap<String, Object>();
headers.put("JMSCorrelationID", "MyCorrId");
headers.put("JMSDeliveryMode", Boolean.FALSE);
headers.put("JMSXGroupID", "MyGroupID");
@@ -370,7 +371,6 @@ public class MBeanTest extends EmbeddedB
if (compdatalist.length == 0) {
fail("There is no message in the queue:");
}
- String[] messageIDs = new String[compdatalist.length];
for (int i = 0; i < compdatalist.length; i++) {
CompositeData cdata = compdatalist[i];
@@ -407,7 +407,6 @@ public class MBeanTest extends EmbeddedB
assertComplexData(i, cdata, "JMSXGroupSeq", 1234);
assertComplexData(i, cdata, "JMSXGroupID", "MyGroupID");
assertComplexData(i, cdata, "Text", "message:" + i);
-
}
}
@@ -632,7 +631,7 @@ public class MBeanTest extends EmbeddedB
}
protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:61616";
+ bindAddress = "tcp://localhost:0";
useTopic = false;
super.setUp();
mbeanServer = broker.getManagementContext().getMBeanServer();
@@ -656,6 +655,11 @@ public class MBeanTest extends EmbeddedB
super.tearDown();
}
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+ }
+
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(false);
@@ -691,7 +695,6 @@ public class MBeanTest extends EmbeddedB
Thread.sleep(1000);
}
-
protected void useConnectionWithBlobMessage(Connection connection) throws Exception {
connection.setClientID(clientID);
connection.start();
@@ -733,7 +736,6 @@ public class MBeanTest extends EmbeddedB
LOG.info(text);
}
-
protected String getSecondDestinationString() {
return "test.new.destination." + getClass() + "." + getName();
}
@@ -815,7 +817,6 @@ public class MBeanTest extends EmbeddedB
} catch (Exception e) {
// expected!
}
-
}
// Test for AMQ-3029
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java Wed Aug 3 20:31:53 2011
@@ -25,6 +25,7 @@ import javax.management.ObjectName;
import junit.framework.Test;
import junit.textui.TestRunner;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
@@ -36,8 +37,6 @@ import org.slf4j.LoggerFactory;
/**
* A specific test of Queue.purge() functionality
- *
- *
*/
public class PurgeTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PurgeTest.class);
@@ -118,7 +117,6 @@ public class PurgeTest extends EmbeddedB
Message message = session.createTextMessage("Test Message");
producer.send(message);
-
MessageConsumer consumer = session.createConsumer(destination);
Message received = consumer.receive(1000);
@@ -128,16 +126,12 @@ public class PurgeTest extends EmbeddedB
BrokerViewMBean brokerProxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerViewMBeanName, BrokerViewMBean.class, true);
brokerProxy.removeQueue(getDestinationString());
-
-
producer.send(message);
received = consumer.receive(1000);
assertNotNull("Message not received", received);
assertEquals(message, received);
-
-
}
public void testDelete() throws Exception {
@@ -209,7 +203,7 @@ public class PurgeTest extends EmbeddedB
}
protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:61616";
+ bindAddress = "tcp://localhost:0";
useTopic = false;
super.setUp();
mbeanServer = broker.getManagementContext().getMBeanServer();
@@ -233,6 +227,11 @@ public class PurgeTest extends EmbeddedB
return answer;
}
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+ }
+
protected void echo(String text) {
LOG.info(text);
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java Wed Aug 3 20:31:53 2011
@@ -29,8 +29,8 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.spring.ConsumerBean;
/**
- *
- *
+ *
+ *
*/
public class AMQ1687Test extends EmbeddedBrokerTestSupport {
@@ -40,10 +40,8 @@ public class AMQ1687Test extends Embedde
protected ConnectionFactory createConnectionFactory() throws Exception {
//prefetch change is not required, but test will not fail w/o it, only spew errors in the AMQ log.
return new ActiveMQConnectionFactory(this.bindAddress+"?jms.prefetchPolicy.all=5");
- //return super.createConnectionFactory();
- //return new ActiveMQConnectionFactory("tcp://localhost:61616");
}
-
+
public void testVirtualTopicCreation() throws Exception {
if (connection == null) {
connection = createConnection();
@@ -52,10 +50,10 @@ public class AMQ1687Test extends Embedde
ConsumerBean messageList = new ConsumerBean();
messageList.setVerbose(true);
-
+
String queueAName = getVirtualTopicConsumerName();
String queueBName = getVirtualTopicConsumerNameB();
-
+
// create consumer 'cluster'
ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
ActiveMQQueue queue2 = new ActiveMQQueue(queueBName);
@@ -76,16 +74,14 @@ public class AMQ1687Test extends Embedde
for (int i = 0; i < total; i++) {
producer.send(session.createTextMessage("message: " + i));
}
-
+
messageList.assertMessagesArrived(total*2);
}
-
protected String getVirtualTopicName() {
return "VirtualTopic.TEST";
}
-
protected String getVirtualTopicConsumerName() {
return "Consumer.A.VirtualTopic.TEST";
}
@@ -93,8 +89,7 @@ public class AMQ1687Test extends Embedde
protected String getVirtualTopicConsumerNameB() {
return "Consumer.B.VirtualTopic.TEST";
}
-
-
+
protected void setUp() throws Exception {
this.bindAddress="tcp://localhost:61616";
super.setUp();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java Wed Aug 3 20:31:53 2011
@@ -40,20 +40,20 @@ import org.slf4j.LoggerFactory;
/**
* This is a test case for the issue reported at:
* https://issues.apache.org/activemq/browse/AMQ-1866
- *
- * If you have a JMS producer sending messages to multiple fast consumers and
- * one slow consumer, eventually all consumers will run as slow as
- * the slowest consumer.
+ *
+ * If you have a JMS producer sending messages to multiple fast consumers and
+ * one slow consumer, eventually all consumers will run as slow as
+ * the slowest consumer.
*/
public class AMQ1866 extends TestCase {
private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);
private BrokerService brokerService;
private ArrayList<Thread> threads = new ArrayList<Thread>();
-
- String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616";
- String ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
-
+
+ private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
+ private String ACTIVEMQ_BROKER_URI;
+
AtomicBoolean shutdown = new AtomicBoolean();
private ActiveMQQueue destination;
@@ -65,19 +65,21 @@ public class AMQ1866 extends TestCase {
adaptor.setIndexBinSize(4096);
brokerService.setPersistenceAdapter(adaptor);
brokerService.deleteAllMessages();
-
+
// A small max page size makes this issue occur faster.
PolicyMap policyMap = new PolicyMap();
PolicyEntry pe = new PolicyEntry();
pe.setMaxPageSize(1);
policyMap.put(new ActiveMQQueue(">"), pe);
brokerService.setDestinationPolicy(policyMap);
-
+
brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
brokerService.start();
+
+ ACTIVEMQ_BROKER_URI = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
destination = new ActiveMQQueue(getName());
}
-
+
@Override
protected void tearDown() throws Exception {
// Stop any running threads.
@@ -85,30 +87,29 @@ public class AMQ1866 extends TestCase {
for (Thread t : threads) {
t.interrupt();
t.join();
- }
+ }
brokerService.stop();
}
public void testConsumerSlowDownPrefetch0() throws Exception {
- ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0";
+ ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0";
doTestConsumerSlowDown();
}
public void testConsumerSlowDownPrefetch10() throws Exception {
- ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10";
+ ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10";
doTestConsumerSlowDown();
}
-
+
public void testConsumerSlowDownDefaultPrefetch() throws Exception {
- ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
doTestConsumerSlowDown();
}
public void doTestConsumerSlowDown() throws Exception {
-
+
// Preload the queue.
produce(20000);
-
+
Thread producer = new Thread() {
@Override
public void run() {
@@ -122,7 +123,7 @@ public class AMQ1866 extends TestCase {
};
threads.add(producer);
producer.start();
-
+
// This is the slow consumer.
ConsumerThread c1 = new ConsumerThread("Consumer-1");
threads.add(c1);
@@ -139,33 +140,33 @@ public class AMQ1866 extends TestCase {
Thread.sleep(1000);
long c1Counter = c1.counter.getAndSet(0);
long c2Counter = c2.counter.getAndSet(0);
- System.out.println("c1: "+c1Counter+", c2: "+c2Counter);
+ log.debug("c1: "+c1Counter+", c2: "+c2Counter);
totalReceived += c1Counter;
totalReceived += c2Counter;
-
+
// Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec
if( i > 10 ) {
assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0);
}
}
- }
-
+ }
+
public void produce(int count) throws Exception {
Connection connection=null;
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
factory.setDispatchAsync(true);
-
+
connection = factory.createConnection();
-
+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
connection.start();
-
+
for( int i=0 ; i< count; i++ ) {
producer.send(session.createTextMessage(getName()+" Message "+(++i)));
}
-
+
} finally {
try {
connection.close();
@@ -173,7 +174,7 @@ public class AMQ1866 extends TestCase {
}
}
}
-
+
public class ConsumerThread extends Thread {
final AtomicLong counter = new AtomicLong();
@@ -185,16 +186,16 @@ public class AMQ1866 extends TestCase {
Connection connection=null;
try {
log.debug(getName() + ": is running");
-
+
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI);
factory.setDispatchAsync(true);
-
+
connection = factory.createConnection();
-
+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
-
+
while (!shutdown.get()) {
TextMessage msg = (TextMessage)consumer.receive(1000);
if ( msg!=null ) {
@@ -202,13 +203,13 @@ public class AMQ1866 extends TestCase {
if (getName().equals("Consumer-1")) {
sleepingTime = 1000 * 1000;
} else {
- sleepingTime = 1;
+ sleepingTime = 1;
}
counter.incrementAndGet();
Thread.sleep(sleepingTime);
}
}
-
+
} catch (Exception e) {
} finally {
log.debug(getName() + ": is stopping");
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java Wed Aug 3 20:31:53 2011
@@ -43,21 +43,22 @@ public class AMQ1917Test extends TestCas
private static final int NUM_MESSAGES = 4000;
private static final int NUM_THREADS = 10;
- public static final String REQUEST_QUEUE = "mock.in.queue";
- public static final String REPLY_QUEUE = "mock.out.queue";
+ private static final String REQUEST_QUEUE = "mock.in.queue";
+ private static final String REPLY_QUEUE = "mock.out.queue";
- Destination requestDestination = ActiveMQDestination.createDestination(
+ private Destination requestDestination = ActiveMQDestination.createDestination(
REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE);
- Destination replyDestination = ActiveMQDestination.createDestination(
+ private Destination replyDestination = ActiveMQDestination.createDestination(
REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE);
- CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
- CountDownLatch errorLatch = new CountDownLatch(1);
- ThreadPoolExecutor tpe;
- final String BROKER_URL = "tcp://localhost:61616";
- BrokerService broker = null;
+ private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES);
+ private CountDownLatch errorLatch = new CountDownLatch(1);
+ private ThreadPoolExecutor tpe;
+ private final String BROKER_URL = "tcp://localhost:61616";
+ private String connectionUri;
+ private BrokerService broker = null;
private boolean working = true;
-
+
// trival session/producer pool
final Session[] sessions = new Session[NUM_THREADS];
final MessageProducer[] producers = new MessageProducer[NUM_THREADS];
@@ -67,11 +68,13 @@ public class AMQ1917Test extends TestCas
broker.setPersistent(false);
broker.addConnector(BROKER_URL);
broker.start();
-
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000);
tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000,
TimeUnit.MILLISECONDS, queue);
- ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
+ ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory());
tpe.setThreadFactory(limitedthreadFactory);
}
@@ -79,29 +82,29 @@ public class AMQ1917Test extends TestCas
broker.stop();
tpe.shutdown();
}
-
- public void testLoadedSendRecieveWithCorrelationId() throws Exception {
-
+
+ public void testLoadedSendRecieveWithCorrelationId() throws Exception {
+
ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
- connectionFactory.setBrokerURL(BROKER_URL);
- Connection connection = connectionFactory.createConnection();
+ connectionFactory.setBrokerURL(connectionUri);
+ Connection connection = connectionFactory.createConnection();
setupReceiver(connection);
connection = connectionFactory.createConnection();
connection.start();
-
- // trival session/producer pool
+
+ // trival session/producer pool
for (int i=0; i<NUM_THREADS; i++) {
sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producers[i] = sessions[i].createProducer(requestDestination);
}
-
+
for (int i = 0; i < NUM_MESSAGES; i++) {
MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination,
replyDestination, "Test Message : " + i);
tpe.execute(msr);
}
-
+
while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) {
if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) {
fail("there was an error, check the console for thread or thread allocation failure");
@@ -129,7 +132,7 @@ public class AMQ1917Test extends TestCas
TextMessage msg = (TextMessage) consumer.receive(20000);
if (msg == null) {
errorLatch.countDown();
- fail("Response timed out."
+ fail("Response timed out."
+ " latchCount=" + roundTripLatch.getCount());
} else {
String result = msg.getText();
@@ -205,7 +208,7 @@ public class AMQ1917Test extends TestCas
}
}
}
-
+
public class LimitedThreadFactory implements ThreadFactory {
int threadCount;
private ThreadFactory factory;
@@ -217,7 +220,7 @@ public class AMQ1917Test extends TestCas
if (++threadCount > NUM_THREADS) {
errorLatch.countDown();
fail("too many threads requested");
- }
+ }
return factory.newThread(arg0);
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2314Test.java Wed Aug 3 20:31:53 2011
@@ -38,43 +38,42 @@ import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class AMQ2314Test extends CombinationTestSupport {
public boolean consumeAll = false;
public int deliveryMode = DeliveryMode.NON_PERSISTENT;
-
+
private static final Logger LOG = LoggerFactory.getLogger(AMQ2314Test.class);
private static final int MESSAGES_COUNT = 30000;
private static byte[] buf = new byte[1024];
private BrokerService broker;
-
- protected long messageReceiveTimeout = 500L;
+ private String connectionUri;
+
+ private static final long messageReceiveTimeout = 500L;
Destination destination = new ActiveMQTopic("FooTwo");
-
+
public void testRemoveSlowSubscriberWhacksTempStore() throws Exception {
runProducerWithHungConsumer();
}
-
+
public void testMemoryUsageReleasedOnAllConsumed() throws Exception {
consumeAll = true;
runProducerWithHungConsumer();
// do it again to ensure memory limits are decreased
runProducerWithHungConsumer();
}
-
-
+
public void runProducerWithHungConsumer() throws Exception {
-
+
final CountDownLatch consumerContinue = new CountDownLatch(1);
final CountDownLatch consumerReady = new CountDownLatch(1);
-
+
final long origTempUsage = broker.getSystemUsage().getTempUsage().getUsage();
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
factory.setAlwaysSyncSend(true);
-
+
// ensure messages are spooled to disk for this consumer
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setTopicPrefetch(500);
@@ -99,19 +98,19 @@ public class AMQ2314Test extends Combina
}
}
};
-
+
Thread consumingThread = new Thread("Consuming thread") {
public void run() {
try {
int count = 0;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
-
+
while (consumer.receive(messageReceiveTimeout) == null) {
consumerReady.countDown();
}
count++;
- LOG.info("Received one... waiting");
+ LOG.info("Received one... waiting");
consumerContinue.await();
if (consumeAll) {
LOG.info("Consuming the rest of the messages...");
@@ -128,27 +127,27 @@ public class AMQ2314Test extends Combina
};
consumingThread.start();
consumerReady.await();
-
+
producingThread.start();
producingThread.join();
-
+
final long tempUsageBySubscription = broker.getSystemUsage().getTempUsage().getUsage();
LOG.info("Orig Usage: " + origTempUsage + ", currentUsage: " + tempUsageBySubscription);
assertTrue("some temp store has been used", tempUsageBySubscription != origTempUsage);
consumerContinue.countDown();
consumingThread.join();
connection.close();
-
+
LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+ broker.getSystemUsage().getTempUsage().getUsage());
-
+
assertTrue("temp usage decreased with removed sub", Wait.waitFor(new Wait.Condition(){
public boolean isSatisified() throws Exception {
return broker.getSystemUsage().getTempUsage().getUsage() < tempUsageBySubscription;
}
}));
}
-
+
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
@@ -159,17 +158,17 @@ public class AMQ2314Test extends Combina
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(true);
- broker.addConnector("tcp://localhost:61616").setName("Default");
+ broker.addConnector("tcp://localhost:0").setName("Default");
broker.start();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
-
+
public void tearDown() throws Exception {
broker.stop();
}
-
-
+
public static Test suite() {
return suite(AMQ2314Test.class);
}
-
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java Wed Aug 3 20:31:53 2011
@@ -31,53 +31,55 @@ import org.apache.activemq.broker.jmx.Ma
/**
* This unit test verifies an issue when
- * javax.management.InstanceNotFoundException is thrown after subsequent startups when
+ * javax.management.InstanceNotFoundException is thrown after subsequent startups when
* managementContext createConnector="false"
*
*/
public class AMQ2513Test extends TestCase {
- BrokerService broker;
-
- void createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+ private BrokerService broker;
+ private String connectionUri;
+
+ void createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
broker = new BrokerService();
broker.setBrokerName("localhost");
broker.setUseJmx(true);
broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
- broker.addConnector("tcp://localhost:61616");
+ broker.addConnector("tcp://localhost:0");
ManagementContext ctx = new ManagementContext();
//if createConnector == true everything is fine
ctx.setCreateConnector(false);
broker.setManagementContext(ctx);
-
- broker.start();
+ broker.start();
broker.waitUntilStarted();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
-
- public void testJmx() throws Exception{
- createBroker(true);
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+
+ public void testJmx() throws Exception{
+ createBroker(true);
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("test"));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
-
+
producer.send(session.createTextMessage("test123"));
-
+
DestinationViewMBean dv = createView();
assertTrue(dv.getQueueSize() > 0);
-
+
connection.close();
-
+
broker.stop();
broker.waitUntilStopped();
-
+
createBroker(false);
- factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ factory = new ActiveMQConnectionFactory(connectionUri);
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(session.createQueue("test"));
@@ -85,20 +87,20 @@ public class AMQ2513Test extends TestCas
connection.start();
producer.send(session.createTextMessage("test123"));
connection.close();
-
+
dv = createView();
assertTrue(dv.getQueueSize() > 0);
-
+
broker.stop();
broker.waitUntilStopped();
-
- }
-
- DestinationViewMBean createView() throws Exception {
+
+ }
+
+ DestinationViewMBean createView() throws Exception {
String domain = "org.apache.activemq";
ObjectName name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=test");
return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class,
true);
}
-
+
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java Wed Aug 3 20:31:53 2011
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
-import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@@ -39,17 +38,18 @@ public class AMQ2616Test extends TestCas
private static final int NUMBER = 2000;
private BrokerService brokerService;
private final ArrayList<Thread> threads = new ArrayList<Thread>();
- String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:61616";
- AtomicBoolean shutdown = new AtomicBoolean();
-
+ private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0";
+ private final AtomicBoolean shutdown = new AtomicBoolean();
+
+ private String connectionUri;
+
public void testQueueResourcesReleased() throws Exception{
- ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_BIND);
+ ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri);
Connection tempConnection = fac.createConnection();
tempConnection.start();
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue tempQueue = tempSession.createTemporaryQueue();
- final MessageConsumer tempConsumer = tempSession.createConsumer(tempQueue);
-
+
Connection testConnection = fac.createConnection();
long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -67,8 +67,8 @@ public class AMQ2616Test extends TestCas
endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage();
assertEquals(startUsage,endUsage);
}
-
-
+
+
@Override
protected void setUp() throws Exception {
// Start an embedded broker up.
@@ -95,6 +95,10 @@ public class AMQ2616Test extends TestCas
brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024);
brokerService.addConnector(ACTIVEMQ_BROKER_BIND);
brokerService.start();
+ brokerService.waitUntilStarted();
+
+ connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+
new ActiveMQQueue(getName());
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java Wed Aug 3 20:31:53 2011
@@ -18,7 +18,6 @@ package org.apache.activemq.bugs;
import javax.jms.Connection;
import javax.jms.JMSException;
-import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
@@ -26,13 +25,12 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
-/**
- *
- */
public class CraigsBugTest extends EmbeddedBrokerTestSupport {
+ private String connectionUri;
+
public void testConnectionFactory() throws Exception {
- final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
final ActiveMQQueue queue = new ActiveMQQueue("testqueue");
final Connection conn = cf.createConnection();
@@ -60,8 +58,10 @@ public class CraigsBugTest extends Embed
}
protected void setUp() throws Exception {
- bindAddress = "tcp://localhost:61616";
+ bindAddress = "tcp://localhost:0";
super.setUp();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DataFileNotDeletedTest.java Wed Aug 3 20:31:53 2011
@@ -41,14 +41,15 @@ public class DataFileNotDeletedTest exte
private static final Logger LOG = LoggerFactory.getLogger(DataFileNotDeletedTest.class);
private final CountDownLatch latch = new CountDownLatch(max_messages);
- private static int max_messages = 600;
+ private final static int max_messages = 600;
private static int messageCounter;
private final String destinationName = getName()+"_Queue";
private BrokerService broker;
private Connection receiverConnection;
private Connection producerConnection;
- final boolean useTopic = false;
-
+ private final boolean useTopic = false;
+ private String connectionUri;
+
AMQPersistenceAdapter persistentAdapter;
protected static final String payload = new String(new byte[512]);
@@ -61,7 +62,7 @@ public class DataFileNotDeletedTest exte
producerConnection = createConnection();
producerConnection.start();
}
-
+
@Override
public void tearDown() throws Exception {
receiverConnection.close();
@@ -72,16 +73,16 @@ public class DataFileNotDeletedTest exte
public void testForDataFileNotDeleted() throws Exception {
doTestForDataFileNotDeleted(false);
}
-
+
public void testForDataFileNotDeletedTransacted() throws Exception {
doTestForDataFileNotDeleted(true);
}
-
+
private void doTestForDataFileNotDeleted(boolean transacted) throws Exception {
-
+
Receiver receiver = new Receiver() {
public void receive(String s) throws Exception {
- messageCounter++;
+ messageCounter++;
latch.countDown();
}
};
@@ -94,7 +95,7 @@ public class DataFileNotDeletedTest exte
latch.await();
assertEquals(max_messages, messageCounter);
LOG.info("Sent and received + " + messageCounter + ", file count " + persistentAdapter.getAsyncDataManager().getFiles().size());
- waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 60000, 2);
+ waitFordataFilesToBeCleanedUp(persistentAdapter.getAsyncDataManager(), 60000, 2);
}
private void waitFordataFilesToBeCleanedUp(
@@ -111,7 +112,7 @@ public class DataFileNotDeletedTest exte
}
private Connection createConnection() throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
return factory.createConnection();
}
@@ -120,7 +121,7 @@ public class DataFileNotDeletedTest exte
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
- broker.addConnector("tcp://localhost:61616").setName("Default");
+ broker.addConnector("tcp://localhost:0").setName("Default");
broker.setPersistenceFactory(new AMQPersistenceAdapterFactory());
AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
// ensure there are a bunch of data files but multiple entries in each
@@ -129,10 +130,13 @@ public class DataFileNotDeletedTest exte
factory.setCheckpointInterval(500);
factory.setCleanupInterval(500);
factory.setSyncOnWrite(false);
-
+
persistentAdapter = (AMQPersistenceAdapter) broker.getPersistenceAdapter();
broker.start();
LOG.info("Starting broker..");
+ broker.waitUntilStarted();
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
private void buildReceiver(Connection connection, final String queueName, boolean transacted, final Receiver receiver, boolean isTopic) throws Exception {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java Wed Aug 3 20:31:53 2011
@@ -16,10 +16,9 @@
*/
package org.apache.activemq.bugs;
-
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
@@ -28,128 +27,132 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.RequestTimedOutIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
- static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class);
-
- private int messageSize=1024*64;
- private int messageCount=10000;
- private final AtomicInteger exceptionCount = new AtomicInteger(0);
-
- /**
- * Test the case where the broker is blocked due to a memory limit
- * and a producer timeout is set on the connection.
- * @throws Exception
- */
- public void testBlockedProducerConnectionTimeout() throws Exception {
- final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
- final ActiveMQDestination queue = createDestination("testqueue");
-
- // we should not take longer than 10 seconds to return from send
- cx.setSendTimeout(10000);
-
- Runnable r = new Runnable() {
- public void run() {
- try {
- LOG.info("Sender thread starting");
- Session session = cx.createSession(false, 1);
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- TextMessage message = session.createTextMessage(createMessageText());
- for(int count=0; count<messageCount; count++){
- producer.send(message);
- }
- LOG.info("Done sending..");
- } catch (JMSException e) {
- if (e.getCause() instanceof RequestTimedOutIOException) {
- exceptionCount.incrementAndGet();
- } else {
- e.printStackTrace();
- }
- return;
- }
-
- }
- };
- cx.start();
- Thread producerThread = new Thread(r);
- producerThread.start();
- producerThread.join(30000);
- cx.close();
- // We should have a few timeout exceptions as memory store will fill up
- assertTrue("No exception from the broker", exceptionCount.get() > 0);
- }
-
-
- /**
- * Test the case where the broker is blocked due to a memory limit
- * with a fail timeout
- * @throws Exception
- */
- public void testBlockedProducerUsageSendFailTimeout() throws Exception {
- final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
- final ActiveMQDestination queue = createDestination("testqueue");
-
- broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
- Runnable r = new Runnable() {
- public void run() {
- try {
- LOG.info("Sender thread starting");
- Session session = cx.createSession(false, 1);
- MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- TextMessage message = session.createTextMessage(createMessageText());
- for(int count=0; count<messageCount; count++){
- producer.send(message);
- }
- LOG.info("Done sending..");
- } catch (JMSException e) {
- if (e instanceof ResourceAllocationException || e.getCause() instanceof RequestTimedOutIOException) {
- exceptionCount.incrementAndGet();
- } else {
- e.printStackTrace();
- }
- return;
- }
-
- }
- };
- cx.start();
- Thread producerThread = new Thread(r);
- producerThread.start();
- producerThread.join(30000);
- cx.close();
- // We should have a few timeout exceptions as memory store will fill up
- assertTrue("No exception from the broker", exceptionCount.get() > 0);
- }
-
- protected void setUp() throws Exception {
- exceptionCount.set(0);
- bindAddress = "tcp://localhost:61616";
- broker = createBroker();
- broker.setDeleteAllMessagesOnStartup(true);
- broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
-
- super.setUp();
- }
-
- private String createMessageText() {
- StringBuffer buffer = new StringBuffer();
- buffer.append("<filler>");
- for (int i = buffer.length(); i < messageSize; i++) {
- buffer.append('X');
- }
- buffer.append("</filler>");
- return buffer.toString();
- }
-
- }
+ static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class);
+
+ private final int messageSize=1024*64;
+ private final int messageCount=10000;
+ private final AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ /**
+ * Test the case where the broker is blocked due to a memory limit
+ * and a producer timeout is set on the connection.
+ * @throws Exception
+ */
+ public void testBlockedProducerConnectionTimeout() throws Exception {
+ final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+ final ActiveMQDestination queue = createDestination("testqueue");
+
+ // we should not take longer than 10 seconds to return from send
+ cx.setSendTimeout(10000);
+
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ LOG.info("Sender thread starting");
+ Session session = cx.createSession(false, 1);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage message = session.createTextMessage(createMessageText());
+ for(int count=0; count<messageCount; count++){
+ producer.send(message);
+ }
+ LOG.info("Done sending..");
+ } catch (JMSException e) {
+ if (e.getCause() instanceof RequestTimedOutIOException) {
+ exceptionCount.incrementAndGet();
+ } else {
+ e.printStackTrace();
+ }
+ return;
+ }
+
+ }
+ };
+ cx.start();
+ Thread producerThread = new Thread(r);
+ producerThread.start();
+ producerThread.join(30000);
+ cx.close();
+ // We should have a few timeout exceptions as memory store will fill up
+ assertTrue("No exception from the broker", exceptionCount.get() > 0);
+ }
+
+ /**
+ * Test the case where the broker is blocked due to a memory limit
+ * with a fail timeout
+ * @throws Exception
+ */
+ public void testBlockedProducerUsageSendFailTimeout() throws Exception {
+ final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+ final ActiveMQDestination queue = createDestination("testqueue");
+
+ broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ LOG.info("Sender thread starting");
+ Session session = cx.createSession(false, 1);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage message = session.createTextMessage(createMessageText());
+ for(int count=0; count<messageCount; count++){
+ producer.send(message);
+ }
+ LOG.info("Done sending..");
+ } catch (JMSException e) {
+ if (e instanceof ResourceAllocationException || e.getCause() instanceof RequestTimedOutIOException) {
+ exceptionCount.incrementAndGet();
+ } else {
+ e.printStackTrace();
+ }
+ return;
+ }
+ }
+ };
+ cx.start();
+ Thread producerThread = new Thread(r);
+ producerThread.start();
+ producerThread.join(30000);
+ cx.close();
+ // We should have a few timeout exceptions as memory store will fill up
+ assertTrue("No exception from the broker", exceptionCount.get() > 0);
+ }
+
+ protected void setUp() throws Exception {
+ exceptionCount.set(0);
+ bindAddress = "tcp://localhost:0";
+ broker = createBroker();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
+
+ super.setUp();
+ }
+
+ @Override
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(
+ broker.getTransportConnectors().get(0).getPublishableConnectString());
+ }
+
+ private String createMessageText() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("<filler>");
+ for (int i = buffer.length(); i < messageSize; i++) {
+ buffer.append('X');
+ }
+ buffer.append("</filler>");
+ return buffer.toString();
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java Wed Aug 3 20:31:53 2011
@@ -38,101 +38,101 @@ import javax.management.ObjectName;
/**
* Test to determine if expired messages are being reaped if there is
- * no active consumer connected to the broker.
- *
- * @author bsnyder
- *
+ * no active consumer connected to the broker.
*/
public class MessageExpirationReaperTest {
-
- protected BrokerService broker;
- protected ConnectionFactory factory;
- protected ActiveMQConnection connection;
- protected String destinationName = "TEST.Q";
- protected String brokerUrl = "tcp://localhost:61616";
- protected String brokerName = "testBroker";
-
+
+ private BrokerService broker;
+ private ConnectionFactory factory;
+ private ActiveMQConnection connection;
+ private final String destinationName = "TEST.Q";
+ private final String brokerUrl = "tcp://localhost:0";
+ private final String brokerName = "testBroker";
+ private String connectionUri;
+
@Before
public void init() throws Exception {
createBroker();
-
+
+ connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
+
factory = createConnectionFactory();
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
}
-
+
@After
public void cleanUp() throws Exception {
connection.close();
broker.stop();
}
-
+
protected void createBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName(brokerName);
broker.addConnector(brokerUrl);
-
+
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(500);
policyMap.setDefaultEntry(defaultEntry);
broker.setDestinationPolicy(policyMap);
-
+
broker.start();
}
-
+
protected ConnectionFactory createConnectionFactory() throws Exception {
- return new ActiveMQConnectionFactory(brokerUrl);
+ return new ActiveMQConnectionFactory(connectionUri);
}
-
+
protected Session createSession() throws Exception {
- return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
-
+
@Test
public void testExpiredMessageReaping() throws Exception {
-
+
Session producerSession = createSession();
ActiveMQDestination destination = (ActiveMQDestination) producerSession.createQueue(destinationName);
MessageProducer producer = producerSession.createProducer(destination);
producer.setTimeToLive(1000);
-
+
final int count = 3;
- // Send some messages with an expiration
+ // Send some messages with an expiration
for (int i = 0; i < count; i++) {
TextMessage message = producerSession.createTextMessage("" + i);
producer.send(message);
}
-
- // Let the messages expire
+
+ // Let the messages expire
Thread.sleep(2000);
-
+
DestinationViewMBean view = createView(destination);
-
+
assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
assertEquals("Incorrect queue size count", 0, view.getQueueSize());
- assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount());
-
- // Send more messages with an expiration
+ assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount());
+
+ // Send more messages with an expiration
for (int i = 0; i < count; i++) {
TextMessage message = producerSession.createTextMessage("" + i);
producer.send(message);
}
-
- // Let the messages expire
+
+ // Let the messages expire
Thread.sleep(2000);
-
- // Simply browse the queue
+
+ // Simply browse the queue
Session browserSession = createSession();
QueueBrowser browser = browserSession.createBrowser((Queue) destination);
- assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements());
-
- // The messages expire and should be reaped because of the presence of
- // the queue browser
+ assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements());
+
+ // The messages expire and should be reaped because of the presence of
+ // the queue browser
assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
}
-
+
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
String domain = "org.apache.activemq";
ObjectName name;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java?rev=1153649&r1=1153648&r2=1153649&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java Wed Aug 3 20:31:53 2011
@@ -35,44 +35,43 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OutOfOrderTestCase extends TestCase {
-
- private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class);
-
- public static final String BROKER_URL = "tcp://localhost:61616";
- private static final int PREFETCH = 10;
- private static final String CONNECTION_URL = BROKER_URL + "?jms.prefetchPolicy.all=" + PREFETCH;
-
- public static final String QUEUE_NAME = "QUEUE";
- private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
-
- BrokerService brokerService;
- Session session;
- Connection connection;
-
- int seq = 0;
-
- public void setUp() throws Exception {
- brokerService = new BrokerService();
- brokerService.setUseJmx(true);
- brokerService.addConnector(BROKER_URL);
- brokerService.deleteAllMessages();
- brokerService.start();
-
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
- connection = connectionFactory.createConnection();
- connection.start();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- }
-
-
- protected void tearDown() throws Exception {
- session.close();
- connection.close();
- brokerService.stop();
- }
+ private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class);
+ private static final String BROKER_URL = "tcp://localhost:0";
+ private static final int PREFETCH = 10;
+ private static final String CONNECTION_URL_OPTIONS = "?jms.prefetchPolicy.all=" + PREFETCH;
+
+ private static final String DESTINATION = "QUEUE?consumer.exclusive=true";
+
+ private BrokerService brokerService;
+ private Session session;
+ private Connection connection;
+ private String connectionUri;
+
+ private int seq = 0;
+
+ public void setUp() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setUseJmx(true);
+ brokerService.addConnector(BROKER_URL);
+ brokerService.deleteAllMessages();
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + CONNECTION_URL_OPTIONS);
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ protected void tearDown() throws Exception {
+ session.close();
+ connection.close();
+ brokerService.stop();
+ }
public void testOrder() throws Exception {
@@ -102,7 +101,7 @@ public class OutOfOrderTestCase extends
log.info("Consuming messages 20-29 . . .");
consumeBatch();
}
-
+
protected void consumeBatch() throws Exception {
Destination destination = session.createQueue(DESTINATION);
final MessageConsumer messageConsumer = session.createConsumer(destination);
@@ -118,15 +117,15 @@ public class OutOfOrderTestCase extends
}
}
- private String toString(final Message message) throws JMSException {
- String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
- if (message.getJMSRedelivered())
- ret += " (redelivered)";
- return ret;
-
- }
-
- private static String createMessageText(final int index) {
- return "message #" + index;
- }
+ private String toString(final Message message) throws JMSException {
+ String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID();
+ if (message.getJMSRedelivered())
+ ret += " (redelivered)";
+ return ret;
+
+ }
+
+ private static String createMessageText(final int index) {
+ return "message #" + index;
+ }
}