You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/17 10:24:26 UTC

svn commit: r955504 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: TopicSubscription.java cursors/AbstractPendingMessageCursor.java cursors/FilePendingMessageCursor.java cursors/PendingMessageCursor.java

Author: rajdavies
Date: Thu Jun 17 08:24:26 2010
New Revision: 955504

URL: http://svn.apache.org/viewvc?rev=955504&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2475

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=955504&r1=955503&r2=955504&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Thu Jun 17 08:24:26 2010
@@ -19,9 +19,7 @@ package org.apache.activemq.broker.regio
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.jms.JMSException;
-
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -67,6 +65,7 @@ public class TopicSubscription extends A
     protected int maxAuditDepth = 1000;
     protected boolean enableAudit = false;
     protected ActiveMQMessageAudit audit;
+    protected boolean active = false;
 
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
         super(broker, context, info);
@@ -86,6 +85,7 @@ public class TopicSubscription extends A
         if (enableAudit) {
             audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
         }
+        this.active=true;
     }
 
     public void add(MessageReference node) throws Exception {
@@ -108,21 +108,33 @@ public class TopicSubscription extends A
             }
             if (maximumPendingMessages != 0) {
                 boolean warnedAboutWait = false;
-            	synchronized(matchedListMutex){
-            		while (matched.isFull()){
-            		    if (getContext().getStopping().get()) {
-            		        LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: " + node.getMessageId());
-            		        enqueueCounter.decrementAndGet();
-            		        return;
-            		    }
-            		    if (!warnedAboutWait) {
-            		        LOG.info(toString() + ": Pending message cursor ["+ matched + "] is full, temp usage (" + + matched.getSystemUsage().getTempUsage().getPercentUsage() + "%) or memory usage (" + matched.getSystemUsage().getMemoryUsage().getPercentUsage() + "%) limit reached, blocking message add() pending the release of resources.");
-            		        warnedAboutWait = true;
-            		    }
-            			matchedListMutex.wait(20);
-            		}
-            		matched.addMessageLast(node);
-            	}
+                while (active) {
+                    synchronized (matchedListMutex) {
+                        while (matched.isFull()) {
+                            if (getContext().getStopping().get()) {
+                                LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
+                                        + node.getMessageId());
+                                enqueueCounter.decrementAndGet();
+                                return;
+                            }
+                            if (!warnedAboutWait) {
+                                LOG.info(toString() + ": Pending message cursor [" + matched
+                                        + "] is full, temp usage ("
+                                        + +matched.getSystemUsage().getTempUsage().getPercentUsage()
+                                        + "%) or memory usage ("
+                                        + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
+                                        + "%) limit reached, blocking message add() pending the release of resources.");
+                                warnedAboutWait = true;
+                            }
+                            matchedListMutex.wait(20);
+                        }
+                        //Temporary storage could be full - so just try to add the message
+                        //see https://issues.apache.org/activemq/browse/AMQ-2475
+                        if (matched.tryAddMessageLast(node, 10)) {
+                            break;
+                        }
+                    }
+                }
                 synchronized (matchedListMutex) {
                     
                     // NOTE - be careful about the slaveBroker!
@@ -239,6 +251,7 @@ public class TopicSubscription extends A
             if (context.isInTransaction()) {
                 context.getTransaction().addSynchronization(new Synchronization() {
 
+                    @Override
                     public void afterCommit() throws Exception {
                        synchronized (TopicSubscription.this) {
                             if (singleDestination && destination != null) {
@@ -456,7 +469,7 @@ public class TopicSubscription extends A
                     matched.reset();
                    
                     while (matched.hasNext() && !isFull()) {
-                        MessageReference message = (MessageReference) matched.next();
+                        MessageReference message = matched.next();
                         message.decrementReferenceCount();
                         matched.remove();
                         // Message may have been sitting in the matched list a
@@ -530,12 +543,14 @@ public class TopicSubscription extends A
         broker.getRoot().sendToDeadLetterQueue(getContext(), message);
     }
 
+    @Override
     public String toString() {
         return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
     }
 
     public void destroy() {
+        this.active=false;
         synchronized (matchedListMutex) {
             try {
                 matched.destroy();
@@ -546,8 +561,9 @@ public class TopicSubscription extends A
         setSlowConsumer(false);
     }
 
+    @Override
     public int getPrefetchSize() {
-        return (int)info.getPrefetchSize();
+        return info.getPrefetchSize();
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=955504&r1=955503&r2=955504&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Thu Jun 17 08:24:26 2010
@@ -33,7 +33,7 @@ import org.apache.activemq.usage.SystemU
  * 
  * @version $Revision$
  */
-public class AbstractPendingMessageCursor implements PendingMessageCursor {
+public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
     protected int memoryUsageHighWaterMark = 70;
     protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
     protected SystemUsage systemUsage;
@@ -76,6 +76,11 @@ public class AbstractPendingMessageCurso
 
     public void addMessageLast(MessageReference node) throws Exception {
     }
+    
+    public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
+        addMessageLast(node);
+        return true;
+    }
 
     public void addRecoveredMessage(MessageReference node) throws Exception {
         addMessageLast(node);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=955504&r1=955503&r2=955504&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Jun 17 08:24:26 2010
@@ -183,9 +183,14 @@ public class FilePendingMessageCursor ex
      * add message to await dispatch
      * 
      * @param node
+     * @throws Exception 
      */
     @Override
-    public synchronized void addMessageLast(MessageReference node) {
+    public synchronized void addMessageLast(MessageReference node) throws Exception {
+        tryAddMessageLast(node, 0);
+    }
+    
+    public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
         if (!node.isExpired()) {
             try {
                 regionDestination = node.getMessage().getRegionDestination();
@@ -193,7 +198,7 @@ public class FilePendingMessageCursor ex
                     if (hasSpace() || this.store == null) {
                         memoryList.add(node);
                         node.incrementReferenceCount();
-                        return;
+                        return true;
                     }
                 }
                 if (!hasSpace()) {
@@ -202,15 +207,18 @@ public class FilePendingMessageCursor ex
                         if (hasSpace()) {
                             memoryList.add(node);
                             node.incrementReferenceCount();
-                            return;
+                            return true;
                         } else {
                             flushToDisk();
                         }
                     }
                 }
-                systemUsage.getTempUsage().waitForSpace();
-                ByteSequence bs = getByteSequence(node.getMessage());
-                getDiskList().addLast(node.getMessageId().toString(), bs);
+                if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
+                    ByteSequence bs = getByteSequence(node.getMessage());
+                    getDiskList().addLast(node.getMessageId().toString(), bs);
+                    return true;
+                }
+                return false;
 
             } catch (Exception e) {
                 LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
@@ -219,6 +227,7 @@ public class FilePendingMessageCursor ex
         } else {
             discard(node);
         }
+        return false;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=955504&r1=955503&r2=955504&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Thu Jun 17 08:24:26 2010
@@ -85,6 +85,16 @@ public interface PendingMessageCursor ex
      * @throws Exception
      */
     void addMessageLast(MessageReference node) throws Exception;
+    /**
+     * add message to await dispatch - if it can
+     * 
+     * @param node
+     * @param maxWaitTime 
+     * @return true if successful
+     * @throws IOException
+     * @throws Exception
+     */
+    boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception;
 
     /**
      * add message to await dispatch