You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/29 20:57:45 UTC
svn commit: r789411 - in /activemq/sandbox/activemq-flow:
activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/
activemq-store/src/test/java/org/apache/activemq/broker/store/
Author: cmacnaug
Date: Mon Jun 29 18:57:45 2009
New Revision: 789411
URL: http://svn.apache.org/viewvc?rev=789411&view=rev
Log:
Improvements to KahaDBStore recovery time, plus unit test for Map functionality.
Modified:
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=789411&r1=789410&r2=789411&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Mon Jun 29 18:57:45 2009
@@ -138,6 +138,7 @@
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
+ flush();
unload();
}
}
@@ -323,41 +324,32 @@
try {
long start = System.currentTimeMillis();
recovering = true;
- ArrayList<UoWOperation> uow = null;
Location recoveryPosition = getRecoveryPosition();
if (recoveryPosition != null) {
int redoCounter = 0;
+ Transaction uow = null;
+ int uowCounter = 0;
while (recoveryPosition != null) {
Buffer data = journal.read(recoveryPosition);
if (data.length == 1 && data.data[0] == BEGIN_UNIT_OF_WORK) {
- uow = new ArrayList<UoWOperation>();
+ uow = pageFile.tx();
} else if (data.length == 1 && data.data[0] == END_UNIT_OF_WORK) {
- if (uow != null) {
- final ArrayList<UoWOperation> list = uow;
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- for (UoWOperation op : list) {
- updateIndex(tx, op.bean.toType(), (MessageBuffer) op.bean, op.location);
- rootEntity.setLastUpdate(op.location);
- }
- }
- });
- redoCounter += uow.size();
- uow = null;
- }
+ rootEntity.setLastUpdate(recoveryPosition);
+ uow.commit();
+ redoCounter += uowCounter;
+ uowCounter = 0;
+ uow = null;
} else if (data.length == 1 && data.data[0] == CANCEL_UNIT_OF_WORK) {
+ uow.rollback();
uow = null;
} else if (data.length == 1 && data.data[0] == FLUSH) {
} else {
final TypeCreatable message = load(recoveryPosition);
final Location location = recoveryPosition;
if (uow != null) {
- UoWOperation op = new UoWOperation();
- op.bean = message;
- op.data = data;
- op.location = recoveryPosition;
- uow.add(op);
+ updateIndex(uow, message.toType(), (MessageBuffer) message, location);
+ uowCounter++;
} else {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=789411&r1=789410&r2=789411&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Mon Jun 29 18:57:45 2009
@@ -122,7 +122,7 @@
// Maps:
private BTreeIndex<AsciiBuffer, Long> mapIndex;
- private TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer, Buffer>> mapCache;
+ private TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer, Buffer>> mapCache = new TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer,Buffer>>();
// /////////////////////////////////////////////////////////////////
// Lifecycle Methods.
Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=789411&r1=789410&r2=789411&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Mon Jun 29 18:57:45 2009
@@ -17,8 +17,10 @@
package org.apache.activemq.broker.store;
import java.io.IOException;
+import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.TreeMap;
import junit.framework.TestCase;
@@ -201,6 +203,132 @@
}
}
+
+ public void testMap() throws Exception {
+ final TreeMap<AsciiBuffer, Buffer> expected = new TreeMap<AsciiBuffer, Buffer>();
+ final AsciiBuffer map = new AsciiBuffer("testMap");
+
+ for(int i=0; i < 100000; i++)
+ {
+ expected.put(new AsciiBuffer("Key" + i), new AsciiBuffer("Value" + i));
+ }
+
+ //Test no values present:
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ Iterator<AsciiBuffer> r = session.mapList(null, 10);
+ assertEquals("Not expecting any maps", false, r.hasNext());
+ }
+ }, null);
+
+ //Test auto add:
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ session.mapEntryPut(new AsciiBuffer("testMap"), expected.firstKey(), expected.get(expected.firstKey()));
+ assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), expected.get(expected.firstKey()));
+ }
+ }, null);
+
+ //Test re-add non empty map
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ session.mapAdd(map);
+ assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), expected.get(expected.firstKey()));
+ }
+ }, null);
+
+ //Test overwrite
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ AsciiBuffer overwrite = new AsciiBuffer("overwrite");
+ session.mapEntryPut(map, expected.firstKey(), overwrite);
+ assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), overwrite);
+ }
+ }, null);
+
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), new AsciiBuffer("overwrite"));
+ }
+ }, null);
+
+ //Test map remove:
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ AsciiBuffer overwrite = new AsciiBuffer("overwrite");
+ session.mapRemove(map);
+ Iterator<AsciiBuffer> r = session.mapList(null, 10);
+ assertEquals("Not expecting any maps", false, r.hasNext());
+ }
+ }, null);
+
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ Iterator<AsciiBuffer> r = session.mapList(null, 10);
+ assertEquals("Not expecting any maps", false, r.hasNext());
+ }
+ }, null);
+
+
+ //Test multiple adds:
+ System.out.println(new Date() + " Adding entries");
+
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ for(AsciiBuffer k : expected.keySet())
+ {
+ session.mapEntryPut(map, k, expected.get(k));
+ }
+ }
+ }, null);
+
+ System.out.println(new Date() + " Checking entries");
+
+ checkMap(map, expected);
+
+ //Restart the store and make sure the entries are still there
+ if (isStorePersistent()) {
+ store.stop();
+
+ System.out.println(new Date() + " Restarting store");
+ store = createStore(false);
+ store.start();
+ System.out.println(new Date() + " Started store");
+
+
+ //Test that the queue was persisted
+ checkMap(map, expected);
+ }
+ }
+
+ private void checkMap(final AsciiBuffer mapName, final TreeMap<AsciiBuffer, Buffer> expected) throws Exception
+ {
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ Iterator<AsciiBuffer> r = session.mapEntryListKeys(mapName, expected.firstKey(), expected.size());
+ TreeMap<AsciiBuffer, Buffer> comp = new TreeMap<AsciiBuffer, Buffer>();
+ while(r.hasNext())
+ {
+ AsciiBuffer key = r.next();
+ Buffer value = session.mapEntryGet(mapName, key);
+ comp.put(key, new AsciiBuffer(value.data));
+ }
+
+ assertEquals("Map in store doesn't match", expected, comp);
+
+ }
+ }, null);
+ }
+
@SuppressWarnings("unchecked")
private void checkSubscriptions(HashMap<AsciiBuffer, SubscriptionRecord> expected) throws Exception
{