You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/20 17:53:02 UTC

svn commit: r901274 - in /activemq/branches/activemq-5.3/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/transport/failover/

Author: gtully
Date: Wed Jan 20 16:53:02 2010
New Revision: 901274

URL: http://svn.apache.org/viewvc?rev=901274&view=rev
Log:
svn merge -c 901273 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2573 - rollback of audit check needs to be synced with redispatch after failover transport resumption, otherwise some redispatched unconsumed messages can get auto-acked as duplicates in error

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
      - copied unchanged from r901273, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=901274&r1=901273&r2=901274&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Wed Jan 20 16:53:02 2010
@@ -187,6 +187,7 @@
     private DestinationSource destinationSource;
     private final Object ensureConnectionInfoSentMutex = new Object();
     private boolean useDedicatedTaskRunner;
+    protected CountDownLatch transportInterruptionProcessingComplete;
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -1674,6 +1675,7 @@
                 command.visit(new CommandVisitorAdapter() {
                     @Override
                     public Response processMessageDispatch(MessageDispatch md) throws Exception {
+                        waitForTransportInterruptionProcessing();
                         ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
                         if (dispatcher != null) {
                             // Copy in case a embedded broker is dispatching via
@@ -1837,6 +1839,10 @@
 	}
 
     public void transportInterupted() {
+        transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
+        }
         for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
             ActiveMQSession s = i.next();
             s.clearMessagesInProgress();
@@ -2235,4 +2241,21 @@
 	public IOException getFirstFailureError() {
 		return firstFailureError;
 	}
+	
+	protected void waitForTransportInterruptionProcessing() throws InterruptedException {
+        if (transportInterruptionProcessingComplete != null) {
+            while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15, TimeUnit.SECONDS)) {
+                LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
+            }
+            synchronized (this) {
+                transportInterruptionProcessingComplete = null;
+            }
+        }
+    }
+	
+	protected synchronized void transportInterruptionProcessingComplete() {
+	    if (transportInterruptionProcessingComplete != null) {
+	        transportInterruptionProcessingComplete.countDown();
+	    }
+	}
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=901274&r1=901273&r2=901274&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Jan 20 16:53:02 2010
@@ -103,7 +103,7 @@
     protected final ConsumerInfo info;
 
     // These are the messages waiting to be delivered to the client
-    private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+    protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
 
     // The are the messages that were delivered to the consumer but that have
     // not been acknowledged. It's kept in reverse order since we
@@ -640,14 +640,22 @@
     }
     
     void clearMessagesInProgress() {
-        // we are called from inside the transport reconnection logic
-        // which involves us clearing all the connections' consumers
-        // dispatch lists and clearing them
-        // so rather than trying to grab a mutex (which could be already
-        // owned by the message listener calling the send) we will just set
-        // a flag so that the list can be cleared as soon as the
-        // dispatch thread is ready to flush the dispatch list
+        // deal with delivered messages async to avoid lock contention with in pogress acks
         clearDispatchList = true;
+        synchronized (unconsumedMessages.getMutex()) {            
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
+            }
+            // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
+            List<MessageDispatch> list = unconsumedMessages.removeAll();
+            if (!this.info.isBrowser()) {
+                for (MessageDispatch old : list) {
+                    session.connection.rollbackDuplicate(this, old.getMessage());
+                }
+            }
+        }
+        // allow dispatch on this connection to resume
+        session.connection.transportInterruptionProcessingComplete();
     }
 
     void deliverAcks() {
@@ -755,9 +763,7 @@
      * broker to pull a message we are about to receive
      */
     protected void sendPullCommand(long timeout) throws JMSException {
-        synchronized (unconsumedMessages.getMutex()) {
-            clearDispatchListOnReconnect();
-        }
+        clearDispatchList();
         if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
             MessagePull messagePull = new MessagePull();
             messagePull.configure(info);
@@ -937,9 +943,7 @@
      * @throws JMSException
      */
     public void acknowledge() throws JMSException {
-        synchronized (unconsumedMessages.getMutex()) {
-            clearDispatchListOnReconnect();
-        }
+        clearDispatchList();
         synchronized(deliveredMessages) {
             // Acknowledge all messages so far.
             MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -1072,8 +1076,8 @@
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener.get();
         try {
+            clearDispatchList();
             synchronized (unconsumedMessages.getMutex()) {
-                clearDispatchListOnReconnect();
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
                         if (listener != null && unconsumedMessages.isRunning()) {
@@ -1119,25 +1123,19 @@
         }
     }
 
-    // called holding unconsumedMessages.getMutex()
-    private void clearDispatchListOnReconnect() {
+    // async (on next call) clear delivered as they will be auto-acked as duplicates if they arrive again
+    private void clearDispatchList() {
         if (clearDispatchList) {
-            // we are reconnecting so lets flush the in progress
-            // messages
-            clearDispatchList = false;
-            List<MessageDispatch> list = unconsumedMessages.removeAll();
-            if (!this.info.isBrowser()) {
-                for (MessageDispatch old : list) {
-                    // ensure we don't filter this as a duplicate
-                    session.connection.rollbackDuplicate(this, old.getMessage());
-                }
-            }
-           
-            // clean, so we don't have duplicates with optimizeAcknowledge 
             synchronized (deliveredMessages) {
-                deliveredMessages.clear();        
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(getConsumerId() + " async clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
+                }
+                if (clearDispatchList) {
+                    deliveredMessages.clear();
+                    pendingAck = null;
+                    clearDispatchList = false;
+                }
             }
-            pendingAck = null;
         }
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=901274&r1=901273&r2=901274&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Jan 20 16:53:02 2010
@@ -16,9 +16,71 @@
  */
 package org.apache.activemq;
 
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.activemq.blob.BlobDownloader;
 import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.blob.BlobUploader;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.management.JMSSessionStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
@@ -30,20 +92,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import java.io.File;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.blob.BlobDownloader;
-
 /**
  * <P>
  * A <CODE>Session</CODE> object is a single-threaded context for producing
@@ -591,10 +639,20 @@
     }
 
     void clearMessagesInProgress() {
-        executor.clearMessagesInProgress();
-        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
-            ActiveMQMessageConsumer consumer = iter.next();
-            consumer.clearMessagesInProgress();
+        executor.clearMessagesInProgress();        
+        // we are called from inside the transport reconnection logic
+        // which involves us clearing all the connections' consumers
+        // dispatch and delivered lists. So rather than trying to 
+        // grab a mutex (which could be already owned by the message 
+        // listener calling the send or an ack) we allow it to complete in 
+        // a separate thread via the scheduler and notify us via 
+        // connection.transportInterruptionProcessingComplete()
+        //
+        for (final ActiveMQMessageConsumer consumer : consumers) {
+            scheduler.executeAfterDelay(new Runnable() {
+                public void run() {
+                    consumer.clearMessagesInProgress();
+                }}, 0l);
         }
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=901274&r1=901273&r2=901274&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Wed Jan 20 16:53:02 2010
@@ -390,7 +390,7 @@
     @Test
     public void testFailoverConsumerAckLost() throws Exception {
         // as failure depends on hash order, do a few times
-        for (int i=0; i<4; i++) {
+        for (int i=0; i<3; i++) {
             try {
                 doTestFailoverConsumerAckLost();
             } finally {