You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/03/20 15:06:46 UTC

svn commit: r756469 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Author: gtully
Date: Fri Mar 20 14:06:45 2009
New Revision: 756469

URL: http://svn.apache.org/viewvc?rev=756469&view=rev
Log:
further tests for AMQ-2149

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=756469&r1=756468&r2=756469&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Fri Mar 20 14:06:45 2009
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.bugs;
 
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.Vector;
 
 import junit.framework.TestCase;
@@ -33,45 +35,54 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+interface Configurer {
+    public void configure(BrokerService broker) throws Exception;
+}
+
 public class AMQ2149Test extends TestCase {
 
-    private static final Log log = LogFactory.getLog(AMQ2149Test.class);
+    private static final Log LOG = LogFactory.getLog(AMQ2149Test.class);
+
+    private static final long BROKER_STOP_PERIOD = 15 * 1000;
 
-    private String BROKER_URL;
+    private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
+    private static final String BROKER_URL = "failover:("+ BROKER_CONNECTOR
+        +")?maxReconnectDelay=1000&useExponentialBackOff=false";
+        
     private final String SEQ_NUM_PROPERTY = "seqNum";
 
     final int MESSAGE_LENGTH_BYTES = 75000;
     final int MAX_TO_SEND  = 2000;
     final long SLEEP_BETWEEN_SEND_MS = 5;
     final int NUM_SENDERS_AND_RECEIVERS = 10;
+    final Object brokerLock = new Object();
     
     BrokerService broker;
     Vector<Throwable> exceptions = new Vector<Throwable>();
     
-    public void setUp() throws Exception {
+    public void createBroker(Configurer configurer) throws Exception {
         broker = new BrokerService();
-        broker.addConnector("tcp://localhost:0");
-        broker.deleteAllMessages();
-        
-        SystemUsage usage = new SystemUsage();
-        MemoryUsage memoryUsage = new MemoryUsage();
-        memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
-        usage.setMemoryUsage(memoryUsage);
-        broker.setSystemUsage(usage);
+        broker.setDataDirectory("target/amq-data/" + getName());
+        broker.addConnector(BROKER_CONNECTOR);        
+        if (configurer != null) {
+            configurer.configure(broker);
+        }
+        broker.setBrokerName(getName());
         broker.start();
-
-        BROKER_URL = "failover:("
-            + broker.getTransportConnectors().get(0).getUri()
-            +")?maxReconnectDelay=1000&useExponentialBackOff=false";
     }
     
     public void tearDown() throws Exception {
-        broker.stop();
+        synchronized(brokerLock) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        exceptions.clear();
     }
     
     private String buildLongString() {
@@ -94,6 +105,8 @@
         private final MessageConsumer messageConsumer;
 
         private volatile long nextExpectedSeqNum = 0;
+        
+        private String lastId = null;
 
         public Receiver(String queueName) throws JMSException {
             this.queueName = queueName;
@@ -106,21 +119,33 @@
             connection.start();
         }
 
+        public void close() throws JMSException {
+            connection.close();
+        }
+        
+        public long getNextExpectedSeqNo() {
+            return nextExpectedSeqNum;
+        }
+        
         public void onMessage(Message message) {
             try {
                 final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
                 if ((seqNum % 100) == 0) {
-                    log.info(queueName + " received " + seqNum);
+                    LOG.info(queueName + " received " + seqNum);
                 }
                 if (seqNum != nextExpectedSeqNum) {
-                    log.warn(queueName + " received " + seqNum + " expected "
-                            + nextExpectedSeqNum);
+                    LOG.warn(queueName + " received " + seqNum
+                            + " in msg: " + message.getJMSMessageID()
+                            + " expected "
+                            + nextExpectedSeqNum
+                            + ", lastId: " + lastId);
                     fail(queueName + " received " + seqNum + " expected "
                             + nextExpectedSeqNum);
                 }
                 ++nextExpectedSeqNum;
+                lastId = message.getJMSMessageID();
             } catch (Throwable e) {
-                log.error(queueName + " onMessage error", e);
+                LOG.error(queueName + " onMessage error", e);
                 exceptions.add(e);
             }
         }
@@ -161,38 +186,150 @@
                     ++nextSequenceNumber;
                     messageProducer.send(message);
                 } catch (Exception e) {
-                    log.error(queueName + " send error", e);
+                    LOG.error(queueName + " send error", e);
                     exceptions.add(e);
                 }
                 try {
                     Thread.sleep(SLEEP_BETWEEN_SEND_MS);
                 } catch (InterruptedException e) {
-                    log.warn(queueName + " sleep interrupted", e);
+                    LOG.warn(queueName + " sleep interrupted", e);
                 }
             }
+            try {
+                connection.close();
+            } catch (JMSException ignored) {
+            }
         }
     }
 
