You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/06/01 19:28:21 UTC
svn commit: r950171 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
Author: gtully
Date: Tue Jun 1 17:28:21 2010
New Revision: 950171
URL: http://svn.apache.org/viewvc?rev=950171&view=rev
Log:
resolve durable restart test regressions with kahaDB as default store, the next message sequence id could get out of sync for a durable sub when drained and reused, fix for https://issues.apache.org/activemq/browse/AMQ-2755
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=950171&r1=950170&r2=950171&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Jun 1 17:28:21 2010
@@ -92,6 +92,7 @@ public class MessageDatabase extends Ser
public static final int CLOSED_STATE = 1;
public static final int OPEN_STATE = 2;
+ private static final long NOT_ACKED = -1;
protected class Metadata {
@@ -945,7 +946,7 @@ public class MessageDatabase extends Ser
if (command.hasSubscriptionInfo()) {
String subscriptionKey = command.getSubscriptionKey();
sd.subscriptions.put(tx, subscriptionKey, command);
- long ackLocation=-1;
+ long ackLocation=NOT_ACKED;
if (!command.getRetroactive()) {
ackLocation = sd.nextMessageId-1;
}
@@ -1263,6 +1264,16 @@ public class MessageDatabase extends Ser
Entry<String, Long> entry = iterator.next();
addAckLocation(rc, entry.getValue(), entry.getKey());
}
+
+ if (rc.nextMessageId == 0) {
+ // check for existing durable sub all acked out - pull next seq from acks as messages are gone
+ if (!rc.ackPositions.isEmpty()) {
+ Long lastAckedMessageId = rc.ackPositions.lastKey();
+ if (lastAckedMessageId != NOT_ACKED) {
+ rc.nextMessageId = lastAckedMessageId+1;
+ }
+ }
+ }
}
return rc;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java?rev=950171&r1=950170&r2=950171&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableConsumerCloseAndReconnectTest.java Tue Jun 1 17:28:21 2010
@@ -46,6 +46,26 @@ public class DurableConsumerCloseAndReco
private Destination destination;
private int messageCount;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ deleteAllMessages();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ deleteAllMessages();
+ }
+
+ private void deleteAllMessages() throws Exception {
+ ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true");
+ Connection dummyConnection = fac.createConnection();
+ dummyConnection.start();
+ dummyConnection.close();
+ }
+
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=false");
}
@@ -60,12 +80,7 @@ public class DurableConsumerCloseAndReco
dummyConnection.close();
// now lets try again without one connection open
- consumeMessagesDeliveredWhileConsumerClosed();
- // now delete the db
- ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("vm://localhost?broker.deleteAllMessagesOnStartup=true");
- dummyConnection = fac.createConnection();
- dummyConnection.start();
- dummyConnection.close();
+ consumeMessagesDeliveredWhileConsumerClosed();
}
protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {