You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/06 20:59:05 UTC

svn commit: r692707 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/ main/java/org/apache/kahadb/index/ main/java/org/apache/kahadb/store/ test/java/org/apache/kahadb/index/

Author: chirino
Date: Sat Sep  6 11:59:04 2008
New Revision: 692707

URL: http://svn.apache.org/viewvc?rev=692707&view=rev
Log:
Updated the Index interface so that a Transaction is passed to the load and unload methods.
This allows the initialization and shutdown to be done as part of a larger unit of work.
HashIndex resizing is now done in the context of a transaction so it simplified tremendously as
it does not need to worry about a partial resize operation occuring.

Also added BTreeVisitor class that I forgot to add in a previous commit.


Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java?rev=692707&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java Sat Sep  6 11:59:04 2008
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Implementation of a Marshaller for a Integer
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class IntegerMarshaller implements Marshaller<Integer> {
+    
+    public static final IntegerMarshaller INSTANCE = new IntegerMarshaller();
+    
+    public void writePayload(Integer object, DataOutput dataOut) throws IOException {
+        dataOut.writeInt(object);
+    }
+
+    public Integer readPayload(DataInput dataIn) throws IOException {
+        return dataIn.readInt();
+    }
+    
+    public Class<Integer> getType() {
+        return Integer.class;
+    }
+}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java Sat Sep  6 11:59:04 2008
@@ -139,34 +139,28 @@
         this(pageFile, page.getPageId());
     }
 
