You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/20 17:53:02 UTC
svn commit: r901274 - in /activemq/branches/activemq-5.3/activemq-core/src:
main/java/org/apache/activemq/
test/java/org/apache/activemq/transport/failover/
Author: gtully
Date: Wed Jan 20 16:53:02 2010
New Revision: 901274
URL: http://svn.apache.org/viewvc?rev=901274&view=rev
Log:
svn merge -c 901273 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2573 - rollback of audit check needs to be synced with redispatch after failover transport resumption, otherwise some redispatched unconsumed messages can get auto-acked as duplicates in error
Added:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
- copied unchanged from r901273, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=901274&r1=901273&r2=901274&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Wed Jan 20 16:53:02 2010
@@ -187,6 +187,7 @@
private DestinationSource destinationSource;
private final Object ensureConnectionInfoSentMutex = new Object();
private boolean useDedicatedTaskRunner;
+ protected CountDownLatch transportInterruptionProcessingComplete;
/**
* Construct an <code>ActiveMQConnection</code>
@@ -1674,6 +1675,7 @@
command.visit(new CommandVisitorAdapter() {
@Override
public Response processMessageDispatch(MessageDispatch md) throws Exception {
+ waitForTransportInterruptionProcessing();
ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
if (dispatcher != null) {
// Copy in case a embedded broker is dispatching via
@@ -1837,6 +1839,10 @@
}
public void transportInterupted() {
+ transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
+ }
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
@@ -2235,4 +2241,21 @@
public IOException getFirstFailureError() {
return firstFailureError;
}
+
+ protected void waitForTransportInterruptionProcessing() throws InterruptedException {
+ if (transportInterruptionProcessingComplete != null) {
+ while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15, TimeUnit.SECONDS)) {
+ LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
+ }
+ synchronized (this) {
+ transportInterruptionProcessingComplete = null;
+ }
+ }
+ }
+
+ protected synchronized void transportInterruptionProcessingComplete() {
+ if (transportInterruptionProcessingComplete != null) {
+ transportInterruptionProcessingComplete.countDown();
+ }
+ }
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=901274&r1=901273&r2=901274&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Jan 20 16:53:02 2010
@@ -103,7 +103,7 @@
protected final ConsumerInfo info;
// These are the messages waiting to be delivered to the client
- private final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+ protected final MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
// The are the messages that were delivered to the consumer but that have
// not been acknowledged. It's kept in reverse order since we
@@ -640,14 +640,22 @@
}
void clearMessagesInProgress() {
- // we are called from inside the transport reconnection logic
- // which involves us clearing all the connections' consumers
- // dispatch lists and clearing them
- // so rather than trying to grab a mutex (which could be already
- // owned by the message listener calling the send) we will just set
- // a flag so that the list can be cleared as soon as the
- // dispatch thread is ready to flush the dispatch list
+ // deal with delivered messages async to avoid lock contention with in pogress acks
clearDispatchList = true;
+ synchronized (unconsumedMessages.getMutex()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size() + ") on transport interrupt");
+ }
+ // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
+ List<MessageDispatch> list = unconsumedMessages.removeAll();
+ if (!this.info.isBrowser()) {
+ for (MessageDispatch old : list) {
+ session.connection.rollbackDuplicate(this, old.getMessage());
+ }
+ }
+ }
+ // allow dispatch on this connection to resume
+ session.connection.transportInterruptionProcessingComplete();
}
void deliverAcks() {
@@ -755,9 +763,7 @@
* broker to pull a message we are about to receive
*/
protected void sendPullCommand(long timeout) throws JMSException {
- synchronized (unconsumedMessages.getMutex()) {
- clearDispatchListOnReconnect();
- }
+ clearDispatchList();
if (info.getPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
MessagePull messagePull = new MessagePull();
messagePull.configure(info);
@@ -937,9 +943,7 @@
* @throws JMSException
*/
public void acknowledge() throws JMSException {
- synchronized (unconsumedMessages.getMutex()) {
- clearDispatchListOnReconnect();
- }
+ clearDispatchList();
synchronized(deliveredMessages) {
// Acknowledge all messages so far.
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
@@ -1072,8 +1076,8 @@
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener.get();
try {
+ clearDispatchList();
synchronized (unconsumedMessages.getMutex()) {
- clearDispatchListOnReconnect();
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
@@ -1119,25 +1123,19 @@
}
}
- // called holding unconsumedMessages.getMutex()
- private void clearDispatchListOnReconnect() {
+ // async (on next call) clear delivered as they will be auto-acked as duplicates if they arrive again
+ private void clearDispatchList() {
if (clearDispatchList) {
- // we are reconnecting so lets flush the in progress
- // messages
- clearDispatchList = false;
- List<MessageDispatch> list = unconsumedMessages.removeAll();
- if (!this.info.isBrowser()) {
- for (MessageDispatch old : list) {
- // ensure we don't filter this as a duplicate
- session.connection.rollbackDuplicate(this, old.getMessage());
- }
- }
-
- // clean, so we don't have duplicates with optimizeAcknowledge
synchronized (deliveredMessages) {
- deliveredMessages.clear();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getConsumerId() + " async clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
+ }
+ if (clearDispatchList) {
+ deliveredMessages.clear();
+ pendingAck = null;
+ clearDispatchList = false;
+ }
}
- pendingAck = null;
}
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=901274&r1=901273&r2=901274&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Jan 20 16:53:02 2010
@@ -16,9 +16,71 @@
*/
package org.apache.activemq;
+import java.io.File;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+
+import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionId;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
@@ -30,20 +92,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import java.io.File;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.blob.BlobDownloader;
-
/**
* <P>
* A <CODE>Session</CODE> object is a single-threaded context for producing
@@ -591,10 +639,20 @@
}
void clearMessagesInProgress() {
- executor.clearMessagesInProgress();
- for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
- ActiveMQMessageConsumer consumer = iter.next();
- consumer.clearMessagesInProgress();
+ executor.clearMessagesInProgress();
+ // we are called from inside the transport reconnection logic
+ // which involves us clearing all the connections' consumers
+ // dispatch and delivered lists. So rather than trying to
+ // grab a mutex (which could be already owned by the message
+ // listener calling the send or an ack) we allow it to complete in
+ // a separate thread via the scheduler and notify us via
+ // connection.transportInterruptionProcessingComplete()
+ //
+ for (final ActiveMQMessageConsumer consumer : consumers) {
+ scheduler.executeAfterDelay(new Runnable() {
+ public void run() {
+ consumer.clearMessagesInProgress();
+ }}, 0l);
}
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=901274&r1=901273&r2=901274&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Wed Jan 20 16:53:02 2010
@@ -390,7 +390,7 @@
@Test
public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order, do a few times
- for (int i=0; i<4; i++) {
+ for (int i=0; i<3; i++) {
try {
doTestFailoverConsumerAckLost();
} finally {