You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/22 17:53:43 UTC

svn commit: r688106 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/page/ test/java/org/apache/kahadb/page/

Author: chirino
Date: Fri Aug 22 08:53:42 2008
New Revision: 688106

URL: http://svn.apache.org/viewvc?rev=688106&view=rev
Log:
Update the PageFile public interface so that all read/update operations have be done via a Transaction object.  Updated the 
HashIndex to support doing index operations in the context of the Transaction.  The idea is that multiple page and index updates 
can be performed in a single Unit of Work controled by the Transaction object.  Should make recovering higher level complex 
operations easier to recover.


Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java Fri Aug 22 08:53:42 2008
@@ -108,7 +108,7 @@
 
     public static class PageInputStream extends InputStream {
 
-        private PageFile file;
+        private Transaction tx;
         private Chunk chunk;
         private int pos;
         private int pageCount; 
@@ -118,11 +118,11 @@
         private int markPageCount; 
         private ChunkMarshaller marshaller;
         
-        public PageInputStream(PageFile file, long pageId) throws IOException {
-            this.file = file;
-            this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE);
+        public PageInputStream(Transaction tx, long pageId) throws IOException {
+            this.tx = tx;
+            this.marshaller = new ChunkMarshaller(tx.getPageFile().getPageContentSize()-HEADER_MAX_SIZE);
             
-            Page page = file.load(pageId, marshaller);
+            Page page = tx.load(pageId, marshaller);
             if( page.getType() != Page.CHUNK_TYPE ) {
                 throw new EOFException("Chunk stream does not exist at page: "+pageId);
             }
@@ -151,7 +151,7 @@
         }
 
         private void fill() throws IOException {
-            Page page = file.load(chunk.next, marshaller);
+            Page page = tx.load(chunk.next, marshaller);
             if( page.getType() == Page.INVALID_TYPE ) {
                 throw new IOException("Invalid page: "+chunk.next);
             }
@@ -225,16 +225,16 @@
 
     static public class PageOutputStream extends ByteArrayOutputStream {
 
-        private PageFile file;
         private long pageId;
         private ChunkMarshaller marshaller;
         private int pageCount;
-        private ArrayList<Page> pages; 
+        private ArrayList<Page> pages;
+        private final Transaction tx; 
 
-        public PageOutputStream(PageFile file, long pageId) {
-            this.file = file;
+        public PageOutputStream(Transaction tx, long pageId) {
+            this.tx = tx;
             this.pageId = pageId;
-            this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE);
+            this.marshaller = new ChunkMarshaller(tx.getPageFile().getPageContentSize()-HEADER_MAX_SIZE);
         }
 
         @Override
@@ -261,7 +261,7 @@
             pages = new ArrayList<Page>();
             long p = pageId;
             while( p >= 0  ) {
-                Page page = file.load(p, marshaller);
+                Page page = tx.load(p, marshaller);
                 Chunk c = (Chunk)page.getData();
                 if( c!=null && !c.last ) {
                     p = c.next;
@@ -273,7 +273,7 @@
 
             // Add more if needed.
             while( pages.size() < chunks.size() ) {
-                pages.add(file.allocate());
+                pages.add(tx.allocate());
             }
             
             // Update the page data.
@@ -285,6 +285,7 @@
                 if( !chunk.last ) {
                     chunk.next = pages.get(i+1).getPageId();
                 }
+                tx.write(page, marshaller);
             }
 
             // If there were extra pages.. Free them up.
@@ -292,10 +293,9 @@
                 Page page = pages.get(i);
                 page.setData(null);
                 page.setType(Page.FREE_TYPE);
+                tx.write(page, marshaller);
             }
             
-            file.write(pages, marshaller);
-            
             pageCount=chunks.size();
         }
         

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java Fri Aug 22 08:53:42 2008
@@ -50,7 +50,7 @@
         this.pageId = pageId;
     }
 
-    private void load() throws IOException {
+    private void load(Transaction tx) throws IOException {
         
         data = new TreeMap<Comparable, Long>();
 
@@ -59,8 +59,7 @@
         // we will still need to de-marshall from the stream.
         
         // I think it will be better to make the bin a btree root.  
-        
-        PageInputStream pis = new PageInputStream(index.getPageFile(), pageId);        
+        PageInputStream pis = new PageInputStream(tx, pageId);        
         DataInputStream is = new DataInputStream(pis);
         try {
             
@@ -77,8 +76,8 @@
         }
     }
     
-    public void store() throws IOException {
-        PageOutputStream pos = new PageOutputStream(index.getPageFile(), pageId);
+    public void store(Transaction tx) throws IOException {
+        PageOutputStream pos = new PageOutputStream(tx, pageId);
         DataOutputStream os = new DataOutputStream(pos);
         if( data == null ) {
             os.writeInt(0);
@@ -93,13 +92,13 @@
         pageCount = pos.getPageCount();
     }
 
-    public int size() throws IOException {
+    public int size(Transaction tx) throws IOException {
         if( data!=null ) {
             return data.size();
         } else {
             
             // Peek at the page to see how many items it contains.
-            PageInputStream pis = new PageInputStream(index.getPageFile(), pageId);
+            PageInputStream pis = new PageInputStream(tx, pageId);
             DataInputStream is = new DataInputStream(pis);
             int size = is.readInt();
             is.close();
@@ -108,38 +107,38 @@
         }
     }
 
-    public Long put(Comparable key, Long value) throws IOException {
+    public Long put(Transaction tx, Comparable key, Long value) throws IOException {
         if( data==null ) {
-            load();
+            load(tx);
         }
         Long rc = data.put(key, value);
         if( !value.equals(rc) ) {
-            store();
+            store(tx);
         }
         return rc;
     }
 
-    public Long find(Comparable key) throws IOException {
+    public Long find(Transaction tx, Comparable key) throws IOException {
         if( data==null ) {
-            load();
+            load(tx);
         }
         return data.get(key);
     }
     
-    public Map<Comparable, Long> getAll() throws IOException {
+    public Map<Comparable, Long> getAll(Transaction tx) throws IOException {
         if( data==null ) {
-            load();
+            load(tx);
         }
         return data;
     }
     
-    public Long remove(Comparable key) throws IOException {
+    public Long remove(Transaction tx, Comparable key) throws IOException {
         if( data==null ) {
-            load();
+            load(tx);
         }
         Long rc = data.remove(key);
         if( rc!=null ) {
-            store();
+            store(tx);
         }
         return rc;
     }
@@ -148,4 +147,8 @@
         return pageId;
     }
 
+    public int getPageCount() {
+        return pageCount;
+    }
+
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java Fri Aug 22 08:53:42 2008
@@ -30,6 +30,9 @@
 import org.apache.kahadb.impl.index.IndexManager;
 import org.apache.kahadb.page.Chunk.PageInputStream;
 import org.apache.kahadb.page.Chunk.PageOutputStream;
+import org.apache.kahadb.page.PageFile.PageFileTransaction;
+import org.apache.kahadb.page.Transaction.Closure;
+import org.apache.kahadb.page.Transaction.CallableClosure;
 
 /**
  * BTree implementation
@@ -108,101 +111,143 @@
     }
 
     public synchronized void load() {
-        if (loaded.compareAndSet(false, true)) {
-            try {
-                Page page = pageFile.load(pageId, null);
-
-                // Is this a brand new index?
-                if (page.getType() == Page.FREE_TYPE) {
-
-                    // We need to create the pages for the bins
-                    Page binPage = pageFile.allocate(binCapacity);
-                    binPageId = binPage.getPageId();
-                    state = INITIALIZING_STATE;
-                    storeMetadata();
-                    pageFile.checkpoint();
-
-                    // If failure happens now we can continue initializing the
-                    // the hash bins...
-
-                } else {
-                    // Lets load it's data
-                    loadMetadata();
-
-                    // If we did not have a clean shutdown...
-                    if (state == OPEN_STATE || state == RESIZING_PHASE1_STATE) {
-                        // Figure out the size and the # of bins that are
-                        // active. Yeah This loads the first page of every bin. :(
-                        // We might want to put this in the metadata page, but
-                        // then that page would be getting updated on every write.
-                        size = 0;
-                        for (int i = 0; i < binCapacity; i++) {
-                            HashBin hashBin = new HashBin(this, binPageId + i);
-                            int t = hashBin.size();
-                            if (t > 0) {
-                                binsActive++;
+        PageFileTransaction tx = pageFile.tx();
+        try {
+            
+            if (loaded.compareAndSet(false, true)) {
+                try {
+                    Page page = tx.load(pageId, null);
+    
+                    // Is this a brand new index?
+                    if (page.getType() == Page.FREE_TYPE) {
+    
+                        // We need to create the pages for the bins
+                        tx.execute(new Transaction.Closure<IOException>(){
+                            public void execute(Transaction tx) throws IOException {
+                                Page binPage = tx.allocate(binCapacity);
+                                binPageId = binPage.getPageId();
+                                state = INITIALIZING_STATE;
+                                storeMetadata(tx);
+                            }
+                        });
+                        pageFile.checkpoint();
+    
+                        // If failure happens now we can continue initializing the
+                        // the hash bins...
+    
+                    } else {
+                        // Lets load it's data
+                        loadMetadata(tx);
+    
+                        // If we did not have a clean shutdown...
+                        if (state == OPEN_STATE || state == RESIZING_PHASE1_STATE) {
+                            // Figure out the size and the # of bins that are
+                            // active. Yeah This loads the first page of every bin. :(
+                            // We might want to put this in the metadata page, but
+                            // then that page would be getting updated on every write.
+                            size = 0;
+                            for (int i = 0; i < binCapacity; i++) {
+                                HashBin hashBin = new HashBin(this, binPageId + i);
+                                int t = hashBin.size(tx);
+                                if (t > 0) {
+                                    binsActive++;
+                                }
+                                size += t;
                             }
-                            size += t;
                         }
                     }
-                }
-
-                if (state == INITIALIZING_STATE) {
-                    // TODO:
-                    // If a failure occurs mid way through us initializing the
-                    // bins.. will the page file still think we have the rest
-                    // of them previously allocated to us?
-
-                    for (int i = 0; i < binCapacity; i++) {
-                        HashBin hashBin = new HashBin(this, binPageId + i);
-                        hashBin.store();
+    
+                    if (state == INITIALIZING_STATE) {
+                        // TODO:
+                        // If a failure occurs mid way through us initializing the
+                        // bins.. will the page file still think we have the rest
+                        // of them previously allocated to us?
+    
+                        tx.execute(new Closure<IOException>(){
+                            public void execute(Transaction tx) throws IOException {
+                                for (int i = 0; i < binCapacity; i++) {
+                                    HashBin hashBin = new HashBin(HashIndex.this, binPageId + i);
+                                    hashBin.store(tx);
+                                }
+                            }
+                        });
+                        size = 0;
+                        binsActive = 0;
                     }
-                    size = 0;
-                    binsActive = 0;
+                    
+                    if (state == RESIZING_PHASE1_STATE) {
+                        // continue resize phase 1
+                        resizePhase1();
+                    }                
+                    if (state == RESIZING_PHASE2_STATE) {
+                        // continue resize phase 1
+                        resizePhase2();
+                    }                
+    
+                    calcThresholds();
+    
+                    state = OPEN_STATE;
+                    tx.execute(new Closure<IOException>(){
+                        public void execute(Transaction tx) throws IOException {
+                            storeMetadata(tx);
+                        }
+                    });
+                    pageFile.checkpoint();
+                    
+                    LOG.debug("HashIndex loaded. Using "+binCapacity+" bins starting at page "+binPageId);
+    
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
                 }
-                
-                if (state == RESIZING_PHASE1_STATE) {
-                    // continue resize phase 1
-                    resizePhase1();
-                }                
-                if (state == RESIZING_PHASE2_STATE) {
-                    // continue resize phase 1
-                    resizePhase2();
-                }                
-
-                calcThresholds();
-
-                state = OPEN_STATE;
-                storeMetadata();
-                pageFile.checkpoint();
-                
-                LOG.debug("HashIndex loaded. Using "+binCapacity+" bins starting at page "+binPageId);
-
-            } catch (IOException e) {
-                throw new RuntimeException(e);
             }
+            
+        } finally {
+            // All pending updates should have been committed by now.
+            assert tx.isReadOnly();
         }
     }
 
     public synchronized void unload() throws IOException {
         if (loaded.compareAndSet(true, false)) {
             state = CLOSED_STATE;
-            storeMetadata();
+            pageFile.tx().execute(new Closure<IOException>(){
+                public void execute(Transaction tx) throws IOException {
+                    storeMetadata(tx);
+                }
+            });
         }
     }
 
-    public synchronized StoreEntry get(Object key) throws IOException {
+    public synchronized StoreEntry get(final Object key) throws IOException {
+        return pageFile.tx().execute(new CallableClosure<StoreEntry,IOException>(){
+            public StoreEntry execute(Transaction tx) throws IOException {
+                return get(tx, key);
+            }
+        });
+    }
+
+    public synchronized StoreEntry get(Transaction tx, Object key) throws IOException {
+        // TODO: multiple loads is smelly..
         load();
-        Long result = getBin(key).find((Comparable)key);
+        Long result = getBin(key).find(tx, (Comparable)key);
         return result != null ? indexManager.getIndex(result) : null;
     }
 
-    public synchronized void store(Object key, StoreEntry value) throws IOException {
+    public synchronized void store(final Object key, final StoreEntry value) throws IOException {
+        pageFile.tx().execute(new Closure<IOException>(){
+            public void execute(Transaction tx) throws IOException {
+                store(tx, key, value);
+            }
+        });
+    }
+
+    public synchronized void store(Transaction tx, Object key, StoreEntry value) throws IOException {
+        // TODO: multiple loads is smelly..
         load();
         HashBin bin = getBin(key);
-        if (bin.put((Comparable)key, value.getOffset()) == null) {
+        if (bin.put(tx, (Comparable)key, value.getOffset()) == null) {
             this.size++;
-            if (bin.size() == 1) {
+            if (bin.size(tx) == 1) {
                 binsActive++;
             }
         }
@@ -214,15 +259,24 @@
         }
     }
 
-    public synchronized StoreEntry remove(Object key) throws IOException {
+    public synchronized StoreEntry remove(final Object key) throws IOException {
+        return pageFile.tx().execute(new CallableClosure<StoreEntry,IOException>(){
+            public StoreEntry execute(Transaction tx) throws IOException {
+                return remove(tx, key);
+            }
+        });
+    }
+    
+    public synchronized StoreEntry remove(Transaction tx, Object key) throws IOException {
+        // TODO: multiple loads is smelly..
         load();
         StoreEntry result = null;
 
         HashBin bin = getBin(key);
-        Long offset = bin.remove((Comparable)key);
+        Long offset = bin.remove(tx, (Comparable)key);
         if (offset != null) {
             this.size--;
-            if (bin.size() == 0) {
+            if (bin.size(tx) == 0) {
                 binsActive--;
             }
             result = this.indexManager.getIndex(offset);
@@ -241,12 +295,25 @@
     public synchronized boolean containsKey(Object key) throws IOException {
         return get(key) != null;
     }
+    
+    public synchronized boolean containsKey(Transaction tx, Object key) throws IOException {
+        return get(tx, key) != null;
+    }
 
     public synchronized void clear() throws IOException {
+        pageFile.tx().execute(new Closure<IOException>(){
+            public void execute(Transaction tx) throws IOException {
+                clear(tx);
+            }
+        });
+    }
+
+    public synchronized void clear(Transaction tx) throws IOException {
+        // TODO: multiple loads is smelly..
         load();
         for (int i = 0; i < binCapacity; i++) {
             HashBin hashBin = new HashBin(this, binPageId + i);
-            hashBin.store(); // A store before a load.. clears the data out.
+            hashBin.store(tx); // A store before a load.. clears the data out.
         }
         size = 0;
         binsActive = 0;
@@ -261,8 +328,8 @@
     // Implementation Methods
     // /////////////////////////////////////////////////////////////////
 
-    private void loadMetadata() throws IOException {
-        PageInputStream pis = new PageInputStream(pageFile, pageId);
+    private void loadMetadata(Transaction tx) throws IOException {
+        PageInputStream pis = new PageInputStream(tx, pageId);
         DataInputStream is = new DataInputStream(pis);
         state = is.readInt();
         binPageId = is.readLong();
@@ -274,8 +341,8 @@
         is.close();
     }
 
-    private void storeMetadata() throws IOException {
-        PageOutputStream pos = new PageOutputStream(pageFile, pageId);
+    private void storeMetadata(Transaction tx) throws IOException {
+        PageOutputStream pos = new PageOutputStream(tx, pageId);
         DataOutputStream os = new DataOutputStream(pos);
         os.writeInt(state);
         os.writeLong(binPageId);
@@ -287,65 +354,76 @@
         os.close();
     }
 
-    private void resize(int newSize) throws IOException {
-        
+    private void resize(final int newSize) throws IOException {
         LOG.debug("Resizing to: "+newSize);
-        
-        state = RESIZING_PHASE1_STATE;
-        resizeCapacity = newSize;
-        resizePageId = pageFile.allocate(resizeCapacity).getPageId();
-        storeMetadata();
+        pageFile.tx().execute(new Closure<IOException>(){
+            public void execute(Transaction tx) throws IOException {
+                state = RESIZING_PHASE1_STATE;
+                resizeCapacity = newSize;
+                resizePageId = tx.allocate(resizeCapacity).getPageId();
+                storeMetadata(tx);
+            }
+        });
         pageFile.checkpoint();
+        
         resizePhase1();
         resizePhase2();        
     }
 
     private void resizePhase1() throws IOException {
         // In Phase 1 we copy the data to the new bins..
-        
-        // Initialize the bins..
-        for (int i = 0; i < resizeCapacity; i++) {
-            HashBin bin = new HashBin(this, resizePageId + i);
-            bin.store();
-        }
+        pageFile.tx().execute(new Closure<IOException>(){
+            public void execute(Transaction tx) throws IOException {
+                
+                // Initialize the bins..
+                for (int i = 0; i < resizeCapacity; i++) {
+                    HashBin bin = new HashBin(HashIndex.this, resizePageId + i);
+                    bin.store(tx);
+                }
 
-        binsActive = 0;
-        // Copy the data from the old bins to the new bins.
-        for (int i = 0; i < binCapacity; i++) {
-            HashBin bin = new HashBin(this, binPageId + i);
-            for (Map.Entry<Comparable, Long> entry : bin.getAll().entrySet()) {
-                HashBin resizeBin = getResizeBin(entry.getKey());
-                resizeBin.put(entry.getKey(), entry.getValue());
-                if( resizeBin.size() == 1) {
-                    binsActive++;
+                binsActive = 0;
+                // Copy the data from the old bins to the new bins.
+                for (int i = 0; i < binCapacity; i++) {
+                    HashBin bin = new HashBin(HashIndex.this, binPageId + i);
+                    for (Map.Entry<Comparable, Long> entry : bin.getAll(tx).entrySet()) {
+                        HashBin resizeBin = getResizeBin(entry.getKey());
+                        resizeBin.put(tx, entry.getKey(), entry.getValue());
+                        if( resizeBin.size(tx) == 1) {
+                            binsActive++;
+                        }
+                    }
                 }
+                
+                // Now we can release the old data.
+                state = RESIZING_PHASE2_STATE;
+                storeMetadata(tx);
             }
-        }
-        
-        // Now we can release the old data.
-        state = RESIZING_PHASE2_STATE;
-        storeMetadata();
+        });
         pageFile.checkpoint();
     }
 
     private void resizePhase2() throws IOException {
-        for (int i = 0; i < binCapacity; i++) {
-            HashBin hashBin = new HashBin(this, binPageId + i);
-            hashBin.store(); // A store before a load.. clears the data out.
-        }
-        pageFile.free(binPageId, binCapacity);
-        
-        binCapacity = resizeCapacity;
-        binPageId = resizePageId;
-        resizeCapacity=0;
-        resizePageId=0;
-        state = OPEN_STATE;
-        storeMetadata();
+        // In phase 2 we free the old bins and switch the the new bins.
+        pageFile.tx().execute(new Closure<IOException>(){
+            public void execute(Transaction tx) throws IOException {
+                for (int i = 0; i < binCapacity; i++) {
+                    HashBin hashBin = new HashBin(HashIndex.this, binPageId + i);
+                    hashBin.store(tx); // A store before a load.. clears the data out.
+                }
+                tx.free(binPageId, binCapacity);
+                
+                binCapacity = resizeCapacity;
+                binPageId = resizePageId;
+                resizeCapacity=0;
+                resizePageId=0;
+                state = OPEN_STATE;
+                storeMetadata(tx);
+            }
+        });
+
         pageFile.checkpoint();
         calcThresholds();
-        
         LOG.debug("Resizing done.  New bins start at: "+binPageId);
-
     }
 
     private void calcThresholds() {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Fri Aug 22 08:53:42 2008
@@ -63,7 +63,7 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class PageFile implements Iterable<Page> {
+public class PageFile {
     
     // 4k Default page size.
     public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
@@ -201,6 +201,360 @@
 
     }
 
+    /**
+     * Provides transaction update access to the PageFile.  All operations that modify 
+     * the PageFile are done via a Transaction.
+     */
+    class PageFileTransaction implements Transaction {
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#getPageFile()
+         */
+        public PageFile getPageFile() {
+            return PageFile.this;
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#allocate()
+         */
+        public Page allocate() throws IOException {
+            if( !loaded.get() ) {
+                throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
+            }
+            
+            Page page = null;
+            
+            // We may need to create a new free page...
+            if(freeList.isEmpty()) {
+                page = new Page();
+                page.setPageId(nextFreePageId);
+                page.setType(Page.FREE_TYPE);
+                nextFreePageId ++;
+//                LOG.debug("allocated: "+page.getPageId());
+                write(page, null);
+            }
+
+            long pageId = freeList.removeFirst();
+            page = new Page();
+            page.setPageId(pageId);
+            page.setType(Page.FREE_TYPE);
+            return page;
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#allocate(int)
+         */
+        public Page allocate(int count) throws IOException {
+            if( !loaded.get() ) {
+                throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
+            }
+
+            Page page = null;
+            Sequence seq = freeList.removeFirstSequence(count);
+            if(seq==null) {
+                
+                // We may need to create a new free page...
+                page = new Page();
+                int c=count;
+                while( c > 0 ) {
+                    page.setPageId(nextFreePageId);
+                    page.setType(Page.FREE_TYPE);
+                    nextFreePageId ++;
+//                    LOG.debug("allocate writing: "+page.getPageId());
+                    write(page, null);
+                    c--;
+                    if( page == null ) {
+                        page = page;
+                    }
+                }
+                
+                seq = freeList.removeFirstSequence(count);
+            }
+            
+            page = new Page();
+            page.setPageId(seq.getFirst());
+            page.setType(Page.FREE_TYPE);
+//            LOG.debug("allocated: "+page.getPageId());
+            return page;
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#free(org.apache.kahadb.page.Page)
+         */
+        public void free(Page page) throws IOException {
+            page.setType(Page.FREE_TYPE);
+            free(page.getPageId(), 1);
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#free(long)
+         */
+        public void free(long pageId) throws IOException {
+            free(pageId, 1);
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#free(org.apache.kahadb.page.Page, int)
+         */
+        public void free(Page page, int count) throws IOException {
+            page.setType(Page.FREE_TYPE);
+            free(page.getPageId(), count);
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#free(long, int)
+         */
+        public void free(long pageId, int count) throws IOException {
+            if( !loaded.get() ) {
+                throw new IllegalStateException("Cannot free a page when the page file is not loaded");
+            }
+            
+            Page page = new Page();
+            long initialId = pageId;
+            for (int i = 0; i < count; i++) {
+                page.setPageId(initialId+i);
+                page.setType(Page.FREE_TYPE);
+//                LOG.debug("free: "+page.getPageId());
+                write(page, null);
+            }
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#write(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
+         */
+        public void write(Page page, Marshaller marshaller) throws IOException {
+            
+            if( !loaded.get() ) {
+                throw new IllegalStateException("Cannot wriate a page when the page file is not loaded");
+            }
+                            
+            DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
+            page.setTxId(nextTxid.get());
+            page.write(dataOut, marshaller);
+            if (dataOut.size() > pageSize) {
+                throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
+            }
+            
+            page = page.copy();
+            Long key = page.getPageId();
+            addToCache(page);
+
+//            LOG.debug("write: "+page.getPageId());
+            
+            synchronized( writes ) {
+                // If it's not in the write cache...
+                PageWrite write = writes.get(key);
+                if( write==null ) {
+                    write = new PageWrite(page, dataOut.getData());
+                    writes.put(key, write);
+                } else {
+                    write.setCurrent(page, dataOut.getData());
+                }
+                
+                // Once we start approaching capacity, notify the writer to start writing
+                if( canStartWriteBatch() ) {
+                    if( enableAsyncWrites  ) {
+                        writes.notify();
+                    } else {
+                        while( canStartWriteBatch() ) {
+                            writeBatch(-1, TimeUnit.MILLISECONDS);
+                        }
+                    }
+                }
+            }
+            
+            if( page.getType() == Page.FREE_TYPE ) {
+                removeFromCache(page);
+                freeList.add(page.getPageId());
+            }
+
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#load(long, org.apache.kahadb.Marshaller)
+         */
+        public Page load(long pageId, Marshaller marshaller) throws IOException {
+            Page page = new Page();
+            page.setPageId(pageId);
+            load(page, marshaller);
+            return page;
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
+         */
+        public void load(Page page, Marshaller marshaller) throws IOException {
+            if( !loaded.get() ) {
+                throw new IllegalStateException("Cannot load a page when the page file is not loaded");
+            }
+
+            // Can't load invalid offsets...
+            if (page.getPageId() < 0) {
+                page.setType(Page.INVALID_TYPE);
+                return;
+            }        
+
+            // Try to load it from the cache first...
+            Page t = getFromCache(page.getPageId());
+            if (t != null) {
+                page.copy(t);
+                return;
+            }
+            
+            // Read the page data
+            readFile.seek(toOffset(page.getPageId()));
+            readFile.readFully(readBuffer, 0, pageSize);
+            dataIn.restart(readBuffer);
+            
+            // Unmarshall it.
+//            LOG.debug("load: "+page.getPageId());
+            page.read(dataIn, marshaller);
+            
+            // Cache it.
+            addToCache(page);
+        }
+
+
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#iterator()
+         */
+        public Iterator<Page> iterator() {
+            return iterator(false);
+        }
+        
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#iterator(boolean)
+         */
+        public Iterator<Page> iterator(final boolean includeFreePages) {
+            
+            if( !loaded.get() ) {
+                throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
+            }
+
+            return new Iterator<Page>() {
+                long nextId;
+                Page nextPage;
+                Page lastPage;
+                
+                private void findNextPage() {
+                    if( !loaded.get() ) {
+                        throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
+                    }
+
+                    if( nextPage!=null ) {
+                        return;
+                    }
+                    
+                    try {
+                        while( nextId < PageFile.this.nextFreePageId ) {
+                            readFile.seek(toOffset(nextId));
+                            readFile.readFully(readBuffer, 0, Page.PAGE_HEADER_SIZE);
+                            dataIn.restart(readBuffer);
+                            
+                            Page page = new Page();
+                            page.setPageId(nextId);
+                            page.read(dataIn, null);
+                            
+                            if( includeFreePages || page.getType()!=Page.FREE_TYPE ) {
+                                nextPage = page;
+                                return;
+                            } else {
+                                nextId++;
+                            }
+                        }
+                    } catch (IOException e) {
+                    }
+                }
+
+                public boolean hasNext() {
+                    findNextPage();
+                    return nextPage !=null;
+                }
+
+                public Page next() {
+                    findNextPage(); 
+                    if( nextPage !=null ) {
+                        lastPage = nextPage;
+                        nextPage=null;
+                        nextId++;
+                        return lastPage;
+                    } else {
+                        throw new NoSuchElementException();
+                    }
+                }
+                
+                public void remove() {
+                    if( lastPage==null ) {
+                        throw new IllegalStateException();
+                    }
+                    try {
+                        free(lastPage);
+                        lastPage=null;
+                    } catch (IOException e) {
+                        new RuntimeException(e);
+                    }
+                }
+            };
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#commit()
+         */
+        public void commit() throws IOException {
+        }
+        
+        /**
+         * Rolls back the transaction.
+         */
+        private void rollback() throws IOException {
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#execute(org.apache.kahadb.page.PageFile.Closure)
+         */
+        public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
+            boolean success=false;
+            try {
+                closure.execute(this);
+                success=true;
+            } finally {
+                if( success ) {
+                    commit();
+                } else {
+                    rollback();
+                }
+            }
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#execute(org.apache.kahadb.page.PageFile.CallableClosure)
+         */
+        public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
+            boolean success=false;
+            try {
+                R rc = closure.execute(this);
+                success=true;
+                return rc;
+            } finally {
+                if( success ) {
+                    commit();
+                } else {
+                    rollback();
+                }
+            }
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.kahadb.page.ITransaction#isReadOnly()
+         */
+        public boolean isReadOnly() {
+            return false;
+        }
+    }
+    
+    public PageFileTransaction tx() {
+        return new PageFileTransaction();
+    }
     
     /**
      * Creates a PageFile in the specified directory who's data files are named by name.
@@ -377,388 +731,7 @@
             throw new IllegalStateException("Cannot unload the page file when it is not loaded");
         }
     }
-
-    
-    /** 
-     * Allocates a free page that you can write data to.
-     * 
-     * @return a newly allocated page.  
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public Page allocate() throws IOException {
-        if( !loaded.get() ) {
-            throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
-        }
-        
-        Page page = null;
         
-        // We may need to create a new free page...
-        if(freeList.isEmpty()) {
-            page = new Page();
-            page.setPageId(nextFreePageId);
-            page.setType(Page.FREE_TYPE);
-            nextFreePageId ++;
-//            LOG.debug("allocated: "+page.getPageId());
-            write(page, null);
-        }
-
-        long pageId = freeList.removeFirst();
-        page = new Page();
-        page.setPageId(pageId);
-        page.setType(Page.FREE_TYPE);
-        return page;
-    }
-    
-    /** 
-     * Allocates a block of free pages that you can write data to.
-     * 
-     * @param count the number of sequential pages to allocate
-     * @return the first page of the sequential set. 
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public Page allocate(int count) throws IOException {
-        if( !loaded.get() ) {
-            throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
-        }
-
-        Page page = null;
-        Sequence seq = freeList.removeFirstSequence(count);
-        if(seq==null) {
-            
-            // We may need to create a new free page...
-            page = new Page();
-            int c=count;
-            while( c > 0 ) {
-                page.setPageId(nextFreePageId);
-                page.setType(Page.FREE_TYPE);
-                nextFreePageId ++;
-//                LOG.debug("allocate writing: "+page.getPageId());
-                write(page, null);
-                c--;
-                if( page == null ) {
-                    page = page;
-                }
-            }
-            
-            seq = freeList.removeFirstSequence(count);
-        }
-        
-        page = new Page();
-        page.setPageId(seq.getFirst());
-        page.setType(Page.FREE_TYPE);
-//        LOG.debug("allocated: "+page.getPageId());
-        return page;
-    }
-
-    /**
-     * Frees up a previously allocated page so that it can be re-allocated again.
-     * 
-     * @param page the page to free up
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public void free(Page page) throws IOException {
-        page.setType(Page.FREE_TYPE);
-        free(page.getPageId(), 1);
-    }
-    
-    /**
-     * Frees up a previously allocated page so that it can be re-allocated again.
-     * 
-     * @param page the page to free up
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public void free(long pageId) throws IOException {
-        free(pageId, 1);
-    }
-    
-    /**
-     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
-     * 
-     * @param page the initial page of the sequence that will be getting freed
-     * @param count the number of pages in the sequence
-     * 
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public void free(Page page, int count) throws IOException {
-        page.setType(Page.FREE_TYPE);
-        free(page.getPageId(), count);
-    }
-    
-    /**
-     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
-     * 
-     * @param page the initial page of the sequence that will be getting freed
-     * @param count the number of pages in the sequence
-     * 
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public void free(long pageId, int count) throws IOException {
-        if( !loaded.get() ) {
-            throw new IllegalStateException("Cannot free a page when the page file is not loaded");
-        }
-        
-        Page page = new Page();
-        long initialId = pageId;
-        for (int i = 0; i < count; i++) {
-            page.setPageId(initialId+i);
-            page.setType(Page.FREE_TYPE);
-//            LOG.debug("free: "+page.getPageId());
-            write(page, null);
-        }
-    }
-    
-    /**
-     * Loads a page from disk.
-     * 
-     * @param pageId 
-     *        the id of the page to load
-     * @param marshaller
-     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
-     * @return The page with the given id
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public Page load(long pageId, Marshaller marshaller) throws IOException {
-        Page page = new Page();
-        page.setPageId(pageId);
-        load(page, marshaller);
-        return page;
-    }
-    
-    /**
-     * Loads a page from disk.  If the page.pageId is not valid then then this method will set the page.type to
-     * Page.INVALID_TYPE.
-     * 
-     * @param page - The pageId field must be properly set 
-     * @param marshaller
-     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public void load(Page page, Marshaller marshaller) throws IOException {
-        if( !loaded.get() ) {
-            throw new IllegalStateException("Cannot load a page when the page file is not loaded");
-        }
-
-        // Can't load invalid offsets...
-        if (page.getPageId() < 0) {
-            page.setType(Page.INVALID_TYPE);
-            return;
-        }        
-
-        // Try to load it from the cache first...
-        Page t = getFromCache(page.getPageId());
-        if (t != null) {
-            page.copy(t);
-            return;
-        }
-        
-        // Read the page data
-        readFile.seek(toOffset(page.getPageId()));
-        readFile.readFully(readBuffer, 0, pageSize);
-        dataIn.restart(readBuffer);
-        
-        // Unmarshall it.
-//        LOG.debug("load: "+page.getPageId());
-        page.read(dataIn, marshaller);
-        
-        // Cache it.
-        addToCache(page);
-    }
-
-    /**
-     * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are not included
-     * in this iteration.
-     * 
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public Iterator<Page> iterator() {
-        return iterator(false);
-    }
-
-    /**
-     * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
-     * iterated.
-     * 
-     * @param includeFreePages - if true, free pages are included in the iteration
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public Iterator<Page> iterator(final boolean includeFreePages) {
-        
-        if( !loaded.get() ) {
-            throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
-        }
-
-        return new Iterator<Page>() {
-            long nextId;
-            Page nextPage;
-            Page lastPage;
-            
-            private void findNextPage() {
-                if( !loaded.get() ) {
-                    throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
-                }
-
-                if( nextPage!=null ) {
-                    return;
-                }
-                
-                try {
-                    while( nextId < PageFile.this.nextFreePageId ) {
-                        readFile.seek(toOffset(nextId));
-                        readFile.readFully(readBuffer, 0, Page.PAGE_HEADER_SIZE);
-                        dataIn.restart(readBuffer);
-                        
-                        Page page = new Page();
-                        page.setPageId(nextId);
-                        page.read(dataIn, null);
-                        
-                        if( includeFreePages || page.getType()!=Page.FREE_TYPE ) {
-                            nextPage = page;
-                            return;
-                        } else {
-                            nextId++;
-                        }
-                    }
-                } catch (IOException e) {
-                }
-            }
-
-            public boolean hasNext() {
-                findNextPage();
-                return nextPage !=null;
-            }
-
-            public Page next() {
-                findNextPage(); 
-                if( nextPage !=null ) {
-                    lastPage = nextPage;
-                    nextPage=null;
-                    nextId++;
-                    return lastPage;
-                } else {
-                    throw new NoSuchElementException();
-                }
-            }
-            
-            public void remove() {
-                if( lastPage==null ) {
-                    throw new IllegalStateException();
-                }
-                try {
-                    free(lastPage);
-                    lastPage=null;
-                } catch (IOException e) {
-                    new RuntimeException(e);
-                }
-            }
-        };
-    }
-    
-    /**
-     * Updates multiple pages in a single unit of work.
-     * 
-     * @param pages
-     *        the pages to write. The Pages object must be fully populated with a valid pageId, type, and data.
-     * @param marshaller
-     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public void write(Collection<Page> pages, ChunkMarshaller marshaller) throws IOException {
-        // TODO: Need to update double buffer impl so that it handles a collection of writes.  As is right now,
-        // the pages in this write may be split across multiple write batches which means that they
-        // will not get applied as a unit of work.
-        for (Page page : pages) {
-            write(page, marshaller);
-        }
-    }
-
-    /**
-     * 
-     * @param page
-     *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
-     * @param marshaller
-     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
-     * @throws IOException
-     *         If an disk error occurred.
-     * @throws IllegalStateException
-     *         if the PageFile is not loaded
-     */
-    public void write(Page page, Marshaller marshaller) throws IOException {
-        
-        if( !loaded.get() ) {
-            throw new IllegalStateException("Cannot wriate a page when the page file is not loaded");
-        }
-                        
-        DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
-        page.setTxId(nextTxid.get());
-        page.write(dataOut, marshaller);
-        if (dataOut.size() > pageSize) {
-            throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
-        }
-        
-        page = page.copy();
-        Long key = page.getPageId();
-        addToCache(page);
-
-//        LOG.debug("write: "+page.getPageId());
-        
-        synchronized( writes ) {
-            // If it's not in the write cache...
-            PageWrite write = writes.get(key);
-            if( write==null ) {
-                write = new PageWrite(page, dataOut.getData());
-                writes.put(key, write);
-            } else {
-                write.setCurrent(page, dataOut.getData());
-            }
-            
-            // Once we start approaching capacity, notify the writer to start writing
-            if( canStartWriteBatch() ) {
-                if( enableAsyncWrites  ) {
-                    writes.notify();
-                } else {
-                    while( canStartWriteBatch() ) {
-                        writeBatch(-1, TimeUnit.MILLISECONDS);
-                    }
-                }
-            }
-        }
-        
-        if( page.getType() == Page.FREE_TYPE ) {
-            removeFromCache(page);
-            freeList.add(page.getPageId());
-        }
-
-    }
-    
     /**
      * Flushes all write buffers to disk and returns the transaction id of the last write done to disk.  The 
      * transaction id can be used for recovery purposes since it always incrementing.

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=688106&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Fri Aug 22 08:53:42 2008
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kahadb.Marshaller;
+
+/**
+ * The interface used to read/update a PageFile object.  Using a transaction allows you to
+ * do multiple update operations in a single unit of work.
+ */
+interface Transaction extends Iterable<Page> {
+    
+    /**
+     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
+     * 
+     * @param <T> The type of exceptions that operation will throw.
+     */
+    public interface Closure <T extends Throwable> {
+        public void execute(Transaction tx) throws T;
+    }
+
+    /**
+     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
+     * 
+     * @param <R> The type of result that the closure produces.
+     * @param <T> The type of exceptions that operation will throw.
+     */
+    public interface CallableClosure<R, T extends Throwable> {
+        public R execute(Transaction tx) throws T;
+    }
+
+    public PageFile getPageFile();
+
+    /** 
+     * Allocates a free page that you can write data to.
+     * 
+     * @return a newly allocated page.  
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Page allocate() throws IOException;
+
+    /** 
+     * Allocates a block of free pages that you can write data to.
+     * 
+     * @param count the number of sequential pages to allocate
+     * @return the first page of the sequential set. 
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Page allocate(int count) throws IOException;
+
+    /**
+     * Frees up a previously allocated page so that it can be re-allocated again.
+     * 
+     * @param page the page to free up
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void free(Page page) throws IOException;
+
+    /**
+     * Frees up a previously allocated page so that it can be re-allocated again.
+     * 
+     * @param page the page to free up
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void free(long pageId) throws IOException;
+
+    /**
+     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+     * 
+     * @param page the initial page of the sequence that will be getting freed
+     * @param count the number of pages in the sequence
+     * 
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void free(Page page, int count) throws IOException;
+
+    /**
+     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+     * 
+     * @param page the initial page of the sequence that will be getting freed
+     * @param count the number of pages in the sequence
+     * 
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void free(long pageId, int count) throws IOException;
+
+    /**
+     * 
+     * @param page
+     *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
+     * @param marshaller
+     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void write(Page page, Marshaller marshaller) throws IOException;
+
+    /**
+     * Loads a page from disk.
+     * 
+     * @param pageId 
+     *        the id of the page to load
+     * @param marshaller
+     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
+     * @return The page with the given id
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Page load(long pageId, Marshaller marshaller) throws IOException;
+
+    /**
+     * Loads a page from disk.  If the page.pageId is not valid then then this method will set the page.type to
+     * Page.INVALID_TYPE.
+     * 
+     * @param page - The pageId field must be properly set 
+     * @param marshaller
+     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public void load(Page page, Marshaller marshaller) throws IOException;
+
+    /**
+     * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are 
+     * not included in this iteration. 
+     * 
+     * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
+     * 
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Iterator<Page> iterator();
+
+    /**
+     * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
+     * iterated.
+     * 
+     * @param includeFreePages - if true, free pages are included in the iteration
+     * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    public Iterator<Page> iterator(final boolean includeFreePages);
+
+    /**
+     * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
+     * with the transaction are written to disk or none will.
+     */
+    public void commit() throws IOException;
+
+    /**
+     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
+     * If the closure throws an Exception, then the transaction is rolled back.
+     * 
+     * @param <T>
+     * @param closure - the work to get exectued.
+     * @throws T if the closure throws it
+     * @throws IOException If the commit fails.
+     */
+    public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException;
+
+    /**
+     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
+     * If the closure throws an Exception, then the transaction is rolled back.
+     * 
+     * @param <T>
+     * @param closure - the work to get exectued.
+     * @throws T if the closure throws it
+     * @throws IOException If the commit fails.
+     */
+    public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException;
+
+    /**
+     * 
+     * @return true if there are no uncommitted page file updates associated with this transaction.
+     */
+    public boolean isReadOnly();
+
+}

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java Fri Aug 22 08:53:42 2008
@@ -37,9 +37,11 @@
         pf.delete();
         pf.load();
         
-        long id = pf.allocate().getPageId();
+        Transaction tx = pf.tx();
+        long id = tx.allocate().getPageId();
+        tx.commit();
         
-        PageOutputStream pos = new Chunk.PageOutputStream(pf, id);
+        PageOutputStream pos = new Chunk.PageOutputStream(tx, id);
         DataOutputStream os = new DataOutputStream(pos);
         for( int i=0; i < 10000; i++) {
             os.writeUTF("Test string:"+i);
@@ -47,12 +49,14 @@
         
         os.close();
         System.out.println("Chuncks used: "+pos.getPageCount());
+        tx.commit();
         
         // Reload the page file.
         pf.unload();
         pf.load();
+        tx = pf.tx();
         
-        PageInputStream pis = new PageInputStream(pf, id);        
+        PageInputStream pis = new PageInputStream(tx, id);        
         DataInputStream is = new DataInputStream(pis);
         for( int i=0; i < 10000; i++) {
             assertEquals("Test string:"+i, is.readUTF());

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java Fri Aug 22 08:53:42 2008
@@ -29,7 +29,12 @@
 
         PageFile pf = new PageFile(root, name);
         pf.load();
-        HashIndex index = new HashIndex(indexManager, pf,pf.allocate().getPageId());
+        
+        Transaction tx = pf.tx();
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        HashIndex index = new HashIndex(indexManager, pf, id);
         index.setKeyMarshaller(Store.STRING_MARSHALLER);
         
 //        index.setEnableRecoveryBuffer(false);

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java Fri Aug 22 08:53:42 2008
@@ -51,7 +51,11 @@
         pf.load();
         indexManager = new IndexManager(directory, "im-hash-test", "rw", null, new AtomicLong());
         
-        this.hashIndex = new HashIndex(indexManager, pf, pf.allocate().getPageId());
+        Transaction tx = pf.tx();
+        long id = tx.allocate().getPageId();
+        tx.commit();
+        
+        this.hashIndex = new HashIndex(indexManager, pf, id);
         this.hashIndex.setBinCapacity(12);
         this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
     }

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java Fri Aug 22 08:53:42 2008
@@ -37,24 +37,27 @@
         HashSet<String> expected = new HashSet<String>();
         
         // Insert some data into the page file.
+        Transaction tx = pf.tx();
         for( int i=0 ; i < 100; i++) {
-            Page page = pf.allocate();
+            Page page = tx.allocate();
             page.setType(TEST_TYPE);
             
             String t = "page:"+i;
             expected.add(t);
             page.setData(t);
-            pf.write(page, StringMarshaller.INSTANCE);
+            tx.write(page, StringMarshaller.INSTANCE);
+            tx.commit();
         }
         
         // Reload it...
         pf.unload();
         pf.load();
+        tx = pf.tx();
         
         // Iterate it to make sure they are still there..
         HashSet<String> actual = new HashSet<String>();
-        for (Page page : pf) {
-            pf.load(page, StringMarshaller.INSTANCE);
+        for (Page page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
             actual.add((String)page.getData());
         }
         assertEquals(expected, actual);
@@ -67,46 +70,49 @@
             String t = "page:"+i;
             expected.remove(t);
         }
-        for (Page page : pf) {
-            pf.load(page, StringMarshaller.INSTANCE);
+        for (Page page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
             if( !expected.contains(page.getData()) ) {
-                pf.free(page);
+                tx.free(page);
             }
         }
+        tx.commit();
         
         // Reload it...
         pf.unload();
         pf.load();
+        tx = pf.tx();
         
         // Iterate it to make sure the even records are still there..
         actual.clear();
-        for (Page page : pf) {
-            pf.load(page, StringMarshaller.INSTANCE);
+        for (Page page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
             actual.add((String)page.getData());
         }
         assertEquals(expected, actual);
 
-
         // Update the records...
         HashSet<String> t = expected;
         expected = new HashSet<String>();
         for (String s : t) {
             expected.add(s+":updated");
         }
-        for (Page page : pf) {
-            pf.load(page, StringMarshaller.INSTANCE);
+        for (Page page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
             page.setData(page.getData()+":updated");
-            pf.write(page, StringMarshaller.INSTANCE);
+            tx.write(page, StringMarshaller.INSTANCE);
         }
- 
+        tx.commit();
+        
         // Reload it...
         pf.unload();
         pf.load();
+        tx = pf.tx();
 
         // Iterate it to make sure the updated records are still there..
         actual.clear();
-        for (Page page : pf) {
-            pf.load(page, StringMarshaller.INSTANCE);
+        for (Page page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
             actual.add((String)page.getData());
         }
         assertEquals(expected, actual);