-    synchronized public void load() throws IOException {
+    synchronized public void load(Transaction tx) throws IOException {
         if (loaded.compareAndSet(false, true)) {
             LOG.debug("loading");
             if( keyMarshaller == null ) {
-                throw new IllegalArgumentException("The keyMarshaller must be set before loading the BTreeIndex");
+                throw new IllegalArgumentException("The key marshaller must be set before loading the BTreeIndex");
             }
             if( valueMarshaller == null ) {
-                throw new IllegalArgumentException("The valueMarshaller must be set before loading the BTreeIndex");
+                throw new IllegalArgumentException("The value marshaller must be set before loading the BTreeIndex");
             }
             
-            Transaction tx = pageFile.tx();
             final Page<BTreeNode<Key,Value>> p = tx.load(pageId, null);
             if( p.getType() == Page.PAGE_FREE_TYPE ) {
                  // Need to initialize it..
-                tx.execute(new Transaction.Closure<IOException>(){
-                    public void execute(Transaction tx) throws IOException {
-                        root = createNode(p, null);
-                        storeNode(tx, root, true);
-                    }
-                });
-                pageFile.checkpoint();
+                root = createNode(p, null);
+                storeNode(tx, root, true);
             } else {
                 root = loadNode(tx, pageId, null);    
             }
         }
     }
     
-    synchronized public void unload() {
+    synchronized public void unload(Transaction tx) {
         if (loaded.compareAndSet(true, false)) {
             root=null;
         }    

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=692707&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Sat Sep  6 11:59:04 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+/**
+ * Interface used to selectively visit the entries in a BTree.
+ * 
+ * @param <Key>
+ * @param <Value>
+ */
+public interface BTreeVisitor<Key,Value> {
+    
+    /**
+     * Do you want to visit the range of BTree entries between the first and and second key?
+     * 
+     * @param first if null indicates the range of values before the second key. 
+     * @param second if null indicates the range of values after the first key.
+     * @return true if you want to visit the values between the first and second key.
+     */
+    boolean isInterestedInKeysBetween(Key first, Key second);
+    
+    /**
+     * The keys and values of a BTree leaf node.
+     * 
+     * @param keys
+     * @param values
+     */
+    void visit(Key[] keys, Value[] values);
+    
+}
\ No newline at end of file

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java Sat Sep  6 11:59:04 2008
@@ -41,10 +41,7 @@
 
     public static final int CLOSED_STATE = 1;
     public static final int OPEN_STATE = 2;
-    public static final int INITIALIZING_STATE = 3;
 
-    public static final int RESIZING_PHASE1_STATE = 4;
-    public static final int RESIZING_PHASE2_STATE = 5;
 
     private static final Log LOG = LogFactory.getLog(HashIndex.class);
 
@@ -91,9 +88,6 @@
         private int binCapacity = DEFAULT_BIN_CAPACITY;
         private int binsActive;
         private int size;
-        // While resizing, the following contains the new resize data.
-        private int resizeCapacity;
-        private long resizePageId;
 
         
         public void read(DataInput is) throws IOException {
@@ -102,8 +96,6 @@
             binCapacity = is.readInt();
             size = is.readInt();
             binsActive = is.readInt();
-            resizePageId = is.readLong();
-            resizeCapacity = is.readInt();
         }
         public void write(DataOutput os) throws IOException {
             os.writeInt(state);
@@ -111,8 +103,6 @@
             os.writeInt(binCapacity);
             os.writeInt(size);
             os.writeInt(binsActive);
-            os.writeLong(resizePageId);
-            os.writeInt(resizeCapacity);
         }
         
         static class Marshaller implements org.apache.kahadb.Marshaller<Metadata> {
@@ -154,127 +144,74 @@
         this.pageId = pageId;
     }
 
-    public synchronized void load() {
-        Transaction tx = pageFile.tx();
-        try {
-            
-            if (loaded.compareAndSet(false, true)) {
-                try {
-                    final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
-                    // Is this a brand new index?
-                    if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
-                        // We need to create the pages for the bins
-                        tx.execute(new Transaction.Closure<IOException>(){
-                            public void execute(Transaction tx) throws IOException {
-                                Page binPage = tx.allocate(metadata.binCapacity);
-                                metadata.binPageId = binPage.getPageId();
-                                metadata.state = INITIALIZING_STATE;
-                                metadata.page = metadataPage;
-                                metadataPage.set(metadata);
-                                tx.store(metadataPage, metadataMarshaller, true);
-                            }
-                        });
-                        pageFile.checkpoint();
-    
-                        // If failure happens now we can continue initializing the
-                        // the hash bins...
-                    } else {
-
-                        metadata = metadataPage.get();
-                        metadata.page = metadataPage;
-                        
-                        // If we did not have a clean shutdown...
-                        if (metadata.state == OPEN_STATE || metadata.state == RESIZING_PHASE1_STATE) {
-                            // Figure out the size and the # of bins that are
-                            // active. Yeah This loads the first page of every bin. :(
-                            // We might want to put this in the metadata page, but
-                            // then that page would be getting updated on every write.
-                            metadata.size = 0;
-                            for (int i = 0; i < metadata.binCapacity; i++) {
-                                int t = sizeOfBin(metadata.binPageId);
-                                if (t > 0) {
-                                    metadata.binsActive++;
-                                }
-                                metadata.size += t;
-                            }
+    public synchronized void load(Transaction tx) throws IOException {
+        if (loaded.compareAndSet(false, true)) {
+            final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
+            // Is this a brand new index?
+            if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
+                // We need to create the pages for the bins
+                Page binPage = tx.allocate(metadata.binCapacity);
+                metadata.binPageId = binPage.getPageId();
+                metadata.page = metadataPage;
+                metadataPage.set(metadata);
+                clear(tx);
+
+                // If failure happens now we can continue initializing the
+                // the hash bins...
+            } else {
+
+                metadata = metadataPage.get();
+                metadata.page = metadataPage;
+                
+                // If we did not have a clean shutdown...
+                if (metadata.state == OPEN_STATE ) {
+                    // Figure out the size and the # of bins that are
+                    // active. Yeah This loads the first page of every bin. :(
+                    // We might want to put this in the metadata page, but
+                    // then that page would be getting updated on every write.
+                    metadata.size = 0;
+                    for (int i = 0; i < metadata.binCapacity; i++) {
+                        int t = sizeOfBin(tx, i);
+                        if (t > 0) {
+                            metadata.binsActive++;
                         }
+                        metadata.size += t;
                     }
-    
-                    if (metadata.state == INITIALIZING_STATE) {
-                        // TODO:
-                        // If a failure occurs mid way through us initializing the
-                        // bins.. will the page file still think we have the rest
-                        // of them previously allocated to us?
-    
-                        tx.execute(new Closure<IOException>(){
-                            public void execute(Transaction tx) throws IOException {
-                                clear(tx);
-                            }
-                        });
-                    }
-                    
-                    if (metadata.state == RESIZING_PHASE1_STATE) {
-                        // continue resize phase 1
-                        resizePhase1();
-                    }                
-                    if (metadata.state == RESIZING_PHASE2_STATE) {
-                        // continue resize phase 1
-                        resizePhase2();
-                    }                
-    
-                    calcThresholds();
-    
-                    metadata.state = OPEN_STATE;
-                    tx.execute(new Closure<IOException>(){
-                        public void execute(Transaction tx) throws IOException {
-                            tx.store(metadataPage, metadataMarshaller, true);
-                        }
-                    });
-                    pageFile.checkpoint();
-                    
-                    LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
-    
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
                 }
             }
+
+            calcThresholds();
+
+            metadata.state = OPEN_STATE;
+            tx.store(metadataPage, metadataMarshaller, true);
             
-        } finally {
-            // All pending updates should have been committed by now.
-            assert tx.isReadOnly();
+            LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
         }
     }
 
-    private int sizeOfBin(long binPageId) {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    public synchronized void unload() throws IOException {
+    public synchronized void unload(Transaction tx) throws IOException {
         if (loaded.compareAndSet(true, false)) {
             metadata.state = CLOSED_STATE;
-            pageFile.tx().execute(new Closure<IOException>(){
-                public void execute(Transaction tx) throws IOException {
-                    tx.store(metadata.page, metadataMarshaller, true);
-                }
-            });
+            tx.store(metadata.page, metadataMarshaller, true);
         }
     }
 
+    private int sizeOfBin(Transaction tx, int index) throws IOException {
+        return getBin(tx, index).size();
+    }
+
     public synchronized Value get(Transaction tx, Key key) throws IOException {
-        load();
+        assertLoaded();
         return getBin(tx, key).get(key);
     }
     
     public synchronized boolean containsKey(Transaction tx, Key key) throws IOException {
-        // TODO: multiple loads is smelly..
-        load();
+        assertLoaded();
         return getBin(tx, key).containsKey(key);
     }
 
     synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
-        // TODO: multiple loads is smelly..
-        load();
+        assertLoaded();
         HashBin<Key,Value> bin = getBin(tx, key);
 
         int originalSize = bin.size();
@@ -293,15 +230,14 @@
         if (metadata.binsActive >= this.increaseThreshold) {
             newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2);
             if(metadata.binCapacity!=newSize) {
-                resize(newSize);
+                resize(tx, newSize);
             }
         }
         return result;
     }
     
     synchronized public Value remove(Transaction tx, Key key) throws IOException {
-        // TODO: multiple loads is smelly..
-        load();
+        assertLoaded();
 
         HashBin<Key,Value> bin = getBin(tx, key);
         int originalSize = bin.size();
@@ -320,7 +256,7 @@
         if (metadata.binsActive <= this.decreaseThreshold) {
             newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2);
             if(metadata.binCapacity!=newSize) {
-                resize(newSize);
+                resize(tx, newSize);
             }
         }
         return result;
@@ -328,8 +264,7 @@
     
 
     public synchronized void clear(Transaction tx) throws IOException {
-        // TODO: multiple loads is smelly..
-        load();
+        assertLoaded();
         for (int i = 0; i < metadata.binCapacity; i++) {
             long pageId = metadata.binPageId + i;
             clearBinAtPage(tx, pageId);
@@ -365,109 +300,80 @@
     // Implementation Methods
     // /////////////////////////////////////////////////////////////////
 
+    private void assertLoaded() throws IllegalStateException {
+        if( !loaded.get() ) {
+            throw new IllegalStateException("The HashIndex is not loaded");
+        }
+    }
+
     public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException {
         tx.store(bin.getPage(), hashBinMarshaller, true);
     }
 
+    // While resizing, the following contains the new resize data.
     
-    private void resize(final int newSize) throws IOException {
+    private void resize(Transaction tx, final int newSize) throws IOException {
         LOG.debug("Resizing to: "+newSize);
-        pageFile.tx().execute(new Closure<IOException>(){
-            public void execute(Transaction tx) throws IOException {
-                metadata.state = RESIZING_PHASE1_STATE;
-                metadata.resizeCapacity = newSize;
-                metadata.resizePageId = tx.allocate(metadata.resizeCapacity).getPageId();
-                tx.store(metadata.page, metadataMarshaller, true);
-            }
-        });
-        pageFile.checkpoint();
         
-        resizePhase1();
-        resizePhase2();        
-    }
+        int resizeCapacity = newSize;
+        long resizePageId = tx.allocate(resizeCapacity).getPageId();
 
-    private void resizePhase1() throws IOException {
         // In Phase 1 we copy the data to the new bins..
-        pageFile.tx().execute(new Closure<IOException>(){
-            public void execute(Transaction tx) throws IOException {
-                
-                // Initialize the bins..
-                for (int i = 0; i < metadata.resizeCapacity; i++) {
-                    long pageId = metadata.resizePageId + i;
-                    clearBinAtPage(tx, pageId);
-                }
+        // Initialize the bins..
+        for (int i = 0; i < resizeCapacity; i++) {
+            long pageId = resizePageId + i;
+            clearBinAtPage(tx, pageId);
+        }
 
-                metadata.binsActive = 0;
-                // Copy the data from the old bins to the new bins.
-                for (int i = 0; i < metadata.binCapacity; i++) {
-                    
-                    HashBin<Key,Value> bin = getBin(tx, i);
-                    for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
-                        HashBin<Key,Value> resizeBin = getResizeBin(tx, entry.getKey());
-                        resizeBin.put(entry.getKey(), entry.getValue());
-                        store(tx, resizeBin);
-                        if( resizeBin.size() == 1) {
-                            metadata.binsActive++;
-                        }
-                    }
+        metadata.binsActive = 0;
+        // Copy the data from the old bins to the new bins.
+        for (int i = 0; i < metadata.binCapacity; i++) {
+            
+            HashBin<Key,Value> bin = getBin(tx, i);
+            for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
+                HashBin<Key,Value> resizeBin = getBin(tx, entry.getKey(), resizePageId, resizeCapacity);
+                resizeBin.put(entry.getKey(), entry.getValue());
+                store(tx, resizeBin);
+                if( resizeBin.size() == 1) {
+                    metadata.binsActive++;
                 }
-                
-                // Now we can release the old data.
-                metadata.state = RESIZING_PHASE2_STATE;
-                tx.store(metadata.page, metadataMarshaller, true);
             }
-        });
-        pageFile.checkpoint();
-    }
-
-    private void resizePhase2() throws IOException {
+        }
+        
         // In phase 2 we free the old bins and switch the the new bins.
-        pageFile.tx().execute(new Closure<IOException>(){
-            public void execute(Transaction tx) throws IOException {
-                for (int i = 0; i < metadata.binCapacity; i++) {
-                    long pageId = metadata.binPageId + i;
-                    clearBinAtPage(tx, pageId);
-                }
-                tx.free(metadata.binPageId, metadata.binCapacity);
-                
-                metadata.binCapacity = metadata.resizeCapacity;
-                metadata.binPageId = metadata.resizePageId;
-                metadata.resizeCapacity=0;
-                metadata.resizePageId=0;
-                metadata.state = OPEN_STATE;
-                tx.store(metadata.page, metadataMarshaller, true);
-            }
-        });
-
-        pageFile.checkpoint();
+        tx.free(metadata.binPageId, metadata.binCapacity);
+        
+        metadata.binCapacity = resizeCapacity;
+        metadata.binPageId = resizePageId;
+        metadata.state = OPEN_STATE;
+        tx.store(metadata.page, metadataMarshaller, true);
         calcThresholds();
-        LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);
+
+        LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);        
+        resizeCapacity=0;
+        resizePageId=0;
     }
 
     private void calcThresholds() {
         increaseThreshold = (metadata.binCapacity * loadFactor)/100;
         decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000;
     }
-
-    private HashBin<Key,Value> getResizeBin(Transaction tx, Key key) throws IOException {
-        int i = indexFor(key, metadata.resizeCapacity);
-        return getResizeBin(tx, i);
+    
+    private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
+        return getBin(tx, key, metadata.binPageId, metadata.binCapacity);
     }
 
-    private HashBin<Key,Value> getResizeBin(Transaction tx, int i) throws IOException {
-        Page<HashBin<Key, Value>> page = tx.load(metadata.resizePageId + i, hashBinMarshaller);
-        HashBin<Key, Value> rc = page.get();
-        rc.setPage(page);
-        return rc;
+    private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
+        return getBin(tx, i, metadata.binPageId);
     }
-
-    private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
-        int i = indexFor(key, metadata.binCapacity);
-        return getBin(tx, i);
+    
+    private HashBin<Key,Value> getBin(Transaction tx, Key key, long basePage, int capacity) throws IOException {
+        int i = indexFor(key, capacity);
+        return getBin(tx, i, basePage);
     }
 
-    private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
-        Page<HashBin<Key, Value>> page = tx.load(metadata.binPageId + i, hashBinMarshaller);
+    private HashBin<Key,Value> getBin(Transaction tx, int i, long basePage) throws IOException {
+        Page<HashBin<Key, Value>> page = tx.load(basePage + i, hashBinMarshaller);
         HashBin<Key, Value> rc = page.get();
         rc.setPage(page);
         return rc;

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java Sat Sep  6 11:59:04 2008
@@ -47,14 +47,14 @@
     /**
      * load indexes
      */
-    void load() throws IOException;
+    void load(Transaction tx) throws IOException;
 
     /**
      * unload indexes
      * 
      * @throws IOException
      */
-    void unload() throws IOException;
+    void unload(Transaction tx) throws IOException;
 
     /**
      * clear the index

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Sat Sep  6 11:59:04 2008
@@ -227,7 +227,7 @@
                     }
                     metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
                     metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
-                    metadata.destinations.load();
+                    metadata.destinations.load(tx);
                 }
             });
             
@@ -285,7 +285,7 @@
                 }
             });
 
-            metadata.destinations.unload();
+//            metadata.destinations.unload(tx);
             pageFile.unload();
             metadata = new Metadata();
         }
@@ -611,16 +611,16 @@
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         sd.orderIndex.clear(tx);
         sd.messageIdIndex.clear(tx);
-        sd.orderIndex.unload();
-        sd.messageIdIndex.unload();
+        sd.orderIndex.unload(tx);
+        sd.messageIdIndex.unload(tx);
         tx.free(sd.orderIndex.getPageId());
         tx.free(sd.messageIdIndex.getPageId());
 
         if (sd.subscriptions != null) {
             sd.subscriptions.clear(tx);
             sd.subscriptionAcks.clear(tx);
-            sd.subscriptions.unload();
-            sd.subscriptionAcks.unload();
+            sd.subscriptions.unload(tx);
+            sd.subscriptionAcks.unload(tx);
             tx.free(sd.subscriptions.getPageId());
             tx.free(sd.subscriptionAcks.getPageId());
         }
@@ -842,22 +842,22 @@
         // Configure the marshalers and load.
         rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
         rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
-        rc.orderIndex.load();
+        rc.orderIndex.load(tx);
 
         rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
         rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
-        rc.messageIdIndex.load();
+        rc.messageIdIndex.load(tx);
 
         // If it was a topic...
         if (topic) {
 
             rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
             rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
-            rc.subscriptions.load();
+            rc.subscriptions.load(tx);
 
             rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
             rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
-            rc.subscriptionAcks.load();
+            rc.subscriptionAcks.load(tx);
 
             rc.ackLocations = new TreeMap<Location, HashSet<String>>();
             rc.subscriptionCursors = new HashMap<String, Location>();

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Sat Sep  6 11:59:04 2008
@@ -61,7 +61,8 @@
         createPageFileAndIndex(100);
 
         BTreeIndex index = ((BTreeIndex)this.index);
-        this.index.load();
+        this.index.load(tx);
+        tx.commit();
         
         doInsert(50);
         
@@ -80,7 +81,7 @@
 
         assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
 
-        this.index.unload();
+        this.index.unload(tx);
     }
     
     public void testPruning() throws Exception {
@@ -88,7 +89,8 @@
 
         BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
 
-        this.index.load();
+        this.index.load(tx);
+        tx.commit();
      
         int minLeafDepth = index.getMinLeafDepth(tx);
         int maxLeafDepth = index.getMaxLeafDepth(tx);
@@ -110,19 +112,23 @@
         assertEquals(1, minLeafDepth);
         assertEquals(1, maxLeafDepth);
 
-        this.index.unload();
+        this.index.unload(tx);
+        tx.commit();
     }
 
     public void testIteration() throws Exception {
         createPageFileAndIndex(100);
         BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
-        this.index.load();
+        this.index.load(tx);
+        tx.commit();
           
         // Insert in reverse order..
         doInsertReverse(1000);
         
-        this.index.unload();
-        this.index.load();
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
 
         // BTree should iterate it in sorted order.
         int counter=0;
@@ -133,7 +139,8 @@
             counter++;
         }
 
-        this.index.unload();
+        this.index.unload(tx);
+        tx.commit();
     }
     
     void doInsertReverse(int count) throws Exception {

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java Sat Sep  6 11:59:04 2008
@@ -57,23 +57,27 @@
     }
 
     protected void tearDown() throws Exception {
+        Transaction tx = pf.tx();
         for (Index i : indexes.values()) {
             try {
-                i.unload();
+                i.unload(tx);
             } catch (Throwable ignore) {
             }
         }
+        tx.commit();
     }
 
     abstract protected Index<String, Long> createIndex() throws Exception;
 
     synchronized private Index<String, Long> openIndex(String name) throws Exception {
+        Transaction tx = pf.tx();
         Index<String, Long> index = indexes.get(name);
         if (index == null) {
             index = createIndex();
-            index.load();
+            index.load(tx);
             indexes.put(name, index);
         }
+        tx.commit();
         return index;
     }
 

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java Sat Sep  6 11:59:04 2008
@@ -67,22 +67,29 @@
 
     public void testIndex() throws Exception {
         createPageFileAndIndex(500);
-        
-        this.index.load();
+        this.index.load(tx);
+        tx.commit();
         doInsert(COUNT);
-        this.index.unload();
-        this.index.load();
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
         checkRetrieve(COUNT);
         doRemove(COUNT);
-        this.index.unload();
-        this.index.load();
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
         doInsert(COUNT);
         doRemoveHalf(COUNT);
         doInsertHalf(COUNT);
-        this.index.unload();
-        this.index.load();
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
         checkRetrieve(COUNT);
-        this.index.unload();
+        this.index.unload(tx);
+        tx.commit();
     }
 
     void doInsert(int count) throws Exception {