You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/18 14:16:34 UTC
svn commit: r900385 - in
/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq:
ActiveMQMessageConsumer.java broker/region/PrefetchSubscription.java
transport/failover/FailoverTransport.java
Author: gtully
Date: Mon Jan 18 13:16:34 2010
New Revision: 900385
URL: http://svn.apache.org/viewvc?rev=900385&view=rev
Log:
merge -c 897988 https://svn.apache.org/repos/asf/activemq/trunk - reduce unmatched ack exceptions, tidy up prefetchExtension https://issues.apache.org/activemq/browse/AMQ-2560
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=900385&r1=900384&r2=900385&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Jan 18 13:16:34 2010
@@ -937,6 +937,9 @@
* @throws JMSException
*/
public void acknowledge() throws JMSException {
+ synchronized (unconsumedMessages.getMutex()) {
+ clearDispatchListOnReconnect();
+ }
synchronized(deliveredMessages) {
// Acknowledge all messages so far.
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=900385&r1=900384&r2=900385&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jan 18 13:16:34 2010
@@ -199,7 +199,7 @@
}
}
if (LOG.isTraceEnabled()) {
- LOG.trace("ack:" + ack);
+ LOG.info("ack:" + ack);
}
synchronized(dispatchLock) {
if (ack.isStandardAck()) {
@@ -256,9 +256,6 @@
prefetchExtension = Math.max(
prefetchExtension, index );
}
- } else {
- prefetchExtension = Math.max(0,
- prefetchExtension - index);
}
destination = node.getRegionDestination();
callDispatchMatched = true;
@@ -319,8 +316,6 @@
} else if (ack.isRedeliveredAck()) {
// Message was re-delivered but it was not yet considered to be
// a DLQ message.
- // Acknowledge all dispatched messages up till the message id of
- // the ack.
boolean inAckRange = false;
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
@@ -349,9 +344,6 @@
throw new JMSException("Poison ack cannot be transacted: "
+ ack);
}
- // Acknowledge all dispatched messages up till the message id of
- // the
- // acknowledgment.
int index = 0;
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=900385&r1=900384&r2=900385&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Mon Jan 18 13:16:34 2010
@@ -201,26 +201,27 @@
transport.setTransportListener(disposedListener);
ServiceSupport.dispose(transport);
+ boolean reconnectOk = false;
synchronized (reconnectMutex) {
- boolean reconnectOk = false;
if(started) {
LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e);
LOG.debug("Transport failed with the following exception:", e);
reconnectOk = true;
- }
-
+ }
initialized = false;
failedConnectTransportURI=connectedTransportURI;
connectedTransportURI = null;
connected=false;
- if(reconnectOk) {
+
+ // notify before any reconnect attempt so ack state can be whacked
+ if (transportListener != null) {
+ transportListener.transportInterupted();
+ }
+
+ if (reconnectOk) {
reconnectTask.wakeup();
}
}
-
- if (transportListener != null) {
- transportListener.transportInterupted();
- }
}
}
@@ -412,8 +413,8 @@
// Skipping send of ShutdownInfo command when not connected.
return;
}
- if(command instanceof RemoveInfo) {
- // Simulate response to RemoveInfo command
+ if(command instanceof RemoveInfo || command.isMessageAck()) {
+ // Simulate response to RemoveInfo command or ack (as it will be stale)
stateTracker.track(command);
Response response = new Response();
response.setCorrelationId(command.getCommandId());
@@ -432,7 +433,7 @@
while (transport == null && !disposed
&& connectionFailure == null
&& !Thread.currentThread().isInterrupted()) {
- LOG.trace("Waiting for transport to reconnect.");
+ LOG.trace("Waiting for transport to reconnect..: " + command);
long end = System.currentTimeMillis();
if (timeout > 0 && (end - start > timeout)) {
timedout = true;
@@ -698,7 +699,7 @@
t.setTransportListener(myTransportListener);
try {
if (started) {
- restoreTransport(t);
+ restoreTransport(t);
}
reconnectDelay = initialReconnectDelay;
failedConnectTransportURI=null;
@@ -856,7 +857,7 @@
bt.setTransport(t);
backups.add(bt);
}
- }catch(Exception e) {
+ } catch(Exception e) {
LOG.debug("Failed to build backup ",e);
}
}