You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/01/27 00:31:07 UTC
svn commit: r903487 - in
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs:
AMQ2571.java AMQ2571Test.java DurableConsumerTest.java
Author: rajdavies
Date: Tue Jan 26 23:31:06 2010
New Revision: 903487
URL: http://svn.apache.org/viewvc?rev=903487&view=rev
Log:
renamed to follow Test naming convention
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java
- copied, changed from r903384, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571.java
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java (from r903384, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571.java&r1=903384&r2=903487&rev=903487&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java Tue Jan 26 23:31:06 2010
@@ -27,7 +27,7 @@
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
-public class AMQ2571 extends EmbeddedBrokerTestSupport {
+public class AMQ2571Test extends EmbeddedBrokerTestSupport {
public void testTempQueueClosing() {
try {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=903487&r1=903486&r2=903487&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Tue Jan 26 23:31:06 2010
@@ -50,64 +50,63 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
-import org.apache.activemq.util.ThreadTracker;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
- * @version $Revision: 1.5 $
- * A Test case for AMQ-1479
+ * @version $Revision: 1.5 $ A Test case for AMQ-1479
*/
-public class DurableConsumerTest extends CombinationTestSupport {
+public class DurableConsumerTest extends CombinationTestSupport{
private static final Log LOG = LogFactory.getLog(DurableConsumerTest.class);
- private static int COUNT = 1024*10;
+ private static int COUNT = 1024 * 10;
private static String CONSUMER_NAME = "DURABLE_TEST";
protected BrokerService broker;
-
- protected String bindAddress="tcp://localhost:61616";
- protected byte[] payload = new byte[1024*32];
+ protected String bindAddress = "tcp://localhost:61616";
+
+ protected byte[] payload = new byte[1024 * 32];
protected ConnectionFactory factory;
protected Vector<Exception> exceptions = new Vector<Exception>();
private static final String TOPIC_NAME = "failoverTopic";
private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
public boolean useDedicatedTaskRunner = false;
-
- private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
-
+
+ private class SimpleTopicSubscriber implements MessageListener,ExceptionListener{
+
private TopicConnection topicConnection = null;
- public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) {
-
+ public SimpleTopicSubscriber(String connectionURL,String clientId,String topicName) {
+
ActiveMQConnectionFactory topicConnectionFactory = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
-
topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
try {
-
+
topic = new ActiveMQTopic(topicName);
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.setClientID((clientId));
topicConnection.start();
-
+
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId));
topicSubscriber.setMessageListener(this);
-
+
} catch (JMSException e) {
- e.printStackTrace();
+ e.printStackTrace();
}
}
-
- public void onMessage(Message arg0) {
+
+ public void onMessage(Message arg0){
}
-
- public void closeConnection() {
+
+ public void closeConnection(){
if (topicConnection != null) {
try {
topicConnection.close();
@@ -115,39 +114,38 @@
}
}
}
-
- public void onException(JMSException exception) {
- exceptions.add(exception);
- }
- }
+
+ public void onException(JMSException exception){
+ exceptions.add(exception);
+ }
+ }
- private class MessagePublisher implements Runnable {
+ private class MessagePublisher implements Runnable{
private boolean shouldPublish = true;
-
- public void run() {
+
+ public void run(){
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
Message message = null;
-
-
+
topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
try {
- topic = new ActiveMQTopic(TOPIC_NAME);
- topicConnection = topicConnectionFactory.createTopicConnection();
- topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
- topicPublisher = topicSession.createPublisher(topic);
- message = topicSession.createMessage();
- } catch( Exception ex ) {
- exceptions.add(ex);
+ topic = new ActiveMQTopic(TOPIC_NAME);
+ topicConnection = topicConnectionFactory.createTopicConnection();
+ topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ topicPublisher = topicSession.createPublisher(topic);
+ message = topicSession.createMessage();
+ } catch (Exception ex) {
+ exceptions.add(ex);
}
while (shouldPublish) {
try {
topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000);
} catch (JMSException ex) {
- exceptions.add(ex);
+ exceptions.add(ex);
}
try {
Thread.sleep(1);
@@ -157,34 +155,34 @@
}
}
- private void configurePersistence(BrokerService broker) throws Exception {
- File dataDirFile = new File("target/"+ getName());
+ private void configurePersistence(BrokerService broker) throws Exception{
+ File dataDirFile = new File("target/" + getName());
AMQPersistenceAdapterFactory fact = new AMQPersistenceAdapterFactory();
fact.setDataDirectory(dataDirFile);
fact.setForceRecoverReferenceStore(true);
- broker.setPersistenceAdapter(fact.createPersistenceAdapter());
+ broker.setPersistenceAdapter(fact.createPersistenceAdapter());
}
- public void testFailover() throws Exception {
+ public void testFailover() throws Exception{
+
+ configurePersistence(broker);
+ broker.start();
- configurePersistence(broker);
- broker.start();
-
- Thread publisherThread = new Thread( new MessagePublisher() );
+ Thread publisherThread = new Thread(new MessagePublisher());
publisherThread.start();
- for( int i = 0; i < 100; i++ ) {
+ for (int i = 0; i < 100; i++) {
final int id = i;
- Thread thread = new Thread( new Runnable() {
- public void run() {
- new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME);
+ Thread thread = new Thread(new Runnable(){
+ public void run(){
+ new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, TOPIC_NAME);
}
- } );
+ });
thread.start();
}
-
+
Thread.sleep(5000);
broker.stop();
broker = createBroker(false);
@@ -196,41 +194,38 @@
// makes heavy use of threads and can demonstrate https://issues.apache.org/activemq/browse/AMQ-2028
// with use dedicatedTaskRunner=true and produce OOM
- public void initCombosForTestConcurrentDurableConsumer() {
- addCombinationValues("useDedicatedTaskRunner", new Object[] {Boolean.TRUE, Boolean.FALSE});
+ public void initCombosForTestConcurrentDurableConsumer(){
+ addCombinationValues("useDedicatedTaskRunner", new Object[] { Boolean.TRUE, Boolean.FALSE });
}
- public void testConcurrentDurableConsumer() throws Exception {
-
- broker.start();
-
+ public void testConcurrentDurableConsumer() throws Exception{
+
+ broker.start();
+
factory = createConnectionFactory();
final String topicName = getName();
final int numMessages = 500;
- int numConsumers = 20;
+ int numConsumers = 1;
final CountDownLatch counsumerStarted = new CountDownLatch(0);
final AtomicInteger receivedCount = new AtomicInteger();
- Runnable consumer = new Runnable() {
- public void run() {
+ Runnable consumer = new Runnable(){
+ public void run(){
final String consumerName = Thread.currentThread().getName();
int acked = 0;
int received = 0;
-
try {
- while (acked < numMessages/2) {
+ while (acked < numMessages / 2) {
// take one message and close, ack on occasion
Connection consumerConnection = factory.createConnection();
- ((ActiveMQConnection)consumerConnection).setWatchTopicAdvisories(false);
+ ((ActiveMQConnection) consumerConnection).setWatchTopicAdvisories(false);
consumerConnection.setClientID(consumerName);
- Session consumerSession = consumerConnection.createSession(false,
- Session.CLIENT_ACKNOWLEDGE);
+ Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = consumerSession.createTopic(topicName);
consumerConnection.start();
- MessageConsumer consumer = consumerSession
- .createDurableSubscriber(topic, consumerName);
-
+ MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, consumerName);
+
counsumerStarted.countDown();
Message msg = null;
do {
@@ -243,7 +238,7 @@
}
}
} while (msg == null);
-
+
consumerConnection.close();
}
assertTrue(received >= acked);
@@ -255,24 +250,24 @@
};
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
-
- for (int i=0; i<numConsumers ; i++) {
+
+ for (int i = 0; i < numConsumers; i++) {
executor.execute(consumer);
}
-
+
assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
Connection producerConnection = factory.createConnection();
- ((ActiveMQConnection)producerConnection).setWatchTopicAdvisories(false);
+ ((ActiveMQConnection) producerConnection).setWatchTopicAdvisories(false);
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = producerSession.createTopic(topicName);
MessageProducer producer = producerSession.createProducer(topic);
producerConnection.start();
- for (int i =0; i < numMessages; i++) {
+ for (int i = 0; i < numMessages; i++) {
BytesMessage msg = producerSession.createBytesMessage();
msg.writeBytes(payload);
producer.send(msg);
- if (i != 0 && i%100==0) {
+ if (i != 0 && i % 100 == 0) {
LOG.info("Sent msg " + i);
}
}
@@ -280,35 +275,35 @@
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
- Wait.waitFor(new Wait.Condition() {
- public boolean isSatisified() throws Exception {
+ Wait.waitFor(new Wait.Condition(){
+ public boolean isSatisified() throws Exception{
return receivedCount.get() > numMessages;
- }
- }, 60*1000);
+ }
+ }, 60 * 1000);
assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() > numMessages);
assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
}
- public void testConsumerRecover() throws Exception {
- doTestConsumer(true);
+ public void testConsumerRecover() throws Exception{
+ doTestConsumer(true);
}
- public void testConsumer() throws Exception {
- doTestConsumer(false);
+ public void testConsumer() throws Exception{
+ doTestConsumer(false);
}
public void doTestConsumer(boolean forceRecover) throws Exception{
-
+
if (forceRecover) {
configurePersistence(broker);
}
broker.start();
-
+
factory = createConnectionFactory();
Connection consumerConnection = factory.createConnection();
consumerConnection.setClientID(CONSUMER_NAME);
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = consumerSession.createTopic(getClass().getName());
+ Topic topic = consumerSession.createTopic(getClass().getName());
MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
consumerConnection.start();
consumerConnection.close();
@@ -320,16 +315,16 @@
broker.start();
Connection producerConnection = factory.createConnection();
-
+
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(topic);
producerConnection.start();
- for (int i =0; i < COUNT;i++) {
+ for (int i = 0; i < COUNT; i++) {
BytesMessage msg = producerSession.createBytesMessage();
msg.writeBytes(payload);
producer.send(msg);
- if (i != 0 && i%1000==0) {
+ if (i != 0 && i % 1000 == 0) {
LOG.info("Sent msg " + i);
}
}
@@ -345,73 +340,79 @@
consumerConnection.setClientID(CONSUMER_NAME);
consumerConnection.start();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
- for (int i =0; i < COUNT;i++) {
- Message msg = consumer.receive(5000);
- assertNotNull("Missing message: "+i, msg);
- if (i != 0 && i%1000==0) {
+ for (int i = 0; i < COUNT; i++) {
+ Message msg = consumer.receive(5000);
+ assertNotNull("Missing message: " + i, msg);
+ if (i != 0 && i % 1000 == 0) {
LOG.info("Received msg " + i);
}
}
consumerConnection.close();
-
}
- protected void setUp() throws Exception {
+ protected void setUp() throws Exception{
if (broker == null) {
broker = createBroker(true);
}
-
+
super.setUp();
}
-
- protected void tearDown() throws Exception {
- super.tearDown();
+
+ protected void tearDown() throws Exception{
+ super.tearDown();
if (broker != null) {
broker.stop();
broker = null;
}
}
-
- protected Topic creatTopic(Session s, String destinationName) throws JMSException {
+
+ protected Topic creatTopic(Session s,String destinationName) throws JMSException{
return s.createTopic(destinationName);
}
-
+
/**
* Factory method to create a new broker
*
* @throws Exception
*/
- protected BrokerService createBroker(boolean deleteStore) throws Exception {
+ protected BrokerService createBroker(boolean deleteStore) throws Exception{
BrokerService answer = new BrokerService();
- configureBroker(answer,deleteStore);
+ configureBroker(answer, deleteStore);
return answer;
}
-
- protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
+ protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception{
answer.setDeleteAllMessagesOnStartup(deleteStore);
+ KahaDBStore kaha = new KahaDBStore();
+ File directory = new File("target/activemq-data/kahadb");
+ if (deleteStore) {
+ IOHelper.deleteChildren(directory);
+ }
+ kaha.setDirectory(directory);
+
+ answer.setPersistenceAdapter(kaha);
answer.addConnector(bindAddress);
answer.setUseShutdownHook(false);
answer.setUseJmx(false);
answer.setAdvisorySupport(false);
answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
}
-
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
return factory;
}
-
- public static Test suite() {
+
+ public static Test suite(){
return suite(DurableConsumerTest.class);
}
-
- public static void main(String[] args) {
+
+ public static void main(String[] args){
junit.textui.TestRunner.run(suite());
}
}