You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/01/28 18:22:27 UTC

svn commit: r904171 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java main/java/org/apache/activemq/usage/SystemUsage.java test/java/org/apache/activemq/bugs/JmsTimeoutTest.java

Author: dejanb
Date: Thu Jan 28 17:22:27 2010
New Revision: 904171

URL: http://svn.apache.org/viewvc?rev=904171&view=rev
Log:
merging 904160 - https://issues.apache.org/activemq/browse/AMQ-2507 - producer flow control timeout

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
      - copied unchanged from r904160, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=904171&r1=904170&r2=904171&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jan 28 17:22:27 2010
@@ -27,15 +27,19 @@
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+import javax.jms.ResourceAllocationException;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -96,7 +100,7 @@
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     private final Object sendLock = new Object();
     private ExecutorService executor;
-    protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+    protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
     private final Object dispatchMutex = new Object();
     private boolean useConsumerPriority = true;
     private boolean strictOrderDispatch = false;
@@ -118,8 +122,72 @@
             expireMessages();
         }
     };
+    
     private final Object iteratingMutex = new Object() {};
     private static final Scheduler scheduler = Scheduler.getInstance();
+    
+    class TimeoutMessage implements Delayed {
+
+        Message message;
+        ConnectionContext context;
+        long trigger;
+        
+        public TimeoutMessage(Message message, ConnectionContext context, long delay) {
+            this.message = message;
+            this.context = context;
+            this.trigger = System.currentTimeMillis() + delay;
+        }
+        
+        public long getDelay(TimeUnit unit) {
+            long n = trigger - System.currentTimeMillis();
+            return unit.convert(n, TimeUnit.MILLISECONDS);
+        }
+
+        public int compareTo(Delayed delayed) {
+            long other = ((TimeoutMessage)delayed).trigger;
+            int returnValue;
+            if (this.trigger < other) {
+              returnValue = -1;
+            } else if (this.trigger > other) {
+              returnValue = 1;
+            } else {
+              returnValue = 0;
+            }
+            return returnValue;
+        }
+        
+    }
+    
+    DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
+    
+    class FlowControlTimeoutTask extends Thread {
+        
+        public void run() {
+            TimeoutMessage timeout;
+            try {
+                while (true) {
+                    timeout = flowControlTimeoutMessages.take();
+                    if (timeout != null) {
+                        synchronized (messagesWaitingForSpace) {
+                            if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
+                                ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
+                                        + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
+                                response.setCorrelationId(timeout.message.getCommandId());
+                                timeout.context.getConnection().dispatchAsync(response);
+                            }
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Producer Flow Control Timeout Task is stopping");
+                }
+            }
+        }
+    };
+    
+    private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
+    
 
     private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
 
@@ -401,7 +469,7 @@
                 }
 
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+                    throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
                             + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
 
@@ -412,7 +480,7 @@
                     // for space.
                     final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
                     synchronized (messagesWaitingForSpace) {
-                        messagesWaitingForSpace.add(new Runnable() {
+                        messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
                             public void run() {
 
                                 try {
@@ -446,6 +514,10 @@
                                 }
                             }
                         });
+                        
+                        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+                            flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
+                        }
 
                         registerCallbackForNotFullNotification();
                         context.setDontSendReponse(true);
@@ -497,7 +569,7 @@
                             + " See http://activemq.apache.org/producer-flow-control.html for more info";
 
                     if (systemUsage.isSendFailIfNoSpace()) {
-                        throw new javax.jms.ResourceAllocationException(logMessage);
+                        throw new ResourceAllocationException(logMessage);
                     }
 
                     waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
@@ -616,6 +688,15 @@
         if (getExpireMessagesPeriod() > 0) {
             scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
         }
+        
+        flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
+        
+        // Start flow control timeout task
+        // Prevent trying to start it multiple times
+        if (!flowControlTimeoutTask.isAlive()) {
+            flowControlTimeoutTask.start();
+        }
+        
         doPageIn(false);
     }
 
@@ -628,6 +709,10 @@
         }
 
         scheduler.cancel(expireMessagesTask);
+        
+        if (flowControlTimeoutTask.isAlive()) {
+            flowControlTimeoutTask.interrupt();
+        }
 
         if (messages != null) {
             messages.stop();
@@ -1074,9 +1159,11 @@
 
             // do early to allow dispatch of these waiting messages
             synchronized (messagesWaitingForSpace) {
-                while (!messagesWaitingForSpace.isEmpty()) {
+                Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
+                while (it.hasNext()) {
                     if (!memoryUsage.isFull()) {
-                        Runnable op = messagesWaitingForSpace.removeFirst();
+                        Runnable op = it.next();
+                        it.remove();
                         op.run();
                     } else {
                         registerCallbackForNotFullNotification();
@@ -1286,7 +1373,7 @@
                 final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                         + " See http://activemq.apache.org/producer-flow-control.html for more info";
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException(logMessage);
+                    throw new ResourceAllocationException(logMessage);
                 }
 
                 waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
@@ -1619,18 +1706,24 @@
         }
     }
 
-    private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
-        long start = System.currentTimeMillis();
-        long nextWarn = start + blockedProducerWarningInterval;
-        while (!usage.waitForSpace(1000)) {
-            if (context.getStopping().get()) {
-                throw new IOException("Connection closed, send aborted.");
+    private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
+        if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
+            if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) {
+                throw new ResourceAllocationException(warning);
             }
-
-            long now = System.currentTimeMillis();
-            if (now >= nextWarn) {
-                LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
-                nextWarn = now + blockedProducerWarningInterval;
+        } else {
+            long start = System.currentTimeMillis();
+            long nextWarn = start + blockedProducerWarningInterval;
+            while (!usage.waitForSpace(1000)) {
+                if (context.getStopping().get()) {
+                    throw new IOException("Connection closed, send aborted.");
+                }
+    
+                long now = System.currentTimeMillis();
+                if (now >= nextWarn) {
+                    LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+                    nextWarn = now + blockedProducerWarningInterval;
+                }
             }
         }
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java?rev=904171&r1=904170&r2=904171&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java Thu Jan 28 17:22:27 2010
@@ -43,6 +43,10 @@
      */
     private boolean sendFailIfNoSpaceExplicitySet;
     private boolean sendFailIfNoSpace;
+
+    private boolean sendFailIfNoSpaceAfterTimeoutExplicitySet;
+    private long sendFailIfNoSpaceAfterTimeout = 0;
+    
     private List<SystemUsage> children = new CopyOnWriteArrayList<SystemUsage>();
 
     public SystemUsage() {
@@ -154,6 +158,19 @@
         this.sendFailIfNoSpaceExplicitySet = sendFailIfNoSpaceExplicitySet;
     }
 
+    public long getSendFailIfNoSpaceAfterTimeout() {
+        if (sendFailIfNoSpaceAfterTimeoutExplicitySet || parent == null) {
+            return sendFailIfNoSpaceAfterTimeout;
+        } else {
+            return parent.getSendFailIfNoSpaceAfterTimeout();
+        }
+    }
+
+    public void setSendFailIfNoSpaceAfterTimeout(long sendFailIfNoSpaceAfterTimeout) {
+        this.sendFailIfNoSpaceAfterTimeoutExplicitySet = true;
+        this.sendFailIfNoSpaceAfterTimeout = sendFailIfNoSpaceAfterTimeout;
+    }
+
     public void setName(String name) {
         this.name = name;
         this.memoryUsage.setName(name + ":memory");