You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/08/08 13:51:29 UTC
svn commit: r1511711 - in /activemq/trunk:
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
Author: gtully
Date: Thu Aug 8 11:51:29 2013
New Revision: 1511711
URL: http://svn.apache.org/r1511711
Log:
https://issues.apache.org/jira/browse/AMQ-4212 - fix auto upgrade from ver 1 and 2 to 5 - regression in KahaDBVersionTest
Modified:
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1511711&r1=1511710&r2=1511711&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Aug 8 11:51:29 2013
@@ -1838,31 +1838,34 @@ public abstract class MessageDatabase ex
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
- BTreeIndex<Long, HashSet<String>> oldAckPositions =
- new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
- oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
- oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
- oldAckPositions.load(tx);
-
LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
- // Do the initial build of the data in memory before writing into the store
- // based Ack Positions List to avoid a lot of disk thrashing.
- Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
- while (iterator.hasNext()) {
- Entry<Long, HashSet<String>> entry = iterator.next();
-
- for(String subKey : entry.getValue()) {
- SequenceSet pendingAcks = temp.get(subKey);
- if (pendingAcks == null) {
- pendingAcks = new SequenceSet();
- temp.put(subKey, pendingAcks);
- }
+ if (metadata.version >= 3) {
+ // migrate
+ BTreeIndex<Long, HashSet<String>> oldAckPositions =
+ new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
+ oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
+ oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+ oldAckPositions.load(tx);
+
+
+ // Do the initial build of the data in memory before writing into the store
+ // based Ack Positions List to avoid a lot of disk thrashing.
+ Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
+ while (iterator.hasNext()) {
+ Entry<Long, HashSet<String>> entry = iterator.next();
+
+ for(String subKey : entry.getValue()) {
+ SequenceSet pendingAcks = temp.get(subKey);
+ if (pendingAcks == null) {
+ pendingAcks = new SequenceSet();
+ temp.put(subKey, pendingAcks);
+ }
- pendingAcks.add(entry.getKey());
+ pendingAcks.add(entry.getKey());
+ }
}
}
-
// Now move the pending messages to ack data into the store backed
// structure.
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java?rev=1511711&r1=1511710&r2=1511711&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java Thu Aug 8 11:51:29 2013
@@ -168,7 +168,7 @@ public class KahaDBVersionTest extends T
TextMessage msg = (TextMessage) topicConsumer.receive(10000);
count++;
// System.err.println(msg.getText());
- assertNotNull(msg);
+ assertNotNull("" + count, msg);
}
LOG.info("Consumed " + count + " from topic");
connection.close();