You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/04/06 21:20:12 UTC

svn commit: r762464 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ test/java/org/apache/activemq/bugs/

Author: gtully
Date: Mon Apr  6 19:20:12 2009
New Revision: 762464

URL: http://svn.apache.org/viewvc?rev=762464&view=rev
Log:
ensure duplicate acks are send immediatly and supppress outstanding delivery acks on transport resumption, AMQ-2149|https://issues.apache.org/activemq/browse/AMQ-2149

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=762464&r1=762463&r2=762464&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Mon Apr  6 19:20:12 2009
@@ -1836,12 +1836,6 @@
             TransportListener listener = iter.next();
             listener.transportResumed();
         }
-        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
-            ActiveMQSession s = i.next();
-            // deliverAcks at this point is too early as acks can arrive at the broker
-            // before redispatch of messages and hence be out or order
-            s.transportResumed();
-        }
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=762464&r1=762463&r2=762464&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Apr  6 19:20:12 2009
@@ -1054,6 +1054,14 @@
                             session.connection.rollbackDuplicate(this, old.getMessage());
                         }
                     }
+                    if (pendingAck != null && pendingAck.isDeliveredAck()) {
+                        // on resumption a pending delivered ack will be out of sync with
+                        // re deliveries.
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("removing pending delivered ack on transport interupt: " + pendingAck);
+                        }   
+                        pendingAck = null;
+                    }
                 }
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
@@ -1085,12 +1093,15 @@
                     } else {
                         // ignore duplicate
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug(getConsumerId() + " ignoring duplicate: " + md.getMessage());
+                            LOG.debug(getConsumerId() + " ignoring(auto acking) duplicate: " + md.getMessage());
                         }
                         // in a transaction ack delivery of duplicates to ensure prefetch extension kicks in.
                         // the normal ack will happen in the transaction.
-                        ackLater(md, session.isTransacted() ? 
-                                MessageAck.DELIVERED_ACK_TYPE : MessageAck.STANDARD_ACK_TYPE);
+                        if (session.isTransacted()) {
+                            ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
+                        } else {
+                            acknowledge(md);
+                        }
                     }
                 }
             }
@@ -1159,13 +1170,6 @@
         return lastDeliveredSequenceId;
     }
 
-    // on resumption re deliveries will percolate acks in their own good time
-    public void transportResumed() {
-        pendingAck = null; 
-        additionalWindowSize = 0;
-        deliveredCounter = 0;
-    }
-
 	public IOException getFailureError() {
 		return failureError;
 	}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=762464&r1=762463&r2=762464&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon Apr  6 19:20:12 2009
@@ -1962,12 +1962,4 @@
             syncSendPacket(ack);
         }
     }
-
-    public void transportResumed() {
-        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
-            ActiveMQMessageConsumer consumer = iter.next();
-            consumer.transportResumed();
-        }
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=762464&r1=762463&r2=762464&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Mon Apr  6 19:20:12 2009
@@ -39,8 +39,6 @@
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.util.LoggingBrokerPlugin;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
@@ -64,19 +62,20 @@
     private final String SEQ_NUM_PROPERTY = "seqNum";
 
     final int MESSAGE_LENGTH_BYTES = 75 * 1024;
-    final long SLEEP_BETWEEN_SEND_MS = 3;
+    final long SLEEP_BETWEEN_SEND_MS = 25;
     final int NUM_SENDERS_AND_RECEIVERS = 10;
     final Object brokerLock = new Object();
     
     private static final long DEFAULT_BROKER_STOP_PERIOD = 20 * 1000;
-    private static final long DEFAULT_NUM_TO_SEND = 1500;
+    private static final long DEFAULT_NUM_TO_SEND = 1400;
     
     long brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
     long numtoSend = DEFAULT_NUM_TO_SEND;
+    long sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
     String brokerURL = DEFAULT_BROKER_URL;
     
     int numBrokerRestarts = 0;
-    final static int MAX_BROKER_RESTARTS = 4;
+    final static int MAX_BROKER_RESTARTS = 5;
     BrokerService broker;
     Vector<Throwable> exceptions = new Vector<Throwable>();
 
@@ -110,6 +109,7 @@
         dataDirFile = new File("target/"+ getName());
         numtoSend = DEFAULT_NUM_TO_SEND;
         brokerStopPeriod = DEFAULT_BROKER_STOP_PERIOD;
+        sleepBetweenSend = SLEEP_BETWEEN_SEND_MS;
         brokerURL = DEFAULT_BROKER_URL;
     }
     
@@ -244,9 +244,9 @@
                     LOG.error(dest + " send error", e);
                     exceptions.add(e);
                 }
-                if (SLEEP_BETWEEN_SEND_MS > 0) {
+                if (sleepBetweenSend > 0) {
                     try {
-                        Thread.sleep(SLEEP_BETWEEN_SEND_MS);
+                        Thread.sleep(sleepBetweenSend);
                     } catch (InterruptedException e) {
                         LOG.warn(dest + " sleep interrupted", e);
                     }
@@ -301,7 +301,7 @@
     }
 
 
-    public void x_testOrderWithRestart() throws Exception {
+    public void testOrderWithRestart() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
                 broker.deleteAllMessages();     
@@ -323,7 +323,7 @@
         verifyStats(true);
     }
         
-    public void x_testTopicOrderWithRestart() throws Exception {
+    public void testTopicOrderWithRestart() throws Exception {
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {
                 broker.deleteAllMessages();
@@ -351,7 +351,8 @@
     }
     
     public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception {
-        numtoSend = 15000;
+        numtoSend = 10000;
+        sleepBetweenSend = 3;
         brokerStopPeriod = 30 * 1000;
               
         createBroker(new Configurer() {