You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/01/28 18:22:27 UTC
svn commit: r904171 - in /activemq/branches/activemq-5.3/activemq-core/src:
main/java/org/apache/activemq/broker/region/Queue.java
main/java/org/apache/activemq/usage/SystemUsage.java
test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Author: dejanb
Date: Thu Jan 28 17:22:27 2010
New Revision: 904171
URL: http://svn.apache.org/viewvc?rev=904171&view=rev
Log:
merging 904160 - https://issues.apache.org/activemq/browse/AMQ-2507 - producer flow control timeout
Added:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
- copied unchanged from r904160, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=904171&r1=904170&r2=904171&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jan 28 17:22:27 2010
@@ -27,15 +27,19 @@
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@@ -96,7 +100,7 @@
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object sendLock = new Object();
private ExecutorService executor;
- protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+ protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
@@ -118,8 +122,72 @@
expireMessages();
}
};
+
private final Object iteratingMutex = new Object() {};
private static final Scheduler scheduler = Scheduler.getInstance();
+
+ class TimeoutMessage implements Delayed {
+
+ Message message;
+ ConnectionContext context;
+ long trigger;
+
+ public TimeoutMessage(Message message, ConnectionContext context, long delay) {
+ this.message = message;
+ this.context = context;
+ this.trigger = System.currentTimeMillis() + delay;
+ }
+
+ public long getDelay(TimeUnit unit) {
+ long n = trigger - System.currentTimeMillis();
+ return unit.convert(n, TimeUnit.MILLISECONDS);
+ }
+
+ public int compareTo(Delayed delayed) {
+ long other = ((TimeoutMessage)delayed).trigger;
+ int returnValue;
+ if (this.trigger < other) {
+ returnValue = -1;
+ } else if (this.trigger > other) {
+ returnValue = 1;
+ } else {
+ returnValue = 0;
+ }
+ return returnValue;
+ }
+
+ }
+
+ DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
+
+ class FlowControlTimeoutTask extends Thread {
+
+ public void run() {
+ TimeoutMessage timeout;
+ try {
+ while (true) {
+ timeout = flowControlTimeoutMessages.take();
+ if (timeout != null) {
+ synchronized (messagesWaitingForSpace) {
+ if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
+ ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
+ response.setCorrelationId(timeout.message.getCommandId());
+ timeout.context.getConnection().dispatchAsync(response);
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Producer Flow Control Timeout Task is stopping");
+ }
+ }
+ }
+ };
+
+ private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
+
private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
@@ -401,7 +469,7 @@
}
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
@@ -412,7 +480,7 @@
// for space.
final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
synchronized (messagesWaitingForSpace) {
- messagesWaitingForSpace.add(new Runnable() {
+ messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
public void run() {
try {
@@ -446,6 +514,10 @@
}
}
});
+
+ if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+ flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
+ }
registerCallbackForNotFullNotification();
context.setDontSendReponse(true);
@@ -497,7 +569,7 @@
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException(logMessage);
+ throw new ResourceAllocationException(logMessage);
}
waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
@@ -616,6 +688,15 @@
if (getExpireMessagesPeriod() > 0) {
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
+
+ flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
+
+ // Start flow control timeout task
+ // Prevent trying to start it multiple times
+ if (!flowControlTimeoutTask.isAlive()) {
+ flowControlTimeoutTask.start();
+ }
+
doPageIn(false);
}
@@ -628,6 +709,10 @@
}
scheduler.cancel(expireMessagesTask);
+
+ if (flowControlTimeoutTask.isAlive()) {
+ flowControlTimeoutTask.interrupt();
+ }
if (messages != null) {
messages.stop();
@@ -1074,9 +1159,11 @@
// do early to allow dispatch of these waiting messages
synchronized (messagesWaitingForSpace) {
- while (!messagesWaitingForSpace.isEmpty()) {
+ Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
+ while (it.hasNext()) {
if (!memoryUsage.isFull()) {
- Runnable op = messagesWaitingForSpace.removeFirst();
+ Runnable op = it.next();
+ it.remove();
op.run();
} else {
registerCallbackForNotFullNotification();
@@ -1286,7 +1373,7 @@
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException(logMessage);
+ throw new ResourceAllocationException(logMessage);
}
waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
@@ -1619,18 +1706,24 @@
}
}
- private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
- long start = System.currentTimeMillis();
- long nextWarn = start + blockedProducerWarningInterval;
- while (!usage.waitForSpace(1000)) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
+ private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
+ if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+ if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) {
+ throw new ResourceAllocationException(warning);
}
-
- long now = System.currentTimeMillis();
- if (now >= nextWarn) {
- LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
- nextWarn = now + blockedProducerWarningInterval;
+ } else {
+ long start = System.currentTimeMillis();
+ long nextWarn = start + blockedProducerWarningInterval;
+ while (!usage.waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+
+ long now = System.currentTimeMillis();
+ if (now >= nextWarn) {
+ LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+ nextWarn = now + blockedProducerWarningInterval;
+ }
}
}
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=904171&r1=904170&r2=904171&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu Jan 28 17:22:27 2010
@@ -43,6 +43,10 @@
*/
private boolean sendFailIfNoSpaceExplicitySet;
private boolean sendFailIfNoSpace;
+
+ private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet;
+ private long sendFailIfNoSpaceAfterTimeout = 0;
+
private List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
public SystemUsage() {
@@ -154,6 +158,19 @@
this.sendFailIfNoSpaceExplicitySet = sendFailIfNoSpaceExplicitySet;
}
+ public long getSendFailIfNoSpaceAfterTimeout() {
+ if (sendFailIfNoSpaceAfterTimeoutExplicitySet || parent == null) {
+ return sendFailIfNoSpaceAfterTimeout;
+ } else {
+ return parent.getSendFailIfNoSpaceAfterTimeout();
+ }
+ }
+
+ public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) {
+ this.sendFailIfNoSpaceAfterTimeoutExplicitySet = true;
+ this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout;
+ }
+
public void setName(String name) {
this.name = name;
this.memoryUsage.setName(name + ":memory");