You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/06/24 18:09:17 UTC

svn commit: r788068 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java

Author: dejanb
Date: Wed Jun 24 16:09:17 2009
New Revision: 788068

URL: http://svn.apache.org/viewvc?rev=788068&view=rev
Log:
test case for https://issues.apache.org/activemq/browse/AMQ-2303 - durable consumers recovery

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=788068&r1=788067&r2=788068&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java Wed Jun 24 16:09:17 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.bugs;
 
+import java.io.File;
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -26,18 +27,30 @@
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
 
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -56,7 +69,120 @@
     protected byte[] payload = new byte[1024*32];
     protected ConnectionFactory factory;
     protected Vector<Exception> exceptions = new Vector<Exception>();
+    
+    private static final String TOPIC_NAME = "failoverTopic";
+    private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
    
+    
+    private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
+
+        private TopicConnection topicConnection = null;
+        private String clientId;
+
+        public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) {
+
+            ActiveMQConnectionFactory topicConnectionFactory = null;
+            TopicSession topicSession = null;
+            Topic topic = null;
+            TopicSubscriber topicSubscriber = null;
+            
+
+            topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
+            try {
+
+                topic = new ActiveMQTopic(topicName);
+                topicConnection = topicConnectionFactory.createTopicConnection();
+                topicConnection.setClientID((clientId));
+                topicConnection.start();
+
+                topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+                topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId));
+                this.clientId = clientId;
+                topicSubscriber.setMessageListener(this);
+
+            } catch (JMSException e) {
+            	e.printStackTrace();
+            }
+        }
+
+        public void onMessage(Message arg0) {
+        }
+
+        public void closeConnection() {
+            if (topicConnection != null) {
+                try {
+                    topicConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+
+		public void onException(JMSException exception) {
+			exceptions.add(exception);
+		}
+    }   
+    
+    private class MessagePublisher implements Runnable {
+        private boolean shouldPublish = true;
+
+        public void run() {
+            TopicConnectionFactory topicConnectionFactory = null;
+            TopicConnection topicConnection = null;
+            TopicSession topicSession = null;
+            Topic topic = null;
+            TopicPublisher topicPublisher = null;
+            Message message = null;
+
+
+            topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
+            try {
+            topic = new ActiveMQTopic(TOPIC_NAME);
+            topicConnection = topicConnectionFactory.createTopicConnection();
+            topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+            topicPublisher = topicSession.createPublisher(topic);
+            message = topicSession.createMessage();
+            } catch( Exception ex ) {
+            	exceptions.add(ex);
+            }
+            while (shouldPublish) {
+                try {
+                    topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000);
+                } catch (JMSException ex) {
+                	exceptions.add(ex);
+                }
+                try {
+                    Thread.sleep(1);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+    
+    public void testFailover() throws Exception {
+        
+    	Thread publisherThread = new Thread( new MessagePublisher() );
+        publisherThread.start();
+        
+        for( int i = 0; i < 100; i++ ) {
+            
+            final int id = i;
+            Thread thread = new Thread( new Runnable() {
+                public void run() {
+                    
+                    SimpleTopicSubscriber sub = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME);
+                }
+            } );
+            thread.start();
+            
+            LOG.info( "subscribed " + i + " of 100" );
+        }
+
+        Thread.sleep(5000);
+        broker.stop();
+        broker = createBroker(false);
+        Thread.sleep(5000);
+        assertEquals(0, exceptions.size());
+    }
   
     public void testConcurrentDurableConsumer() throws Exception {
         factory = createConnectionFactory();
@@ -131,7 +257,7 @@
                 LOG.info("Sent msg " + i);
             }
         }
-
+        
         Thread.sleep(2000);
         executor.shutdown();
         executor.awaitTermination(30, TimeUnit.SECONDS);
@@ -220,10 +346,14 @@
         answer.start();
         return answer;
     }
-
     
 
     protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
+        File dataDirFile = new File("target/"+ getName());
+        AMQPersistenceAdapterFactory fact = new AMQPersistenceAdapterFactory();
+        fact.setDataDirectory(dataDirFile);
+        fact.setForceRecoverReferenceStore(true);
+    	answer.setPersistenceAdapter(fact.createPersistenceAdapter());
         answer.setDeleteAllMessagesOnStartup(deleteStore);
         answer.addConnector(bindAddress);
         answer.setUseShutdownHook(false);