You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/20 19:56:42 UTC

svn commit: r687399 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/ main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/util/ test/java/org/apache/kahadb/page/

Author: chirino
Date: Wed Aug 20 10:56:36 2008
New Revision: 687399

URL: http://svn.apache.org/viewvc?rev=687399&view=rev
Log:
- Updated Marshaller so that it can tell us on what type it operates.
- Extracted a PageFile layer form the roubust hash implementation so that the index implemenations can be layered on top of this.  This could also make easier to for multiple indexes to share one page file.


Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/LongMarshaller.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IntrospectionSupport.java   (with props)
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/BytesMarshaller.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/Marshaller.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ObjectMarshaller.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/StringMarshaller.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/BytesMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/BytesMarshaller.java?rev=687399&r1=687398&r2=687399&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/BytesMarshaller.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/BytesMarshaller.java Wed Aug 20 10:56:36 2008
@@ -52,4 +52,8 @@
         dataIn.readFully(data);
         return data;
     }
+    
+    public Class getType() {
+        return byte[].class;
+    }
 }

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/LongMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/LongMarshaller.java?rev=687399&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/LongMarshaller.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/LongMarshaller.java Wed Aug 20 10:56:36 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Implementation of a Marshaller for a Long
+ * 
+ * @version $Revision: 1.2 $
+ */
+public class LongMarshaller implements Marshaller<Long> {
+    
+    public static final LongMarshaller INSTANCE = new LongMarshaller();
+    
+    /**
+     * Write the payload of this entry to the RawContainer
+     * 
+     * @param object
+     * @param dataOut
+     * @throws IOException
+     */
+    public void writePayload(Long object, DataOutput dataOut) throws IOException {
+        dataOut.writeLong(object);
+    }
+
+    /**
+     * Read the entry from the RawContainer
+     * 
+     * @param dataIn
+     * @return unmarshalled object
+     * @throws IOException
+     */
+    public Long readPayload(DataInput dataIn) throws IOException {
+        return dataIn.readLong();
+    }
+    
+    public Class<Long> getType() {
+        return Long.class;
+    }
+}

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/Marshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/Marshaller.java?rev=687399&r1=687398&r2=687399&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/Marshaller.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/Marshaller.java Wed Aug 20 10:56:36 2008
@@ -27,6 +27,7 @@
  */
 public interface Marshaller<T> {
     
+    Class<T> getType();
        
     /**
      * Write the payload of this entry to the RawContainer

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ObjectMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ObjectMarshaller.java?rev=687399&r1=687398&r2=687399&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ObjectMarshaller.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ObjectMarshaller.java Wed Aug 20 10:56:36 2008
@@ -67,4 +67,8 @@
             throw new IOException(e.getMessage());
         }
     }
+    
+    public Class getType() {
+        return Object.class;
+    }
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/StringMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/StringMarshaller.java?rev=687399&r1=687398&r2=687399&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/StringMarshaller.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/StringMarshaller.java Wed Aug 20 10:56:36 2008
@@ -26,6 +26,9 @@
  * @version $Revision: 1.2 $
  */
 public class StringMarshaller implements Marshaller<String> {
+    
+    public static final StringMarshaller INSTANCE = new StringMarshaller();
+    
     /**
      * Write the payload of this entry to the RawContainer
      * 
@@ -47,4 +50,8 @@
     public String readPayload(DataInput dataIn) throws IOException {
         return dataIn.readUTF();
     }
+    
+    public Class<String> getType() {
+        return String.class;
+    }
 }

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=687399&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java Wed Aug 20 10:56:36 2008
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+
+/**
+ * A Page within a file.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class Page {
+    
+    public static final int PAGE_HEADER_SIZE = 10;
+
+    public static final short INVALID_TYPE = -1;
+    public static final short FREE_TYPE = 0;
+    
+
+    private long pageId;
+    
+    // The following fields are persisted
+    private long txId;
+    private short type = FREE_TYPE;
+    private Object data;
+    
+    public void copy(Page other) {
+        this.pageId = other.pageId;
+        this.txId = other.txId;
+        this.type = other.type;
+        this.data = other.data;
+    }
+
+
+    void write(DataOutput os, Marshaller marshaller) throws IOException {
+        os.writeShort(type);
+        os.writeLong(txId);
+        if( marshaller!=null ) {
+            marshaller.writePayload(data, os);
+        }
+    }
+
+    void read(DataInput is, Marshaller marshaller) throws IOException {
+        type = is.readShort();
+        txId = is.readLong();
+        if( marshaller!=null ) {
+            data = marshaller.readPayload(is);
+        }
+    }
+
+    public String toString() {
+        return "Page:" + getPageId();
+    }
+    
+    long getPageId() {
+        return pageId;
+    }
+
+    void setPageId(long id) {
+        this.pageId = id;
+    }
+
+    public long getTxId() {
+        return txId;
+    }
+
+    public void setTxId(long txId) {
+        this.txId = txId;
+    }
+
+    public Object getData() {
+        return data;
+    }
+
+    public void setData(Object data) {
+        this.data = data;
+    }
+
+    public short getType() {
+        return type;
+    }
+
+    public void setType(short type) {
+        this.type = type;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=687399&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Wed Aug 20 10:56:36 2008
@@ -0,0 +1,1117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import com.sun.tools.javac.tree.Tree.TopLevel;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.IntrospectionSupport;
+import org.apache.kahadb.util.LRUCache;
+
+/**
+ * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 
+ * be externally synchronized.
+ * 
+ * The file has 3 parts:
+ * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
+ * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
+ * Page Space: The pages in the page file.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class PageFile implements Iterable<Page> {
+    
+    // 4k Default page size.
+    public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
+    private static final int MAX_PAGES_IN_RECOVERY_BUFFER=1000;
+    private static final int CONFIG_SPACE_SIZE=1024*4;
+
+    // Recovery header is (long offset)
+    private static final int RECOVERY_HEADER_SIZE=8;  
+    private static final Log LOG = LogFactory.getLog(PageFile.class);
+    
+    private final String name;
+    private File directory;
+    private RandomAccessFile readFile;
+    private RandomAccessFile writeFile;
+
+    private int pageSize = DEFAULT_PAGE_SIZE;
+    private int recoveryBufferSize=(this.pageSize+RECOVERY_HEADER_SIZE)*MAX_PAGES_IN_RECOVERY_BUFFER;
+    private int initialPageOffset;
+
+    private DataByteArrayInputStream dataIn;
+    private byte[] readBuffer;
+    private long nextFreePageId;
+    
+    private LinkedList<Long> freeList = new LinkedList<Long>();
+    private AtomicBoolean loaded = new AtomicBoolean();
+    private LRUCache<Long, Page> pageCache;
+    
+    private boolean enableRecoveryBuffer=true;
+    private boolean enableSyncedWrites=true;
+    private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
+    private int pageCacheSize = 10;
+    
+    private LinkedHashMap<Long, PageWrite> writes=new LinkedHashMap<Long, PageWrite>();
+    private Thread writerThread;
+    AtomicBoolean stopWriter = new AtomicBoolean();
+    private CountDownLatch checkpointLatch;
+
+    private AtomicLong nextTxid = new AtomicLong();
+    private MetaData metaData;
+    
+    /**
+     * The MetaData object hold the persistent data associated with a PageFile object. 
+     */
+    public static class MetaData {
+        
+        String fileType;
+        String fileTypeVersion;
+        
+        long metaDataTxId=-1;
+        int pageSize;
+        boolean cleanShutdown;
+        long lastTxId;
+        long freePages;
+        
+        public String getFileType() {
+            return fileType;
+        }
+        public void setFileType(String fileType) {
+            this.fileType = fileType;
+        }
+        public String getFileTypeVersion() {
+            return fileTypeVersion;
+        }
+        public void setFileTypeVersion(String version) {
+            this.fileTypeVersion = version;
+        }
+        public long getMetaDataTxId() {
+            return metaDataTxId;
+        }
+        public void setMetaDataTxId(long metaDataTxId) {
+            this.metaDataTxId = metaDataTxId;
+        }
+        public int getPageSize() {
+            return pageSize;
+        }
+        public void setPageSize(int pageSize) {
+            this.pageSize = pageSize;
+        }
+        public boolean isCleanShutdown() {
+            return cleanShutdown;
+        }
+        public void setCleanShutdown(boolean cleanShutdown) {
+            this.cleanShutdown = cleanShutdown;
+        }
+        public long getLastTxId() {
+            return lastTxId;
+        }
+        public void setLastTxId(long lastTxId) {
+            this.lastTxId = lastTxId;
+        }
+        public long getFreePages() {
+            return freePages;
+        }
+        public void setFreePages(long value) {
+            this.freePages = value;
+        }
+    }
+
+    /**
+     * Internally used by the double write buffer implementation used in this class. 
+     */
+    private class PageWrite {
+        final Page page;
+        byte[] current;
+        byte[] diskBound;
+
+        public PageWrite(Page page, byte[] data) {
+            this.page = page;
+            this.current = data;
+        }
+        
+        public void setCurrent(byte[] data) {
+            current=data;
+        }
+
+        void begin() {
+            diskBound = current;
+            current = null;
+        }
+        
+        /**
+         * @return true if there is no pending writes to do.
+         */
+        boolean done() {
+            diskBound=null;
+            return current == null;
+        }
+
+    }
+
+    
+    /**
+     * Creates a PageFile in the specified directory who's data files are named by name.
+     * 
+     * @param directory
+     * @param name
+     */
+    public PageFile(File directory, String name) {
+        this.directory = directory;
+        this.name = name;
+    }
+    
+    /**
+     * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
+     * 
+     * @throws IOException 
+     *         if the files cannot be deleted.
+     * @throws IllegalStateException 
+     *         if this PageFile is loaded
+     */
+    public void delete() throws IOException {
+        if( loaded.get() ) {
+            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
+        }
+        File mainPageFile = getMainPageFile();
+        if( mainPageFile.exists() ) {
+            if( !mainPageFile.delete() ) {
+                throw new IOException("Could not delete: "+mainPageFile.getPath());
+            }
+        }
+        File freeFile = getFreeFile();
+        if( freeFile.exists() ) {
+            if( !freeFile.delete() ) {
+                throw new IOException("Could not delete: "+freeFile.getPath());
+            }
+        }
+    }
+    
+    /**
+     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the 
+     * first time the page file is loaded, then this creates the page file in the file system.
+     * 
+     * @throws IOException
+     *         If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 
+     *         there was a disk error.
+     * @throws IllegalStateException 
+     *         If the page file was already loaded.
+     */
+    public void load() throws IOException, IllegalStateException {
+        if (loaded.compareAndSet(false, true)) {
+            
+            if( enablePageCaching ) {
+                pageCache = new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true);
+            }
+            
+            File file = getMainPageFile();
+            IOHelper.mkdirs(file.getParentFile());
+            writeFile = new RandomAccessFile(file, "rw");
+            readFile = new RandomAccessFile(file, "r");
+            
+            if (readFile.length() > 0) {
+                // Load the page size setting cause that can't change once the file is created.
+                loadMetaData();
+                pageSize = metaData.getPageSize();
+            } else {
+                // Store the page size setting cause that can't change once the file is created.
+                metaData = new MetaData();
+                metaData.setFileType(PageFile.class.getName());
+                metaData.setFileTypeVersion("1");
+                metaData.setPageSize(getPageSize());
+                metaData.setCleanShutdown(true);
+                metaData.setFreePages(-1);
+                metaData.setLastTxId(0);
+                storeMetaData();
+            }
+
+            recoveryBufferSize = (pageSize+RECOVERY_HEADER_SIZE) * MAX_PAGES_IN_RECOVERY_BUFFER;
+            initialPageOffset = CONFIG_SPACE_SIZE+recoveryBufferSize;
+            
+            dataIn = new DataByteArrayInputStream();
+            readBuffer = new byte[pageSize];
+
+            long lastTxId=0;
+            if(  metaData.isCleanShutdown() ) {
+                lastTxId = metaData.getLastTxId();
+                if( metaData.getFreePages()>0 ) {
+                    loadFreeList();
+                } 
+            } else {
+                LOG.debug("Recovering page file...");
+                lastTxId = redoRecoveryUpdates();
+                
+                // Scan all to find the free pages.
+                freeList.clear();
+                long length = readFile.length();
+                Page page = new Page();
+                long offset = initialPageOffset;
+                while( offset < length ) {
+                    // Just read the headers to re-build the free list.
+                    readFile.seek(offset);
+                    readFile.readFully(readBuffer, 0, Page.PAGE_HEADER_SIZE);
+                    dataIn.restart(readBuffer);
+                    page.read(dataIn, null);
+                    
+                    if( page.getType() == Page.FREE_TYPE ) {
+                        freeList.add(offset);
+                    }
+                    offset+=pageSize;
+                }
+                
+            }
+            nextTxid.set( lastTxId+1 );
+            LOG.debug("Last transaction id: "+lastTxId);
+            
+            metaData.setCleanShutdown(false);
+            storeMetaData();
+            getFreeFile().delete();
+            
+            if( writeFile.length() < initialPageOffset) {
+                writeFile.setLength(initialPageOffset);
+            }
+            nextFreePageId=(writeFile.length()-initialPageOffset)/pageSize;
+            startWriter();
+                
+        } else {
+            throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
+        }
+    }
+
+
+    /**
+     * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
+     * once unloaded, you can no longer use the page file to read or write Pages.
+     * 
+     * @throws IOException
+     *         if there was a disk error occurred while closing the down the page file.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void unload() throws IOException {
+        if (loaded.compareAndSet(true, false)) {
+            checkpoint();
+            try {
+                stopWriter();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+            
+            if( freeList.isEmpty() ) {
+                metaData.setFreePages(0);
+            } else {
+                storeFreeList();
+                metaData.setFreePages(freeList.size());
+            }
+            
+            metaData.setLastTxId( nextTxid.get()-1 );
+            metaData.setCleanShutdown(true);
+            storeMetaData();
+            
+            if (readFile != null) {
+                readFile.close();
+                readFile = null;
+                writeFile.close();
+                writeFile=null;
+                freeList.clear();
+                if( pageCache!=null ) {
+                    pageCache=null;
+                }
+                synchronized(writes) {
+                    writes.clear();
+                }
+            }
+        } else {
+            throw new IllegalStateException("Cannot unload the page file when it is not loaded");
+        }
+    }
+
+    
+    /** 
+     * Gives you back a free page that you can write data to.
+     * 
+     * @return a newly allocated page.  
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Page allocate() throws IOException {
+        if( !loaded.get() ) {
+            throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
+        }
+
+        Page page = null;
+        if(!freeList.isEmpty()) {
+            long pageId = freeList.removeFirst();
+            page = new Page();
+            page.setPageId(pageId);
+            page.setType(Page.FREE_TYPE);
+        } else { 
+            // allocate one
+            page = new Page();
+            page.setPageId(nextFreePageId);
+            page.setType(Page.FREE_TYPE);
+            nextFreePageId ++;
+            write(page, null);
+        }
+        addToCache(page);
+        return page;
+    }
+
+    /**
+     * Frees up a previously allocated page so that it can be re-allocated again.
+     * 
+     * @param page the page to free up
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void free(Page page) throws IOException {
+        if( !loaded.get() ) {
+            throw new IllegalStateException("Cannot free a page when the page file is not loaded");
+        }
+        removeFromCache(page);
+        page.setType(Page.FREE_TYPE);
+        write(page, null);
+        freeList.add(page.getPageId());
+    }
+    
+    /**
+     * Loads a page from disk.
+     * 
+     * @param pageId 
+     *        the id of the page to load
+     * @param marshaller
+     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
+     * @return The page with the given id
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Page load(long pageId, Marshaller marshaller) throws IOException {
+        Page page = new Page();
+        page.setPageId(pageId);
+        load(page, marshaller);
+        return page;
+    }
+    
+    /**
+     * Loads a page from disk.  If the page.pageId is not valid then then this method will set the page.type to
+     * Page.INVALID_TYPE.
+     * 
+     * @param page - The pageId field must be properly set 
+     * @param marshaller
+     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void load(Page page, Marshaller marshaller) throws IOException {
+        if( !loaded.get() ) {
+            throw new IllegalStateException("Cannot load a page when the page file is not loaded");
+        }
+
+        // Can't load invalid offsets...
+        if (page.getPageId() < 0) {
+            page.setTxId(Page.INVALID_TYPE);
+            return;
+        }        
+
+        // Try to load it from the cache first...
+        Page t = getFromCache(page.getPageId());
+        if (t != null) {
+            page.copy(t);
+            return;
+        }
+        
+        // Read the page data
+        readFile.seek(toOffset(page.getPageId()));
+        readFile.readFully(readBuffer, 0, pageSize);
+        dataIn.restart(readBuffer);
+        
+        // Unmarshall it.
+        page.read(dataIn, marshaller);
+        
+        // Cache it.
+        addToCache(page);
+    }
+
+    /**
+     * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are not included
+     * in this iteration.
+     * 
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Iterator<Page> iterator() {
+        return iterator(false);
+    }
+
+    /**
+     * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
+     * iterated.
+     * 
+     * @param includeFreePages - if true, free pages are included in the iteration
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Iterator<Page> iterator(final boolean includeFreePages) {
+        
+        if( !loaded.get() ) {
+            throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
+        }
+
+        return new Iterator<Page>() {
+            long nextId;
+            Page nextPage;
+            Page lastPage;
+            
+            private void findNextPage() {
+                if( !loaded.get() ) {
+                    throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
+                }
+
+                if( nextPage!=null ) {
+                    return;
+                }
+                
+                try {
+                    while( nextId < PageFile.this.nextFreePageId ) {
+                        readFile.seek(toOffset(nextId));
+                        readFile.readFully(readBuffer, 0, Page.PAGE_HEADER_SIZE);
+                        dataIn.restart(readBuffer);
+                        
+                        Page page = new Page();
+                        page.setPageId(nextId);
+                        page.read(dataIn, null);
+                        
+                        if( includeFreePages || page.getType()!=Page.FREE_TYPE ) {
+                            nextPage = page;
+                            return;
+                        } else {
+                            nextId++;
+                        }
+                    }
+                } catch (IOException e) {
+                }
+            }
+
+            public boolean hasNext() {
+                findNextPage();
+                return nextPage !=null;
+            }
+
+            public Page next() {
+                findNextPage(); 
+                if( nextPage !=null ) {
+                    lastPage = nextPage;
+                    nextPage=null;
+                    nextId++;
+                    return lastPage;
+                } else {
+                    throw new NoSuchElementException();
+                }
+            }
+            
+            public void remove() {
+                if( lastPage==null ) {
+                    throw new IllegalStateException();
+                }
+                try {
+                    free(lastPage);
+                    lastPage=null;
+                } catch (IOException e) {
+                    new RuntimeException(e);
+                }
+            }
+        };
+    }
+    
+
+    /**
+     * 
+     * @param page
+     *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
+     * @param marshaller
+     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void write(Page page, Marshaller marshaller) throws IOException {
+        
+        if( !loaded.get() ) {
+            throw new IllegalStateException("Cannot wriate a page when the page file is not loaded");
+        }
+        
+        DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
+        page.setTxId(nextTxid.get());
+        page.write(dataOut, marshaller);
+        if (dataOut.size() > pageSize) {
+            throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
+        }
+        
+        Long key = page.getPageId();
+
+        LOG.debug("Page write request for offset: "+page.getPageId());
+        synchronized( writes ) {
+            // If it's not in the write cache...
+            PageWrite write = writes.get(key);
+            if( write==null ) {
+                write = new PageWrite(page, dataOut.getData());
+                writes.put(key, write);
+            } else {
+                write.setCurrent(dataOut.getData());
+            }
+            
+            // Once we start approaching capacity, notify the writer to start writing
+            if( canStartWriteBatch() ) {
+                writes.notify();
+            }
+        }
+    }
+    
+    /**
+     * Flushes all write buffers to disk and returns the transaction id of the last write done to disk.  The 
+     * transaction id can be used for recovery purposes since it always incrementing.
+     * 
+     * @return the last transaction id that was fully written to disk.
+     * @throws IOException
+     *         If an disk error occurred.
+     */
+    public long checkpoint() throws IOException {
+
+        if( stopWriter.get() ) {
+            throw new IOException("Page file already stopped: checkpointing is not allowed");
+        }
+        
+        // Setup a latch that gets notified when all buffered writes hits the disk.
+        CountDownLatch checkpointLatch;
+        synchronized( writes ) {
+            if( writes.isEmpty()) {                
+                return nextTxid.get()-1;
+            }
+            if( this.checkpointLatch == null ) {
+                this.checkpointLatch = new CountDownLatch(1);
+            }
+            checkpointLatch = this.checkpointLatch;
+            writes.notify();
+        }        
+        try {
+            checkpointLatch.await();        
+            return nextTxid.get()-1;
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+    }
+
+    
+    public String toString() {
+        return "Page File: "+getMainPageFile();
+    }
+      
+    ///////////////////////////////////////////////////////////////////
+    // Internal Double write implementation follows...
+    ///////////////////////////////////////////////////////////////////
+        
+    private boolean canStartWriteBatch() {
+        int capacityUsed = ((writes.size() * 100)/MAX_PAGES_IN_RECOVERY_BUFFER);
+        
+        // The constant 10 here controls how soon write batches start going to disk..
+        // would be nice to figure out how to auto tune that value.  Make to small and
+        // we reduce through put because we are locking the write mutex too offen doing writes
+    
+        return capacityUsed >= 10 || checkpointLatch!=null;
+    }
+
+    
+    /**
+     * 
+     * @param timeout
+     * @param unit
+     * @return true if a write was done.
+     * @throws InterruptedException 
+     * @throws IOException 
+     */
+    private boolean doWrites(long timeout, TimeUnit unit) throws IOException {
+                
+        int batchLength=8+4; // Account for the:  lastTxid + recovery record counter
+        ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
+        
+        synchronized( writes ) {            
+            // If there is not enough to write, wait for a notification...
+            if( !canStartWriteBatch() ) {
+                releaseCheckpointWaiter();
+                try {
+                    writes.wait(unit.toMillis(timeout));
+                } catch (InterruptedException e) {
+                    throw new InterruptedIOException();
+                }
+            }
+            if( writes.isEmpty() ) {
+                releaseCheckpointWaiter();
+                return false;
+            }
+            
+            // build a write batch from the current write cache. 
+            for (PageWrite write : writes.values()) {
+                
+                int l = write.current.length+RECOVERY_HEADER_SIZE;
+                
+                // Will it fit in the batch???
+                if( batchLength + l > recoveryBufferSize ) {
+                    break; // nope.. stop adding to the batch.
+                }
+                
+                batch.add(write);
+                batchLength +=l;
+                
+                // Move the current write to the diskBound write, this lets folks update the 
+                // page again without blocking for this write.
+                write.begin();
+            }
+        }
+       long txId = nextTxid.get();
+        
+ 
+       if (enableRecoveryBuffer) {
+            // Now the batch array has all the writes, write the batch to the
+            // recovery buffer.
+            writeFile.seek(CONFIG_SPACE_SIZE);
+            // write txid of the batch
+            writeFile.writeLong(txId); 
+            // write the recovery record counter.
+            writeFile.writeInt(batch.size()); 
+            for (PageWrite w : batch) {
+                writeFile.writeLong(w.page.getPageId());
+                writeFile.write(w.diskBound, 0, pageSize);
+            }
+            if( enableSyncedWrites ) {
+                // Sync to make sure recovery buffer writes land on disk..
+                writeFile.getFD().sync();
+            }
+        }
+       
+        
+        StringBuilder pageOffsets = new StringBuilder();
+        for (PageWrite w : batch) {
+            if( pageOffsets.length()!=0 ) {
+                pageOffsets.append(", ");
+            }
+            pageOffsets.append(w.page.getPageId());
+            writeFile.seek(toOffset(w.page.getPageId()));
+            writeFile.write(w.diskBound, 0, pageSize);
+        }
+        
+        // Sync again
+        if( enableSyncedWrites ) {
+            writeFile.getFD().sync();
+            LOG.debug("Page write complete tx: "+txId+", pages: "+pageOffsets);
+        }
+        
+        nextTxid.incrementAndGet();
+
+        synchronized( writes ) {
+            for (PageWrite w : batch) {
+                // If there are no more pending writes, then remove it from the write cache.
+                if( w.done() ) {
+                    writes.remove(w.page.getPageId());
+                }
+            }
+            if( writes.isEmpty() ) {
+                releaseCheckpointWaiter();
+            }
+        }
+        
+        return true;
+    }
+
+    private void releaseCheckpointWaiter() {
+        if( checkpointLatch!=null ) {
+            checkpointLatch.countDown();
+            checkpointLatch=null;
+        }
+    }       
+    
+    /**
+     * @return the last transaction id successfully written to disk.
+     * @throws IOException
+     */
+    private long redoRecoveryUpdates() throws IOException {
+        
+        if( readFile.length() < CONFIG_SPACE_SIZE+12 ) {
+            return 0;
+        }
+
+        // How many recovery records do we have in the recovery buffer?
+        readFile.seek(CONFIG_SPACE_SIZE);
+        long rc = readFile.readLong();
+        int recordCounter = readFile.readInt();
+        
+        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
+        try {
+            for (int i = 0; i < recordCounter; i++) {
+                long offset = readFile.readLong();
+                byte []data = new byte[pageSize];
+                if( readFile.read(data, 0, pageSize) != pageSize ) {
+                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
+                }
+                batch.put(offset, data);
+            }
+        } catch (IllegalStateException e) {
+            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 
+            // as the pages should still be consistent.
+            return rc-1;
+        }
+        
+        // We need to detect if the recovery buffer write was fully synced to disk.  If it was we should see some of it's partial writes in the page file. 
+        // If we don't see any (even partial writes) to the page file, then we consider that the recovery buffer was the one that was partially written to.
+        // FYI: This depends on the header changing on every write.  It occurs because the header contains a txid which changes on every write.
+        boolean redoNeeded = false;
+        byte header[] = new byte[Page.PAGE_HEADER_SIZE]; 
+        byte header2[] = new byte[Page.PAGE_HEADER_SIZE]; 
+        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
+            try {
+                readFile.seek(e.getKey());
+                readFile.readFully(header);
+                System.arraycopy(e.getValue(), 0, header2, 0, Page.PAGE_HEADER_SIZE);
+                if( Arrays.equals(header, header2) ) {
+                    redoNeeded = true;
+                    break;
+                }
+            } catch (IOException ignore) {
+                // not all records may have been written..
+            }
+        }
+
+        // Stop here if we don't need to redo...
+        if( !redoNeeded ) {
+            return rc-1;
+        }
+        
+        
+        // Re-apply all the writes in the recovery buffer.
+        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
+            writeFile.seek(e.getKey());
+            e.getValue();
+            writeFile.write(e.getValue());
+        }
+        
+        // And sync it to disk
+        writeFile.getFD().sync(); 
+        return rc;
+    }
+
+    private void startWriter() {
+        stopWriter.set(false);
+        writerThread = new Thread("Page Writer") {
+            @Override
+            public void run() {
+                try {
+                    while( !stopWriter.get() ) {
+                        doWrites(1000, TimeUnit.MILLISECONDS);
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                } finally {
+                    releaseCheckpointWaiter();
+                }
+            }
+        };
+        writerThread.start();
+    }
+ 
+    private void stopWriter() throws InterruptedException {
+        stopWriter.set(true);
+        writerThread.join();
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Misc Internal Operations
+    ///////////////////////////////////////////////////////////////////
+
+    private File getMainPageFile() {
+        return new File(directory, IOHelper.toFileSystemSafeName(name)+".dat");
+    }
+    
+    private File getFreeFile() {
+        return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
+    } 
+    
+    private long toOffset(long pageId) {
+        return initialPageOffset+(pageId*pageSize);
+    }
+
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal Cache Related operations
+    ///////////////////////////////////////////////////////////////////
+    
+    private Page getFromCache(long pageId) {
+        Page result = null;
+        if (enablePageCaching) {
+            result = pageCache.get(pageId);
+        }
+        if( result == null ) {
+            synchronized(writes) {
+                PageWrite pageWrite = writes.get(pageId);
+                if( pageWrite != null ) {
+                    result = pageWrite.page;
+                }
+            }
+            if (enablePageCaching) {
+                pageCache.put(pageId, result);
+            }
+        }
+        return result;
+    }
+
+    private void addToCache(Page page) {
+        if (enablePageCaching) {
+            pageCache.put(page.getPageId(), page);
+        }
+    }
+
+    private void removeFromCache(Page page) {
+        if (enablePageCaching) {
+            pageCache.remove(page.getPageId());
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal MetaData Related Operations
+    ///////////////////////////////////////////////////////////////////
+    
+    private void loadMetaData() throws IOException {
+
+        ByteArrayInputStream is;
+        MetaData v1 = new MetaData();
+        MetaData v2 = new MetaData();
+        try {
+            Properties p = new Properties();
+            byte[] d = new byte[CONFIG_SPACE_SIZE/2];
+            readFile.seek(0);
+            readFile.readFully(d);
+            is = new ByteArrayInputStream(d);
+            p.load(is);
+            IntrospectionSupport.setProperties(v1, p);
+        } catch (IOException e) {
+            v1 = null;
+        }
+        
+        try {
+            Properties p = new Properties();
+            byte[] d = new byte[CONFIG_SPACE_SIZE/2];
+            readFile.seek(CONFIG_SPACE_SIZE/2);
+            readFile.readFully(d);
+            is = new ByteArrayInputStream(d);
+            p.load(is);
+            IntrospectionSupport.setProperties(v2, p);
+        } catch (IOException e) {
+            v2 = null;
+        }
+        
+        if( v1==null && v2==null ) {
+            throw new IOException("Could not load page file meta data");
+        } 
+        
+        if( v1 == null || v1.metaDataTxId<0 ) {
+            metaData = v2;
+        } else if( v2==null || v1.metaDataTxId<0 ) {
+            metaData = v1;
+        } else if( v1.metaDataTxId==v2.metaDataTxId ) {
+            metaData = v1; // use the first since the 2nd could be a partial..
+        } else {
+            metaData = v2; // use the second cause the first is probably a partial.
+        }
+    }
+    
+    private void storeMetaData() throws IOException {
+
+        // Convert the metadata into a property format
+        metaData.metaDataTxId++;
+        Properties p = new Properties();
+        IntrospectionSupport.getProperties(metaData, p, null);
+        
+        ByteArrayOutputStream os = new ByteArrayOutputStream(CONFIG_SPACE_SIZE);
+        p.store(os, "");
+        if( os.size() > CONFIG_SPACE_SIZE/2) { 
+            throw new IOException("Configuation is to larger than: "+CONFIG_SPACE_SIZE/2);
+        }
+        // Fill the rest with space...
+        byte[] filler = new byte[(CONFIG_SPACE_SIZE/2)-os.size()];
+        Arrays.fill(filler, (byte)' ');
+        os.write(filler);
+        os.flush();
+        
+        byte[] d = os.toByteArray();
+
+        // So we don't loose it.. write it 2 times...
+        writeFile.seek(0);
+        writeFile.write(d);
+        writeFile.getFD().sync();
+        writeFile.seek(CONFIG_SPACE_SIZE/2);
+        writeFile.write(d);
+        writeFile.getFD().sync();
+    }
+
+    private void storeFreeList() throws IOException {
+        FileOutputStream os = new FileOutputStream(getFreeFile());
+        DataOutputStream dos = new DataOutputStream(os);
+        dos.writeLong(freeList.size());
+        for (Long offset : freeList) {
+            dos.writeLong(offset);
+        }
+        dos.close();
+    }
+
+    private void loadFreeList() throws IOException {
+        freeList.clear();
+        FileInputStream is = new FileInputStream(getFreeFile());
+        DataInputStream dis = new DataInputStream(is);
+        long size = dis.readLong();
+        for ( long i=-0 ; i < size; i++) {
+            freeList.add(dis.readLong());
+        }
+        dis.close();
+    }
+    
+
+    ///////////////////////////////////////////////////////////////////
+    // Property Accessors 
+    ///////////////////////////////////////////////////////////////////
+    
+    /**
+     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
+     * 
+     * @return is the recovery buffer enabled.
+     */
+    public boolean isEnableRecoveryBuffer() {
+        return enableRecoveryBuffer;
+    }
+
+    /**
+     * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
+     * may potentially cause partial page writes which can lead to page file corruption.
+     */
+    public void setEnableRecoveryBuffer(boolean doubleBuffer) {
+        this.enableRecoveryBuffer = doubleBuffer;
+    }
+
+    /**
+     * @return Are page writes synced to disk?
+     */
+    public boolean isEnableSyncedWrites() {
+        return enableSyncedWrites;
+    }
+
+    /**
+     * Allows you enable syncing writes to disk.
+     * @param syncWrites
+     */
+    public void setEnableSyncedWrites(boolean syncWrites) {
+        this.enableSyncedWrites = syncWrites;
+    }
+    
+    /**
+     * @return the page size
+     */
+    public int getPageSize() {
+        return this.pageSize;
+    }
+
+    /**
+     * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
+     * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
+     * can no longer be changed.
+     * 
+     * @param pageSize the pageSize to set
+     * @throws IllegalStateException
+     *         once the page file is loaded.
+     */
+    public void setPageSize(int pageSize) throws IllegalStateException {
+        if (loaded.get() && pageSize != this.pageSize) {
+            throw new IllegalStateException("Pages already loaded - can't reset page size");
+        }
+        this.pageSize = pageSize;
+    }
+    
+    /**
+     * @return true if read page caching is enabled
+     */
+    public boolean isEnablePageCaching() {
+        return this.enablePageCaching;
+    }
+
+    /**
+     * @param allows you to enable read page caching
+     */
+    public void setEnablePageCaching(boolean enablePageCaching) {
+        this.enablePageCaching = enablePageCaching;
+    }
+
+    /**
+     * @return the maximum number of pages that will get stored in the read page cache.
+     */
+    public int getPageCacheSize() {
+        return this.pageCacheSize;
+    }
+
+    /**
+     * @param Sets the maximum number of pages that will get stored in the read page cache.
+     */
+    public void setPageCacheSize(int pageCacheSize) {
+        this.pageCacheSize = pageCacheSize;
+    }
+    
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IntrospectionSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IntrospectionSupport.java?rev=687399&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IntrospectionSupport.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IntrospectionSupport.java Wed Aug 20 10:56:36 2008
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.util;
+
+import java.beans.PropertyEditor;
+import java.beans.PropertyEditorManager;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+public final class IntrospectionSupport {
+    
+    private IntrospectionSupport() {
+    }
+
+    public static boolean getProperties(Object target, Map props, String optionPrefix) {
+
+        boolean rc = false;
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        if (optionPrefix == null) {
+            optionPrefix = "";
+        }
+
+        Class clazz = target.getClass();
+        Method[] methods = clazz.getMethods();
+        for (int i = 0; i < methods.length; i++) {
+            Method method = methods[i];
+            String name = method.getName();
+            Class type = method.getReturnType();
+            Class params[] = method.getParameterTypes();
+            if ((name.startsWith("is") || name.startsWith("get")) && params.length == 0 && type != null && isSettableType(type)) {
+
+                try {
+
+                    Object value = method.invoke(target, new Object[] {});
+                    if (value == null) {
+                        continue;
+                    }
+
+                    String strValue = convertToString(value, type);
+                    if (strValue == null) {
+                        continue;
+                    }
+                    if (name.startsWith("get")) {
+                        name = name.substring(3, 4).toLowerCase()
+                                + name.substring(4);
+                    } else {
+                        name = name.substring(2, 3).toLowerCase()
+                                + name.substring(3);
+                    }
+                    props.put(optionPrefix + name, strValue);
+                    rc = true;
+
+                } catch (Throwable ignore) {
+                    ignore.printStackTrace();
+                }
+
+            }
+        }
+
+        return rc;
+    }
+
+    public static boolean setProperties(Object target, Map props, String optionPrefix) {
+        boolean rc = false;
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        for (Iterator<String> iter = props.keySet().iterator(); iter.hasNext();) {
+            String name = iter.next();
+            if (name.startsWith(optionPrefix)) {
+                Object value = props.get(name);
+                name = name.substring(optionPrefix.length());
+                if (setProperty(target, name, value)) {
+                    iter.remove();
+                    rc = true;
+                }
+            }
+        }
+        return rc;
+    }
+
+    public static Map<String, Object> extractProperties(Map props, String optionPrefix) {
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        HashMap<String, Object> rc = new HashMap<String, Object>(props.size());
+
+        for (Iterator iter = props.keySet().iterator(); iter.hasNext();) {
+            String name = (String)iter.next();
+            if (name.startsWith(optionPrefix)) {
+                Object value = props.get(name);
+                name = name.substring(optionPrefix.length());
+                rc.put(name, value);
+                iter.remove();
+            }
+        }
+
+        return rc;
+    }
+
+    public static boolean setProperties(Object target, Map props) {
+        boolean rc = false;
+
+        if (target == null) {
+            throw new IllegalArgumentException("target was null.");
+        }
+        if (props == null) {
+            throw new IllegalArgumentException("props was null.");
+        }
+
+        for (Iterator iter = props.entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Entry)iter.next();
+            if (setProperty(target, (String)entry.getKey(), entry.getValue())) {
+                iter.remove();
+                rc = true;
+            }
+        }
+
+        return rc;
+    }
+
+    public static boolean setProperty(Object target, String name, Object value) {
+        try {
+            Class clazz = target.getClass();
+            Method setter = findSetterMethod(clazz, name);
+            if (setter == null) {
+                return false;
+            }
+
+            // If the type is null or it matches the needed type, just use the
+            // value directly
+            if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
+                setter.invoke(target, new Object[] {value});
+            } else {
+                // We need to convert it
+                setter.invoke(target, new Object[] {convert(value, setter.getParameterTypes()[0])});
+            }
+            return true;
+        } catch (Throwable ignore) {
+            return false;
+        }
+    }
+
+    private static Object convert(Object value, Class type) throws URISyntaxException {
+        PropertyEditor editor = PropertyEditorManager.findEditor(type);
+        if (editor != null) {
+            editor.setAsText(value.toString());
+            return editor.getValue();
+        }
+        if (type == URI.class) {
+            return new URI(value.toString());
+        }
+        return null;
+    }
+
+    private static String convertToString(Object value, Class type) throws URISyntaxException {
+        PropertyEditor editor = PropertyEditorManager.findEditor(type);
+        if (editor != null) {
+            editor.setValue(value);
+            return editor.getAsText();
+        }
+        if (type == URI.class) {
+            return ((URI)value).toString();
+        }
+        return null;
+    }
+
+    private static Method findSetterMethod(Class clazz, String name) {
+        // Build the method name.
+        name = "set" + name.substring(0, 1).toUpperCase() + name.substring(1);
+        Method[] methods = clazz.getMethods();
+        for (int i = 0; i < methods.length; i++) {
+            Method method = methods[i];
+            Class params[] = method.getParameterTypes();
+            if (method.getName().equals(name) && params.length == 1 ) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    private static boolean isSettableType(Class clazz) {
+        if (PropertyEditorManager.findEditor(clazz) != null) {
+            return true;
+        }
+        if (clazz == URI.class) {
+            return true;
+        }
+        if (clazz == Boolean.class) {
+            return true;
+        }
+        return false;
+    }
+
+    public static String toString(Object target) {
+        return toString(target, Object.class);
+    }
+
+    public static String toString(Object target, Class stopClass) {
+        LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
+        addFields(target, target.getClass(), stopClass, map);
+        StringBuffer buffer = new StringBuffer(simpleName(target.getClass()));
+        buffer.append(" {");
+        Set entrySet = map.entrySet();
+        boolean first = true;
+        for (Iterator iter = entrySet.iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry)iter.next();
+            if (first) {
+                first = false;
+            } else {
+                buffer.append(", ");
+            }
+            buffer.append(entry.getKey());
+            buffer.append(" = ");
+            appendToString(buffer, entry.getValue());
+        }
+        buffer.append("}");
+        return buffer.toString();
+    }
+
+    protected static void appendToString(StringBuffer buffer, Object value) {
+        buffer.append(value);
+    }
+
+    public static String simpleName(Class clazz) {
+        String name = clazz.getName();
+        int p = name.lastIndexOf(".");
+        if (p >= 0) {
+            name = name.substring(p + 1);
+        }
+        return name;
+    }
+
+    private static void addFields(Object target, Class startClass, Class<Object> stopClass, LinkedHashMap<String, Object> map) {
+
+        if (startClass != stopClass) {
+            addFields(target, startClass.getSuperclass(), stopClass, map);
+        }
+
+        Field[] fields = startClass.getDeclaredFields();
+        for (int i = 0; i < fields.length; i++) {
+            Field field = fields[i];
+            if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())
+                || Modifier.isPrivate(field.getModifiers())) {
+                continue;
+            }
+
+            try {
+                field.setAccessible(true);
+                Object o = field.get(target);
+                if (o != null && o.getClass().isArray()) {
+                    try {
+                        o = Arrays.asList((Object[])o);
+                    } catch (Throwable e) {
+                    }
+                }
+                map.put(field.getName(), o);
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+}

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/IntrospectionSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java?rev=687399&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java Wed Aug 20 10:56:36 2008
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+
+import org.apache.kahadb.StringMarshaller;
+
+import junit.framework.TestCase;
+
+public class PageFileTest extends TestCase {
+
+    static final short TEST_TYPE = 65;
+    
+    public void testCRUD() throws IOException {
+        
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.load();
+        
+        HashSet<String> expected = new HashSet<String>();
+        
+        // Insert some data into the page file.
+        for( int i=0 ; i < 100; i++) {
+            Page page = pf.allocate();
+            page.setType(TEST_TYPE);
+            
+            String t = "page:"+i;
+            expected.add(t);
+            page.setData(t);
+            pf.write(page, StringMarshaller.INSTANCE);
+        }
+        
+        // Reload it...
+        pf.unload();
+        pf.load();
+        
+        // Iterate it to make sure they are still there..
+        HashSet<String> actual = new HashSet<String>();
+        for (Page page : pf) {
+            pf.load(page, StringMarshaller.INSTANCE);
+            actual.add((String)page.getData());
+        }
+        assertEquals(expected, actual);
+        
+        // Remove the odd records..
+        for( int i=0 ; i < 100; i++) {
+            if( i%2 == 0 ) {
+                break;
+            }
+            String t = "page:"+i;
+            expected.remove(t);
+        }
+        for (Page page : pf) {
+            pf.load(page, StringMarshaller.INSTANCE);
+            if( !expected.contains(page.getData()) ) {
+                pf.free(page);
+            }
+        }
+        
+        // Reload it...
+        pf.unload();
+        pf.load();
+        
+        // Iterate it to make sure the even records are still there..
+        actual.clear();
+        for (Page page : pf) {
+            pf.load(page, StringMarshaller.INSTANCE);
+            actual.add((String)page.getData());
+        }
+        assertEquals(expected, actual);
+
+
+        // Update the records...
+        HashSet<String> t = expected;
+        expected = new HashSet<String>();
+        for (String s : t) {
+            expected.add(s+":updated");
+        }
+        for (Page page : pf) {
+            pf.load(page, StringMarshaller.INSTANCE);
+            page.setData(page.getData()+":updated");
+            pf.write(page, StringMarshaller.INSTANCE);
+        }
+ 
+        // Reload it...
+        pf.unload();
+        pf.load();
+
+        // Iterate it to make sure the updated records are still there..
+        actual.clear();
+        for (Page page : pf) {
+            pf.load(page, StringMarshaller.INSTANCE);
+            actual.add((String)page.getData());
+        }
+        assertEquals(expected, actual);
+        
+        pf.unload();
+    }
+    
+}