You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/29 20:57:45 UTC

svn commit: r789411 - in /activemq/sandbox/activemq-flow: activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ activemq-store/src/test/java/org/apache/activemq/broker/store/

Author: cmacnaug
Date: Mon Jun 29 18:57:45 2009
New Revision: 789411

URL: http://svn.apache.org/viewvc?rev=789411&view=rev
Log:
Improvements to KahaDBStore recovery time, plus unit test for Map functionality.

Modified:
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=789411&r1=789410&r2=789411&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Mon Jun 29 18:57:45 2009
@@ -138,6 +138,7 @@
 
     public void stop() throws Exception {
         if (started.compareAndSet(true, false)) {
+            flush();
             unload();
         }
     }
@@ -323,41 +324,32 @@
         try {
             long start = System.currentTimeMillis();
             recovering = true;
-            ArrayList<UoWOperation> uow = null;
             Location recoveryPosition = getRecoveryPosition();
             if (recoveryPosition != null) {
                 int redoCounter = 0;
+                Transaction uow = null;
+                int uowCounter = 0;
                 while (recoveryPosition != null) {
 
                     Buffer data = journal.read(recoveryPosition);
                     if (data.length == 1 && data.data[0] == BEGIN_UNIT_OF_WORK) {
-                        uow = new ArrayList<UoWOperation>();
+                        uow = pageFile.tx();
                     } else if (data.length == 1 && data.data[0] == END_UNIT_OF_WORK) {
-                        if (uow != null) {
-                            final ArrayList<UoWOperation> list = uow;
-                            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                                public void execute(Transaction tx) throws IOException {
-                                    for (UoWOperation op : list) {
-                                        updateIndex(tx, op.bean.toType(), (MessageBuffer) op.bean, op.location);
-                                        rootEntity.setLastUpdate(op.location);
-                                    }
-                                }
-                            });
-                            redoCounter += uow.size();
-                            uow = null;
-                        }
+                        rootEntity.setLastUpdate(recoveryPosition);
+                        uow.commit();
+                        redoCounter += uowCounter;
+                        uowCounter = 0;
+                        uow = null;
                     } else if (data.length == 1 && data.data[0] == CANCEL_UNIT_OF_WORK) {
+                        uow.rollback();
                         uow = null;
                     } else if (data.length == 1 && data.data[0] == FLUSH) {
                     } else {
                         final TypeCreatable message = load(recoveryPosition);
                         final Location location = recoveryPosition;
                         if (uow != null) {
-                            UoWOperation op = new UoWOperation();
-                            op.bean = message;
-                            op.data = data;
-                            op.location = recoveryPosition;
-                            uow.add(op);
+                            updateIndex(uow, message.toType(), (MessageBuffer) message, location);
+                            uowCounter++;
                         } else {
                             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                                 public void execute(Transaction tx) throws IOException {

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=789411&r1=789410&r2=789411&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Mon Jun 29 18:57:45 2009
@@ -122,7 +122,7 @@
 
     // Maps:
     private BTreeIndex<AsciiBuffer, Long> mapIndex;
-    private TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer, Buffer>> mapCache;
+    private TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer, Buffer>> mapCache = new TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer,Buffer>>();
 
     // /////////////////////////////////////////////////////////////////
     // Lifecycle Methods.

Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=789411&r1=789410&r2=789411&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Mon Jun 29 18:57:45 2009
@@ -17,8 +17,10 @@
 package org.apache.activemq.broker.store;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.TreeMap;
 
 import junit.framework.TestCase;
 
@@ -201,6 +203,132 @@
         }
     }
     
+    
+    public void testMap() throws Exception {
+        final TreeMap<AsciiBuffer, Buffer> expected = new TreeMap<AsciiBuffer, Buffer>();
+        final AsciiBuffer map = new AsciiBuffer("testMap");
+        
+        for(int i=0; i < 100000; i++)
+        {
+            expected.put(new AsciiBuffer("Key" + i), new AsciiBuffer("Value" + i));
+        }
+        
+        //Test no values present:
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Iterator<AsciiBuffer> r = session.mapList(null, 10);
+                assertEquals("Not expecting any maps", false, r.hasNext());
+            }
+        }, null);
+        
+        //Test auto add:
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                session.mapEntryPut(new AsciiBuffer("testMap"), expected.firstKey(), expected.get(expected.firstKey()));
+                assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), expected.get(expected.firstKey()));
+            }
+        }, null);
+        
+        //Test re-add non empty map
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                session.mapAdd(map);
+                assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), expected.get(expected.firstKey()));
+            }
+        }, null);
+        
+        //Test overwrite
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                AsciiBuffer overwrite = new AsciiBuffer("overwrite");
+                session.mapEntryPut(map, expected.firstKey(), overwrite);
+                assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), overwrite);
+            }
+        }, null);
+
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                assertEquals("Value should be in map", session.mapEntryGet(map, expected.firstKey()), new AsciiBuffer("overwrite"));
+            }
+        }, null);
+        
+        //Test map remove:
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                AsciiBuffer overwrite = new AsciiBuffer("overwrite");
+                session.mapRemove(map);
+                Iterator<AsciiBuffer> r = session.mapList(null, 10);
+                assertEquals("Not expecting any maps", false, r.hasNext());
+            }
+        }, null);
+        
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Iterator<AsciiBuffer> r = session.mapList(null, 10);
+                assertEquals("Not expecting any maps", false, r.hasNext());
+            }
+        }, null);
+        
+        
+        //Test multiple adds:
+        System.out.println(new Date() + " Adding entries");
+        
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                for(AsciiBuffer k : expected.keySet())
+                {
+                    session.mapEntryPut(map, k, expected.get(k));
+                }
+            }
+        }, null);
+        
+        System.out.println(new Date() + " Checking entries");
+        
+        checkMap(map, expected);
+        
+        //Restart the store and make sure the entries are still there
+        if (isStorePersistent()) {
+            store.stop();
+
+            System.out.println(new Date() + " Restarting store");
+            store = createStore(false);
+            store.start();
+            System.out.println(new Date() + " Started store");
+            
+
+            //Test that the queue was persisted
+            checkMap(map, expected);
+        }
+    }
+    
+    private void checkMap(final AsciiBuffer mapName, final TreeMap<AsciiBuffer, Buffer> expected) throws Exception
+    {
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Iterator<AsciiBuffer> r = session.mapEntryListKeys(mapName, expected.firstKey(), expected.size());
+                TreeMap<AsciiBuffer, Buffer> comp = new TreeMap<AsciiBuffer, Buffer>();
+                while(r.hasNext())
+                {
+                    AsciiBuffer key = r.next();
+                    Buffer value = session.mapEntryGet(mapName, key);
+                    comp.put(key, new AsciiBuffer(value.data));
+                }
+                
+                assertEquals("Map in store doesn't match", expected, comp);
+                
+            }
+        }, null);
+    }
+    
     @SuppressWarnings("unchecked")
     private void checkSubscriptions(HashMap<AsciiBuffer, SubscriptionRecord> expected) throws Exception
     {