You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/11/10 18:26:39 UTC

svn commit: r834557 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/broker/ft/ test/java/org/apache/activemq/broker/region/ test/java...

Author: gtully
Date: Tue Nov 10 17:26:38 2009
New Revision: 834557

URL: http://svn.apache.org/viewvc?rev=834557&view=rev
Log:
merge -c 834543 - resolve https://issues.apache.org/activemq/browse/AMQ-2481 - no need to force a page in but sync between expiry from browse and from pageIn needed some tweaks, expired messages need to be removed from the cursor in the event of expiry from browse. Also resolve unit test failures from https://issues.apache.org/activemq/browse/AMQ-2481

Added:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
      - copied unchanged from r834543, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=834557&r1=834556&r2=834557&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Nov 10 17:26:38 2009
@@ -25,7 +25,6 @@
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageDispatchNotification;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=834557&r1=834556&r2=834557&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Nov 10 17:26:38 2009
@@ -16,6 +16,28 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -54,26 +76,6 @@
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.io.IOException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
 
 
 /**
@@ -198,10 +200,12 @@
                     public boolean recoverMessage(Message message) {
                         // Message could have expired while it was being
                         // loaded..
-                        if (broker.isExpired(message)) {
-                            messageExpired(createConnectionContext(), createMessageReference(message));
-                            // drop message will decrement so counter balance here
-                            destinationStatistics.getMessages().increment();
+                        if (message.isExpired()) {
+                            if (broker.isExpired(message)) {
+                                messageExpired(createConnectionContext(), createMessageReference(message));
+                                // drop message will decrement so counter balance here
+                                destinationStatistics.getMessages().increment();
+                            }
                             return true;
                         }
                         if (hasSpace()) {
@@ -439,6 +443,7 @@
                                     // While waiting for space to free up... the
                                     // message may have expired.
                                     if (message.isExpired()) {
+                                        LOG.error("expired waiting for space..");
                                         broker.messageExpired(context, message);
                                         destinationStatistics.getExpired().increment();
                                     } else {
@@ -585,7 +590,7 @@
                 return null;
             }
         };
-        doBrowse(true, browsedMessages, this.getMaxExpirePageSize());
+        doBrowse(browsedMessages, this.getMaxExpirePageSize());
     }
 
     public void gc(){
@@ -749,14 +754,15 @@
 
     public Message[] browse() {    
         List<Message> l = new ArrayList<Message>();
-        doBrowse(false, l, getMaxBrowsePageSize());
+        doBrowse(l, getMaxBrowsePageSize());
         return l.toArray(new Message[l.size()]);
     }
     
-    public void doBrowse(boolean forcePageIn, List<Message> l, int max) {
+    
+    public void doBrowse(List<Message> l, int max) {
         final ConnectionContext connectionContext = createConnectionContext();
         try {
-            pageInMessages(forcePageIn);
+            pageInMessages(false);
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
             synchronized(dispatchMutex) {
                 synchronized (pagedInPendingDispatch) {
@@ -770,7 +776,7 @@
                 }
                 toExpire.clear();
                 synchronized (pagedInMessages) {
-                    addAll(pagedInMessages.values(), l, max, toExpire);   
+                    addAll(pagedInMessages.values(), l, max, toExpire);
                 }
                 for (MessageReference ref : toExpire) {
                     if (broker.isExpired(ref)) {
@@ -787,13 +793,16 @@
                         try {
                             messages.reset();
                             while (messages.hasNext() && l.size() < max) {
-                                MessageReference node = messages.next();
-                                messages.rollback(node.getMessageId());
-                                if (node != null) {
+                                MessageReference node = messages.next();        
+                                if (node.isExpired()) {
                                     if (broker.isExpired(node)) {
                                         messageExpired(connectionContext,
                                                 createMessageReference(node.getMessage()));
-                                    } else if (l.contains(node.getMessage()) == false) {
+                                    }
+                                    messages.remove();
+                                } else {
+                                    messages.rollback(node.getMessageId());
+                                    if (l.contains(node.getMessage()) == false) {
                                         l.add(node.getMessage());
                                     }
                                 }
@@ -806,7 +815,7 @@
             } 
         } catch (Exception e) {
             LOG.error("Problem retrieving message for browse", e);
-        }
+        }     
     }
 
     private void addAll(Collection<QueueMessageReference> refs,
@@ -1278,7 +1287,7 @@
     }
     
     public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
-        if (LOG.isDebugEnabled()) {      
+        if (LOG.isDebugEnabled()) {
             LOG.debug("message expired: " + reference);
         }
         broker.messageExpired(context, reference);
@@ -1371,12 +1380,14 @@
                             node.incrementReferenceCount();
                             messages.remove();
                             QueueMessageReference ref = createMessageReference(node.getMessage());
-                            if (!broker.isExpired(node)) {
+                            if (ref.isExpired()) {
+                                if (broker.isExpired(ref)) {
+                                    messageExpired(createConnectionContext(), ref);
+                                }
+                            } else {
                                 result.add(ref);
                                 count++;
-                            } else {
-                                messageExpired(createConnectionContext(), ref);
-                            }
+                            }   
                         }
                     } finally {
                         messages.release();

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java?rev=834557&r1=834556&r2=834557&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java Tue Nov 10 17:26:38 2009
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 
 
 public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTest {
@@ -33,17 +36,25 @@
 
     protected void createMaster() throws Exception {
         master = new BrokerService();
-        master.setBrokerName("shared");
+        master.setBrokerName("shared-master");
+        configureSharedPersistenceAdapter(master);
         master.addConnector(brokerUrl);
         master.start();
     }
     
+    private void configureSharedPersistenceAdapter(BrokerService broker) throws Exception {
+       AMQPersistenceAdapter adapter = new AMQPersistenceAdapter();
+       adapter.setDirectory(new File("shared"));
+       broker.setPersistenceAdapter(adapter); 
+    }
+
     protected void createSlave() throws Exception {      
         new Thread(new Runnable() {
             public void run() {
                 try {
                     BrokerService broker = new BrokerService();
-                    broker.setBrokerName("shared");
+                    broker.setBrokerName("shared-slave");
+                    configureSharedPersistenceAdapter(broker);
                     // add transport as a service so that it is bound on start, after store started                
                     final TransportConnector tConnector = new TransportConnector();
                     tConnector.setUri(new URI(brokerUrl));

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java?rev=834557&r1=834556&r2=834557&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java Tue Nov 10 17:26:38 2009
@@ -98,6 +98,23 @@
                 proxy.getQueueSize());
     }
 
+    public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {       
+        applyBrokerSpoolingPolicy();
+        final int exprityPeriod = 1000;
+        applyExpiryDuration(exprityPeriod);
+        createProducerAndSendMessages(90000);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        LOG.info("waiting for expiry to kick in a bunch of times to verify it does not blow mem");
+        Thread.sleep(10000);
+        assertEquals("Queue size is has not changed " + proxy.getQueueSize(), 90000,
+                proxy.getQueueSize());
+    }
+    
+
+    private void applyExpiryDuration(int i) {
+        broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
+    }
+
     private void applyBrokerSpoolingPolicy() {
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=834557&r1=834556&r2=834557&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java Tue Nov 10 17:26:38 2009
@@ -98,7 +98,8 @@
         context.checking(new Expectations(){{
             allowing (managementContext).getJmxDomainName(); will (returnValue("Test"));
             allowing (managementContext).start();
-            allowing (managementContext).stop();            
+            allowing (managementContext).stop();
+            allowing (managementContext).isConnectorStarted();
             
             // expected MBeans
             allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=834557&r1=834556&r2=834557&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Tue Nov 10 17:26:38 2009
@@ -152,6 +152,8 @@
         
         assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
+                LOG.info("Stats: received: "  + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+                        + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
                 return view.getQueueSize() == 0;
             }
         }));
@@ -282,7 +284,7 @@
         broker.waitUntilStarted();
         return broker;
     }
-
+	
     protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
         String domain = "org.apache.activemq";
         ObjectName name;

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=834557&r1=834556&r2=834557&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Tue Nov 10 17:26:38 2009
@@ -16,6 +16,19 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import junit.framework.Test;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -27,16 +40,6 @@
 import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-import junit.framework.Test;
 
 
 public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
@@ -140,12 +143,16 @@
         final DestinationViewMBean view = createView(destination);
         Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
+                LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                        + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                        + ", size= " + view.getQueueSize());
                 return sendCount == view.getExpiredCount();
             }
         });
         LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
                 + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
                 + ", size= " + view.getQueueSize());
+        
         assertEquals("All sent have expired", sendCount, view.getExpiredCount());
 	}