You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/22 03:16:06 UTC

svn commit: r687919 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/util/ test/java/org/apache/kahadb/page/

Author: chirino
Date: Thu Aug 21 18:16:06 2008
New Revision: 687919

URL: http://svn.apache.org/viewvc?rev=687919&view=rev
Log:
Added a HashIndex implementation that uses the PageFile layer.

Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kahadb.page;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.util.ByteArrayOutputStream;
+import org.apache.kahadb.util.ByteSequence;
+
+/**
+ * Represents a Chunk of data in a chunk stream.  Use the PageInputStream and PageOnputStream classes to access
+ * a linked set of chunks on a PageFile.
+ * 
+ */
+public class Chunk extends ByteSequence {
+    
+    static final int HEADER_MAX_SIZE=9;
+
+    boolean last;
+    long next;
+
+    public Chunk() {
+        super();
+    }
+
+    public Chunk(byte[] data, int offset, int length) {
+        super(data, offset, length);
+    }
+
+    public Chunk(byte[] data) {
+        super(data);
+    }
+    
+    @Override
+    public String toString() {
+        return "Chunk{length: "+length+", last; "+last+", next:"+next+"}";
+    }
+
+    public static class ChunkMarshaller implements Marshaller<Chunk> {
+        private final int chunkSize;
+        
+        public ChunkMarshaller(int chunkSize) {
+            this.chunkSize = chunkSize;
+        }
+
+        public Class<Chunk> getType() {
+            return Chunk.class;
+        }
+
+        public void writePayload(Chunk chunk, DataOutput out) throws IOException {
+            if( chunk.last ) {
+                out.writeBoolean(true);
+                out.writeInt(chunk.length);
+                out.write(chunk.data, chunk.offset, chunk.length);
+            } else {
+                out.writeBoolean(false);
+                out.writeLong(chunk.next);
+                out.write(chunk.data, chunk.offset, chunk.length);
+            }
+        }
+
+        public Chunk readPayload(DataInput in) throws IOException {
+            Chunk chunk = new Chunk();
+            if( in.readBoolean() ) {
+                chunk.last=true;
+                chunk.length = in.readInt();
+                chunk.data = new byte[chunk.length];
+                chunk.next=0;
+                in.readFully(chunk.data);
+            } else {
+                chunk.last=false;
+                chunk.next = in.readLong();
+                chunk.length = chunkSize;
+                chunk.data = new byte[chunkSize];
+                in.readFully(chunk.data);
+            }
+            return chunk;
+        }
+
+        public int getChunkSize() {
+            return chunkSize;
+        }
+
+    }
+
+    public static class PageInputStream extends InputStream {
+
+        private PageFile file;
+        private Chunk chunk;
+        private int pos;
+        private int pageCount; 
+        
+        private int markPos;
+        private Chunk markChunk;
+        private int markPageCount; 
+        private ChunkMarshaller marshaller;
+        
+        public PageInputStream(PageFile file, long pageId) throws IOException {
+            this.file = file;
+            this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE);
+            
+            Page page = file.load(pageId, marshaller);
+            if( page.getType() != Page.CHUNK_TYPE ) {
+                throw new EOFException("Chunk stream does not exist at page: "+pageId);
+            }
+            chunk = (Chunk)page.getData();
+            pageCount++;
+            
+        }
+
+        public int read() throws IOException {
+            if (!atEOF()) {
+                return chunk.data[chunk.offset+pos++] & 0xff;
+            } else {
+                return -1;
+            }
+        }
+
+        private boolean atEOF() throws IOException {
+            if( pos < chunk.length ) {
+              return false;  
+            }
+            if( chunk.last ) {
+                return true;
+            }
+            fill();
+            return pos >= chunk.length;
+        }
+
+        private void fill() throws IOException {
+            Page page = file.load(chunk.next, marshaller);
+            if( page.getType() == Page.INVALID_TYPE ) {
+                throw new IOException("Invalid page: "+chunk.next);
+            }
+            chunk = (Chunk)page.getData();
+            pageCount++;
+            pos = 0;
+        }
+        
+        public int read(byte[] b) throws IOException {
+            return read(b, 0, b.length);
+        }
+
+        public int read(byte b[], int off, int len) throws IOException {
+            if (!atEOF()) {
+                int rc=0;
+                while(!atEOF() && rc < len) {
+                    len = Math.min(len, chunk.length - pos);
+                    if (len > 0) {
+                        System.arraycopy(chunk.data, chunk.offset+pos, b, off, len);
+                        pos += len;
+                    }
+                    rc+=len;
+                }
+                return rc;
+            } else {
+                return -1;
+            }
+        }
+
+        public long skip(long len) throws IOException {
+            if (atEOF()) {
+                int rc=0;
+                while(!atEOF() && rc < len) {
+                    len = Math.min(len, chunk.length - pos);
+                    if (len > 0) {
+                        pos += len;
+                    }
+                    rc+=len;
+                }
+                return rc;
+            } else {
+                return -1;
+            }
+        }
+
+        public int available() {
+            return chunk.length - pos;
+        }
+
+        public boolean markSupported() {
+            return true;
+        }
+
+        public void mark(int markpos) {
+            markPos = pos;
+            markChunk = chunk;
+            markPageCount = pageCount;
+        }
+
+        public void reset() {
+            pos = markPos;
+            chunk = markChunk;
+            pageCount = markPageCount;
+        }
+
+        public int getPageCount() {
+            return pageCount;
+        }
+
+    }
+
+    static public class PageOutputStream extends ByteArrayOutputStream {
+
+        private PageFile file;
+        private long pageId;
+        private ChunkMarshaller marshaller;
+        private int pageCount;
+        private ArrayList<Page> pages; 
+
+        public PageOutputStream(PageFile file, long pageId) {
+            this.file = file;
+            this.pageId = pageId;
+            this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE);
+        }
+
+        @Override
+        public void close() throws IOException {
+            super.close();
+            
+            ArrayList<Chunk> chunks = new ArrayList<Chunk>();
+            ByteSequence bs = toByteSequence();
+            
+            int pos = 0;
+            while( pos < bs.length ) {
+                int len = Math.min(marshaller.getChunkSize(), bs.length - pos);
+                Chunk c = new Chunk(bs.data, pos, len);
+                chunks.add(c);
+                pos+=len;
+            }
+            if( chunks.isEmpty() ) {
+                Chunk c = new Chunk(new byte[]{});
+                chunks.add(c);
+            }
+            chunks.get(chunks.size()-1).last = true;
+            
+            // Load the old pages..
+            pages = new ArrayList<Page>();
+            long p = pageId;
+            while( p >= 0  ) {
+                Page page = file.load(p, marshaller);
+                Chunk c = (Chunk)page.getData();
+                if( c!=null && !c.last ) {
+                    p = c.next;
+                } else {
+                    p = -1;
+                }
+                pages.add(page);
+            }
+
+            // Add more if needed.
+            while( pages.size() < chunks.size() ) {
+                pages.add(file.allocate());
+            }
+            
+            // Update the page data.
+            for(int i=0; i < chunks.size(); i++) {
+                Chunk chunk = chunks.get(i);
+                Page page = pages.get(i);
+                page.setType(Page.CHUNK_TYPE);
+                page.setData(chunk);
+                if( !chunk.last ) {
+                    chunk.next = pages.get(i+1).getPageId();
+                }
+            }
+
+            // If there were extra pages.. Free them up.
+            for(int i=chunks.size(); i < pages.size(); i++) {
+                Page page = pages.get(i);
+                page.setData(null);
+                page.setType(Page.FREE_TYPE);
+            }
+            
+            file.write(pages, marshaller);
+            
+            pageCount=chunks.size();
+        }
+        
+        public int getPageCount() {
+            return pageCount;
+        }
+
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.kahadb.page.Chunk.PageInputStream;
+import org.apache.kahadb.page.Chunk.PageOutputStream;
+
+/**
+ * Bin in a HashIndex
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashBin {
+    
+    private final HashIndex index;
+    private final long pageId;
+    private TreeMap<Comparable, Long> data;
+    private int pageCount;
+    
+    /**
+     * Constructor
+     * 
+     * @param hashIndex
+     * @param pageId
+     * @param maximumEntries
+     * @throws IOException 
+     */
+    HashBin(HashIndex hashIndex, long pageId) throws IOException {
+        this.index = hashIndex;
+        this.pageId = pageId;
+    }
+
+    private void load() throws IOException {
+        
+        data = new TreeMap<Comparable, Long>();
+
+        // Using page page streams to store the data makes it easy to marshall the HashBin data,
+        // but it does not give us very good page based caching. As even if the pages are cached,
+        // we will still need to de-marshall from the stream.
+        
+        // I think it will be better to make the bin a btree root.  
+        
+        PageInputStream pis = new PageInputStream(index.getPageFile(), pageId);        
+        DataInputStream is = new DataInputStream(pis);
+        try {
+            
+            int size = is.readInt();
+            for(int i=0; i < size; i++) {
+                Comparable key = (Comparable)index.getKeyMarshaller().readPayload(is);
+                long value = is.readLong();
+                data.put(key, value);
+            }
+            is.close();
+            pageCount = pis.getPageCount();
+        } catch (IOException e) {
+            throw e;
+        }
+    }
+    
+    public void store() throws IOException {
+        PageOutputStream pos = new PageOutputStream(index.getPageFile(), pageId);
+        DataOutputStream os = new DataOutputStream(pos);
+        if( data == null ) {
+            os.writeInt(0);
+        } else {
+            os.writeInt(data.size());
+            for (Map.Entry<Comparable, Long> entry : data.entrySet()) {
+                index.getKeyMarshaller().writePayload(entry.getKey(), os);
+                os.writeLong(entry.getValue());
+            }
+        }
+        os.close();
+        pageCount = pos.getPageCount();
+    }
+
+    public int size() throws IOException {
+        if( data!=null ) {
+            return data.size();
+        } else {
+            
+            // Peek at the page to see how many items it contains.
+            PageInputStream pis = new PageInputStream(index.getPageFile(), pageId);
+            DataInputStream is = new DataInputStream(pis);
+            int size = is.readInt();
+            is.close();
+            
+            return size;
+        }
+    }
+
+    public Long put(Comparable key, Long value) throws IOException {
+        if( data==null ) {
+            load();
+        }
+        Long rc = data.put(key, value);
+        if( !value.equals(rc) ) {
+            store();
+        }
+        return rc;
+    }
+
+    public Long find(Comparable key) throws IOException {
+        if( data==null ) {
+            load();
+        }
+        return data.get(key);
+    }
+    
+    public Map<Comparable, Long> getAll() throws IOException {
+        if( data==null ) {
+            load();
+        }
+        return data;
+    }
+    
+    public Long remove(Comparable key) throws IOException {
+        if( data==null ) {
+            load();
+        }
+        Long rc = data.remove(key);
+        if( rc!=null ) {
+            store();
+        }
+        return rc;
+    }
+
+    public long getPageId() {
+        return pageId;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.StoreEntry;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.page.Chunk.PageInputStream;
+import org.apache.kahadb.page.Chunk.PageOutputStream;
+
+/**
+ * BTree implementation
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class HashIndex implements Index {
+
+    public static final int CLOSED_STATE = 1;
+    public static final int OPEN_STATE = 2;
+    public static final int INITIALIZING_STATE = 3;
+
+    public static final int RESIZING_PHASE1_STATE = 4;
+    public static final int RESIZING_PHASE2_STATE = 5;
+
+    private static final Log LOG = LogFactory.getLog(HashIndex.class);
+
+    public static final int DEFAULT_BIN_CAPACITY;
+    public static final int DEFAULT_MAXIMUM_BIN_CAPACITY;
+    public static final int DEFAULT_MINIMUM_BIN_CAPACITY;
+    public static final int DEFAULT_LOAD_FACTOR;
+
+    static {
+        DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
+        DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
+        DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16"));
+        DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75"));
+    }
+
+    private IndexManager indexManager;
+
+    private Marshaller keyMarshaller;
+    private AtomicBoolean loaded = new AtomicBoolean();
+
+    private int size;
+
+    private int increaseThreshold;
+    private int decreaseThreshold;
+
+    // Where the bin page array starts at.
+    private long binPageId;
+    private int binCapacity = DEFAULT_BIN_CAPACITY;
+    private int binsActive;
+    private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
+    private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;
+
+    // While resizing, the following contains the new resize data.
+    private int resizeCapacity;
+    private long resizePageId;
+
+    // When the index is initializing or resizing.. state changes so that
+    // on failure it can be properly recovered.
+    private int state;
+
+    // Once binsActive/binCapacity reaches the loadFactor, then we need to
+    // increase the capacity
+    private int loadFactor = DEFAULT_LOAD_FACTOR;
+
+    private PageFile pageFile;
+    // This page holds the index metadata.
+    private long pageId;
+
+    /**
+     * Constructor
+     * 
+     * @param directory
+     * @param name
+     * @param indexManager
+     * @param numberOfBins
+     * @throws IOException
+     */
+    public HashIndex(IndexManager indexManager, PageFile pageFile, long pageId) throws IOException {
+        this.pageFile = pageFile;
+        this.indexManager = indexManager;
+        this.pageId = pageId;
+    }
+
+    public synchronized void load() {
+        if (loaded.compareAndSet(false, true)) {
+            try {
+                Page page = pageFile.load(pageId, null);
+
+                // Is this a brand new index?
+                if (page.getType() == Page.FREE_TYPE) {
+
+                    // We need to create the pages for the bins
+                    Page binPage = pageFile.allocate(binCapacity);
+                    binPageId = binPage.getPageId();
+                    state = INITIALIZING_STATE;
+                    storeMetadata();
+                    pageFile.checkpoint();
+
+                    // If failure happens now we can continue initializing the
+                    // the hash bins...
+
+                } else {
+                    // Lets load it's data
+                    loadMetadata();
+
+                    // If we did not have a clean shutdown...
+                    if (state == OPEN_STATE || state == RESIZING_PHASE1_STATE) {
+                        // Figure out the size and the # of bins that are
+                        // active. Yeah This loads the first page of every bin. :(
+                        // We might want to put this in the metadata page, but
+                        // then that page would be getting updated on every write.
+                        size = 0;
+                        for (int i = 0; i < binCapacity; i++) {
+                            HashBin hashBin = new HashBin(this, binPageId + i);
+                            int t = hashBin.size();
+                            if (t > 0) {
+                                binsActive++;
+                            }
+                            size += t;
+                        }
+                    }
+                }
+
+                if (state == INITIALIZING_STATE) {
+                    // TODO:
+                    // If a failure occurs mid way through us initializing the
+                    // bins.. will the page file still think we have the rest
+                    // of them previously allocated to us?
+
+                    for (int i = 0; i < binCapacity; i++) {
+                        HashBin hashBin = new HashBin(this, binPageId + i);
+                        hashBin.store();
+                    }
+                    size = 0;
+                    binsActive = 0;
+                }
+                
+                if (state == RESIZING_PHASE1_STATE) {
+                    // continue resize phase 1
+                    resizePhase1();
+                }                
+                if (state == RESIZING_PHASE2_STATE) {
+                    // continue resize phase 1
+                    resizePhase2();
+                }                
+
+                calcThresholds();
+
+                state = OPEN_STATE;
+                storeMetadata();
+                pageFile.checkpoint();
+                
+                LOG.debug("HashIndex loaded. Using "+binCapacity+" bins starting at page "+binPageId);
+
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public synchronized void unload() throws IOException {
+        if (loaded.compareAndSet(true, false)) {
+            state = CLOSED_STATE;
+            storeMetadata();
+        }
+    }
+
+    public synchronized StoreEntry get(Object key) throws IOException {
+        load();
+        Long result = getBin(key).find((Comparable)key);
+        return result != null ? indexManager.getIndex(result) : null;
+    }
+
+    public synchronized void store(Object key, StoreEntry value) throws IOException {
+        load();
+        HashBin bin = getBin(key);
+        if (bin.put((Comparable)key, value.getOffset()) == null) {
+            this.size++;
+            if (bin.size() == 1) {
+                binsActive++;
+            }
+        }
+        if (this.binsActive >= this.increaseThreshold) {
+            int newSize = Math.min(maximumBinCapacity, binCapacity*2);
+            if(binCapacity!=newSize) {
+                resize(newSize);
+            }
+        }
+    }
+
+    public synchronized StoreEntry remove(Object key) throws IOException {
+        load();
+        StoreEntry result = null;
+
+        HashBin bin = getBin(key);
+        Long offset = bin.remove((Comparable)key);
+        if (offset != null) {
+            this.size--;
+            if (bin.size() == 0) {
+                binsActive--;
+            }
+            result = this.indexManager.getIndex(offset);
+        }
+
+        if (this.binsActive <= this.decreaseThreshold) {
+            int newSize = Math.max(minimumBinCapacity, binCapacity/2);
+            if(binCapacity!=newSize) {
+                resize(newSize);
+         
+            }
+        }
+        return result;
+    }
+
+    public synchronized boolean containsKey(Object key) throws IOException {
+        return get(key) != null;
+    }
+
+    public synchronized void clear() throws IOException {
+        load();
+        for (int i = 0; i < binCapacity; i++) {
+            HashBin hashBin = new HashBin(this, binPageId + i);
+            hashBin.store(); // A store before a load.. clears the data out.
+        }
+        size = 0;
+        binsActive = 0;
+    }
+
+    public String toString() {
+        String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile;
+        return str;
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Implementation Methods
+    // /////////////////////////////////////////////////////////////////
+
+    private void loadMetadata() throws IOException {
+        PageInputStream pis = new PageInputStream(pageFile, pageId);
+        DataInputStream is = new DataInputStream(pis);
+        state = is.readInt();
+        binPageId = is.readLong();
+        binCapacity = is.readInt();
+        size = is.readInt();
+        binsActive = is.readInt();
+        resizePageId = is.readLong();
+        resizeCapacity = is.readInt();
+        is.close();
+    }
+
+    private void storeMetadata() throws IOException {
+        PageOutputStream pos = new PageOutputStream(pageFile, pageId);
+        DataOutputStream os = new DataOutputStream(pos);
+        os.writeInt(state);
+        os.writeLong(binPageId);
+        os.writeInt(binCapacity);
+        os.writeInt(size);
+        os.writeInt(binsActive);
+        os.writeLong(resizePageId);
+        os.writeInt(resizeCapacity);
+        os.close();
+    }
+
+    private void resize(int newSize) throws IOException {
+        
+        LOG.debug("Resizing to: "+newSize);
+        
+        state = RESIZING_PHASE1_STATE;
+        resizeCapacity = newSize;
+        resizePageId = pageFile.allocate(resizeCapacity).getPageId();
+        storeMetadata();
+        pageFile.checkpoint();
+        resizePhase1();
+        resizePhase2();        
+    }
+
+    private void resizePhase1() throws IOException {
+        // In Phase 1 we copy the data to the new bins..
+        
+        // Initialize the bins..
+        for (int i = 0; i < resizeCapacity; i++) {
+            HashBin bin = new HashBin(this, resizePageId + i);
+            bin.store();
+        }
+
+        binsActive = 0;
+        // Copy the data from the old bins to the new bins.
+        for (int i = 0; i < binCapacity; i++) {
+            HashBin bin = new HashBin(this, binPageId + i);
+            for (Map.Entry<Comparable, Long> entry : bin.getAll().entrySet()) {
+                HashBin resizeBin = getResizeBin(entry.getKey());
+                resizeBin.put(entry.getKey(), entry.getValue());
+                if( resizeBin.size() == 1) {
+                    binsActive++;
+                }
+            }
+        }
+        
+        // Now we can release the old data.
+        state = RESIZING_PHASE2_STATE;
+        storeMetadata();
+        pageFile.checkpoint();
+    }
+
+    private void resizePhase2() throws IOException {
+        for (int i = 0; i < binCapacity; i++) {
+            HashBin hashBin = new HashBin(this, binPageId + i);
+            hashBin.store(); // A store before a load.. clears the data out.
+        }
+        pageFile.free(binPageId, binCapacity);
+        
+        binCapacity = resizeCapacity;
+        binPageId = resizePageId;
+        resizeCapacity=0;
+        resizePageId=0;
+        state = OPEN_STATE;
+        storeMetadata();
+        pageFile.checkpoint();
+        calcThresholds();
+        
+        LOG.debug("Resizing done.  New bins start at: "+binPageId);
+
+    }
+
+    private void calcThresholds() {
+        increaseThreshold = (binCapacity * loadFactor)/100;
+        decreaseThreshold = (binCapacity * loadFactor * loadFactor ) / 20000;
+    }
+
+    private HashBin getResizeBin(Object key) throws IOException {
+        int i = indexFor(key, resizeCapacity);
+        return new HashBin(this, resizePageId + i);
+    }
+
+    private HashBin getBin(Object key) throws IOException {
+        int i = indexFor(key, binCapacity);
+        return new HashBin(this, binPageId + i);
+    }
+
+    static int indexFor(Object x, int length) {
+        return Math.abs(x.hashCode()%length);
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Property Accessors
+    // /////////////////////////////////////////////////////////////////
+
+    public Marshaller getKeyMarshaller() {
+        return keyMarshaller;
+    }
+
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    public synchronized void setKeyMarshaller(Marshaller marshaller) {
+        this.keyMarshaller = marshaller;
+    }
+
+    /**
+     * @return number of bins in the index
+     */
+    public int getBinCapacity() {
+        return this.binCapacity;
+    }
+
+    /**
+     * @param binCapacity
+     */
+    public void setBinCapacity(int binCapacity) {
+        if (loaded.get() && binCapacity != this.binCapacity) {
+            throw new RuntimeException("Pages already loaded - can't reset bin capacity");
+        }
+        this.binCapacity = binCapacity;
+    }
+
+    public boolean isTransient() {
+        return false;
+    }
+
+    /**
+     * @return the loadFactor
+     */
+    public int getLoadFactor() {
+        return loadFactor;
+    }
+
+    /**
+     * @param loadFactor the loadFactor to set
+     */
+    public void setLoadFactor(int loadFactor) {
+        this.loadFactor = loadFactor;
+    }
+
+    /**
+     * @return the maximumCapacity
+     */
+    public int setMaximumBinCapacity() {
+        return maximumBinCapacity;
+    }
+
+    /**
+     * @param maximumCapacity the maximumCapacity to set
+     */
+    public void setMaximumBinCapacity(int maximumCapacity) {
+        this.maximumBinCapacity = maximumCapacity;
+    }
+
+    public synchronized int getSize() {
+        return size;
+    }
+
+    public synchronized int getActiveBins() {
+        return binsActive;
+    }
+
+    public long getBinPageId() {
+        return binPageId;
+    }
+
+    public PageFile getPageFile() {
+        return pageFile;
+    }
+
+    public int getBinsActive() {
+        return binsActive;
+    }
+
+}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java Thu Aug 21 18:16:06 2008
@@ -39,6 +39,7 @@
 
     public static final short INVALID_TYPE = -1;
     public static final short FREE_TYPE = 0;
+    public static final short CHUNK_TYPE = 1;
     
 
     private long pageId;
@@ -54,12 +55,18 @@
         this.type = other.type;
         this.data = other.data;
     }
+    
+    Page copy() {
+        Page rc = new Page();
+        rc.copy(this);
+        return rc;
+    }
 
 
     void write(DataOutput os, Marshaller marshaller) throws IOException {
         os.writeShort(type);
         os.writeLong(txId);
-        if( marshaller!=null ) {
+        if( marshaller!=null && type!=FREE_TYPE ) {
             marshaller.writePayload(data, os);
         }
     }
@@ -67,8 +74,10 @@
     void read(DataInput is, Marshaller marshaller) throws IOException {
         type = is.readShort();
         txId = is.readLong();
-        if( marshaller!=null ) {
+        if( marshaller!=null && type!=FREE_TYPE ) {
             data = marshaller.readPayload(is);
+        } else {
+            data = null;
         }
     }
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Aug 21 18:16:06 2008
@@ -16,8 +16,6 @@
  */
 package org.apache.kahadb.page;
 
-import com.sun.tools.javac.tree.Tree.TopLevel;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -30,12 +28,13 @@
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,10 +43,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.Marshaller;
-import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.page.Chunk.ChunkMarshaller;
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.IOExceptionSupport;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
@@ -93,12 +91,14 @@
     private AtomicBoolean loaded = new AtomicBoolean();
     private LRUCache<Long, Page> pageCache;
     
-    private boolean enableRecoveryBuffer=true;
-    private boolean enableSyncedWrites=true;
-    private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
+    private boolean enableRecoveryBuffer=false;
+    private boolean enableSyncedWrites=false;
+    private boolean enablePageCaching=true;
+    private boolean enableAsyncWrites=false;
+    
     private int pageCacheSize = 10;
     
-    private LinkedHashMap<Long, PageWrite> writes=new LinkedHashMap<Long, PageWrite>();
+    private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
     private Thread writerThread;
     AtomicBoolean stopWriter = new AtomicBoolean();
     private CountDownLatch checkpointLatch;
@@ -168,16 +168,16 @@
      * Internally used by the double write buffer implementation used in this class. 
      */
     private class PageWrite {
-        final Page page;
+        Page page;
         byte[] current;
         byte[] diskBound;
 
         public PageWrite(Page page, byte[] data) {
-            this.page = page;
-            this.current = data;
+            setCurrent(page, data);
         }
         
-        public void setCurrent(byte[] data) {
+        public void setCurrent(Page page, byte[] data) {
+            this.page=page;
             current=data;
         }
 
@@ -193,6 +193,11 @@
             diskBound=null;
             return current == null;
         }
+        
+        @Override
+        public String toString() {
+            return "PageWrite{pageId="+page.getPageId()+"}";
+        }
 
     }
 
@@ -387,22 +392,23 @@
         if( !loaded.get() ) {
             throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
         }
-
+        
         Page page = null;
-        if(!freeList.isEmpty()) {
-            long pageId = freeList.removeFirst();
-            page = new Page();
-            page.setPageId(pageId);
-            page.setType(Page.FREE_TYPE);
-        } else { 
-            // allocate one
+        
+        // We may need to create a new free page...
+        if(freeList.isEmpty()) {
             page = new Page();
             page.setPageId(nextFreePageId);
             page.setType(Page.FREE_TYPE);
             nextFreePageId ++;
+//            LOG.debug("allocated: "+page.getPageId());
             write(page, null);
         }
-        addToCache(page);
+
+        long pageId = freeList.removeFirst();
+        page = new Page();
+        page.setPageId(pageId);
+        page.setType(Page.FREE_TYPE);
         return page;
     }
     
@@ -423,28 +429,30 @@
 
         Page page = null;
         Sequence seq = freeList.removeFirstSequence(count);
-        if(seq!=null) {
-            page = new Page();
-            page.setPageId(seq.getFirst());
-            page.setType(Page.FREE_TYPE);
-        } else {
+        if(seq==null) {
             
-            // allocate the pages..
-            Page t = new Page();
-            while( count > 0 ) {
-                t.setPageId(nextFreePageId);
-                t.setType(Page.FREE_TYPE);
+            // We may need to create a new free page...
+            page = new Page();
+            int c=count;
+            while( c > 0 ) {
+                page.setPageId(nextFreePageId);
+                page.setType(Page.FREE_TYPE);
                 nextFreePageId ++;
-                write(t, null);
-                count--;
-                
+//                LOG.debug("allocate writing: "+page.getPageId());
+                write(page, null);
+                c--;
                 if( page == null ) {
-                    page = t;
+                    page = page;
                 }
             }
             
+            seq = freeList.removeFirstSequence(count);
         }
-        addToCache(page);
+        
+        page = new Page();
+        page.setPageId(seq.getFirst());
+        page.setType(Page.FREE_TYPE);
+//        LOG.debug("allocated: "+page.getPageId());
         return page;
     }
 
@@ -458,13 +466,63 @@
      *         if the PageFile is not loaded
      */
     public void free(Page page) throws IOException {
+        page.setType(Page.FREE_TYPE);
+        free(page.getPageId(), 1);
+    }
+    
+    /**
+     * Frees up a previously allocated page so that it can be re-allocated again.
+     * 
+     * @param page the page to free up
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void free(long pageId) throws IOException {
+        free(pageId, 1);
+    }
+    
+    /**
+     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+     * 
+     * @param page the initial page of the sequence that will be getting freed
+     * @param count the number of pages in the sequence
+     * 
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void free(Page page, int count) throws IOException {
+        page.setType(Page.FREE_TYPE);
+        free(page.getPageId(), count);
+    }
+    
+    /**
+     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+     * 
+     * @param page the initial page of the sequence that will be getting freed
+     * @param count the number of pages in the sequence
+     * 
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void free(long pageId, int count) throws IOException {
         if( !loaded.get() ) {
             throw new IllegalStateException("Cannot free a page when the page file is not loaded");
         }
-        removeFromCache(page);
-        page.setType(Page.FREE_TYPE);
-        write(page, null);
-        freeList.add(page.getPageId());
+        
+        Page page = new Page();
+        long initialId = pageId;
+        for (int i = 0; i < count; i++) {
+            page.setPageId(initialId+i);
+            page.setType(Page.FREE_TYPE);
+//            LOG.debug("free: "+page.getPageId());
+            write(page, null);
+        }
     }
     
     /**
@@ -506,7 +564,7 @@
 
         // Can't load invalid offsets...
         if (page.getPageId() < 0) {
-            page.setTxId(Page.INVALID_TYPE);
+            page.setType(Page.INVALID_TYPE);
             return;
         }        
 
@@ -523,6 +581,7 @@
         dataIn.restart(readBuffer);
         
         // Unmarshall it.
+//        LOG.debug("load: "+page.getPageId());
         page.read(dataIn, marshaller);
         
         // Cache it.
@@ -620,6 +679,26 @@
         };
     }
     
+    /**
+     * Updates multiple pages in a single unit of work.
+     * 
+     * @param pages
+     *        the pages to write. The Pages object must be fully populated with a valid pageId, type, and data.
+     * @param marshaller
+     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void write(Collection<Page> pages, ChunkMarshaller marshaller) throws IOException {
+        // TODO: Need to update double buffer impl so that it handles a collection of writes.  As is right now,
+        // the pages in this write may be split across multiple write batches which means that they
+        // will not get applied as a unit of work.
+        for (Page page : pages) {
+            write(page, marshaller);
+        }
+    }
 
     /**
      * 
@@ -637,7 +716,7 @@
         if( !loaded.get() ) {
             throw new IllegalStateException("Cannot wriate a page when the page file is not loaded");
         }
-        
+                        
         DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
         page.setTxId(nextTxid.get());
         page.write(dataOut, marshaller);
@@ -645,9 +724,12 @@
             throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
         }
         
+        page = page.copy();
         Long key = page.getPageId();
+        addToCache(page);
 
-        LOG.debug("Page write request for offset: "+page.getPageId());
+//        LOG.debug("write: "+page.getPageId());
+        
         synchronized( writes ) {
             // If it's not in the write cache...
             PageWrite write = writes.get(key);
@@ -655,14 +737,26 @@
                 write = new PageWrite(page, dataOut.getData());
                 writes.put(key, write);
             } else {
-                write.setCurrent(dataOut.getData());
+                write.setCurrent(page, dataOut.getData());
             }
             
             // Once we start approaching capacity, notify the writer to start writing
             if( canStartWriteBatch() ) {
-                writes.notify();
+                if( enableAsyncWrites  ) {
+                    writes.notify();
+                } else {
+                    while( canStartWriteBatch() ) {
+                        writeBatch(-1, TimeUnit.MILLISECONDS);
+                    }
+                }
             }
         }
+        
+        if( page.getType() == Page.FREE_TYPE ) {
+            removeFromCache(page);
+            freeList.add(page.getPageId());
+        }
+
     }
     
     /**
@@ -675,7 +769,7 @@
      */
     public long checkpoint() throws IOException {
 
-        if( stopWriter.get() ) {
+        if( enableAsyncWrites && stopWriter.get() ) {
             throw new IOException("Page file already stopped: checkpointing is not allowed");
         }
         
@@ -689,7 +783,13 @@
                 this.checkpointLatch = new CountDownLatch(1);
             }
             checkpointLatch = this.checkpointLatch;
-            writes.notify();
+            if( enableAsyncWrites ) {
+                writes.notify();
+            } else {
+                while( !writes.isEmpty() ) {
+                    writeBatch(-1, TimeUnit.MILLISECONDS);
+                }
+            }
         }        
         try {
             checkpointLatch.await();        
@@ -711,11 +811,14 @@
     private boolean canStartWriteBatch() {
         int capacityUsed = ((writes.size() * 100)/MAX_PAGES_IN_RECOVERY_BUFFER);
         
-        // The constant 10 here controls how soon write batches start going to disk..
-        // would be nice to figure out how to auto tune that value.  Make to small and
-        // we reduce through put because we are locking the write mutex too offen doing writes
-    
-        return capacityUsed >= 10 || checkpointLatch!=null;
+        if( enableAsyncWrites ) {
+            // The constant 10 here controls how soon write batches start going to disk..
+            // would be nice to figure out how to auto tune that value.  Make to small and
+            // we reduce through put because we are locking the write mutex too offen doing writes
+            return capacityUsed >= 10 || checkpointLatch!=null;
+        } else {
+            return capacityUsed >= 80 || checkpointLatch!=null;
+        }
     }
 
     
@@ -727,14 +830,14 @@
      * @throws InterruptedException 
      * @throws IOException 
      */
-    private boolean doWrites(long timeout, TimeUnit unit) throws IOException {
+    private boolean writeBatch(long timeout, TimeUnit unit) throws IOException {
                 
         int batchLength=8+4; // Account for the:  lastTxid + recovery record counter
         ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
         
         synchronized( writes ) {            
             // If there is not enough to write, wait for a notification...
-            if( !canStartWriteBatch() ) {
+            if( !canStartWriteBatch() && timeout>=0 ) {
                 releaseCheckpointWaiter();
                 try {
                     writes.wait(unit.toMillis(timeout));
@@ -800,9 +903,9 @@
         // Sync again
         if( enableSyncedWrites ) {
             writeFile.getFD().sync();
-            LOG.debug("Page write complete tx: "+txId+", pages: "+pageOffsets);
         }
         
+//        LOG.debug("write done: "+txId+", pages: "+pageOffsets);
         nextTxid.incrementAndGet();
 
         synchronized( writes ) {
@@ -897,27 +1000,35 @@
     }
 
     private void startWriter() {
-        stopWriter.set(false);
-        writerThread = new Thread("Page Writer") {
-            @Override
-            public void run() {
-                try {
-                    while( !stopWriter.get() ) {
-                        doWrites(1000, TimeUnit.MILLISECONDS);
+        synchronized( writes ) {
+            if( enableAsyncWrites ) {
+                stopWriter.set(false);
+                writerThread = new Thread("Page Writer") {
+                    @Override
+                    public void run() {
+                        try {
+                            while( !stopWriter.get() ) {
+                                writeBatch(1000, TimeUnit.MILLISECONDS);
+                            }
+                        } catch (Throwable e) {
+                            e.printStackTrace();
+                        } finally {
+                            releaseCheckpointWaiter();
+                        }
                     }
-                } catch (IOException e) {
-                    e.printStackTrace();
-                } finally {
-                    releaseCheckpointWaiter();
-                }
+                };
+                writerThread.start();
             }
-        };
-        writerThread.start();
+        }
     }
  
     private void stopWriter() throws InterruptedException {
-        stopWriter.set(true);
-        writerThread.join();
+        synchronized( writes ) {
+            if( enableAsyncWrites ) {
+                stopWriter.set(true);
+                writerThread.join();
+            }
+        }
     }
 
     ///////////////////////////////////////////////////////////////////
@@ -942,21 +1053,17 @@
     ///////////////////////////////////////////////////////////////////
     
     private Page getFromCache(long pageId) {
+        synchronized(writes) {
+            PageWrite pageWrite = writes.get(pageId);
+            if( pageWrite != null ) {
+                return pageWrite.page;
+            }
+        }
+
         Page result = null;
         if (enablePageCaching) {
             result = pageCache.get(pageId);
         }
-        if( result == null ) {
-            synchronized(writes) {
-                PageWrite pageWrite = writes.get(pageId);
-                if( pageWrite != null ) {
-                    result = pageWrite.page;
-                }
-            }
-            if (enablePageCaching) {
-                pageCache.put(pageId, result);
-            }
-        }
         return result;
     }
 
@@ -1109,6 +1216,13 @@
     }
 
     /**
+     * @return the amount of content data that a page can hold.
+     */
+    public int getPageContentSize() {
+        return this.pageSize-Page.PAGE_HEADER_SIZE;
+    }
+    
+    /**
      * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
      * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
      * can no longer be changed.
@@ -1151,5 +1265,13 @@
     public void setPageCacheSize(int pageCacheSize) {
         this.pageCacheSize = pageCacheSize;
     }
+
+    public boolean isEnableAsyncWrites() {
+        return enableAsyncWrites;
+    }
+
+    public void setEnableAsyncWrites(boolean enableAsyncWrites) {
+        this.enableAsyncWrites = enableAsyncWrites;
+    }
     
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java Thu Aug 21 18:16:06 2008
@@ -23,6 +23,9 @@
     public int offset;
     public int length;
 
+    public ByteSequence() {
+    }
+    
     public ByteSequence(byte data[]) {
         this.data = data;
         this.offset = 0;

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java Thu Aug 21 18:16:06 2008
@@ -100,4 +100,22 @@
         return size;
     }
 
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        boolean first=true;
+        T cur = getHead();
+        while( cur!=null ) {
+            if( !first ) {
+                sb.append(", ");
+            }
+            sb.append(cur);
+            first=false;
+            cur = cur.getNext();
+        }
+        sb.append("]");
+        return sb.toString();
+    }
+    
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=687919&r1=687918&r2=687919&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Thu Aug 21 18:16:06 2008
@@ -254,4 +254,9 @@
     public void clear() {
         sequences = new LinkedNodeList<Sequence>();
     }
+    
+    @Override
+    public String toString() {
+        return sequences.toString();
+    }
 }
\ No newline at end of file

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+
+import junit.framework.TestCase;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.page.Chunk.PageInputStream;
+import org.apache.kahadb.page.Chunk.PageOutputStream;
+
+public class ChunkTest extends TestCase {
+
+    static final short TEST_TYPE = 65;
+    
+    public void testChunkStreams() throws IOException {
+        
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.load();
+        
+        long id = pf.allocate().getPageId();
+        
+        PageOutputStream pos = new Chunk.PageOutputStream(pf, id);
+        DataOutputStream os = new DataOutputStream(pos);
+        for( int i=0; i < 10000; i++) {
+            os.writeUTF("Test string:"+i);
+        }
+        
+        os.close();
+        System.out.println("Chuncks used: "+pos.getPageCount());
+        
+        // Reload the page file.
+        pf.unload();
+        pf.load();
+        
+        PageInputStream pis = new PageInputStream(pf, id);        
+        DataInputStream is = new DataInputStream(pis);
+        for( int i=0; i < 10000; i++) {
+            assertEquals("Test string:"+i, is.readUTF());
+        }
+        assertEquals(-1, is.read());
+        is.close();
+
+        System.out.println("Chuncks used: "+pis.getPageCount());
+        
+        pf.unload();
+    }
+    
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.File;
+
+import org.apache.kahadb.Store;
+import org.apache.kahadb.impl.index.Index;
+import org.apache.kahadb.impl.index.IndexBenchmark;
+
+public class HashIndexBenchMark extends IndexBenchmark {
+
+    @Override
+    protected Index createIndex(File root, String name) throws Exception {
+
+        PageFile pf = new PageFile(root, name);
+        pf.load();
+        HashIndex index = new HashIndex(indexManager, pf,pf.allocate().getPageId());
+        index.setKeyMarshaller(Store.STRING_MARSHALLER);
+        
+//        index.setEnableRecoveryBuffer(false);
+//        index.setEnableSyncedWrites(false);
+        return index;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java?rev=687919&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java Thu Aug 21 18:16:06 2008
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import junit.framework.TestCase;
+import org.apache.kahadb.Store;
+import org.apache.kahadb.impl.index.IndexItem;
+import org.apache.kahadb.impl.index.IndexManager;
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Test a HashIndex
+ */
+public class HashTest extends TestCase {
+
+    private static final int COUNT = 10000;
+
+    private HashIndex hashIndex;
+    private File directory;
+    private IndexManager indexManager;
+    private PageFile pf;
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        directory = new File(IOHelper.getDefaultDataDirectory());
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        
+        pf = new PageFile(directory, "im-hash-test");
+        pf.load();
+        indexManager = new IndexManager(directory, "im-hash-test", "rw", null, new AtomicLong());
+        
+        this.hashIndex = new HashIndex(indexManager, pf, pf.allocate().getPageId());
+        this.hashIndex.setBinCapacity(12);
+        this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
+    }
+    
+    public void testHashIndex() throws Exception {
+        String keyRoot = "key:";
+        this.hashIndex.load();
+        doInsert(keyRoot);
+        this.hashIndex.unload();
+        this.hashIndex.load();
+        checkRetrieve(keyRoot);
+        doRemove(keyRoot);
+        this.hashIndex.unload();
+        this.hashIndex.load();
+        doInsert(keyRoot);
+        doRemoveHalf(keyRoot);
+        doInsertHalf(keyRoot);
+        this.hashIndex.unload();
+        this.hashIndex.load();
+        checkRetrieve(keyRoot);
+        this.hashIndex.unload();
+    }
+
+    void doInsert(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            IndexItem value = indexManager.createNewIndex();
+            indexManager.storeIndex(value);
+            hashIndex.store(keyRoot + i, value);
+        }
+    }
+
+    void checkRetrieve(String keyRoot) throws IOException {
+        for (int i = 0; i < COUNT; i++) {
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+            assertNotNull("Key missing: "+keyRoot + i, item);
+        }
+    }
+
+    void doRemoveHalf(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                hashIndex.remove(keyRoot + i);
+            }
+
+        }
+    }
+
+    void doInsertHalf(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                IndexItem value = indexManager.createNewIndex();
+                indexManager.storeIndex(value);
+                hashIndex.store(keyRoot + i, value);
+            }
+        }
+    }
+
+    void doRemove(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            hashIndex.remove(keyRoot + i);
+        }
+        for (int i = 0; i < COUNT; i++) {
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+            assertNull(item);
+        }
+    }
+
+    void doRemoveBackwards(String keyRoot) throws Exception {
+        for (int i = COUNT - 1; i >= 0; i--) {
+            hashIndex.remove(keyRoot + i);
+        }
+        for (int i = 0; i < COUNT; i++) {
+            IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
+            assertNull(item);
+        }
+    }
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        pf.unload();
+    }
+}