You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/25 04:36:00 UTC

svn commit: r688600 [1/2] - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/util/ test/java/org/apache/kahadb/page/

Author: chirino
Date: Sun Aug 24 19:35:58 2008
New Revision: 688600

URL: http://svn.apache.org/viewvc?rev=688600&view=rev
Log:
- Added a BTree index based on the xindice implemenation execpt it uses the PageFile abstraction
- Added a  new Index interface which is Generics based.  
- Updates the indexes to support the generic keys and values.
- The BTree implementaiton also needed page chaining/overflow support so I moved the bits in Chunks deeper into the PageFile.  Now when you 
  write data to a Page that is large than the page size, it will automatically chain and overflow to additional pages.  This also provides better caching
  as the unmarshalled page data is cached at the initial page.


Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Index.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexBenchmark.java   (with props)
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexTestSupport.java
Removed:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java?rev=688600&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java Sun Aug 24 19:35:58 2008
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+
+/**
+ * BTreeIndex represents a Variable Magnitude B+Tree in a Page File.
+ * A BTree is a bit flexible in that it can be used for set or
+ * map-based indexing.
+ *
+ * <br>
+ * The Variable Magnitude attribute means that the BTree attempts
+ * to store as many values and pointers on one page as is possible.
+ * 
+ * <br>
+ * The implementation can optionally a be Simple-Prefix B+Tree.
+ * 
+ * <br>
+ * For those who don't know how a Simple-Prefix B+Tree works, the primary
+ * distinction is that instead of promoting actual keys to branch pages,
+ * when leaves are split, a shortest-possible separator is generated at
+ * the pivot.  That separator is what is promoted to the parent branch
+ * (and continuing up the list).  As a result, actual keys and pointers
+ * can only be found at the leaf level.  This also affords the index the
+ * ability to ignore costly merging and redistribution of pages when
+ * deletions occur.  Deletions only affect leaf pages in this
+ * implementation, and so it is entirely possible for a leaf page to be
+ * completely empty after all of its keys have been removed.
+ *
+ * @version $Revision: 541508 $, $Date: 2007-05-24 21:54:12 -0400 (Thu, 24 May 2007) $
+ */
+public class BTreeIndex<Key,Value> implements Index<Key,Value> {
+
+    private static final Log LOG = LogFactory.getLog(BTreeIndex.class);
+
+    /**
+     * BTreeCallback is a callback interface for BTree queries.
+     *
+     * @version $Revision: 541508 $, $Date: 2007-05-24 21:54:12 -0400 (Thu, 24 May 2007) $
+     */
+    static public interface BTreeCallback<Key,Value> {
+
+        /**
+         * indexInfo is a callback method for index enumeration.
+         *
+         * @param value The Value being reported
+         * @param pointer The data pointer being reported
+         * @return false to cancel the enumeration
+         */
+        boolean onEntry(Key key, Value pointer);
+    }   
+    
+    /**
+     * Interface used to determine the simple prefix of two keys.
+     *
+     * @version $Revision: 541508 $, $Date: 2007-05-24 21:54:12 -0400 (Thu, 24 May 2007) $
+     */
+    static public interface Prefixer<Key> {
+        
+        /**
+         * This methods should return shortest prefix of value2 where the following still holds:<br/>
+         * value1 <= prefix <= value2.<br/><br/>
+         * 
+         * When this method is called, the following is guaranteed:<br/>
+         * value1 < value2<br/><br/>
+         * 
+         * 
+         * @param value1
+         * @param value2
+         * @return
+         */
+        public Key getSimplePrefix(Key value1, Key value2);
+    }
+    
+    /**
+     * StringPrefixer is a Prefixer implementation that works on strings.
+     */
+    static public class StringPrefixer implements Prefixer<String> {
+        
+        /**
+         * Example:
+         * If value1 is "Hello World"
+         * and value 2 is "Help Me"
+         * then the result will be: "Help"
+         * 
+         * @see  Prefixer#getSimplePrefix
+         */
+        public String getSimplePrefix(String value1, String value2) {
+            char[] c1 = value1.toCharArray();
+            char[] c2 = value2.toCharArray();
+            int n = Math.min(c1.length, c2.length);
+            int i =0;
+            while (i < n) {
+                if (c1[i] != c2[i]) {
+                    return value2.substring(0,i+1);
+                }
+                i++;
+            }
+            
+            if( n == c2.length ) {
+                return value2;
+            }
+            return value2.substring(0,n);
+        }
+    }    
+
+    private final PageFile pageFile;
+    private final long rootPageId;
+    private AtomicBoolean loaded = new AtomicBoolean();
+    
+    private final BTreeNode.Marshaller<Key, Value> marshaller = new BTreeNode.Marshaller<Key, Value>(this);
+    private Marshaller<Key> keyMarshaller;
+    private Marshaller<Value> valueMarshaller;
+    private Prefixer<Key> prefixer;
+
+    private BTreeNode<Key,Value> root;
+
+    public BTreeIndex(PageFile pageFile, long rootPageId) {
+        super();
+        this.pageFile = pageFile;
+        this.rootPageId = rootPageId;
+    }
+
+    public void load() throws IOException {
+        if (loaded.compareAndSet(false, true)) {
+            LOG.debug("loading");
+            if( keyMarshaller == null ) {
+                throw new IllegalArgumentException("The keyMarshaller must be set before loading the BTreeIndex");
+            }
+            if( valueMarshaller == null ) {
+                throw new IllegalArgumentException("The valueMarshaller must be set before loading the BTreeIndex");
+            }
+            
+            Transaction tx = pageFile.tx();
+            final Page<BTreeNode<Key,Value>> p = tx.load(rootPageId, null);
+            if( p.getType() == Page.PAGE_FREE_TYPE ) {
+                 // Need to initialize it..
+                tx.execute(new Transaction.Closure<IOException>(){
+                    public void execute(Transaction tx) throws IOException {
+                        root = createNode(p, null);
+                        storeNode(tx, root, true);
+                    }
+                });
+                pageFile.checkpoint();
+            } else {
+                root = loadNode(tx, rootPageId, null);    
+            }
+        }
+    }
+    
+    public void unload() {
+        if (loaded.compareAndSet(true, false)) {
+            root=null;
+        }    
+    }
+    
+    synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
+        assertLoaded();
+        return root.contains(tx, key);
+    }
+
+    synchronized public Value get(Transaction tx, Key key) throws IOException {
+        assertLoaded();
+        return root.get(tx, key);
+    }
+
+    synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
+        assertLoaded();
+        return root.put(tx, key, value);
+    }
+
+    synchronized public Value remove(Transaction tx, Key key) throws IOException {
+        assertLoaded();
+        return root.remove(tx, key);
+    }
+    
+    public boolean isTransient() {
+        return false;
+    }
+
+    public void clear(Transaction tx) throws IOException {
+        throw new RuntimeException("Not implemented...");
+    }
+
+
+    public int size(Transaction tx) {
+        throw new RuntimeException("Not implemented...");
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal implementation methods
+    ///////////////////////////////////////////////////////////////////
+    
+    private void assertLoaded() throws IllegalStateException {
+        if( !loaded.get() ) {
+            throw new IllegalStateException("The BTreeIndex is not loaded");
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal methods made accessible to BTreeNode
+    ///////////////////////////////////////////////////////////////////
+
+    BTreeNode<Key,Value> loadNode(Transaction tx, long pageId, BTreeNode<Key,Value> parent) throws IOException {
+        Page<BTreeNode<Key,Value>> page = tx.load(pageId, marshaller);
+        BTreeNode<Key, Value> node = page.get();
+        node.setPage(page);
+        node.setParent(parent);
+        return node;
+    }
+
+    BTreeNode<Key,Value> createNode(Transaction tx, BTreeNode<Key,Value> parent) throws IOException {
+        Page<BTreeNode<Key,Value>> p = tx.allocate();
+        BTreeNode<Key,Value> node = new BTreeNode<Key,Value>(this);
+        node.setPage(p);
+        node.setParent(parent);
+        node.setEmpty();
+        p.set(node);
+        return node;
+    }
+
+    BTreeNode<Key,Value> createNode(Page<BTreeNode<Key,Value>> p, BTreeNode<Key,Value> parent) throws IOException {
+        BTreeNode<Key,Value> node = new BTreeNode<Key,Value>(this);
+        node.setPage(p);
+        node.setParent(parent);
+        node.setEmpty();
+        p.set(node);
+        return node;
+    }
+    
+    void storeNode(Transaction tx, BTreeNode<Key,Value> node, boolean overflow) throws IOException {
+        tx.store(node.getPage(), marshaller, overflow);
+    }
+    
+    ///////////////////////////////////////////////////////////////////
+    // Property Accessors
+    ///////////////////////////////////////////////////////////////////
+
+    public PageFile getPageFile() {
+        return pageFile;
+    }
+    public long getRootPageId() {
+        return rootPageId;
+    }
+
+    public Marshaller<Key> getKeyMarshaller() {
+        return keyMarshaller;
+    }
+    public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
+        this.keyMarshaller = keyMarshaller;
+    }
+
+    public Marshaller<Value> getValueMarshaller() {
+        return valueMarshaller;
+    }
+    public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
+        this.valueMarshaller = valueMarshaller;
+    }
+
+    public Prefixer<Key> getPrefixer() {
+        return prefixer;
+    }
+    public void setPrefixer(Prefixer<Key> prefixer) {
+        this.prefixer = prefixer;
+    }
+
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java?rev=688600&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java Sun Aug 24 19:35:58 2008
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.kahadb.page.BTreeIndex.Prefixer;
+
+
+/**
+ * The BTreeNode class represents a node in the BTree object graph.  It is stored in 
+ * one Page of a PageFile.
+ */
+public final class BTreeNode<Key,Value> {
+    
+    // The index that this node is part of.
+    private final BTreeIndex<Key,Value> index;
+    // The parent node or null if this is the root node of the BTree
+    private BTreeNode<Key,Value> parent;
+    // The page associated with this node
+    private Page<BTreeNode<Key,Value>> page;
+    
+    // Order list of keys in the node
+    private Key[] keys;
+    // Values associated with the Keys. Null if this is a branch node.
+    private Value[] values;
+    // nodeId pointers to children BTreeNodes. Null if this is a leaf node.
+    private long[] children;
+    
+    /**
+     * The Marshaller is used to store and load the data in the BTreeNode into a Page.
+     *  
+     * @param <Key>
+     * @param <Value>
+     */
+    static public class Marshaller<Key,Value> implements org.apache.kahadb.Marshaller<BTreeNode<Key,Value>> {
+        private final BTreeIndex<Key,Value> index;
+        
+        public Marshaller(BTreeIndex<Key,Value> index) {
+            this.index = index;
+        }
+
+        public Class<BTreeNode<Key,Value>> getType() {
+            return null;
+        }
+        
+        public void writePayload(BTreeNode<Key,Value> node, DataOutput os) throws IOException {
+            // Write the keys
+            short count = (short)node.keys.length; // cast may truncate value...
+            if( count != node.keys.length ) {
+                throw new IOException("Too many keys");
+            }
+            
+            os.writeShort(count);
+            for (int i = 0; i < node.keys.length; i++) {
+                index.getKeyMarshaller().writePayload(node.keys[i], os);
+            }
+            
+            if( node.isBranch() ) {
+                // If this is a branch...
+                os.writeBoolean(true);
+                for (int i = 0; i < count+1; i++) {
+                    os.writeLong(node.children[i]);
+                }
+                
+            } else {
+                // If this is a leaf
+                os.writeBoolean(false);
+                for (int i = 0; i < count; i++) {
+                    index.getValueMarshaller().writePayload(node.values[i], os);
+                }
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        public BTreeNode<Key,Value> readPayload(DataInput is) throws IOException {
+            BTreeNode<Key,Value>  node = new BTreeNode<Key,Value>(index);
+            int count = is.readShort();
+            
+            node.keys = (Key[])new Object[count];
+            for (int i = 0; i < count; i++) {
+                node.keys[i] = index.getKeyMarshaller().readPayload(is);
+            }
+            
+            if( is.readBoolean() ) {
+                node.children = new long[count+1];
+                for (int i = 0; i < count; i++) {
+                    node.children[i] = is.readLong();
+                }
+            } else {
+                node.values = (Value[])new Object[count];
+                for (int i = 0; i < count; i++) {
+                    node.values[i] = index.getValueMarshaller().readPayload(is);
+                }
+            }
+            return node;
+        }
+    }
+
+    public BTreeNode(BTreeIndex<Key,Value> index) {
+        this.index = index;
+    }
+    
+    public void setEmpty() {
+        setLeafData(createKeyArray(0), createValueArray(0));
+    }
+    
+
+    /**
+     * Internal (to the BTreeNode) method. Because this method is called only by
+     * BTreeNode itself, no synchronization done inside of this method.
+     * @throws IOException 
+     */
+    private BTreeNode<Key,Value> getChild(Transaction tx, int idx) throws IOException {
+        if (isBranch() && idx >= 0 && idx < children.length) {
+            return this.index.loadNode(tx, children[idx], this);
+        } else {
+            return null;
+        }
+    }
+
+    public synchronized Value remove(Transaction tx, Key key) throws IOException {
+        int idx = Arrays.binarySearch(keys, key);
+
+        if(isBranch()) {
+            idx = idx < 0 ? -(idx + 1) : idx + 1;
+            return getChild(tx, idx).remove(tx, key);
+        } else {
+            if (idx < 0) {
+                return null;
+            } else {
+                Value oldValue = values[idx];
+                setLeafData(arrayDelete(keys, idx), arrayDelete(values, idx));
+                index.storeNode(tx, this, true);
+                return oldValue;
+            }
+        }
+    }
+
+    public synchronized Value put(Transaction tx, Key key, Value value) throws IOException {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+
+        int idx = Arrays.binarySearch(keys, key);
+
+        if( isBranch() ) {
+            idx = idx < 0 ? -(idx + 1) : idx + 1;
+            return getChild(tx, idx).put(tx, key, value);
+        } else {
+            
+            Value oldValue=null;
+            if (idx >= 0) {
+                // Key was found... Overwrite
+                oldValue = values[idx];
+                values[idx] = value;
+                setLeafData(keys, values);
+            } else {
+                // Key was not found, Insert it
+                idx = -(idx + 1);
+                setLeafData(arrayInsert(keys, key, idx), arrayInsert(values, value, idx));
+            }
+            
+            try {
+                index.storeNode(tx, this, allowOverflow());
+            } catch ( Transaction.PageOverflowIOException e ) {
+                // If we get an overflow 
+                split(tx);
+            }
+            
+            return oldValue;
+        }
+    }
+
+    private synchronized void promoteValue(Transaction tx, Key key, long nodeId) throws IOException {
+
+        int idx = Arrays.binarySearch(keys, key);
+        idx = idx < 0 ? -(idx + 1) : idx + 1;
+        setBranchData(arrayInsert(keys, key, idx), arrayInsert(children, nodeId, idx + 1));
+
+        try {
+            index.storeNode(tx, this, allowOverflow());
+        } catch ( Transaction.PageOverflowIOException e ) {
+            split(tx);
+        }
+
+    }
+
+    /**
+     * Internal to the BTreeNode method
+     */
+    private void split(Transaction tx) throws IOException {
+        Key[] leftKeys;
+        Key[] rightKeys;
+        Value[] leftValues=null;
+        Value[] rightValues=null;
+        long[] leftNodeIds=null;
+        long[] rightNodeIds=null;
+        Key separator;
+
+        int vc = keys.length;
+        int pivot = vc / 2;
+
+        // Split the node into two nodes
+        if( isBranch() ) {
+
+            leftKeys = createKeyArray(pivot);
+            leftNodeIds = new long[leftKeys.length + 1];
+            rightKeys = createKeyArray(vc - (pivot + 1));
+            rightNodeIds = new long[rightKeys.length + 1];
+
+            System.arraycopy(keys, 0, leftKeys, 0, leftKeys.length);
+            System.arraycopy(children, 0, leftNodeIds, 0, leftNodeIds.length);
+            System.arraycopy(keys, leftKeys.length + 1, rightKeys, 0, rightKeys.length);
+            System.arraycopy(children, leftNodeIds.length, rightNodeIds, 0, rightNodeIds.length);
+
+            // Is it a Simple Prefix BTree??
+            Prefixer<Key> prefixer = index.getPrefixer();
+            if(prefixer!=null) {
+                separator = prefixer.getSimplePrefix(leftKeys[leftKeys.length - 1], rightKeys[0]);
+            } else {
+                separator = keys[leftKeys.length];
+            }
+                
+            
+        } else {
+
+            leftKeys = createKeyArray(pivot);
+            leftValues = createValueArray(leftKeys.length);
+            rightKeys = createKeyArray(vc - pivot);
+            rightValues = createValueArray(rightKeys.length);
+
+            System.arraycopy(keys, 0, leftKeys, 0, leftKeys.length);
+            System.arraycopy(values, 0, leftValues, 0, leftValues.length);
+            System.arraycopy(keys, leftKeys.length, rightKeys, 0, rightKeys.length);
+            System.arraycopy(values, leftValues.length, rightValues, 0, rightValues.length);
+
+            // separator = getSeparator(leftVals[leftVals.length - 1],
+            // rightVals[0]);
+            separator = rightKeys[0];
+
+        }
+
+        // Promote the pivot to the parent branch
+        if (parent == null) {
+            
+            // This can only happen if this is the root
+            BTreeNode<Key,Value> rNode = this.index.createNode(tx, this);
+            BTreeNode<Key,Value> lNode = this.index.createNode(tx, this);
+
+            if( isBranch() ) {
+                rNode.setBranchData(rightKeys, rightNodeIds);
+                lNode.setBranchData(leftKeys, leftNodeIds);
+            } else {
+                rNode.setLeafData(rightKeys, rightValues);
+                lNode.setLeafData(leftKeys, leftValues);
+            }
+
+            Key[] v = createKeyArray(1);
+            v[0]=separator;
+            setBranchData(v, new long[] { lNode.getPageId(), rNode.getPageId() });
+
+            index.storeNode(tx, this, true);
+            index.storeNode(tx, rNode, true);
+            index.storeNode(tx, lNode, true);
+            
+        } else {
+            BTreeNode<Key,Value> rNode = this.index.createNode(tx, parent);
+
+            if( isBranch() ) {
+                setBranchData(leftKeys, leftNodeIds);
+                rNode.setBranchData(rightKeys, rightNodeIds);
+            } else {
+                setLeafData(leftKeys, leftValues);
+                rNode.setLeafData(rightKeys, rightValues);
+            }
+
+            index.storeNode(tx, this, true);
+            index.storeNode(tx, rNode, true);
+            parent.promoteValue(tx, separator, rNode.getPageId());
+        }
+    }
+
+    // ///////////////////////////////////////////////////////////////
+
+    public synchronized Value get(Transaction tx, Key key) throws IOException {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+
+        int idx = Arrays.binarySearch(keys, key);
+
+        if( isBranch() ) {
+            idx = idx < 0 ? -(idx + 1) : idx + 1;
+            return getChild(tx, idx).get(tx, key);
+        } else {
+            if (idx < 0) {
+                return null;
+            } else {
+                return values[idx];
+            }
+        }
+    }
+
+    public synchronized boolean contains(Transaction tx, Key key) throws IOException {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+
+        int idx = Arrays.binarySearch(keys, key);
+        if( isBranch() ) {
+            idx = idx < 0 ? -(idx + 1) : idx + 1;
+            return getChild(tx, idx).contains(tx, key);
+        } else {
+            if (idx < 0) {
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+
+    void iterate(Transaction tx, BTreeIndex.BTreeCallback<Key,Value> callback) throws IOException {
+        if(isBranch()) {
+            for (int i = 0; i < children.length; i++) {
+                getChild(tx, i).iterate(tx, callback);
+            }
+        } else {
+            for (int i = 0; i < keys.length; i++) {
+                callback.onEntry(keys[i], values[i]);
+            }
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Implementation methods
+    ///////////////////////////////////////////////////////////////////
+    
+    private boolean allowOverflow() {
+        // Only allow page overflow if there are <= 3 keys in the node.  Otherwise a split will occur on overflow
+        return this.keys.length<=3;
+    }
+
+
+    private void setLeafData(Key[] keys, Value[] values) {
+        this.keys = keys;
+        this.values = values;
+        this.children = null;
+    }
+    
+    private void setBranchData(Key[] keys, long[] nodeIds) {
+        this.keys = keys;
+        this.children = nodeIds;
+        this.values = null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Key[] createKeyArray(int size) {
+        return (Key[])new Object[size];
+    }
+
+    @SuppressWarnings("unchecked")
+    private Value[] createValueArray(int size) {
+        return (Value[])new Object[size];
+    }
+    
+    @SuppressWarnings("unchecked")
+    static private <T> T[] arrayDelete(T[] vals, int idx) {
+        T[] newVals = (T[])new Object[vals.length - 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        if (idx < newVals.length) {
+            System.arraycopy(vals, idx + 1, newVals, idx, newVals.length - idx);
+        }
+        return newVals;
+    }
+    
+//    static private long[] arrayDelete(long[] vals, int idx) {
+//        long[] newVals = new long[vals.length - 1];
+//        if (idx > 0) {
+//            System.arraycopy(vals, 0, newVals, 0, idx);
+//        }
+//        if (idx < newVals.length) {
+//            System.arraycopy(vals, idx + 1, newVals, idx, newVals.length - idx);
+//        }
+//        return newVals;
+//    }
+
+    @SuppressWarnings("unchecked")
+    static private <T> T[] arrayInsert(T[] vals, T val, int idx) {
+        T[] newVals = (T[])new Object[vals.length + 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        newVals[idx] = val;
+        if (idx < vals.length) {
+            System.arraycopy(vals, idx, newVals, idx + 1, vals.length - idx);
+        }
+        return newVals;
+    }
+
+
+    static private long[] arrayInsert(long[] vals, long val, int idx) {
+        
+        long[] newVals = new long[vals.length + 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        newVals[idx] = val;
+        if (idx < vals.length) {
+            System.arraycopy(vals, idx, newVals, idx + 1, vals.length - idx);
+        }
+        return newVals;
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Property Accessors
+    ///////////////////////////////////////////////////////////////////
+    private boolean isBranch() {
+        return children!=null;
+    }
+
+    public long getPageId() {
+        return page.getPageId();
+    }
+
+    public BTreeNode<Key, Value> getParent() {
+        return parent;
+    }
+
+    public void setParent(BTreeNode<Key, Value> parent) {
+        this.parent = parent;
+    }
+
+    public Page<BTreeNode<Key, Value>> getPage() {
+        return page;
+    }
+
+    public void setPage(Page<BTreeNode<Key, Value>> page) {
+        this.page = page;
+    }
+}
+
+

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java Sun Aug 24 19:35:58 2008
@@ -16,139 +16,89 @@
  */
 package org.apache.kahadb.page;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.apache.kahadb.page.Chunk.PageInputStream;
-import org.apache.kahadb.page.Chunk.PageOutputStream;
-
 /**
  * Bin in a HashIndex
  * 
  * @version $Revision: 1.1.1.1 $
  */
-class HashBin {
+class HashBin<Key, Value> {
     
-    private final HashIndex index;
-    private final long pageId;
-    private TreeMap<Comparable, Long> data;
-    private int pageCount;
-    
-    /**
-     * Constructor
-     * 
-     * @param hashIndex
-     * @param pageId
-     * @param maximumEntries
-     * @throws IOException 
-     */
-    HashBin(HashIndex hashIndex, long pageId) throws IOException {
-        this.index = hashIndex;
-        this.pageId = pageId;
-    }
+    
+    static public class Marshaller<Key, Value> implements org.apache.kahadb.Marshaller<HashBin<Key, Value>> {
+        private final HashIndex<Key, Value> hashIndex;
 
-    private void load(Transaction tx) throws IOException {
+        public Marshaller(HashIndex<Key, Value> index) {
+            this.hashIndex = index;
+        }
         
-        data = new TreeMap<Comparable, Long>();
+        public Class<HashBin<Key, Value>> getType() {
+            return null;
+        }
 
-        // Using page page streams to store the data makes it easy to marshall the HashBin data,
-        // but it does not give us very good page based caching. As even if the pages are cached,
-        // we will still need to de-marshall from the stream.
-        
-        // I think it will be better to make the bin a btree root.  
-        PageInputStream pis = new PageInputStream(tx, pageId);        
-        DataInputStream is = new DataInputStream(pis);
-        try {
-            
+        public HashBin<Key, Value> readPayload(DataInput is) throws IOException {
+            HashBin<Key, Value> bin = new HashBin<Key, Value>();
             int size = is.readInt();
             for(int i=0; i < size; i++) {
-                Comparable key = (Comparable)index.getKeyMarshaller().readPayload(is);
-                long value = is.readLong();
-                data.put(key, value);
+                Key key = hashIndex.getKeyMarshaller().readPayload(is);
+                Value value = hashIndex.getValueMarshaller().readPayload(is);
+                bin.data.put(key, value);
             }
-            is.close();
-            pageCount = pis.getPageCount();
-        } catch (IOException e) {
-            throw e;
+            return bin;
         }
-    }
-    
-    public void store(Transaction tx) throws IOException {
-        PageOutputStream pos = new PageOutputStream(tx, pageId);
-        DataOutputStream os = new DataOutputStream(pos);
-        if( data == null ) {
-            os.writeInt(0);
-        } else {
-            os.writeInt(data.size());
-            for (Map.Entry<Comparable, Long> entry : data.entrySet()) {
-                index.getKeyMarshaller().writePayload(entry.getKey(), os);
-                os.writeLong(entry.getValue());
+
+        public void writePayload(HashBin<Key, Value> bin, DataOutput os) throws IOException {
+            os.writeInt(bin.data.size());
+            for (Map.Entry<Key, Value> entry : bin.data.entrySet()) {
+                hashIndex.getKeyMarshaller().writePayload(entry.getKey(), os);
+                hashIndex.getValueMarshaller().writePayload(entry.getValue(), os);
             }
         }
-        os.close();
-        pageCount = pos.getPageCount();
+        
     }
-
-    public int size(Transaction tx) throws IOException {
-        if( data!=null ) {
-            return data.size();
-        } else {
-            
-            // Peek at the page to see how many items it contains.
-            PageInputStream pis = new PageInputStream(tx, pageId);
-            DataInputStream is = new DataInputStream(pis);
-            int size = is.readInt();
-            is.close();
-            
-            return size;
-        }
+    
+    private Page<HashBin<Key, Value>> page;
+    private TreeMap<Key, Value> data = new TreeMap<Key, Value>();
+    
+    public int size() {
+        return data.size();
     }
 
-    public Long put(Transaction tx, Comparable key, Long value) throws IOException {
-        if( data==null ) {
-            load(tx);
-        }
-        Long rc = data.put(key, value);
-        if( !value.equals(rc) ) {
-            store(tx);
-        }
-        return rc;
+    public Value put(Key key, Value value) throws IOException {
+        return data.put(key, value);
     }
 
-    public Long find(Transaction tx, Comparable key) throws IOException {
-        if( data==null ) {
-            load(tx);
-        }
+    public Value get(Key key) throws IOException {
         return data.get(key);
     }
     
-    public Map<Comparable, Long> getAll(Transaction tx) throws IOException {
-        if( data==null ) {
-            load(tx);
-        }
+    public boolean containsKey(Key key) throws IOException {
+        return data.containsKey(key);
+    }
+    
+    public Map<Key, Value> getAll(Transaction tx) throws IOException {
         return data;
     }
     
-    public Long remove(Transaction tx, Comparable key) throws IOException {
-        if( data==null ) {
-            load(tx);
-        }
-        Long rc = data.remove(key);
-        if( rc!=null ) {
-            store(tx);
-        }
-        return rc;
+    public Value remove(Key key) throws IOException {
+        return data.remove(key);
     }
 
-    public long getPageId() {
-        return pageId;
+    public Page<HashBin<Key, Value>> getPage() {
+        return page;
     }
 
-    public int getPageCount() {
-        return pageCount;
+    public void setPage(Page<HashBin<Key, Value>> page) {
+        this.page = page;
+        this.page.set(this);
     }
 
+
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java Sun Aug 24 19:35:58 2008
@@ -16,8 +16,11 @@
  */
 package org.apache.kahadb.page;
 
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
+import java.io.Externalizable;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,11 +28,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.Marshaller;
-import org.apache.kahadb.StoreEntry;
-import org.apache.kahadb.impl.index.Index;
 import org.apache.kahadb.impl.index.IndexManager;
-import org.apache.kahadb.page.Chunk.PageInputStream;
-import org.apache.kahadb.page.Chunk.PageOutputStream;
 import org.apache.kahadb.page.PageFile.PageFileTransaction;
 import org.apache.kahadb.page.Transaction.Closure;
 import org.apache.kahadb.page.Transaction.CallableClosure;
@@ -39,7 +38,7 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class HashIndex implements Index {
+public class HashIndex<Key,Value> implements Index<Key,Value> {
 
     public static final int CLOSED_STATE = 1;
     public static final int OPEN_STATE = 2;
@@ -62,30 +61,17 @@
         DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75"));
     }
 
-    private IndexManager indexManager;
-
-    private Marshaller keyMarshaller;
     private AtomicBoolean loaded = new AtomicBoolean();
 
-    private int size;
 
     private int increaseThreshold;
     private int decreaseThreshold;
 
     // Where the bin page array starts at.
-    private long binPageId;
-    private int binCapacity = DEFAULT_BIN_CAPACITY;
-    private int binsActive;
     private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
     private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;
 
-    // While resizing, the following contains the new resize data.
-    private int resizeCapacity;
-    private long resizePageId;
-
-    // When the index is initializing or resizing.. state changes so that
-    // on failure it can be properly recovered.
-    private int state;
+
 
     // Once binsActive/binCapacity reaches the loadFactor, then we need to
     // increase the capacity
@@ -95,6 +81,66 @@
     // This page holds the index metadata.
     private long pageId;
 
+    static class Metadata {
+        
+        private Page<Metadata> page;
+        
+        // When the index is initializing or resizing.. state changes so that
+        // on failure it can be properly recovered.
+        private int state;
+        private long binPageId;
+        private int binCapacity = DEFAULT_BIN_CAPACITY;
+        private int binsActive;
+        private int size;
+        // While resizing, the following contains the new resize data.
+        private int resizeCapacity;
+        private long resizePageId;
+
+        
+        public void read(DataInput is) throws IOException {
+            state = is.readInt();
+            binPageId = is.readLong();
+            binCapacity = is.readInt();
+            size = is.readInt();
+            binsActive = is.readInt();
+            resizePageId = is.readLong();
+            resizeCapacity = is.readInt();
+        }
+        public void write(DataOutput os) throws IOException {
+            os.writeInt(state);
+            os.writeLong(binPageId);
+            os.writeInt(binCapacity);
+            os.writeInt(size);
+            os.writeInt(binsActive);
+            os.writeLong(resizePageId);
+            os.writeInt(resizeCapacity);
+        }
+        
+        static class Marshaller implements org.apache.kahadb.Marshaller<Metadata> {
+            public Class<Metadata> getType() {
+                return Metadata.class;
+            }
+
+            public Metadata readPayload(DataInput dataIn) throws IOException {
+                Metadata rc = new Metadata();
+                rc.read(dataIn);
+                return rc;
+            }
+
+            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
+                object.write(dataOut);
+            }
+        }
+    }
+    
+    private Metadata metadata = new Metadata();
+    
+    private Metadata.Marshaller metadataMarshaller = new Metadata.Marshaller();
+    private HashBin.Marshaller<Key,Value> hashBinMarshaller = new HashBin.Marshaller<Key,Value>(this);
+    private Marshaller<Key> keyMarshaller;
+    private Marshaller<Value> valueMarshaller;
+
+    
     /**
      * Constructor
      * 
@@ -104,60 +150,58 @@
      * @param numberOfBins
      * @throws IOException
      */
-    public HashIndex(IndexManager indexManager, PageFile pageFile, long pageId) throws IOException {
+    public HashIndex(PageFile pageFile, long pageId) throws IOException {
         this.pageFile = pageFile;
-        this.indexManager = indexManager;
         this.pageId = pageId;
     }
 
     public synchronized void load() {
-        PageFileTransaction tx = pageFile.tx();
+        Transaction tx = pageFile.tx();
         try {
             
             if (loaded.compareAndSet(false, true)) {
                 try {
-                    Page page = tx.load(pageId, null);
-    
+                    final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
                     // Is this a brand new index?
-                    if (page.getType() == Page.FREE_TYPE) {
-    
+                    if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
                         // We need to create the pages for the bins
                         tx.execute(new Transaction.Closure<IOException>(){
                             public void execute(Transaction tx) throws IOException {
-                                Page binPage = tx.allocate(binCapacity);
-                                binPageId = binPage.getPageId();
-                                state = INITIALIZING_STATE;
-                                storeMetadata(tx);
+                                Page binPage = tx.allocate(metadata.binCapacity);
+                                metadata.binPageId = binPage.getPageId();
+                                metadata.state = INITIALIZING_STATE;
+                                metadata.page = metadataPage;
+                                metadataPage.set(metadata);
+                                tx.store(metadataPage, metadataMarshaller, true);
                             }
                         });
                         pageFile.checkpoint();
     
                         // If failure happens now we can continue initializing the
                         // the hash bins...
-    
                     } else {
-                        // Lets load it's data
-                        loadMetadata(tx);
-    
+
+                        metadata = metadataPage.get();
+                        metadata.page = metadataPage;
+                        
                         // If we did not have a clean shutdown...
-                        if (state == OPEN_STATE || state == RESIZING_PHASE1_STATE) {
+                        if (metadata.state == OPEN_STATE || metadata.state == RESIZING_PHASE1_STATE) {
                             // Figure out the size and the # of bins that are
                             // active. Yeah This loads the first page of every bin. :(
                             // We might want to put this in the metadata page, but
                             // then that page would be getting updated on every write.
-                            size = 0;
-                            for (int i = 0; i < binCapacity; i++) {
-                                HashBin hashBin = new HashBin(this, binPageId + i);
-                                int t = hashBin.size(tx);
+                            metadata.size = 0;
+                            for (int i = 0; i < metadata.binCapacity; i++) {
+                                int t = sizeOfBin(metadata.binPageId);
                                 if (t > 0) {
-                                    binsActive++;
+                                    metadata.binsActive++;
                                 }
-                                size += t;
+                                metadata.size += t;
                             }
                         }
                     }
     
-                    if (state == INITIALIZING_STATE) {
+                    if (metadata.state == INITIALIZING_STATE) {
                         // TODO:
                         // If a failure occurs mid way through us initializing the
                         // bins.. will the page file still think we have the rest
@@ -165,36 +209,31 @@
     
                         tx.execute(new Closure<IOException>(){
                             public void execute(Transaction tx) throws IOException {
-                                for (int i = 0; i < binCapacity; i++) {
-                                    HashBin hashBin = new HashBin(HashIndex.this, binPageId + i);
-                                    hashBin.store(tx);
-                                }
+                                clear(tx);
                             }
                         });
-                        size = 0;
-                        binsActive = 0;
                     }
                     
-                    if (state == RESIZING_PHASE1_STATE) {
+                    if (metadata.state == RESIZING_PHASE1_STATE) {
                         // continue resize phase 1
                         resizePhase1();
                     }                
-                    if (state == RESIZING_PHASE2_STATE) {
+                    if (metadata.state == RESIZING_PHASE2_STATE) {
                         // continue resize phase 1
                         resizePhase2();
                     }                
     
                     calcThresholds();
     
-                    state = OPEN_STATE;
+                    metadata.state = OPEN_STATE;
                     tx.execute(new Closure<IOException>(){
                         public void execute(Transaction tx) throws IOException {
-                            storeMetadata(tx);
+                            tx.store(metadataPage, metadataMarshaller, true);
                         }
                     });
                     pageFile.checkpoint();
                     
-                    LOG.debug("HashIndex loaded. Using "+binCapacity+" bins starting at page "+binPageId);
+                    LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
     
                 } catch (IOException e) {
                     throw new RuntimeException(e);
@@ -207,116 +246,110 @@
         }
     }
 
+    private int sizeOfBin(long binPageId) {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
     public synchronized void unload() throws IOException {
         if (loaded.compareAndSet(true, false)) {
-            state = CLOSED_STATE;
+            metadata.state = CLOSED_STATE;
             pageFile.tx().execute(new Closure<IOException>(){
                 public void execute(Transaction tx) throws IOException {
-                    storeMetadata(tx);
+                    tx.store(metadata.page, metadataMarshaller, true);
                 }
             });
         }
     }
 
-    public synchronized StoreEntry get(final Object key) throws IOException {
-        return pageFile.tx().execute(new CallableClosure<StoreEntry,IOException>(){
-            public StoreEntry execute(Transaction tx) throws IOException {
-                return get(tx, key);
-            }
-        });
+    public synchronized Value get(Transaction tx, Key key) throws IOException {
+        load();
+        return getBin(tx, key).get(key);
     }
-
-    public synchronized StoreEntry get(Transaction tx, Object key) throws IOException {
+    
+    public synchronized boolean containsKey(Transaction tx, Key key) throws IOException {
         // TODO: multiple loads is smelly..
         load();
-        Long result = getBin(key).find(tx, (Comparable)key);
-        return result != null ? indexManager.getIndex(result) : null;
+        return getBin(tx, key).containsKey(key);
     }
 
-    public synchronized void store(final Object key, final StoreEntry value) throws IOException {
-        pageFile.tx().execute(new Closure<IOException>(){
-            public void execute(Transaction tx) throws IOException {
-                store(tx, key, value);
-            }
-        });
-    }
-
-    public synchronized void store(Transaction tx, Object key, StoreEntry value) throws IOException {
+    synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
         // TODO: multiple loads is smelly..
         load();
-        HashBin bin = getBin(key);
-        if (bin.put(tx, (Comparable)key, value.getOffset()) == null) {
-            this.size++;
-            if (bin.size(tx) == 1) {
-                binsActive++;
+        HashBin<Key,Value> bin = getBin(tx, key);
+
+        int originalSize = bin.size();
+        Value result = bin.put(key,value);
+        store(tx, bin);
+
+        int newSize = bin.size();
+
+        if (newSize != originalSize) {
+            metadata.size++;
+            if (newSize == 1) {
+                metadata.binsActive++;
             }
         }
-        if (this.binsActive >= this.increaseThreshold) {
-            int newSize = Math.min(maximumBinCapacity, binCapacity*2);
-            if(binCapacity!=newSize) {
+
+        if (metadata.binsActive >= this.increaseThreshold) {
+            newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2);
+            if(metadata.binCapacity!=newSize) {
                 resize(newSize);
             }
         }
-    }
-
-    public synchronized StoreEntry remove(final Object key) throws IOException {
-        return pageFile.tx().execute(new CallableClosure<StoreEntry,IOException>(){
-            public StoreEntry execute(Transaction tx) throws IOException {
-                return remove(tx, key);
-            }
-        });
+        return result;
     }
     
-    public synchronized StoreEntry remove(Transaction tx, Object key) throws IOException {
+    synchronized public Value remove(Transaction tx, Key key) throws IOException {
         // TODO: multiple loads is smelly..
         load();
-        StoreEntry result = null;
 
-        HashBin bin = getBin(key);
-        Long offset = bin.remove(tx, (Comparable)key);
-        if (offset != null) {
-            this.size--;
-            if (bin.size(tx) == 0) {
-                binsActive--;
+        HashBin<Key,Value> bin = getBin(tx, key);
+        int originalSize = bin.size();
+        Value result = bin.remove(key);
+        int newSize = bin.size();
+        
+        if (newSize != originalSize) {
+            store(tx, bin);
+
+            metadata.size--;
+            if (newSize == 0) {
+                metadata.binsActive--;
             }
-            result = this.indexManager.getIndex(offset);
         }
 
-        if (this.binsActive <= this.decreaseThreshold) {
-            int newSize = Math.max(minimumBinCapacity, binCapacity/2);
-            if(binCapacity!=newSize) {
+        if (metadata.binsActive <= this.decreaseThreshold) {
+            newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2);
+            if(metadata.binCapacity!=newSize) {
                 resize(newSize);
-         
             }
         }
         return result;
     }
-
-    public synchronized boolean containsKey(Object key) throws IOException {
-        return get(key) != null;
-    }
     
-    public synchronized boolean containsKey(Transaction tx, Object key) throws IOException {
-        return get(tx, key) != null;
-    }
-
-    public synchronized void clear() throws IOException {
-        pageFile.tx().execute(new Closure<IOException>(){
-            public void execute(Transaction tx) throws IOException {
-                clear(tx);
-            }
-        });
-    }
 
     public synchronized void clear(Transaction tx) throws IOException {
         // TODO: multiple loads is smelly..
         load();
-        for (int i = 0; i < binCapacity; i++) {
-            HashBin hashBin = new HashBin(this, binPageId + i);
-            hashBin.store(tx); // A store before a load.. clears the data out.
+        for (int i = 0; i < metadata.binCapacity; i++) {
+            long pageId = metadata.binPageId + i;
+            clearBinAtPage(tx, pageId);
         }
-        size = 0;
-        binsActive = 0;
+        metadata.size = 0;
+        metadata.binsActive = 0;
+    }
+
+    /**
+     * @param tx
+     * @param pageId
+     * @throws IOException
+     */
+    private void clearBinAtPage(Transaction tx, long pageId) throws IOException {
+        Page<HashBin<Key,Value>> page = tx.load(pageId, null);
+        HashBin<Key, Value> bin = new HashBin<Key,Value>();
+        bin.setPage(page);
+        page.set(bin);
+        store(tx, bin);
     }
 
     public String toString() {
@@ -328,40 +361,19 @@
     // Implementation Methods
     // /////////////////////////////////////////////////////////////////
 
-    private void loadMetadata(Transaction tx) throws IOException {
-        PageInputStream pis = new PageInputStream(tx, pageId);
-        DataInputStream is = new DataInputStream(pis);
-        state = is.readInt();
-        binPageId = is.readLong();
-        binCapacity = is.readInt();
-        size = is.readInt();
-        binsActive = is.readInt();
-        resizePageId = is.readLong();
-        resizeCapacity = is.readInt();
-        is.close();
-    }
-
-    private void storeMetadata(Transaction tx) throws IOException {
-        PageOutputStream pos = new PageOutputStream(tx, pageId);
-        DataOutputStream os = new DataOutputStream(pos);
-        os.writeInt(state);
-        os.writeLong(binPageId);
-        os.writeInt(binCapacity);
-        os.writeInt(size);
-        os.writeInt(binsActive);
-        os.writeLong(resizePageId);
-        os.writeInt(resizeCapacity);
-        os.close();
+    public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException {
+        tx.store(bin.getPage(), hashBinMarshaller, true);
     }
 
+    
     private void resize(final int newSize) throws IOException {
         LOG.debug("Resizing to: "+newSize);
         pageFile.tx().execute(new Closure<IOException>(){
             public void execute(Transaction tx) throws IOException {
-                state = RESIZING_PHASE1_STATE;
-                resizeCapacity = newSize;
-                resizePageId = tx.allocate(resizeCapacity).getPageId();
-                storeMetadata(tx);
+                metadata.state = RESIZING_PHASE1_STATE;
+                metadata.resizeCapacity = newSize;
+                metadata.resizePageId = tx.allocate(metadata.resizeCapacity).getPageId();
+                tx.store(metadata.page, metadataMarshaller, true);
             }
         });
         pageFile.checkpoint();
@@ -376,27 +388,29 @@
             public void execute(Transaction tx) throws IOException {
                 
                 // Initialize the bins..
-                for (int i = 0; i < resizeCapacity; i++) {
-                    HashBin bin = new HashBin(HashIndex.this, resizePageId + i);
-                    bin.store(tx);
+                for (int i = 0; i < metadata.resizeCapacity; i++) {
+                    long pageId = metadata.resizePageId + i;
+                    clearBinAtPage(tx, pageId);
                 }
 
-                binsActive = 0;
+                metadata.binsActive = 0;
                 // Copy the data from the old bins to the new bins.
-                for (int i = 0; i < binCapacity; i++) {
-                    HashBin bin = new HashBin(HashIndex.this, binPageId + i);
-                    for (Map.Entry<Comparable, Long> entry : bin.getAll(tx).entrySet()) {
-                        HashBin resizeBin = getResizeBin(entry.getKey());
-                        resizeBin.put(tx, entry.getKey(), entry.getValue());
-                        if( resizeBin.size(tx) == 1) {
-                            binsActive++;
+                for (int i = 0; i < metadata.binCapacity; i++) {
+                    
+                    HashBin<Key,Value> bin = getBin(tx, i);
+                    for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
+                        HashBin<Key,Value> resizeBin = getResizeBin(tx, entry.getKey());
+                        resizeBin.put(entry.getKey(), entry.getValue());
+                        store(tx, resizeBin);
+                        if( resizeBin.size() == 1) {
+                            metadata.binsActive++;
                         }
                     }
                 }
                 
                 // Now we can release the old data.
-                state = RESIZING_PHASE2_STATE;
-                storeMetadata(tx);
+                metadata.state = RESIZING_PHASE2_STATE;
+                tx.store(metadata.page, metadataMarshaller, true);
             }
         });
         pageFile.checkpoint();
@@ -406,42 +420,56 @@
         // In phase 2 we free the old bins and switch the the new bins.
         pageFile.tx().execute(new Closure<IOException>(){
             public void execute(Transaction tx) throws IOException {
-                for (int i = 0; i < binCapacity; i++) {
-                    HashBin hashBin = new HashBin(HashIndex.this, binPageId + i);
-                    hashBin.store(tx); // A store before a load.. clears the data out.
+                for (int i = 0; i < metadata.binCapacity; i++) {
+                    long pageId = metadata.binPageId + i;
+                    clearBinAtPage(tx, pageId);
                 }
-                tx.free(binPageId, binCapacity);
+                tx.free(metadata.binPageId, metadata.binCapacity);
                 
-                binCapacity = resizeCapacity;
-                binPageId = resizePageId;
-                resizeCapacity=0;
-                resizePageId=0;
-                state = OPEN_STATE;
-                storeMetadata(tx);
+                metadata.binCapacity = metadata.resizeCapacity;
+                metadata.binPageId = metadata.resizePageId;
+                metadata.resizeCapacity=0;
+                metadata.resizePageId=0;
+                metadata.state = OPEN_STATE;
+                tx.store(metadata.page, metadataMarshaller, true);
             }
         });
 
         pageFile.checkpoint();
         calcThresholds();
-        LOG.debug("Resizing done.  New bins start at: "+binPageId);
+        LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);
     }
 
     private void calcThresholds() {
-        increaseThreshold = (binCapacity * loadFactor)/100;
-        decreaseThreshold = (binCapacity * loadFactor * loadFactor ) / 20000;
+        increaseThreshold = (metadata.binCapacity * loadFactor)/100;
+        decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000;
+    }
+
+    private HashBin<Key,Value> getResizeBin(Transaction tx, Key key) throws IOException {
+        int i = indexFor(key, metadata.resizeCapacity);
+        return getResizeBin(tx, i);
     }
 
-    private HashBin getResizeBin(Object key) throws IOException {
-        int i = indexFor(key, resizeCapacity);
-        return new HashBin(this, resizePageId + i);
+    private HashBin<Key,Value> getResizeBin(Transaction tx, int i) throws IOException {
+        Page<HashBin<Key, Value>> page = tx.load(metadata.resizePageId + i, hashBinMarshaller);
+        HashBin<Key, Value> rc = page.get();
+        rc.setPage(page);
+        return rc;
     }
 
-    private HashBin getBin(Object key) throws IOException {
-        int i = indexFor(key, binCapacity);
-        return new HashBin(this, binPageId + i);
+    private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
+        int i = indexFor(key, metadata.binCapacity);
+        return getBin(tx, i);
     }
 
-    static int indexFor(Object x, int length) {
+    private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
+        Page<HashBin<Key, Value>> page = tx.load(metadata.binPageId + i, hashBinMarshaller);
+        HashBin<Key, Value> rc = page.get();
+        rc.setPage(page);
+        return rc;
+    }
+
+    int indexFor(Key x, int length) {
         return Math.abs(x.hashCode()%length);
     }
 
@@ -449,7 +477,7 @@
     // Property Accessors
     // /////////////////////////////////////////////////////////////////
 
-    public Marshaller getKeyMarshaller() {
+    public Marshaller<Key> getKeyMarshaller() {
         return keyMarshaller;
     }
 
@@ -458,25 +486,37 @@
      * 
      * @param marshaller
      */
-    public synchronized void setKeyMarshaller(Marshaller marshaller) {
+    public synchronized void setKeyMarshaller(Marshaller<Key> marshaller) {
         this.keyMarshaller = marshaller;
     }
 
+    public Marshaller<Value> getValueMarshaller() {
+        return valueMarshaller;
+    }
+    /**
+     * Set the marshaller for value objects
+     * 
+     * @param marshaller
+     */
+    public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
+        this.valueMarshaller = valueMarshaller;
+    }
+    
     /**
      * @return number of bins in the index
      */
     public int getBinCapacity() {
-        return this.binCapacity;
+        return metadata.binCapacity;
     }
 
     /**
      * @param binCapacity
      */
     public void setBinCapacity(int binCapacity) {
-        if (loaded.get() && binCapacity != this.binCapacity) {
+        if (loaded.get() && binCapacity != metadata.binCapacity) {
             throw new RuntimeException("Pages already loaded - can't reset bin capacity");
         }
-        this.binCapacity = binCapacity;
+        metadata.binCapacity = binCapacity;
     }
 
     public boolean isTransient() {
@@ -511,16 +551,16 @@
         this.maximumBinCapacity = maximumCapacity;
     }
 
-    public synchronized int getSize() {
-        return size;
+    public synchronized int size(Transaction tx) {
+        return metadata.size;
     }
 
     public synchronized int getActiveBins() {
-        return binsActive;
+        return metadata.binsActive;
     }
 
     public long getBinPageId() {
-        return binPageId;
+        return metadata.binPageId;
     }
 
     public PageFile getPageFile() {
@@ -528,7 +568,7 @@
     }
 
     public int getBinsActive() {
-        return binsActive;
+        return metadata.binsActive;
     }
 
 }

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Index.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Index.java?rev=688600&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Index.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Index.java Sun Aug 24 19:35:58 2008
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.IOException;
+
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreEntry;
+
+/**
+ * Simpler than a Map
+ * 
+ * @version $Revision: 1.2 $
+ */
+public interface Index<Key,Value> {
+    
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    void setKeyMarshaller(Marshaller<Key> marshaller);
+    
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    void setValueMarshaller(Marshaller<Value> marshaller);
+
+    /**
+     * load indexes
+     */
+    void load() throws IOException;
+
+    /**
+     * unload indexes
+     * 
+     * @throws IOException
+     */
+    void unload() throws IOException;
+
+    /**
+     * clear the index
+     * 
+     * @throws IOException
+     * 
+     */
+    void clear(Transaction tx) throws IOException;
+
+    /**
+     * @param key
+     * @return true if it contains the key
+     * @throws IOException
+     */
+    boolean containsKey(Transaction tx, Key key) throws IOException;
+
+    /**
+     * remove the index key
+     * 
+     * @param key
+     * @return StoreEntry removed
+     * @throws IOException
+     */
+    Value remove(Transaction tx, Key key) throws IOException;
+
+    /**
+     * store the key, item
+     * 
+     * @param key
+     * @param entry
+     * @throws IOException
+     */
+    Value put(Transaction tx, Key key, Value entry) throws IOException;
+
+    /**
+     * @param key
+     * @return the entry
+     * @throws IOException
+     */
+    Value get(Transaction tx, Key key) throws IOException;
+
+    /**
+     * @return true if the index is transient
+     */
+    boolean isTransient();
+
+
+    /**
+     * return the size of the index
+     * @return
+     */
+    int size(Transaction tx);
+}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java Sun Aug 24 19:35:58 2008
@@ -33,79 +33,92 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-class Page {
-    
-    public static final int PAGE_HEADER_SIZE = 10;
+class Page<T> {
 
-    public static final short INVALID_TYPE = -1;
-    public static final short FREE_TYPE = 0;
-    public static final short CHUNK_TYPE = 1;
-    
+    public static final int PAGE_HEADER_SIZE = 17;
+
+    public static final byte PAGE_FREE_TYPE = 0;
+    public static final byte PAGE_PART_TYPE = 1;
+    public static final byte PAGE_END_TYPE = 2;
+
+    long pageId;
 
-    private long pageId;
-    
     // The following fields are persisted
-    private long txId;
-    private short type = FREE_TYPE;
-    private Object data;
-    
-    public void copy(Page other) {
+    byte type = PAGE_FREE_TYPE;
+    long txId;
+    // Points to the next page in the chunk stream
+    long next;
+    T data;
+
+    public Page() {
+    }
+
+    public Page(long pageId) {
+        this.pageId=pageId;
+    }
+
+    public void copy(Page<T> other) {
         this.pageId = other.pageId;
         this.txId = other.txId;
         this.type = other.type;
+        this.next = other.next;
         this.data = other.data;
     }
-    
-    Page copy() {
-        Page rc = new Page();
+
+    Page<T> copy() {
+        Page<T> rc = new Page<T>();
         rc.copy(this);
         return rc;
     }
 
+    void makeFree(long txId) {
+        this.type = Page.PAGE_FREE_TYPE;
+        this.txId = txId;
+        this.data = null;
+        this.next = 0;
+    }
+    
+    public void makePagePart(long next, long txId) {
+        this.type = Page.PAGE_PART_TYPE;
+        this.next = next;
+        this.txId = txId;
+    }
+    
+    public void makePageEnd(long size, long txId) {
+        this.type = Page.PAGE_END_TYPE;
+        this.next = size;
+        this.txId = txId;
+    }
 
-    void write(DataOutput os, Marshaller marshaller) throws IOException {
-        os.writeShort(type);
+    void write(DataOutput os) throws IOException {
+        os.writeByte(type);
         os.writeLong(txId);
-        if( marshaller!=null && type!=FREE_TYPE ) {
-            marshaller.writePayload(data, os);
-        }
+        os.writeLong(next);
     }
 
-    void read(DataInput is, Marshaller marshaller) throws IOException {
-        type = is.readShort();
+    void read(DataInput is) throws IOException {
+        type = is.readByte();
         txId = is.readLong();
-        if( marshaller!=null && type!=FREE_TYPE ) {
-            data = marshaller.readPayload(is);
-        } else {
-            data = null;
-        }
+        next = is.readLong();
     }
 
     public String toString() {
         return "Page:" + getPageId();
     }
-    
-    long getPageId() {
-        return pageId;
-    }
 
-    void setPageId(long id) {
-        this.pageId = id;
+    public long getPageId() {
+        return pageId;
     }
 
     public long getTxId() {
         return txId;
     }
 
-    public void setTxId(long txId) {
-        this.txId = txId;
-    }
-
-    public Object getData() {
+    public T get() {
         return data;
     }
 
-    public void setData(Object data) {
+    public void set(T data) {
         this.data = data;
     }
 
@@ -113,8 +126,9 @@
         return type;
     }
 
-    public void setType(short type) {
-        this.type = type;
+    public long getNext() {
+        return next;
     }
 
+
 }