You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/12/14 16:23:47 UTC

svn commit: r487235 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Author: jstrachan
Date: Thu Dec 14 07:23:47 2006
New Revision: 487235

URL: http://svn.apache.org/viewvc?view=rev&rev=487235
Log:
Patch for AMQ-1093 to avoid a deadlock if the transport is being reconnected from inside a MessageListener which is calling a send(), lets make the explicit clear of the consumer dispatch list asynchronous and within the existing mutex

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=487235&r1=487234&r2=487235
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Dec 14 07:23:47 2006
@@ -17,24 +17,7 @@
  */
 package org.apache.activemq;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.*;
 import org.apache.activemq.management.JMSConsumerStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
@@ -47,6 +30,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import javax.jms.IllegalStateException;
+import javax.jms.*;
+import javax.jms.Message;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -121,6 +110,7 @@
     private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
     private ExecutorService executorService = null;
     private MessageTransformer transformer;
+    private boolean clearDispatchList;
 
     /**
      * Create a MessageConsumer
@@ -569,7 +559,14 @@
     }
     
     void clearMessagesInProgress(){
-        unconsumedMessages.clear();
+        // we are called from inside the transport reconnection logic
+        // which involves us clearing all the connections' consumers
+        // dispatch lists and clearing them
+        // so rather than trying to grab a mutex (which could be already
+        // owned by the message listener calling the send) we will just set
+        // a flag so that the list can be cleared as soon as the
+        // dispatch thread is ready to flush the dispatch list
+        clearDispatchList= true;
     }
     
     void deliverAcks(){
@@ -859,7 +856,13 @@
         MessageListener listener = this.messageListener;
         try {
             synchronized(unconsumedMessages.getMutex()){
-	            if (!unconsumedMessages.isClosed()) {
+                if (clearDispatchList) {
+                    // we are reconnecting so lets flush the in progress messages
+                    clearDispatchList = false;
+                    unconsumedMessages.clear();
+                }
+
+                if (!unconsumedMessages.isClosed()) {
 	                if (listener != null && unconsumedMessages.isRunning() ) {
 	                    ActiveMQMessage message = createActiveMQMessage(md);
 	                    beforeMessageIsConsumed(md);