You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/07/09 18:26:42 UTC

svn commit: r792598 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Thu Jul  9 16:26:42 2009
New Revision: 792598

URL: http://svn.apache.org/viewvc?rev=792598&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2322 - test and correction

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=792598&r1=792597&r2=792598&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jul  9 16:26:42 2009
@@ -205,7 +205,9 @@
                         // Message could have expired while it was being
                         // loaded..
                         if (broker.isExpired(message)) {
-                            messageExpired(createConnectionContext(), message);
+                            messageExpired(createConnectionContext(), createMessageReference(message));
+                            // drop message will decrement so counter balance here
+                            destinationStatistics.getMessages().increment();
                             return true;
                         }
                         if (hasSpace()) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=792598&r1=792597&r2=792598&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Thu Jul  9 16:26:42 2009
@@ -16,9 +16,12 @@
  */
 package org.apache.activemq.usecases;
 
+import java.io.File;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -34,8 +37,10 @@
 import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -49,7 +54,9 @@
 	MessageProducer producer;
 	MessageConsumer consumer;
 	public ActiveMQDestination destination = new ActiveMQQueue("test");
-	
+    public boolean useTextMessage = true;
+    public boolean useVMCursor = true;
+    
     public static Test suite() {
         return suite(ExpiredMessagesTest.class);
     }
@@ -59,21 +66,8 @@
     }
 	
 	protected void setUp() throws Exception {
-        broker = new BrokerService();
-        broker.setBrokerName("localhost");
-        broker.setDataDirectory("data/");
-        broker.setUseJmx(true);
-        broker.deleteAllMessages();
-
-        PolicyEntry defaultPolicy = new PolicyEntry();
-        defaultPolicy.setExpireMessagesPeriod(100);
-        PolicyMap policyMap = new PolicyMap();
-        policyMap.setDefaultEntry(defaultPolicy);
-        broker.setDestinationPolicy(policyMap);
-
-        broker.addConnector("tcp://localhost:61616");
-        broker.start();
-        broker.waitUntilStarted();
+        final boolean deleteAllMessages = true;
+        broker = createBroker(deleteAllMessages, 100);
     }
 	
 	public void testExpiredMessages() throws Exception {
@@ -129,8 +123,8 @@
         producingThread.join();
         session.close();
         
-        Thread.sleep(5000);
-        
+        Thread.sleep(2000);
+                
         DestinationViewMBean view = createView(destination);
         LOG.info("Stats: received: "  + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
                 + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
@@ -145,8 +139,107 @@
                 + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
         assertEquals("Wrong inFlightCount: ", 0, view.getInFlightCount());
 	}
+
+	
+	public void initCombosForTestRecoverExpiredMessages() {
+	    addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE, Boolean.FALSE});
+	}
 	
-	protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
+	public void testRecoverExpiredMessages() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                "failover://tcp://localhost:61616");
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = session.createProducer(destination);
+        producer.setTimeToLive(2000);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        Thread producingThread = new Thread("Producing Thread") {
+            public void run() {
+                try {
+                    int i = 0;
+                    while (i++ < 1000) {
+                        Message message = useTextMessage ? session
+                                .createTextMessage("test") : session
+                                .createObjectMessage("test");
+                        producer.send(message);
+                    }
+                    producer.close();
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+
+        producingThread.start();
+        producingThread.join();
+
+        DestinationViewMBean view = createView(destination);
+        LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+                + view.getDequeueCount() + ", dequeues: "
+                + view.getDequeueCount() + ", dispatched: "
+                + view.getDispatchCount() + ", inflight: "
+                + view.getInFlightCount() + ", expiries: "
+                + view.getExpiredCount());
+
+        LOG.info("stopping broker");
+        broker.stop();
+        broker.waitUntilStopped();
+
+        Thread.sleep(5000);
+
+        LOG.info("recovering broker");
+        final boolean deleteAllMessages = false;
+        broker = createBroker(deleteAllMessages, 5000);
+        
+        view = createView(destination);
+        LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+                + view.getDequeueCount() + ", dequeues: "
+                + view.getDequeueCount() + ", dispatched: "
+                + view.getDispatchCount() + ", inflight: "
+                + view.getInFlightCount() + ", expiries: "
+                + view.getExpiredCount());
+
+        long expiry = System.currentTimeMillis() + 30000;
+        while (view.getQueueSize() > 0 && System.currentTimeMillis() < expiry) {
+            Thread.sleep(500);
+        }
+        LOG.info("Stats: size: " + view.getQueueSize() + ", enqueues: "
+                + view.getDequeueCount() + ", dequeues: "
+                + view.getDequeueCount() + ", dispatched: "
+                + view.getDispatchCount() + ", inflight: "
+                + view.getInFlightCount() + ", expiries: "
+                + view.getExpiredCount());
+        assertEquals("Wrong QueueSize: ", 0, view.getQueueSize());
+        assertEquals("all dequeues were expired", view.getDequeueCount(), view.getExpiredCount());
+    }
+
+	private BrokerService createBroker(boolean deleteAllMessages, long expireMessagesPeriod) throws Exception {
+	    BrokerService broker = new BrokerService();
+        broker.setBrokerName("localhost");
+        AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
+        adaptor.setDirectory(new File("data/"));
+        adaptor.setForceRecoverReferenceStore(true);
+        broker.setPersistenceAdapter(adaptor);
+        
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        if (useVMCursor) {
+            defaultPolicy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        }
+        defaultPolicy.setExpireMessagesPeriod(expireMessagesPeriod);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        broker.addConnector("tcp://localhost:61616");
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+
+    protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
 		 MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
 		 String domain = "org.apache.activemq";
 		 ObjectName name;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=792598&r1=792597&r2=792598&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Thu Jul  9 16:26:42 2009
@@ -137,7 +137,7 @@
         
 		assertTrue("producer completed within time ", !producingThread.isAlive());
 		
-		Thread.sleep(2*expiryPeriod);
+		Thread.sleep(3*expiryPeriod);
         DestinationViewMBean view = createView(destination);
         assertEquals("All sent have expired ", sendCount, view.getExpiredCount());
 	}