You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/01/28 18:07:17 UTC

svn commit: r904160 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java main/java/org/apache/activemq/usage/SystemUsage.java test/java/org/apache/activemq/bugs/JmsTimeoutTest.java

Author: dejanb
Date: Thu Jan 28 17:07:16 2010
New Revision: 904160

URL: http://svn.apache.org/viewvc?rev=904160&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2507 - producer flow control timeout

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=904160&r1=904159&r2=904160&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jan 28 17:07:16 2010
@@ -27,15 +27,19 @@
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+import javax.jms.ResourceAllocationException;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -96,7 +100,7 @@
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     private final Object sendLock = new Object();
     private ExecutorService executor;
-    protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+    protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
     private final Object dispatchMutex = new Object();
     private boolean useConsumerPriority = true;
     private boolean strictOrderDispatch = false;
@@ -118,8 +122,72 @@
             expireMessages();
         }
     };
+    
     private final Object iteratingMutex = new Object() {};
     private static final Scheduler scheduler = Scheduler.getInstance();
+    
+    class TimeoutMessage implements Delayed {
+
+        Message message;
+        ConnectionContext context;
+        long trigger;
+        
+        public TimeoutMessage(Message message, ConnectionContext context, long delay) {
+            this.message = message;
+            this.context = context;
+            this.trigger = System.currentTimeMillis() + delay;
+        }
+        
+        public long getDelay(TimeUnit unit) {
+            long n = trigger - System.currentTimeMillis();
+            return unit.convert(n, TimeUnit.MILLISECONDS);
+        }
+
+        public int compareTo(Delayed delayed) {
+            long other = ((TimeoutMessage)delayed).trigger;
+            int returnValue;
+            if (this.trigger < other) {
+              returnValue = -1;
+            } else if (this.trigger > other) {
+              returnValue = 1;
+            } else {
+              returnValue = 0;
+            }
+            return returnValue;
+        }
+        
+    }
+    
+    DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
+    
+    class FlowControlTimeoutTask extends Thread {
+        
+        public void run() {
+            TimeoutMessage timeout;
+            try {
+                while (true) {
+                    timeout = flowControlTimeoutMessages.take();
+                    if (timeout != null) {
+                        synchronized (messagesWaitingForSpace) {
+                            if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
+                                ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
+                                        + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
+                                response.setCorrelationId(timeout.message.getCommandId());
+                                timeout.context.getConnection().dispatchAsync(response);
+                            }
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Producer Flow Control Timeout Task is stopping");
+                }
+            }
+        }
+    };
+    
+    private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
+    
 
     private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
 
@@ -401,7 +469,7 @@
                 }
 
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+                    throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
                             + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
 
@@ -412,7 +480,7 @@
                     // for space.
                     final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
                     synchronized (messagesWaitingForSpace) {
-                        messagesWaitingForSpace.add(new Runnable() {
+                        messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
                             public void run() {
 
                                 try {
@@ -446,6 +514,10 @@
                                 }
                             }
                         });
+                        
+                        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+                            flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
+                        }
 
                         registerCallbackForNotFullNotification();
                         context.setDontSendReponse(true);
@@ -497,7 +569,7 @@
                             + " See http://activemq.apache.org/producer-flow-control.html for more info";
 
                     if (systemUsage.isSendFailIfNoSpace()) {
-                        throw new javax.jms.ResourceAllocationException(logMessage);
+                        throw new ResourceAllocationException(logMessage);
                     }
 
                     waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
@@ -619,6 +691,15 @@
         if (getExpireMessagesPeriod() > 0) {
             scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
         }
+        
+        flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
+        
+        // Start flow control timeout task
+        // Prevent trying to start it multiple times
+        if (!flowControlTimeoutTask.isAlive()) {
+            flowControlTimeoutTask.start();
+        }
+        
         doPageIn(false);
     }
 
@@ -631,6 +712,10 @@
         }
 
         scheduler.cancel(expireMessagesTask);
