You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/26 18:00:49 UTC
svn commit: r884644 -
/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Author: dejanb
Date: Thu Nov 26 17:00:49 2009
New Revision: 884644
URL: http://svn.apache.org/viewvc?rev=884644&view=rev
Log:
merging 884633 - https://issues.apache.org/activemq/browse/AMQ-2515 - Optimized Acknowledgements and interrupted transport
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=884644&r1=884643&r2=884644&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Nov 26 17:00:49 2009
@@ -134,6 +134,9 @@
private long lastDeliveredSequenceId;
private IOException failureError;
+
+ private long optimizeAckTimestamp = System.currentTimeMillis();
+ private long optimizeAckTimeout = 300;
/**
* Create a MessageConsumer
@@ -788,7 +791,7 @@
}
}
}
-
+
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
@@ -809,12 +812,13 @@
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
- if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) {
+ if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
+ optimizeAckTimestamp = System.currentTimeMillis();
}
}
} else {
@@ -1074,14 +1078,13 @@
session.connection.rollbackDuplicate(this, old.getMessage());
}
}
- if (pendingAck != null && pendingAck.isDeliveredAck()) {
- // on resumption a pending delivered ack will be out of sync with
- // re deliveries.
- if (LOG.isDebugEnabled()) {
- LOG.debug("removing pending delivered ack on transport interupt: " + pendingAck);
- }
- pendingAck = null;
+ if (!session.isTransacted()) {
+ // clean, so we don't have duplicates with optimizeAcknowledge
+ synchronized (deliveredMessages) {
+ deliveredMessages.clear();
+ }
}
+ pendingAck = null;
}
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {