You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/08/08 13:51:29 UTC

svn commit: r1511711 - in /activemq/trunk: activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java

Author: gtully
Date: Thu Aug  8 11:51:29 2013
New Revision: 1511711

URL: http://svn.apache.org/r1511711
Log:
https://issues.apache.org/jira/browse/AMQ-4212 - fix auto upgrade from ver 1 and 2 to 5 - regression in KahaDBVersionTest

Modified:
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1511711&r1=1511710&r2=1511711&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Aug  8 11:51:29 2013
@@ -1838,31 +1838,34 @@ public abstract class MessageDatabase ex
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         @Override
                         public void execute(Transaction tx) throws IOException {
-                            BTreeIndex<Long, HashSet<String>> oldAckPositions =
-                                new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
-                            oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
-                            oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
-                            oldAckPositions.load(tx);
-
                             LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
 
-                            // Do the initial build of the data in memory before writing into the store
-                            // based Ack Positions List to avoid a lot of disk thrashing.
-                            Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
-                            while (iterator.hasNext()) {
-                                Entry<Long, HashSet<String>> entry = iterator.next();
-
-                                for(String subKey : entry.getValue()) {
-                                    SequenceSet pendingAcks = temp.get(subKey);
-                                    if (pendingAcks == null) {
-                                        pendingAcks = new SequenceSet();
-                                        temp.put(subKey, pendingAcks);
-                                    }
+                            if (metadata.version >= 3) {
+                                // migrate
+                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
+                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
+                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
+                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+                                oldAckPositions.load(tx);
+
+
+                                // Do the initial build of the data in memory before writing into the store
+                                // based Ack Positions List to avoid a lot of disk thrashing.
+                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
+                                while (iterator.hasNext()) {
+                                    Entry<Long, HashSet<String>> entry = iterator.next();
+
+                                    for(String subKey : entry.getValue()) {
+                                        SequenceSet pendingAcks = temp.get(subKey);
+                                        if (pendingAcks == null) {
+                                            pendingAcks = new SequenceSet();
+                                            temp.put(subKey, pendingAcks);
+                                        }
 
-                                    pendingAcks.add(entry.getKey());
+                                        pendingAcks.add(entry.getKey());
+                                    }
                                 }
                             }
-
                             // Now move the pending messages to ack data into the store backed
                             // structure.
                             value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java?rev=1511711&r1=1511710&r2=1511711&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java Thu Aug  8 11:51:29 2013
@@ -168,7 +168,7 @@ public class KahaDBVersionTest extends T
                 TextMessage msg = (TextMessage) topicConsumer.receive(10000);
                 count++;
                 // System.err.println(msg.getText());
-                assertNotNull(msg);
+                assertNotNull("" + count, msg);
             }
             LOG.info("Consumed " + count + " from topic");
             connection.close();