-    public void testOutOfOrderWithMemeUsageLimit() throws Exception {
+    public void testOrderWithMemeUsageLimit() throws Exception {
+        
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                SystemUsage usage = new SystemUsage();
+                MemoryUsage memoryUsage = new MemoryUsage();
+                memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
+                usage.setMemoryUsage(memoryUsage);
+                broker.setSystemUsage(usage);
+                
+                broker.deleteAllMessages();            
+            }
+        });
+        
+        verifyOrderedMessageReceipt();
+    }
+
+    public void testOrderWithRestartVMIndex() throws Exception {
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+                persistenceFactory.setPersistentIndex(false);
+                broker.setPersistenceFactory(persistenceFactory);
+                broker.deleteAllMessages();     
+            }
+        });
+        
+        final Timer timer = new Timer();
+        schedualRestartTask(timer, new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+                persistenceFactory.setPersistentIndex(false);
+                broker.setPersistenceFactory(persistenceFactory);
+            }
+        });
+        
+        try {
+            verifyOrderedMessageReceipt();
+        } finally {
+            timer.cancel();
+        }
+    }
+
+    
+    public void x_testOrderWithRestartWithForceRecover() throws Exception {
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+                persistenceFactory.setForceRecoverReferenceStore(true);
+                broker.setPersistenceFactory(persistenceFactory);
+                broker.deleteAllMessages();     
+            }
+        });
+        
+        final Timer timer = new Timer();
+        schedualRestartTask(timer, new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+                persistenceFactory.setForceRecoverReferenceStore(true);
+                broker.setPersistenceFactory(persistenceFactory);
+            }
+        });
+        
+        try {
+            verifyOrderedMessageReceipt();
+        } finally {
+            timer.cancel();
+        }
+    }
+
+    private void schedualRestartTask(Timer timer) {
+        schedualRestartTask(timer, null);   
+    }
+
+    private void schedualRestartTask(final Timer timer, final Configurer configurer) {
+        timer.schedule(new TimerTask() {
+            public void run() {
+                synchronized (brokerLock) {
+                    LOG.info("stopping broker..");
+                    try {
+                        broker.stop();
+                    } catch (Exception e) {
+                        LOG.error("ex on broker stop", e);
+                        exceptions.add(e);
+                    }
+                    LOG.info("restarting broker");
+                    try {
+                        createBroker(configurer);
+                    } catch (Exception e) {
+                        LOG.error("ex on broker restart", e);
+                        exceptions.add(e);
+                    }
+                }
+                // do once
+                // timer.cancel();
+            }            
+        }, BROKER_STOP_PERIOD, BROKER_STOP_PERIOD);
+    }
+    
+    private void verifyOrderedMessageReceipt() throws Exception {
+        
         Vector<Thread> threads = new Vector<Thread>();
+        Vector<Receiver> receivers = new Vector<Receiver>();
         
         for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
             final String queueName = "test.queue." + i;
-            new Receiver(queueName);
+            receivers.add(new Receiver(queueName));
             Thread thread = new Thread(new Sender(queueName));
             thread.start();
             threads.add(thread);
         }
         
         final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
-        while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
+        while(!threads.isEmpty() && !receivers.isEmpty() 
+                && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
             Thread sendThread = threads.firstElement();
             sendThread.join(1000*10);
             if (!sendThread.isAlive()) {
                 threads.remove(sendThread);
             }
+            
+            Receiver receiver = receivers.firstElement();
+            if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND) {
+                receiver.close();
+                receivers.remove(receiver);
+            }
         }
-        assertTrue("No timeout waiting for senders to complete", System.currentTimeMillis() < expiry);
+        assertTrue("No timeout waiting for senders/receivers to complete", System.currentTimeMillis() < expiry);
         assertTrue("No exceptions", exceptions.isEmpty());
     }