You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/01/28 18:07:17 UTC
svn commit: r904160 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Queue.java
main/java/org/apache/activemq/usage/SystemUsage.java
test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Author: dejanb
Date: Thu Jan 28 17:07:16 2010
New Revision: 904160
URL: http://svn.apache.org/viewvc?rev=904160&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2507 - producer flow control timeout
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=904160&r1=904159&r2=904160&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jan 28 17:07:16 2010
@@ -27,15 +27,19 @@
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
@@ -96,7 +100,7 @@
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object sendLock = new Object();
private ExecutorService executor;
- protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+ protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
private final Object dispatchMutex = new Object();
private boolean useConsumerPriority = true;
private boolean strictOrderDispatch = false;
@@ -118,8 +122,72 @@
expireMessages();
}
};
+
private final Object iteratingMutex = new Object() {};
private static final Scheduler scheduler = Scheduler.getInstance();
+
+ class TimeoutMessage implements Delayed {
+
+ Message message;
+ ConnectionContext context;
+ long trigger;
+
+ public TimeoutMessage(Message message, ConnectionContext context, long delay) {
+ this.message = message;
+ this.context = context;
+ this.trigger = System.currentTimeMillis() + delay;
+ }
+
+ public long getDelay(TimeUnit unit) {
+ long n = trigger - System.currentTimeMillis();
+ return unit.convert(n, TimeUnit.MILLISECONDS);
+ }
+
+ public int compareTo(Delayed delayed) {
+ long other = ((TimeoutMessage)delayed).trigger;
+ int returnValue;
+ if (this.trigger < other) {
+ returnValue = -1;
+ } else if (this.trigger > other) {
+ returnValue = 1;
+ } else {
+ returnValue = 0;
+ }
+ return returnValue;
+ }
+
+ }
+
+ DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
+
+ class FlowControlTimeoutTask extends Thread {
+
+ public void run() {
+ TimeoutMessage timeout;
+ try {
+ while (true) {
+ timeout = flowControlTimeoutMessages.take();
+ if (timeout != null) {
+ synchronized (messagesWaitingForSpace) {
+ if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
+ ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
+ response.setCorrelationId(timeout.message.getCommandId());
+ timeout.context.getConnection().dispatchAsync(response);
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Producer Flow Control Timeout Task is stopping");
+ }
+ }
+ }
+ };
+
+ private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
+
private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
@@ -401,7 +469,7 @@
}
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
@@ -412,7 +480,7 @@
// for space.
final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
synchronized (messagesWaitingForSpace) {
- messagesWaitingForSpace.add(new Runnable() {
+ messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
public void run() {
try {
@@ -446,6 +514,10 @@
}
}
});
+
+ if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+ flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
+ }
registerCallbackForNotFullNotification();
context.setDontSendReponse(true);
@@ -497,7 +569,7 @@
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException(logMessage);
+ throw new ResourceAllocationException(logMessage);
}
waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
@@ -619,6 +691,15 @@
if (getExpireMessagesPeriod() > 0) {
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
+
+ flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
+
+ // Start flow control timeout task
+ // Prevent trying to start it multiple times
+ if (!flowControlTimeoutTask.isAlive()) {
+ flowControlTimeoutTask.start();
+ }
+
doPageIn(false);
}
@@ -631,6 +712,10 @@
}
scheduler.cancel(expireMessagesTask);
+
+ if (flowControlTimeoutTask.isAlive()) {
+ flowControlTimeoutTask.interrupt();
+ }
if (messages != null) {
messages.stop();
@@ -1077,9 +1162,11 @@
// do early to allow dispatch of these waiting messages
synchronized (messagesWaitingForSpace) {
- while (!messagesWaitingForSpace.isEmpty()) {
+ Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
+ while (it.hasNext()) {
if (!memoryUsage.isFull()) {
- Runnable op = messagesWaitingForSpace.removeFirst();
+ Runnable op = it.next();
+ it.remove();
op.run();
} else {
registerCallbackForNotFullNotification();
@@ -1289,7 +1376,7 @@
final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException(logMessage);
+ throw new ResourceAllocationException(logMessage);
}
waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
@@ -1622,18 +1709,24 @@
}
}
- private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
- long start = System.currentTimeMillis();
- long nextWarn = start + blockedProducerWarningInterval;
- while (!usage.waitForSpace(1000)) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
+ private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
+ if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+ if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) {
+ throw new ResourceAllocationException(warning);
}
-
- long now = System.currentTimeMillis();
- if (now >= nextWarn) {
- LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
- nextWarn = now + blockedProducerWarningInterval;
+ } else {
+ long start = System.currentTimeMillis();
+ long nextWarn = start + blockedProducerWarningInterval;
+ while (!usage.waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+
+ long now = System.currentTimeMillis();
+ if (now >= nextWarn) {
+ LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+ nextWarn = now + blockedProducerWarningInterval;
+ }
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=904160&r1=904159&r2=904160&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu Jan 28 17:07:16 2010
@@ -43,6 +43,9 @@
*/
private boolean sendFailIfNoSpaceExplicitySet;
private boolean sendFailIfNoSpace;
+ private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet;
+ private long sendFailIfNoSpaceAfterTimeout = 0;
+
private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
public SystemUsage() {
@@ -155,6 +158,19 @@
this.sendFailIfNoSpaceExplicitySet = sendFailIfNoSpaceExplicitySet;
}
+ public long getSendFailIfNoSpaceAfterTimeout() {
+ if (sendFailIfNoSpaceAfterTimeoutExplicitySet || parent == null) {
+ return sendFailIfNoSpaceAfterTimeout;
+ } else {
+ return parent.getSendFailIfNoSpaceAfterTimeout();
+ }
+ }
+
+ public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) {
+ this.sendFailIfNoSpaceAfterTimeoutExplicitySet = true;
+ this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout;
+ }
+
public void setName(String name) {
this.name = name;
this.memoryUsage.setName(name + ":memory");
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java?rev=904160&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java Thu Jan 28 17:07:16 2010
@@ -0,0 +1,91 @@
+package org.apache.activemq.bugs;
+
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.log4j.Logger;
+
+
+public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
+
+ private final static Logger logger = Logger.getLogger( JmsTimeoutTest.class );
+
+ private int messageSize=1024*64;
+ private int messageCount=10000;
+ private final AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ /**
+ * Test the case where the broker is blocked due to a memory limit
+ * and a producer timeout is set on the connection.
+ * @throws Exception
+ */
+ public void testBlockedProducerConnectionTimeout() throws Exception {
+ final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+ final ActiveMQDestination queue = createDestination("testqueue");
+
+ // we should not take longer than 5 seconds to return from send
+ cx.setSendTimeout(10000);
+
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ logger.info("Sender thread starting");
+ Session session = cx.createSession(false, 1);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage message = session.createTextMessage(createMessageText());
+ for(int count=0; count<messageCount; count++){
+ producer.send(message);
+ // Currently after the timeout producer just
+ // returns but there is no way to know that
+ // the send timed out
+ }
+ logger.info("Done sending..");
+ } catch (JMSException e) {
+ e.printStackTrace();
+ exceptionCount.incrementAndGet();
+ return;
+ }
+ }
+ };
+ cx.start();
+ Thread producerThread = new Thread(r);
+ producerThread.start();
+ producerThread.join(30000);
+ cx.close();
+ // We should have a few timeout exceptions as memory store will fill up
+ assertTrue(exceptionCount.get() > 0);
+ }
+
+ protected void setUp() throws Exception {
+ bindAddress = "tcp://localhost:61616";
+ broker = createBroker();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
+ broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+ super.setUp();
+ }
+
+ private String createMessageText() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("<filler>");
+ for (int i = buffer.length(); i < messageSize; i++) {
+ buffer.append('X');
+ }
+ buffer.append("</filler>");
+ return buffer.toString();
+ }
+
+ }
\ No newline at end of file