You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/26 18:00:49 UTC

svn commit: r884644 - /activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Author: dejanb
Date: Thu Nov 26 17:00:49 2009
New Revision: 884644

URL: http://svn.apache.org/viewvc?rev=884644&view=rev
Log:
merging 884633 - https://issues.apache.org/activemq/browse/AMQ-2515 - Optimized Acknowledgements and interrupted transport

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=884644&r1=884643&r2=884644&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Nov 26 17:00:49 2009
@@ -134,6 +134,9 @@
     private long lastDeliveredSequenceId;
 
     private IOException failureError;
+    
+    private long optimizeAckTimestamp = System.currentTimeMillis();
+    private long optimizeAckTimeout = 300;
 
     /**
      * Create a MessageConsumer
@@ -788,7 +791,7 @@
             }
         }
     }
-
+    
     private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
         if (unconsumedMessages.isClosed()) {
             return;
@@ -809,12 +812,13 @@
                         if (!deliveredMessages.isEmpty()) {
                             if (optimizeAcknowledge) {
                                 ackCounter++;
-                                if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) {
+                                if (ackCounter >= (info.getPrefetchSize() * .65) || System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAckTimeout)) {
                                 	MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                 	if (ack != null) {
                             		    deliveredMessages.clear();
                             		    ackCounter = 0;
                             		    session.sendAck(ack);
+                            		    optimizeAckTimestamp = System.currentTimeMillis();
                                 	}
                                 }
                             } else {
@@ -1074,14 +1078,13 @@
                             session.connection.rollbackDuplicate(this, old.getMessage());
                         }
                     }
-                    if (pendingAck != null && pendingAck.isDeliveredAck()) {
-                        // on resumption a pending delivered ack will be out of sync with
-                        // re deliveries.
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("removing pending delivered ack on transport interupt: " + pendingAck);
-                        }   
-                        pendingAck = null;
+                    if (!session.isTransacted()) {
+                        // clean, so we don't have duplicates with optimizeAcknowledge 
+                        synchronized (deliveredMessages) {
+                            deliveredMessages.clear();
+                        }
                     }
+                    pendingAck = null;
                 }
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {