You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/03/25 11:17:49 UTC

svn commit: r758200 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Author: gtully
Date: Wed Mar 25 10:17:45 2009
New Revision: 758200

URL: http://svn.apache.org/viewvc?rev=758200&view=rev
Log:
share the test

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=758200&r1=758199&r2=758200&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Wed Mar 25 10:17:45 2009
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.bugs;
 
+import java.io.File;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.Vector;
@@ -34,6 +35,9 @@
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
 import org.apache.activemq.usage.MemoryUsage;
@@ -59,24 +63,34 @@
 
     final int MESSAGE_LENGTH_BYTES = 75000;
     final int MAX_TO_SEND  = 2000;
-    final long SLEEP_BETWEEN_SEND_MS = 5;
+    final long SLEEP_BETWEEN_SEND_MS = 3;
     final int NUM_SENDERS_AND_RECEIVERS = 10;
     final Object brokerLock = new Object();
     
     BrokerService broker;
     Vector<Throwable> exceptions = new Vector<Throwable>();
+
+    private File dataDirFile;
     
     public void createBroker(Configurer configurer) throws Exception {
         broker = new BrokerService();
-        broker.setDataDirectory("target/amq-data/" + getName());
+        AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+        persistenceFactory.setDataDirectory(dataDirFile);
+        broker.setPersistenceFactory(persistenceFactory);
+
         broker.addConnector(BROKER_CONNECTOR);        
+        broker.setBrokerName(getName());
+        broker.setDataDirectoryFile(dataDirFile);
         if (configurer != null) {
             configurer.configure(broker);
         }
-        broker.setBrokerName(getName());
         broker.start();
     }
     
+    public void setUp() throws Exception {
+        dataDirFile = new File("target/"+ getName());
+    }
+    
     public void tearDown() throws Exception {
         synchronized(brokerLock) {
             broker.stop();
@@ -138,7 +152,8 @@
                             + " in msg: " + message.getJMSMessageID()
                             + " expected "
                             + nextExpectedSeqNum
-                            + ", lastId: " + lastId);
+                            + ", lastId: " + lastId 
+                            + ", message:" + message);
                     fail(queueName + " received " + seqNum + " expected "
                             + nextExpectedSeqNum);
                 }
@@ -189,10 +204,12 @@
                     LOG.error(queueName + " send error", e);
                     exceptions.add(e);
                 }
-                try {
-                    Thread.sleep(SLEEP_BETWEEN_SEND_MS);
-                } catch (InterruptedException e) {
-                    LOG.warn(queueName + " sleep interrupted", e);
+                if (SLEEP_BETWEEN_SEND_MS > 0) {
+                    try {
+                        Thread.sleep(SLEEP_BETWEEN_SEND_MS);
+                    } catch (InterruptedException e) {
+                        LOG.warn(queueName + " sleep interrupted", e);
+                    }
                 }
             }
             try {
@@ -202,13 +219,13 @@
         }
     }
 
-    public void testOrderWithMemeUsageLimit() throws Exception {
+    public void x_testOrderWithMemeUsageLimit() throws Exception {
         
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
                 SystemUsage usage = new SystemUsage();
                 MemoryUsage memoryUsage = new MemoryUsage();
-                memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
+                memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 5 * NUM_SENDERS_AND_RECEIVERS);
                 usage.setMemoryUsage(memoryUsage);
                 broker.setSystemUsage(usage);
                 
@@ -222,9 +239,9 @@
     public void testOrderWithRestartVMIndex() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
-                AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+                AMQPersistenceAdapterFactory persistenceFactory =
+                    (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
                 persistenceFactory.setPersistentIndex(false);
-                broker.setPersistenceFactory(persistenceFactory);
                 broker.deleteAllMessages();     
             }
         });
@@ -232,9 +249,9 @@
         final Timer timer = new Timer();
         schedualRestartTask(timer, new Configurer() {
             public void configure(BrokerService broker) throws Exception {
-                AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+                AMQPersistenceAdapterFactory persistenceFactory =
+                    (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
                 persistenceFactory.setPersistentIndex(false);
-                broker.setPersistenceFactory(persistenceFactory);
             }
         });
         
@@ -245,13 +262,31 @@
         }
     }
 
+
+    public void x_testOrderWithRestart() throws Exception {
+        createBroker(new Configurer() {
+            public void configure(BrokerService broker) throws Exception {
+                broker.deleteAllMessages();     
+            }
+        });
+        
+        final Timer timer = new Timer();
+        schedualRestartTask(timer, null);
+        
+        try {
+            verifyOrderedMessageReceipt();
+        } finally {
+            timer.cancel();
+        }
+    }
     
+
     public void x_testOrderWithRestartWithForceRecover() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
-                AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+                AMQPersistenceAdapterFactory persistenceFactory =
+                    (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
                 persistenceFactory.setForceRecoverReferenceStore(true);
-                broker.setPersistenceFactory(persistenceFactory);
                 broker.deleteAllMessages();     
             }
         });
@@ -259,9 +294,14 @@
         final Timer timer = new Timer();
         schedualRestartTask(timer, new Configurer() {
             public void configure(BrokerService broker) throws Exception {
-                AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+                AMQPersistenceAdapterFactory persistenceFactory =
+                    (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
                 persistenceFactory.setForceRecoverReferenceStore(true);
-                broker.setPersistenceFactory(persistenceFactory);
+//                PolicyEntry auditDepthPolicy = new PolicyEntry();
+//                auditDepthPolicy.setMaxAuditDepth(2000);
+//                PolicyMap policyMap = new PolicyMap();
+//                policyMap.setDefaultEntry(auditDepthPolicy);
+//                broker.setDestinationPolicy(policyMap);
             }
         });
         
@@ -272,12 +312,8 @@
         }
     }
 
-    private void schedualRestartTask(Timer timer) {
-        schedualRestartTask(timer, null);   
-    }
-
     private void schedualRestartTask(final Timer timer, final Configurer configurer) {
-        timer.schedule(new TimerTask() {
+        class RestartTask extends TimerTask {
             public void run() {
                 synchronized (brokerLock) {
                     LOG.info("stopping broker..");
@@ -290,15 +326,20 @@
                     LOG.info("restarting broker");
                     try {
                         createBroker(configurer);
+                        broker.waitUntilStarted();
                     } catch (Exception e) {
                         LOG.error("ex on broker restart", e);
                         exceptions.add(e);
                     }
                 }
-                // do once
-                // timer.cancel();
-            }            
-        }, BROKER_STOP_PERIOD, BROKER_STOP_PERIOD);
+                // do it again
+                try {
+                    timer.schedule(new RestartTask(), BROKER_STOP_PERIOD);
+                } catch (IllegalStateException ignore_alreadyCancelled) {   
+                }
+            } 
+        }
+        timer.schedule(new RestartTask(), BROKER_STOP_PERIOD);
     }
     
     private void verifyOrderedMessageReceipt() throws Exception {
@@ -314,17 +355,19 @@
             threads.add(thread);
         }
         
-        final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
-        while(!threads.isEmpty() && !receivers.isEmpty() 
-                && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
+        final long expiry = System.currentTimeMillis() + 1000 * 60 * 10;
+        while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
             Thread sendThread = threads.firstElement();
             sendThread.join(1000*10);
             if (!sendThread.isAlive()) {
                 threads.remove(sendThread);
             }
-            
+        }
+        LOG.info("senders done...");
+        
+        while(!receivers.isEmpty() && System.currentTimeMillis() < expiry) {
             Receiver receiver = receivers.firstElement();
-            if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND) {
+            if (receiver.getNextExpectedSeqNo() >= MAX_TO_SEND || !exceptions.isEmpty()) {
                 receiver.close();
                 receivers.remove(receiver);
             }