You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/17 10:24:26 UTC
svn commit: r955504 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
TopicSubscription.java cursors/AbstractPendingMessageCursor.java
cursors/FilePendingMessageCursor.java cursors/PendingMessageCursor.java
Author: rajdavies
Date: Thu Jun 17 08:24:26 2010
New Revision: 955504
URL: http://svn.apache.org/viewvc?rev=955504&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2475
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=955504&r1=955503&r2=955504&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Thu Jun 17 08:24:26 2010
@@ -19,9 +19,7 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.jms.JMSException;
-
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -67,6 +65,7 @@ public class TopicSubscription extends A
protected int maxAuditDepth = 1000;
protected boolean enableAudit = false;
protected ActiveMQMessageAudit audit;
+ protected boolean active = false;
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
@@ -86,6 +85,7 @@ public class TopicSubscription extends A
if (enableAudit) {
audit= new ActiveMQMessageAudit(maxAuditDepth, maxProducersToAudit);
}
+ this.active=true;
}
public void add(MessageReference node) throws Exception {
@@ -108,21 +108,33 @@ public class TopicSubscription extends A
}
if (maximumPendingMessages != 0) {
boolean warnedAboutWait = false;
- synchronized(matchedListMutex){
- while (matched.isFull()){
- if (getContext().getStopping().get()) {
- LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: " + node.getMessageId());
- enqueueCounter.decrementAndGet();
- return;
- }
- if (!warnedAboutWait) {
- LOG.info(toString() + ": Pending message cursor ["+ matched + "] is full, temp usage (" + + matched.getSystemUsage().getTempUsage().getPercentUsage() + "%) or memory usage (" + matched.getSystemUsage().getMemoryUsage().getPercentUsage() + "%) limit reached, blocking message add() pending the release of resources.");
- warnedAboutWait = true;
- }
- matchedListMutex.wait(20);
- }
- matched.addMessageLast(node);
- }
+ while (active) {
+ synchronized (matchedListMutex) {
+ while (matched.isFull()) {
+ if (getContext().getStopping().get()) {
+ LOG.warn(toString() + ": stopped waiting for space in pendingMessage cursor for: "
+ + node.getMessageId());
+ enqueueCounter.decrementAndGet();
+ return;
+ }
+ if (!warnedAboutWait) {
+ LOG.info(toString() + ": Pending message cursor [" + matched
+ + "] is full, temp usage ("
+ + +matched.getSystemUsage().getTempUsage().getPercentUsage()
+ + "%) or memory usage ("
+ + matched.getSystemUsage().getMemoryUsage().getPercentUsage()
+ + "%) limit reached, blocking message add() pending the release of resources.");
+ warnedAboutWait = true;
+ }
+ matchedListMutex.wait(20);
+ }
+ //Temporary storage could be full - so just try to add the message
+ //see https://issues.apache.org/activemq/browse/AMQ-2475
+ if (matched.tryAddMessageLast(node, 10)) {
+ break;
+ }
+ }
+ }
synchronized (matchedListMutex) {
// NOTE - be careful about the slaveBroker!
@@ -239,6 +251,7 @@ public class TopicSubscription extends A
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
+ @Override
public void afterCommit() throws Exception {
synchronized (TopicSubscription.this) {
if (singleDestination && destination != null) {
@@ -456,7 +469,7 @@ public class TopicSubscription extends A
matched.reset();
while (matched.hasNext() && !isFull()) {
- MessageReference message = (MessageReference) matched.next();
+ MessageReference message = matched.next();
message.decrementReferenceCount();
matched.remove();
// Message may have been sitting in the matched list a
@@ -530,12 +543,14 @@ public class TopicSubscription extends A
broker.getRoot().sendToDeadLetterQueue(getContext(), message);
}
+ @Override
public String toString() {
return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
+ getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
}
public void destroy() {
+ this.active=false;
synchronized (matchedListMutex) {
try {
matched.destroy();
@@ -546,8 +561,9 @@ public class TopicSubscription extends A
setSlowConsumer(false);
}
+ @Override
public int getPrefetchSize() {
- return (int)info.getPrefetchSize();
+ return info.getPrefetchSize();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=955504&r1=955503&r2=955504&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Thu Jun 17 08:24:26 2010
@@ -33,7 +33,7 @@ import org.apache.activemq.usage.SystemU
*
* @version $Revision$
*/
-public class AbstractPendingMessageCursor implements PendingMessageCursor {
+public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int memoryUsageHighWaterMark = 70;
protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
protected SystemUsage systemUsage;
@@ -76,6 +76,11 @@ public class AbstractPendingMessageCurso
public void addMessageLast(MessageReference node) throws Exception {
}
+
+ public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
+ addMessageLast(node);
+ return true;
+ }
public void addRecoveredMessage(MessageReference node) throws Exception {
addMessageLast(node);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=955504&r1=955503&r2=955504&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Jun 17 08:24:26 2010
@@ -183,9 +183,14 @@ public class FilePendingMessageCursor ex
* add message to await dispatch
*
* @param node
+ * @throws Exception
*/
@Override
- public synchronized void addMessageLast(MessageReference node) {
+ public synchronized void addMessageLast(MessageReference node) throws Exception {
+ tryAddMessageLast(node, 0);
+ }
+
+ public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
if (!node.isExpired()) {
try {
regionDestination = node.getMessage().getRegionDestination();
@@ -193,7 +198,7 @@ public class FilePendingMessageCursor ex
if (hasSpace() || this.store == null) {
memoryList.add(node);
node.incrementReferenceCount();
- return;
+ return true;
}
}
if (!hasSpace()) {
@@ -202,15 +207,18 @@ public class FilePendingMessageCursor ex
if (hasSpace()) {
memoryList.add(node);
node.incrementReferenceCount();
- return;
+ return true;
} else {
flushToDisk();
}
}
}
- systemUsage.getTempUsage().waitForSpace();
- ByteSequence bs = getByteSequence(node.getMessage());
- getDiskList().addLast(node.getMessageId().toString(), bs);
+ if (systemUsage.getTempUsage().waitForSpace(maxWaitTime)) {
+ ByteSequence bs = getByteSequence(node.getMessage());
+ getDiskList().addLast(node.getMessageId().toString(), bs);
+ return true;
+ }
+ return false;
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node + " first to FilePendingMessageCursor ", e);
@@ -219,6 +227,7 @@ public class FilePendingMessageCursor ex
} else {
discard(node);
}
+ return false;
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=955504&r1=955503&r2=955504&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Thu Jun 17 08:24:26 2010
@@ -85,6 +85,16 @@ public interface PendingMessageCursor ex
* @throws Exception
*/
void addMessageLast(MessageReference node) throws Exception;
+ /**
+ * add message to await dispatch - if it can
+ *
+ * @param node
+ * @param maxWaitTime
+ * @return true if successful
+ * @throws IOException
+ * @throws Exception
+ */
+ boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception;
/**
* add message to await dispatch