You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/06/24 18:09:17 UTC
svn commit: r788068 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Author: dejanb
Date: Wed Jun 24 16:09:17 2009
New Revision: 788068
URL: http://svn.apache.org/viewvc?rev=788068&view=rev
Log:
test case for https://issues.apache.org/activemq/browse/AMQ-2303 - durable consumers recovery
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=788068&r1=788067&r2=788068&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Wed Jun 24 16:09:17 2009
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.bugs;
+import java.io.File;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -26,18 +27,30 @@
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,7 +69,120 @@
protected byte[] payload = new byte[1024*32];
protected ConnectionFactory factory;
protected Vector<Exception> exceptions = new Vector<Exception>();
+
+ private static final String TOPIC_NAME = "failoverTopic";
+ private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
+
+ private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
+
+ private TopicConnection topicConnection = null;
+ private String clientId;
+
+ public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) {
+
+ ActiveMQConnectionFactory topicConnectionFactory = null;
+ TopicSession topicSession = null;
+ Topic topic = null;
+ TopicSubscriber topicSubscriber = null;
+
+
+ topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
+ try {
+
+ topic = new ActiveMQTopic(topicName);
+ topicConnection = topicConnectionFactory.createTopicConnection();
+ topicConnection.setClientID((clientId));
+ topicConnection.start();
+
+ topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId));
+ this.clientId = clientId;
+ topicSubscriber.setMessageListener(this);
+
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void onMessage(Message arg0) {
+ }
+
+ public void closeConnection() {
+ if (topicConnection != null) {
+ try {
+ topicConnection.close();
+ } catch (JMSException e) {
+ }
+ }
+ }
+
+ public void onException(JMSException exception) {
+ exceptions.add(exception);
+ }
+ }
+
+ private class MessagePublisher implements Runnable {
+ private boolean shouldPublish = true;
+
+ public void run() {
+ TopicConnectionFactory topicConnectionFactory = null;
+ TopicConnection topicConnection = null;
+ TopicSession topicSession = null;
+ Topic topic = null;
+ TopicPublisher topicPublisher = null;
+ Message message = null;
+
+
+ topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
+ try {
+ topic = new ActiveMQTopic(TOPIC_NAME);
+ topicConnection = topicConnectionFactory.createTopicConnection();
+ topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ topicPublisher = topicSession.createPublisher(topic);
+ message = topicSession.createMessage();
+ } catch( Exception ex ) {
+ exceptions.add(ex);
+ }
+ while (shouldPublish) {
+ try {
+ topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000);
+ } catch (JMSException ex) {
+ exceptions.add(ex);
+ }
+ try {
+ Thread.sleep(1);
+ } catch (Exception ex) {
+ }
+ }
+ }
+ }
+
+ public void testFailover() throws Exception {
+
+ Thread publisherThread = new Thread( new MessagePublisher() );
+ publisherThread.start();
+
+ for( int i = 0; i < 100; i++ ) {
+
+ final int id = i;
+ Thread thread = new Thread( new Runnable() {
+ public void run() {
+
+ SimpleTopicSubscriber sub = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME);
+ }
+ } );
+ thread.start();
+
+ LOG.info( "subscribed " + i + " of 100" );
+ }
+
+ Thread.sleep(5000);
+ broker.stop();
+ broker = createBroker(false);
+ Thread.sleep(5000);
+ assertEquals(0, exceptions.size());
+ }
public void testConcurrentDurableConsumer() throws Exception {
factory = createConnectionFactory();
@@ -131,7 +257,7 @@
LOG.info("Sent msg " + i);
}
}
-
+
Thread.sleep(2000);
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
@@ -220,10 +346,14 @@
answer.start();
return answer;
}
-
protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
+ File dataDirFile = new File("target/"+ getName());
+ AMQPersistenceAdapterFactory fact = new AMQPersistenceAdapterFactory();
+ fact.setDataDirectory(dataDirFile);
+ fact.setForceRecoverReferenceStore(true);
+ answer.setPersistenceAdapter(fact.createPersistenceAdapter());
answer.setDeleteAllMessagesOnStartup(deleteStore);
answer.addConnector(bindAddress);
answer.setUseShutdownHook(false);