You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/03/18 20:49:39 UTC

svn commit: r755715 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/bugs/ test/resou...

Author: gtully
Date: Wed Mar 18 19:49:39 2009
New Revision: 755715

URL: http://svn.apache.org/viewvc?rev=755715&view=rev
Log:
partial fix for AMQ2149|http://issues.apache.org/activemq/browse/AMQ-2149 , contention when usage limit reached can lead to out or order mesage dispatch

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Mar 18 19:49:39 2009
@@ -70,6 +70,8 @@
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -81,7 +83,7 @@
  * 
  * @version $Revision: 1.28 $
  */
-public class Queue extends BaseDestination implements Task {
+public class Queue extends BaseDestination implements Task, UsageListener {
     protected static final Log LOG = LogFactory.getLog(Queue.class);
     protected TaskRunnerFactory taskFactory;
     protected TaskRunner taskRunner;    
@@ -99,7 +101,7 @@
     private final ReentrantLock dispatchLock = new ReentrantLock();
     private boolean useConsumerPriority=true;
     private boolean strictOrderDispatch=false;
-    private QueueDispatchSelector  dispatchSelector;
+    private QueueDispatchSelector dispatchSelector;
     private boolean optimizedDispatch=false;
     private boolean firstConsumer = false;
     private int timeBeforeDispatchStarts = 0;
@@ -133,6 +135,16 @@
         }
     }
 
+    // make the queue easily visible in the debugger from its task runner threads
+    final class QueueThread extends Thread {
+        final Queue queue;
+        public QueueThread(Runnable runnable, String name,
+                Queue queue) {
+            super(runnable, name);
+            this.queue = queue;
+        }
+    }
+    
     public void initialize() throws Exception {
         if (this.messages == null) {
             if (destination.isTemporary() || broker == null || store == null) {
@@ -153,9 +165,10 @@
         if (isOptimizedDispatch()) {
             this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());
         }else {
+            final Queue queue = this;
             this.executor =  Executors.newSingleThreadExecutor(new ThreadFactory() {
                 public Thread newThread(Runnable runnable) {
-                    Thread thread = new Thread(runnable, "QueueThread:"+destination);
+                    Thread thread = new QueueThread(runnable, "QueueThread:"+destination, queue);
                     thread.setDaemon(true);
                     thread.setPriority(Thread.NORM_PRIORITY);
                     return thread;
@@ -565,6 +578,7 @@
         if (memoryUsage != null) {
             memoryUsage.start();
         }
+        systemUsage.getMemoryUsage().addUsageListener(this);
         messages.start();
         doPageIn(false);
     }
@@ -579,6 +593,8 @@
         if (messages != null) {
             messages.stop();
         }
+        
+        systemUsage.getMemoryUsage().removeUsageListener(this);
         if (memoryUsage != null) {
             memoryUsage.stop();
         }
@@ -1000,6 +1016,15 @@
     public boolean iterate() {
         boolean pageInMoreMessages = false;   
         synchronized(iteratingMutex) {
+            
+            // do early to allow dispatch of these waiting messages
+            synchronized(messagesWaitingForSpace) {
+                while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
+                    Runnable op = messagesWaitingForSpace.removeFirst();
+                    op.run();
+                }
+            }
+            
             BrowserDispatch rd;
 	        while ((rd = getNextBrowserDispatch()) != null) {
 	            pageInMoreMessages = true;
@@ -1078,13 +1103,7 @@
 	                LOG.error("Failed to page in more queue messages ", e);
                 }
 	        }
-	        synchronized(messagesWaitingForSpace) {
-	               while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
-	                   Runnable op = messagesWaitingForSpace.removeFirst();
-	                   op.run();
-	               }
-	        }
-	        return false;
+	        return !messagesWaitingForSpace.isEmpty();
         }
     }
 
@@ -1520,4 +1539,18 @@
         }
         return sub;
     }
+
+    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
+        if (oldPercentUsage > newPercentUsage) {
+            synchronized(messagesWaitingForSpace) {
+                if (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
+                    try {
+                        this.taskRunner.wakeup();
+                    } catch (InterruptedException e) {
+                        LOG.warn(getName() + " failed to wakeup task runner on usageChange: " + e);
+                    }
+                }
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Wed Mar 18 19:49:39 2009
@@ -25,8 +25,6 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.usage.Usage;
-import org.apache.activemq.usage.UsageListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -34,7 +32,7 @@
  *  Store based cursor
  *
  */
-public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
+public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
     private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
     protected final Destination regionDestination;
     private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
@@ -60,11 +58,9 @@
                 cacheEnabled=true;
             }
         } 
-        getSystemUsage().getMemoryUsage().addUsageListener(this);
     }
     
     public final synchronized void stop() throws Exception {
-        getSystemUsage().getMemoryUsage().removeUsageListener(this);
         resetBatch();
         super.stop();
         gc();
@@ -160,7 +156,9 @@
             if (cacheEnabled) {
                 cacheEnabled=false;
                 // sync with store on disabling the cache
-                setBatch(lastCachedId);
+                if (lastCachedId != null) {
+                    setBatch(lastCachedId);
+                }
             }
         }
         size++;
@@ -190,20 +188,6 @@
         batchList.remove(node.getMessageId());
     }
     
-           
-    public final synchronized void onUsageChanged(Usage usage, int oldPercentUsage,
-            int newPercentUsage) {
-        if (oldPercentUsage > newPercentUsage && oldPercentUsage >= memoryUsageHighWaterMark) {
-            storeHasMessages = true;
-            try {
-                fillBatch();
-            } catch (Exception e) {
-                LOG.error("Failed to fill batch ", e);
-            }
-        }
-        
-    }
-    
     public final synchronized void clear() {
         gc();
     }
@@ -229,7 +213,6 @@
             resetBatch();
             this.batchResetNeeded = false;
         }
-        
         if( this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
             this.storeHasMessages = false;
             try {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Wed Mar 18 19:49:39 2009
@@ -32,7 +32,8 @@
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.ReferenceStore;
 import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.usage.MemoryUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @author rajdavies
@@ -40,6 +41,7 @@
  */
 public class KahaReferenceStore extends AbstractMessageStore implements ReferenceStore {
 
+    private static final Log LOG = LogFactory.getLog(KahaReferenceStore.class);
     protected final MapContainer<MessageId, ReferenceRecord> messageContainer;
     protected KahaReferenceStoreAdapter adapter;
     private StoreEntry batchEntry;
@@ -120,6 +122,11 @@
                         if ( recoverReference(listener, msg)) {
                             count++;
                             lastBatchId = msg.getMessageId();
+                        } else {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(destination.getQualifiedName() + " did not recover:" + msg.getMessageId());
+                            }
+                            break;
                         }
                     } else {
                         lastBatchId = null;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=755715&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Wed Mar 18 19:49:39 2009
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.util.Vector;
+
+import junit.framework.TestCase;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AMQ2149Test extends TestCase {
+
+    private static final Log log = LogFactory.getLog(AMQ2149Test.class);
+
+    private String BROKER_URL;
+    private final String SEQ_NUM_PROPERTY = "seqNum";
+
+    final int MESSAGE_LENGTH_BYTES = 75000;
+    final int MAX_TO_SEND  = 2000;
+    final long SLEEP_BETWEEN_SEND_MS = 5;
+    final int NUM_SENDERS_AND_RECEIVERS = 10;
+    
+    BrokerService broker;
+    Vector<Throwable> exceptions = new Vector<Throwable>();
+    
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.addConnector("tcp://localhost:0");
+        broker.deleteAllMessages();
+        
+        SystemUsage usage = new SystemUsage();
+        MemoryUsage memoryUsage = new MemoryUsage();
+        memoryUsage.setLimit(2048 * 7 * NUM_SENDERS_AND_RECEIVERS);
+        usage.setMemoryUsage(memoryUsage);
+        broker.setSystemUsage(usage);
+        broker.start();
+
+        BROKER_URL = "failover:("
+            + broker.getTransportConnectors().get(0).getUri()
+            +")?maxReconnectDelay=1000&useExponentialBackOff=false";
+    }
+    
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+    
+    private String buildLongString() {
+        final StringBuilder stringBuilder = new StringBuilder(
+                MESSAGE_LENGTH_BYTES);
+        for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) {
+            stringBuilder.append((int) (Math.random() * 10));
+        }
+        return stringBuilder.toString();
+    }
+
+    private class Receiver implements MessageListener {
+
+        private final String queueName;
+
+        private final Connection connection;
+
+        private final Session session;
+
+        private final MessageConsumer messageConsumer;
+
+        private volatile long nextExpectedSeqNum = 0;
+
+        public Receiver(String queueName) throws JMSException {
+            this.queueName = queueName;
+            connection = new ActiveMQConnectionFactory(BROKER_URL)
+                    .createConnection();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            messageConsumer = session.createConsumer(new ActiveMQQueue(
+                    queueName));
+            messageConsumer.setMessageListener(this);
+            connection.start();
+        }
+
+        public void onMessage(Message message) {
+            try {
+                final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
+                if ((seqNum % 100) == 0) {
+                    log.info(queueName + " received " + seqNum);
+                }
+                if (seqNum != nextExpectedSeqNum) {
+                    log.warn(queueName + " received " + seqNum + " expected "
+                            + nextExpectedSeqNum);
+                    fail(queueName + " received " + seqNum + " expected "
+                            + nextExpectedSeqNum);
+                }
+                ++nextExpectedSeqNum;
+            } catch (Throwable e) {
+                log.error(queueName + " onMessage error", e);
+                exceptions.add(e);
+            }
+        }
+
+    }
+
+    private class Sender implements Runnable {
+
+        private final String queueName;
+
+        private final Connection connection;
+
+        private final Session session;
+
+        private final MessageProducer messageProducer;
+
+        private volatile long nextSequenceNumber = 0;
+
+        public Sender(String queueName) throws JMSException {
+            this.queueName = queueName;
+            connection = new ActiveMQConnectionFactory(BROKER_URL)
+                    .createConnection();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            messageProducer = session.createProducer(new ActiveMQQueue(
+                    queueName));
+            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+        }
+
+        public void run() {
+            final String longString = buildLongString();
+            while (nextSequenceNumber <= MAX_TO_SEND) {
+                try {
+                    final Message message = session
+                            .createTextMessage(longString);
+                    message.setLongProperty(SEQ_NUM_PROPERTY,
+                            nextSequenceNumber);
+                    ++nextSequenceNumber;
+                    messageProducer.send(message);
+                } catch (Exception e) {
+                    log.error(queueName + " send error", e);
+                    exceptions.add(e);
+                }
+                try {
+                    Thread.sleep(SLEEP_BETWEEN_SEND_MS);
+                } catch (InterruptedException e) {
+                    log.warn(queueName + " sleep interrupted", e);
+                }
+            }
+        }
+    }
+
+    public void testOutOfOrderWithMemeUsageLimit() throws Exception {
+        Vector<Thread> threads = new Vector<Thread>();
+        
+        for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
+            final String queueName = "test.queue." + i;
+            new Receiver(queueName);
+            Thread thread = new Thread(new Sender(queueName));
+            thread.start();
+            threads.add(thread);
+        }
+        
+        final long expiry = System.currentTimeMillis() + 1000 * 60 * 5;
+        while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
+            Thread sendThread = threads.firstElement();
+            sendThread.join(1000*10);
+            if (!sendThread.isAlive()) {
+                threads.remove(sendThread);
+            }
+        }
+        assertTrue("No timeout waiting for senders to complete", System.currentTimeMillis() < expiry);
+        assertTrue("No exceptions", exceptions.isEmpty());
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml Wed Mar 18 19:49:39 2009
@@ -18,7 +18,7 @@
 <beans>
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
-  <broker brokerName="master" persistent="false" useJmx="false" deleteAllMessagesOnStartup="true"  xmlns="http://activemq.apache.org/schema/core">
+  <broker brokerName="master" useJmx="false" deleteAllMessagesOnStartup="true"  xmlns="http://activemq.apache.org/schema/core">
     <transportConnectors>
       <transportConnector uri="tcp://localhost:62001"/>
     </transportConnectors>

Modified: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml?rev=755715&r1=755714&r2=755715&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml (original)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml Wed Mar 18 19:49:39 2009
@@ -18,7 +18,7 @@
 <beans>
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
-  <broker brokerName="slave" useJmx="false" masterConnectorURI="tcp://localhost:62001"  xmlns="http://activemq.apache.org/schema/core">
+  <broker brokerName="slave" deleteAllMessagesOnStartup="true" useJmx="false" masterConnectorURI="tcp://localhost:62001"  xmlns="http://activemq.apache.org/schema/core">
     <transportConnectors>
       <transportConnector uri="tcp://localhost:62002"/>
     </transportConnectors>