+        
+        if (flowControlTimeoutTask.isAlive()) {
+            flowControlTimeoutTask.interrupt();
+        }
 
         if (messages != null) {
             messages.stop();
@@ -1077,9 +1162,11 @@
 
             // do early to allow dispatch of these waiting messages
             synchronized (messagesWaitingForSpace) {
-                while (!messagesWaitingForSpace.isEmpty()) {
+                Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
+                while (it.hasNext()) {
                     if (!memoryUsage.isFull()) {
-                        Runnable op = messagesWaitingForSpace.removeFirst();
+                        Runnable op = it.next();
+                        it.remove();
                         op.run();
                     } else {
                         registerCallbackForNotFullNotification();
@@ -1289,7 +1376,7 @@
                 final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                         + " See http://activemq.apache.org/producer-flow-control.html for more info";
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException(logMessage);
+                    throw new ResourceAllocationException(logMessage);
                 }
 
                 waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
@@ -1622,18 +1709,24 @@
         }
     }
 
-    private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
-        long start = System.currentTimeMillis();
-        long nextWarn = start + blockedProducerWarningInterval;
-        while (!usage.waitForSpace(1000)) {
-            if (context.getStopping().get()) {
-                throw new IOException("Connection closed, send aborted.");
+    private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
+        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) {
+                throw new ResourceAllocationException(warning);
             }
-
-            long now = System.currentTimeMillis();
-            if (now >= nextWarn) {
-                LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
-                nextWarn = now + blockedProducerWarningInterval;
+        } else {
+            long start = System.currentTimeMillis();
+            long nextWarn = start + blockedProducerWarningInterval;
+            while (!usage.waitForSpace(1000)) {
+                if (context.getStopping().get()) {
+                    throw new IOException("Connection closed, send aborted.");
+                }
+    
+                long now = System.currentTimeMillis();
+                if (now >= nextWarn) {
+                    LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+                    nextWarn = now + blockedProducerWarningInterval;
+                }
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=904160&r1=904159&r2=904160&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu Jan 28 17:07:16 2010
@@ -43,6 +43,9 @@
      */
     private boolean sendFailIfNoSpaceExplicitySet;
     private boolean sendFailIfNoSpace;
+    private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet;
+    private long sendFailIfNoSpaceAfterTimeout = 0;
+    
     private final List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
 
     public SystemUsage() {
@@ -155,6 +158,19 @@
         this.sendFailIfNoSpaceExplicitySet = sendFailIfNoSpaceExplicitySet;
     }
 
+    public long getSendFailIfNoSpaceAfterTimeout() {
+        if (sendFailIfNoSpaceAfterTimeoutExplicitySet || parent == null) {
+            return sendFailIfNoSpaceAfterTimeout;
+        } else {
+            return parent.getSendFailIfNoSpaceAfterTimeout();
+        }
+    }
+
+    public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) {
+        this.sendFailIfNoSpaceAfterTimeoutExplicitySet = true;
+        this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout;
+    }
+
     public void setName(String name) {
         this.name = name;
         this.memoryUsage.setName(name + ":memory");

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java?rev=904160&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java Thu Jan 28 17:07:16 2010
@@ -0,0 +1,91 @@
+package org.apache.activemq.bugs;
+
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.log4j.Logger;
+
+
+public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
+
+		private final static Logger logger = Logger.getLogger( JmsTimeoutTest.class );
+	
+		private int messageSize=1024*64;
+		private int messageCount=10000;
+		private final AtomicInteger exceptionCount = new AtomicInteger(0);
+		
+	    /**
+	     * Test the case where the broker is blocked due to a memory limit 
+	     * and a producer timeout is set on the connection.
+	     * @throws Exception
+	     */
+	    public void testBlockedProducerConnectionTimeout() throws Exception {
+	        final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+	        final ActiveMQDestination queue = createDestination("testqueue");
+	        
+	        // we should not take longer than 5 seconds to return from send
+	        cx.setSendTimeout(10000);
+	        	
+	        Runnable r = new Runnable() {
+	            public void run() {
+	                try {
+	                	logger.info("Sender thread starting");
+	                    Session session = cx.createSession(false, 1);
+	                    MessageProducer producer = session.createProducer(queue);
+	                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+	                    
+	                    TextMessage message = session.createTextMessage(createMessageText());
+	                    for(int count=0; count<messageCount; count++){
+	                    	producer.send(message);
+	                    	// Currently after the timeout producer just
+	                    	// returns but there is no way to know that
+	                    	// the send timed out
+	                    }	  
+	                    logger.info("Done sending..");
+	                } catch (JMSException e) {
+	                    e.printStackTrace();
+	                	exceptionCount.incrementAndGet();
+	                	return;
+	                }
+	            }
+	        };
+	        cx.start();
+	        Thread producerThread = new Thread(r);
+	        producerThread.start();
+	        producerThread.join(30000);
+	        cx.close();
+	        // We should have a few timeout exceptions as memory store will fill up
+	        assertTrue(exceptionCount.get() > 0);
+	    }
+
+	    protected void setUp() throws Exception {
+	        bindAddress = "tcp://localhost:61616";
+	        broker = createBroker();
+	        broker.setDeleteAllMessagesOnStartup(true);
+	        broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
+	        broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+	        super.setUp();
+	    }
+
+        private String createMessageText() {
+	        StringBuffer buffer = new StringBuffer();
+	        buffer.append("<filler>");
+	        for (int i = buffer.length(); i < messageSize; i++) {
+	            buffer.append('X');
+	        }
+	        buffer.append("</filler>");
+	        return buffer.toString();
+	    }
+	    
+	}
\ No newline at end of file