You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/10/30 11:51:42 UTC

svn commit: r831258 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/broker/region/QueuePurgeTest.java

Author: gtully
Date: Fri Oct 30 10:51:41 2009
New Revision: 831258

URL: http://svn.apache.org/viewvc?rev=831258&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2468 - limit pagedInPendingDispatch to maxPageSize and bypass dispatch for jmx queue modifications like purge and remove matching messages so they are not limited by pending messages and can page through all messages. Resolve intermittent deadlock in AMQ2102Test. Note: sparse selectors may need to increase maxPageSize as ever increasing pagedInPendingDispatch was exceeding that limit in error

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=831258&r1=831257&r2=831258&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Oct 30 10:51:41 2009
@@ -863,8 +863,8 @@
     public void purge() throws Exception {   
         ConnectionContext c = createConnectionContext();
         List<MessageReference> list = null;
-        do {
-            pageInMessages();
+        do {        
+            doPageIn(true);
             synchronized (pagedInMessages) {
                 list = new ArrayList<MessageReference>(pagedInMessages.values());
             }
@@ -876,6 +876,7 @@
                 } catch (IOException e) {
                 }
             }
+            
         } while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0);
         gc();
         this.destinationStatistics.getMessages().setCount(0);
@@ -919,7 +920,7 @@
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         ConnectionContext context = createConnectionContext();
         do {
-            pageInMessages();
+            doPageIn(true);
             synchronized (pagedInMessages) {
                 set.addAll(pagedInMessages.values());
             }
@@ -979,7 +980,7 @@
         do {
             int oldMaxSize=getMaxPageSize();
             setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
-            pageInMessages();
+            doPageIn(true);
             setMaxPageSize(oldMaxSize);
             synchronized (pagedInMessages) {
                 set.addAll(pagedInMessages.values());
@@ -1170,7 +1171,7 @@
 	            pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
 	        } 
 	        
-	        // Perhaps we should page always into the pagedInPendingDispatch list is 
+	        // Perhaps we should page always into the pagedInPendingDispatch list if 
 	        // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
 	        // then we do a dispatch.
 	        if (pageInMoreMessages) {
@@ -1215,6 +1216,11 @@
 
     protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
         removeMessage(c, null, r);
+        synchronized(dispatchMutex) {            
+            synchronized (pagedInPendingDispatch) {
+                pagedInPendingDispatch.remove(r);
+            }
+        }
     }
     
     protected void removeMessage(ConnectionContext c, Subscription subs,QueueMessageReference r) throws IOException {
@@ -1349,12 +1355,11 @@
                         + ", pagedInMessages.size " + pagedInMessages.size());
             }
            
-            if (isLazyDispatch()&& !force) {
+            if (isLazyDispatch() && !force) {
                 // Only page in the minimum number of messages which can be dispatched immediately.
                 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
             }
-            
-            if ((force || !consumers.isEmpty()) && toPageIn > 0) { 
+            if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size() < getMaxPageSize()))) {
                 int count = 0;
                 result = new ArrayList<QueueMessageReference>(toPageIn);
                 synchronized (messages) {
@@ -1405,8 +1410,7 @@
                     // dispatched before.
                     pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
                 }
-                // and now see if we can dispatch the new stuff.. and append to
-                // the pending
+                // and now see if we can dispatch the new stuff.. and append to the pending
                 // list anything that does not actually get dispatched.
                 if (list != null && !list.isEmpty()) {
                     if (pagedInPendingDispatch.isEmpty()) {
@@ -1423,7 +1427,8 @@
             }
         } 
         if (doWakeUp) {
-            wakeup();
+            // avoid lock order contention
+            asyncWakeup();
         }
     }
     
@@ -1495,9 +1500,6 @@
         return rc;
     }
 
-    private void pageInMessages() throws Exception {
-        pageInMessages(true);
-    }
 
     protected void pageInMessages(boolean force) throws Exception {
             doDispatch(doPageIn(force));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java?rev=831258&r1=831257&r2=831258&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java Fri Oct 30 10:51:41 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.File;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
@@ -25,15 +27,25 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.management.MBeanServerInvocationHandler;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+
 import junit.framework.TestCase;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class QueuePurgeTest extends TestCase {
+    private static final Log LOG = LogFactory.getLog(QueuePurgeTest.class);
+    private final String MESSAGE_TEXT = new String(new byte[1024]);
     BrokerService broker;
     ConnectionFactory factory;
     Connection connection;
@@ -43,17 +55,23 @@
 
     protected void setUp() throws Exception {
         broker = new BrokerService();
+        broker.setDataDirectory("target/activemq-data");
         broker.setUseJmx(true);
-        broker.setPersistent(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/QueuePurgeTest"));
+        broker.setPersistenceAdapter(persistenceAdapter);
         broker.addConnector("tcp://localhost:0");
         broker.start();
-        factory = new ActiveMQConnectionFactory("vm://localhost");
+        factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString());
         connection = factory.createConnection();
         connection.start();
     }
 
     protected void tearDown() throws Exception {
-        consumer.close();
+        if (consumer != null) {
+            consumer.close();
+        }
         session.close();
         connection.stop();
         connection.close();
@@ -61,10 +79,45 @@
     }
 
     public void testPurgeQueueWithActiveConsumer() throws Exception {
-        createProducerAndSendMessages();
+        createProducerAndSendMessages(10000);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        createConsumer();
+        proxy.purge();
+        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
+                proxy.getQueueSize());
+    }
+    
+    
+    public void testPurgeLargeQueue() throws Exception {       
+        applyBrokerSpoolingPolicy();
+        createProducerAndSendMessages(90000);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        LOG.info("purging..");
+        proxy.purge();
+        assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
+                proxy.getQueueSize());
+    }
+
+    private void applyBrokerSpoolingPolicy() {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setProducerFlowControl(false);
+        PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy();
+        defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    
+    public void testPurgeLargeQueueWithConsumer() throws Exception {       
+        applyBrokerSpoolingPolicy();
+        createProducerAndSendMessages(90000);
         QueueViewMBean proxy = getProxyToQueueViewMBean();
         createConsumer();
+        long start = System.currentTimeMillis();
+        LOG.info("purging..");
         proxy.purge();
+        LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms");
         assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
                 proxy.getQueueSize());
     }
@@ -80,12 +133,15 @@
         return proxy;
     }
 
-    private void createProducerAndSendMessages() throws Exception {
+    private void createProducerAndSendMessages(int numToSend) throws Exception {
         session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         queue = session.createQueue("test1");
         MessageProducer producer = session.createProducer(queue);
-        for (int i = 0; i < 10000; i++) {
-            TextMessage message = session.createTextMessage("message " + i);
+        for (int i = 0; i < numToSend; i++) {
+            TextMessage message = session.createTextMessage(MESSAGE_TEXT + i);
+            if (i  != 0 && i % 50000 == 0) {
+                LOG.info("sent: " + i);
+            }
             producer.send(message);
         }
         producer.close();
@@ -95,7 +151,7 @@
         consumer = session.createConsumer(queue);
         // wait for buffer fill out
         Thread.sleep(5 * 1000);
-        for (int i = 0; i < 100; ++i) {
+        for (int i = 0; i < 500; ++i) {
             Message message = consumer.receive();
             message.acknowledge();
         }