You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/06/01 19:28:21 UTC

svn commit: r950171 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/MessageDatabase.java test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java

Author: gtully
Date: Tue Jun  1 17:28:21 2010
New Revision: 950171

URL: http://svn.apache.org/viewvc?rev=950171&view=rev
Log:
resolve durable restart test regressions with kahaDB as default store, the next message sequence id could get out of sync for a durable sub when drained and reused, fix for https://issues.apache.org/activemq/browse/AMQ-2755

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=950171&r1=950170&r2=950171&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Jun  1 17:28:21 2010
@@ -92,6 +92,7 @@ public class MessageDatabase extends Ser
 
     public static final int CLOSED_STATE = 1;
     public static final int OPEN_STATE = 2;
+    private static final long NOT_ACKED = -1;
 
 
     protected class Metadata {
@@ -945,7 +946,7 @@ public class MessageDatabase extends Ser
         if (command.hasSubscriptionInfo()) {
             String subscriptionKey = command.getSubscriptionKey();
             sd.subscriptions.put(tx, subscriptionKey, command);
-            long ackLocation=-1;
+            long ackLocation=NOT_ACKED;
             if (!command.getRetroactive()) {
                 ackLocation = sd.nextMessageId-1;
             }
@@ -1263,6 +1264,16 @@ public class MessageDatabase extends Ser
                 Entry<String, Long> entry = iterator.next();
                 addAckLocation(rc, entry.getValue(), entry.getKey());
             }
+            
+            if (rc.nextMessageId == 0) {
+                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
+                if (!rc.ackPositions.isEmpty()) {
+                    Long lastAckedMessageId = rc.ackPositions.lastKey();
+                    if (lastAckedMessageId != NOT_ACKED) {
+                        rc.nextMessageId = lastAckedMessageId+1;
+                    }
+                }
+            }
 
         }
         return rc;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java?rev=950171&r1=950170&r2=950171&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java Tue Jun  1 17:28:21 2010
@@ -46,6 +46,26 @@ public class DurableConsumerCloseAndReco
     private Destination destination;
     private int messageCount;
 
+    
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        deleteAllMessages();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        deleteAllMessages();
+    }
+
+    private void deleteAllMessages() throws Exception {
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true");
+        Connection dummyConnection = fac.createConnection();
+        dummyConnection.start();
+        dummyConnection.close();
+    }
+    
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
         return new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false");
     }
@@ -60,12 +80,7 @@ public class DurableConsumerCloseAndReco
         dummyConnection.close();
 
         // now lets try again without one connection open
-        consumeMessagesDeliveredWhileConsumerClosed();
-        // now delete the db
-        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true");
-        dummyConnection = fac.createConnection();
-        dummyConnection.start();
-        dummyConnection.close();
+        consumeMessagesDeliveredWhileConsumerClosed();       
     }
 
     protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {