You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/11/10 17:57:28 UTC

svn commit: r834543 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/broker/ft/ test/java/org/apache/activemq/broker/region/ test/java/org/apache/acti...

Author: gtully
Date: Tue Nov 10 16:57:28 2009
New Revision: 834543

URL: http://svn.apache.org/viewvc?rev=834543&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2481 - no need to force a page in but sync between expiry from browse and from pageIn needed some tweaks, expired messages need to be removed from the cursor in the event of expiry from browse. Also resolve unit test failures from https://issues.apache.org/activemq/browse/AMQ-2481

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=834543&r1=834542&r2=834543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Nov 10 16:57:28 2009
@@ -25,7 +25,6 @@
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageDispatchNotification;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=834543&r1=834542&r2=834543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Nov 10 16:57:28 2009
@@ -16,6 +16,28 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -54,26 +76,6 @@
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.io.IOException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
 
 
 /**
@@ -198,10 +200,12 @@
                     public boolean recoverMessage(Message message) {
                         // Message could have expired while it was being
                         // loaded..
-                        if (broker.isExpired(message)) {
-                            messageExpired(createConnectionContext(), createMessageReference(message));
-                            // drop message will decrement so counter balance here
-                            destinationStatistics.getMessages().increment();
+                        if (message.isExpired()) {
+                            if (broker.isExpired(message)) {
+                                messageExpired(createConnectionContext(), createMessageReference(message));
+                                // drop message will decrement so counter balance here
+                                destinationStatistics.getMessages().increment();
+                            }
                             return true;
                         }
                         if (hasSpace()) {
@@ -439,6 +443,7 @@
                                     // While waiting for space to free up... the
                                     // message may have expired.
                                     if (message.isExpired()) {
+                                        LOG.error("expired waiting for space..");
                                         broker.messageExpired(context, message);
                                         destinationStatistics.getExpired().increment();
                                     } else {
@@ -585,7 +590,7 @@
                 return null;
             }
         };
-        doBrowse(true, browsedMessages, this.getMaxExpirePageSize());
+        doBrowse(browsedMessages, this.getMaxExpirePageSize());
     }
 
     public void gc(){
@@ -749,14 +754,15 @@
 
     public Message[] browse() {    
         List<Message> l = new ArrayList<Message>();
-        doBrowse(false, l, getMaxBrowsePageSize());
+        doBrowse(l, getMaxBrowsePageSize());
         return l.toArray(new Message[l.size()]);
     }
     
-    public void doBrowse(boolean forcePageIn, List<Message> l, int max) {
+    
+    public void doBrowse(List<Message> l, int max) {
         final ConnectionContext connectionContext = createConnectionContext();
         try {
-            pageInMessages(forcePageIn);
+            pageInMessages(false);
             List<MessageReference> toExpire = new ArrayList<MessageReference>();
             synchronized(dispatchMutex) {
                 synchronized (pagedInPendingDispatch) {
@@ -770,7 +776,7 @@
                 }
                 toExpire.clear();
                 synchronized (pagedInMessages) {
-                    addAll(pagedInMessages.values(), l, max, toExpire);   
+                    addAll(pagedInMessages.values(), l, max, toExpire);
                 }
                 for (MessageReference ref : toExpire) {
                     if (broker.isExpired(ref)) {
@@ -787,13 +793,16 @@
                         try {
                             messages.reset();
                             while (messages.hasNext() && l.size() < max) {
-                                MessageReference node = messages.next();
-                                messages.rollback(node.getMessageId());
-                                if (node != null) {
+                                MessageReference node = messages.next();        
+                                if (node.isExpired()) {
                                     if (broker.isExpired(node)) {
                                         messageExpired(connectionContext,
                                                 createMessageReference(node.getMessage()));
-                                    } else if (l.contains(node.getMessage()) == false) {
+                                    }
+                                    messages.remove();
+                                } else {
+                                    messages.rollback(node.getMessageId());
+                                    if (l.contains(node.getMessage()) == false) {
                                         l.add(node.getMessage());
                                     }
                                 }
@@ -806,7 +815,7 @@
             } 
         } catch (Exception e) {
             LOG.error("Problem retrieving message for browse", e);
-        }
+        }     
     }
 
     private void addAll(Collection<QueueMessageReference> refs,
@@ -1278,7 +1287,7 @@
     }
     
     public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
-        if (LOG.isDebugEnabled()) {      
+        if (LOG.isDebugEnabled()) {
             LOG.debug("message expired: " + reference);
         }
         broker.messageExpired(context, reference);
@@ -1371,12 +1380,14 @@
                             node.incrementReferenceCount();
                             messages.remove();
                             QueueMessageReference ref = createMessageReference(node.getMessage());
-                            if (!broker.isExpired(node)) {
+                            if (ref.isExpired()) {
+                                if (broker.isExpired(ref)) {
+                                    messageExpired(createConnectionContext(), ref);
+                                }
+                            } else {
                                 result.add(ref);
                                 count++;
-                            } else {
-                                messageExpired(createConnectionContext(), ref);
-                            }
+                            }   
                         }
                     } finally {
                         messages.release();

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java?rev=834543&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java Tue Nov 10 16:57:28 2009
@@ -0,0 +1,79 @@
+package org.apache.activemq.util;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Debugging tool to track entry points through code, useful to see runtime call paths
+ * To use, add to a method as follows:<code>
+ *  public void someMethod() {
+ *      ThreadTracker.track("someMethod");
+ *      ...
+ *  }</code>
+ *  and at some stage call <code>result</code> to get a LOG
+ *  output of the callers with an associated call count
+ *      
+ */
+public class ThreadTracker {
+
+    static final Log LOG = LogFactory.getLog(ThreadTracker.class);  
+    static HashMap<String, Tracker> trackers = new HashMap<String, Tracker>();
+    
+    /**
+     * track the stack trace of callers
+     * @param name the method being tracked
+     */
+    public static void track(String name) {
+        Tracker t;
+        synchronized(trackers) {
+            t = trackers.get(name);
+            if (t == null) {
+                t = new Tracker();
+                trackers.put(name, t);
+            }
+        }
+        t.track();
+    }
+    
+    /**
+     * output the result of stack trace capture to the log
+     */
+    public static void result() {
+        for (Entry<String, Tracker> t: trackers.entrySet()) {
+            LOG.info("Tracker: " + t.getKey() + ", " + t.getValue().size() + " entry points...");
+            for (Trace trace : t.getValue().values()) {
+                LOG.info("count: " + trace.count, trace);
+            }
+            LOG.info("Tracker: " + t.getKey() + ", done.");
+        }
+    }
+
+}
+
+@SuppressWarnings("serial")
+class Trace extends Throwable {
+    public int count;
+    public final int size;
+    Trace() {
+        super();
+        size = this.getStackTrace().length;
+    }
+}
+
+@SuppressWarnings("serial")
+class Tracker extends HashMap<Integer, Trace> {
+    public void track() {
+        Trace current = new Trace();
+        synchronized(this) {
+            Trace exist = get(current.size);
+            if (exist != null) {
+                exist.count++;
+            } else {
+                put(current.size, current);
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/ThreadTracker.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java?rev=834543&r1=834542&r2=834543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveSingleUrlTest.java Tue Nov 10 16:57:28 2009
@@ -16,11 +16,14 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.store.amq.AMQPersistenceAdapter;
 
 
 public class QueueMasterSlaveSingleUrlTest extends QueueMasterSlaveTest {
@@ -33,17 +36,25 @@
 
     protected void createMaster() throws Exception {
         master = new BrokerService();
-        master.setBrokerName("shared");
+        master.setBrokerName("shared-master");
+        configureSharedPersistenceAdapter(master);
         master.addConnector(brokerUrl);
         master.start();
     }
     
+    private void configureSharedPersistenceAdapter(BrokerService broker) throws Exception {
+       AMQPersistenceAdapter adapter = new AMQPersistenceAdapter();
+       adapter.setDirectory(new File("shared"));
+       broker.setPersistenceAdapter(adapter); 
+    }
+
     protected void createSlave() throws Exception {      
         new Thread(new Runnable() {
             public void run() {
                 try {
                     BrokerService broker = new BrokerService();
-                    broker.setBrokerName("shared");
+                    broker.setBrokerName("shared-slave");
+                    configureSharedPersistenceAdapter(broker);
                     // add transport as a service so that it is bound on start, after store started                
                     final TransportConnector tConnector = new TransportConnector();
                     tConnector.setUri(new URI(brokerUrl));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java?rev=834543&r1=834542&r2=834543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java Tue Nov 10 16:57:28 2009
@@ -98,6 +98,23 @@
                 proxy.getQueueSize());
     }
 
+    public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {       
+        applyBrokerSpoolingPolicy();
+        final int exprityPeriod = 1000;
+        applyExpiryDuration(exprityPeriod);
+        createProducerAndSendMessages(90000);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        LOG.info("waiting for expiry to kick in a bunch of times to verify it does not blow mem");
+        Thread.sleep(10000);
+        assertEquals("Queue size is has not changed " + proxy.getQueueSize(), 90000,
+                proxy.getQueueSize());
+    }
+    
+
+    private void applyExpiryDuration(int i) {
+        broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
+    }
+
     private void applyBrokerSpoolingPolicy() {
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=834543&r1=834542&r2=834543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java Tue Nov 10 16:57:28 2009
@@ -98,7 +98,8 @@
         context.checking(new Expectations(){{
             allowing (managementContext).getJmxDomainName(); will (returnValue("Test"));
             allowing (managementContext).start();
-            allowing (managementContext).stop();            
+            allowing (managementContext).stop();
+            allowing (managementContext).isConnectorStarted();
             
             // expected MBeans
             allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=834543&r1=834542&r2=834543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Tue Nov 10 16:57:28 2009
@@ -152,6 +152,8 @@
         
         assertTrue("all messages expired - queue size gone to zero " + view.getQueueSize(), Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
+                LOG.info("Stats: received: "  + received.get() + ", size= " + view.getQueueSize() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+                        + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
                 return view.getQueueSize() == 0;
             }
         }));
@@ -282,7 +284,7 @@
         broker.waitUntilStarted();
         return broker;
     }
-
+	
     protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {
         String domain = "org.apache.activemq";
         ObjectName name;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=834543&r1=834542&r2=834543&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java Tue Nov 10 16:57:28 2009
@@ -16,6 +16,19 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import junit.framework.Test;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -27,16 +40,6 @@
 import org.apache.activemq.util.Wait;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.management.ObjectName;
-import junit.framework.Test;
 
 
 public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
@@ -140,12 +143,16 @@
         final DestinationViewMBean view = createView(destination);
         Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
+                LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+                        + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+                        + ", size= " + view.getQueueSize());
                 return sendCount == view.getExpiredCount();
             }
         });
         LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
                 + ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
                 + ", size= " + view.getQueueSize());
+        
         assertEquals("All sent have expired", sendCount, view.getExpiredCount());
 	}