You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/16 20:56:38 UTC

svn commit: r826036 [1/2] - in /activemq/sandbox/activemq-flow: activemq-util/src/main/java/org/apache/activemq/util/list/ hawtdb/src/main/java/org/apache/hawtdb/internal/index/ hawtdb/src/main/java/org/apache/hawtdb/internal/page/ hawtdb/src/test/java...

Author: chirino
Date: Fri Oct 16 18:56:37 2009
New Revision: 826036

URL: http://svn.apache.org/viewvc?rev=826036&view=rev
Log:
ConcurrentPageFile is in better shape now..  indexes almost working now.


Added:
    activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentTransaction.java
Modified:
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java
    activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java
    activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java
    activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java
    activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java
    activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java
    activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java
    activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java
    activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java Fri Oct 16 18:56:37 2009
@@ -17,13 +17,14 @@
 package org.apache.activemq.util.list;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 /**
  * Provides a list of LinkedNode objects. 
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class LinkedNodeList<T extends LinkedNode<T>> {
+public class LinkedNodeList<T extends LinkedNode<T>> implements Iterable<T> {
 
     T head;
     int size;
@@ -48,6 +49,9 @@
     }
 
     public T getTail() {
+        if( head==null ) {
+            return null;
+        }
         return head.prev;
     }
     
@@ -167,4 +171,29 @@
         }
         return rc;
     }
+
+    public Iterator<T> iterator() {
+        return new Iterator<T>() {
+            T next = getHead();
+            private T last;
+            
+            public boolean hasNext() {
+                return next!=null;
+            }
+
+            public T next() {
+                last = next;
+                next = last.getNext();
+                return last;
+            }
+
+            public void remove() {
+                if( last==null ) {
+                    throw new IllegalStateException();
+                }
+                last.unlink();
+                last=null;
+            }
+        };
+    }
 }

Modified: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java (original)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java Fri Oct 16 18:56:37 2009
@@ -59,6 +59,8 @@
 
         private Marshaller<Key> keyMarshaller;
         private Marshaller<Value> valueMarshaller;
+        private boolean deferredEncoding;
+        private Prefixer<Key> prefixer;
 
         public BTreeIndex<Key, Value> create(Paged paged, int page) {
             BTreeIndex<Key, Value> index = createInstance(paged, page);
@@ -79,7 +81,7 @@
             if (valueMarshaller == null) {
                 throw new IllegalArgumentException("The key marshaller must be set before calling open");
             }
-            return new BTreeIndex<Key, Value>(paged, page, keyMarshaller, valueMarshaller, null);
+            return new BTreeIndex<Key, Value>(paged, page, this);
         }
 
         public Marshaller<Key> getKeyMarshaller() {
@@ -97,6 +99,23 @@
         public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
             this.valueMarshaller = valueMarshaller;
         }
+
+        public boolean isDeferredEncoding() {
+            return deferredEncoding;
+        }
+
+        public void setDeferredEncoding(boolean deferredEncoding) {
+            this.deferredEncoding = deferredEncoding;
+        }
+
+        public Prefixer<Key> getPrefixer() {
+            return prefixer;
+        }
+
+        public void setPrefixer(Prefixer<Key> prefixer) {
+            this.prefixer = prefixer;
+        }
+        
     }
 
     private final BTreeNode.BTreeNodeEncoderDecoder<Key, Value> PAGE_ENCODER_DECODER = new BTreeNode.BTreeNodeEncoderDecoder<Key, Value>(this);
@@ -106,13 +125,15 @@
     private final Marshaller<Key> keyMarshaller;
     private final Marshaller<Value> valueMarshaller;
     private final Prefixer<Key> prefixer;
+    private final boolean deferredEncoding;
     
-    private BTreeIndex(Paged paged, int page, Marshaller<Key> keyMarshaller, Marshaller<Value> valueMarshaller, Prefixer<Key> prefixer) {
+    public BTreeIndex(Paged paged, int page, Factory<Key, Value> factory) {
         this.paged = paged;
         this.page = page;
-        this.keyMarshaller = keyMarshaller;
-        this.valueMarshaller = valueMarshaller;
-        this.prefixer = prefixer;
+        this.keyMarshaller = factory.getKeyMarshaller();
+        this.valueMarshaller = factory.getValueMarshaller();
+        this.deferredEncoding = factory.isDeferredEncoding();
+        this.prefixer = factory.getPrefixer();
     }
 
     public boolean containsKey(Key key) {
@@ -192,17 +213,30 @@
     }
     
     void storeNode(BTreeNode<Key, Value> node) {
-        paged.put(PAGE_ENCODER_DECODER, node.getPage(), node);
+        if( deferredEncoding ) {
+            paged.put(PAGE_ENCODER_DECODER, node.getPage(), node);
+        } else {
+            PAGE_ENCODER_DECODER.store(paged, node.getPage(), node);
+        }
     }
     
     BTreeNode<Key, Value> loadNode(int page) {
-        BTreeNode<Key, Value> node = paged.get(PAGE_ENCODER_DECODER, page);
+        BTreeNode<Key, Value> node;
+        if( deferredEncoding ) {
+            node = paged.get(PAGE_ENCODER_DECODER, page);
+        } else {
+            node = PAGE_ENCODER_DECODER.load(paged, page);
+        }
         node.setPage(page);
         return node;
     }
     
     void free( int page ) {
-        paged.remove(PAGE_ENCODER_DECODER, page);
+        if( deferredEncoding ) {
+            paged.remove(PAGE_ENCODER_DECODER, page);
+        } else {
+            PAGE_ENCODER_DECODER.remove(paged, page);
+        }
         paged.allocator().free(page, 1);
     }
 

Modified: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java (original)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java Fri Oct 16 18:56:37 2009
@@ -166,7 +166,13 @@
     }
 
     <Key,Value> int index(Key x) {
-        return Math.abs(x.hashCode()%capacity);
+        try {
+            return Math.abs(x.hashCode()%capacity);
+        } catch (ArithmeticException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+            throw e;
+        }
     }
     
     @Override

Modified: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java (original)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java Fri Oct 16 18:56:37 2009
@@ -16,20 +16,20 @@
  */
 package org.apache.hawtdb.internal.page;
 
+import java.io.Externalizable;
 import java.io.IOException;
+import java.io.ObjectInput;
 import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.zip.CRC32;
 
 import javolution.io.Struct;
@@ -38,11 +38,9 @@
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.list.LinkedNode;
 import org.apache.activemq.util.list.LinkedNodeList;
-import org.apache.hawtdb.api.Allocator;
 import org.apache.hawtdb.api.EncoderDecoder;
 import org.apache.hawtdb.api.IOPagingException;
 import org.apache.hawtdb.api.OptimisticUpdateException;
-import org.apache.hawtdb.api.OutOfSpaceException;
 import org.apache.hawtdb.api.Paged;
 import org.apache.hawtdb.api.PagingException;
 import org.apache.hawtdb.api.Transaction;
@@ -71,561 +69,471 @@
  */
 public final class ConcurrentPageFile {
 
-    private static final String MAGIC = "HawtDB:MVCC Page File:1.0\n";
+    private static final String MAGIC = "HawtDB:1.0\n";
     private static final int FILE_HEADER_SIZE = 1024 * 4;
 
     public static final int PAGE_ALLOCATED = -1;
     public static final int PAGE_FREED = -2;
-//    public static final int PAGE_CACHED_WRITE = -3;
     public static final int HEADER_SIZE = 1024*4;
 
-    static class CacheUpdate {
-        private final int page;
-        private Object value;
-        private EncoderDecoder<?> marshaller;
-
-        public CacheUpdate(int page, Object value, EncoderDecoder<?> marshaller) {
-            this.page = page;
-            this.value = value;
-            this.marshaller = marshaller;
-        }
-
-        public void reset(Object value, EncoderDecoder<?> marshaller) {
-            this.value = value;
-            this.marshaller = marshaller;
-        }
+    /**
+     * The first 4K of the file is used to hold 2 copies of the header.
+     * Each copy is 2K big.  The header is checksummed so that corruption
+     * can be detected. 
+     */
+    static private class Header extends Struct {
+        
+        /** Identifies the file format */
+        public final UTF8String magic = new UTF8String(32);
+        /** The oldest applied commit revision */
+        public final Signed64 base_revision = new Signed64();
+        /** The size of each page in the page file */
+        public final Signed32 page_size = new Signed32();
+        /** The page location of the free page list */
+        public final Signed32 free_list_page = new Signed32();
+        /** points at the latest redo page which is guaranteed to be fully stored */
+        public final Signed32 redo_page = new Signed32();
+        /** The page location of the latest redo page. Not guaranteed to be fully stored */ 
+        public final Signed32 unsynced_redo_page = new Signed32();
+        
+        /** The size of all the previous fields */
+        private static final int USED_FIELDS_SIZE = 32 + 8 + 4 + 4 + 4 + 4;
 
-        @SuppressWarnings("unchecked")
-        <T> T value() {
-            return (T) value;
-        }
+        /** reserves header space for future use */
+        public final UTF8String reserved = new UTF8String((FILE_HEADER_SIZE/2)-(USED_FIELDS_SIZE+8));
         
-        @SuppressWarnings("unchecked")
-        public List<Integer> store(Paged paged) {
-            return ((EncoderDecoder)marshaller).store(paged, page, value);
+        /** a checksum of all the previous fields. The reserved space 
+         * positions this right at the end of a 2k block */
+        public final Signed64 checksum = new Signed64();
+        
+        public String toString() { 
+            return "{ base_revision: "+this.base_revision.get()+
+            ", page_size: "+page_size.get()+", free_list_page: "+free_list_page.get()+
+            ", redo_page: "+unsynced_redo_page.get()+", checksum: "+checksum.get()+
+            " }";
         }
     }
-    
+
     /**
-     * Transaction objects are NOT thread safe. Users of this object should
-     * guard it from concurrent access.
+     * Tracks the page changes that were part of a commit.
+     * 
+     * Commits can be merged, in that sense this then tracks range of commits.
      * 
      * @author chirino
      */
-    private final class ConcurrentTransaction implements Transaction {
-        private HashMap<Integer, CacheUpdate> cache;
-        private HashMap<Integer, Integer> updates;
-        private Snapshot snapshot;
-        
-        private final Allocator txallocator = new Allocator() {
-            
-            public void free(int pageId, int count) {
-                // TODO: this is not a very efficient way to handle allocation ranges.
-                int end = pageId+count;
-                for (int key = pageId; key < end; key++) {
-                    Integer previous = getUpdates().put(key, PAGE_FREED);
-                    
-                    // If it was an allocation that was done in this
-                    // tx, then we can directly release it.
-                    assert previous!=null;
-                    if( previous == PAGE_ALLOCATED) {
-                        getUpdates().remove(key);
-                        allocator.free(key, 1);
-                    }
-                }
-            }
-            
-            public int alloc(int count) throws OutOfSpaceException {
-                int pageId = allocator.alloc(count);
-                // TODO: this is not a very efficient way to handle allocation ranges.
-                int end = pageId+count;
-                for (int key = pageId; key < end; key++) {
-                    getUpdates().put(key, PAGE_ALLOCATED);
-                }
-                return pageId;
-            }
+    static class Commit extends LinkedNode<Commit> {
 
-            public void unfree(int pageId, int count) {
-                throw new UnsupportedOperationException();
-            }
-            
-            public void clear() throws UnsupportedOperationException {
-                throw new UnsupportedOperationException();
-            }
-
-            public int getLimit() {
-                return allocator.getLimit();
-            }
-
-            public boolean isAllocated(int page) {
-                return allocator.isAllocated(page);
-            }
-
-        };
-
-        public <T> T get(EncoderDecoder<T> marshaller, int page) {
-            // Perhaps the page was updated in the current transaction...
-            CacheUpdate rc = cache == null ? null : cache.get(page);
-            if( rc != null ) {
-                return rc.<T>value();
-            }
-            
-            // No?  Then ask the snapshot to load the object.
-            return snapshot().cacheLoad(marshaller, page);
-        }
-
-        public <T> void put(EncoderDecoder<T> marshaller, int page, T value) {
-            Integer update = getUpdates().get(page);
-            if (update == null) {
-                // This is the first time this transaction updates the page...
-                snapshot();
-                update = allocator.alloc(1);
-                getUpdates().put(page, update);
-                getCacheUpdates().put(page, new CacheUpdate(update, value, marshaller));
-            } else {
-                // We have updated it before...
-                switch (update) {
-                case PAGE_FREED:
-                    throw new PagingException("You should never try to write a page that has been freed.");
-                case PAGE_ALLOCATED:
-                    getCacheUpdates().put(page, new CacheUpdate(page, value, marshaller));
-                    break;
-                default:
-                    CacheUpdate cu = getCacheUpdates().get(page);
-                    if( cu == null ) {
-                        throw new PagingException("You should never try to store mix using the cached objects with normal page updates.");
-                    }
-                    cu.reset(value, marshaller);
-                }
-            }
-        }
+        /** the redo that this commit is stored in */
+        private final Redo redo;        
+        /** oldest revision in the commit range. */
+        private long base; 
+        /** newest revision in the commit range, will match base if this only tracks one commit */ 
+        private long head;
+        /** set to the open snapshot who's head is this commit */
+        private Snapshot snapshot;
+        /** all the page updates that are part of the redo */
+        private HashMap<Integer, Integer> updates;
+        /** the deferred updates that need to be done in this redo */
+        private HashMap<Integer, DeferredUpdate> deferredUpdates;
 
-        public <T> void remove(EncoderDecoder<T> marshaller, int page) {
-            marshaller.remove(ConcurrentTransaction.this, page);
+        public Commit(long version, HashMap<Integer, Integer> updates, HashMap<Integer, DeferredUpdate> deferredUpdates, Redo redo) {
+            this.redo = redo;
+            this.head = this.base = version;
+            this.updates = updates;
+            this.deferredUpdates = deferredUpdates;
         }
         
-        public Allocator allocator() {
-            return txallocator;
+        public String toString() { 
+            int updateSize = updates==null ? 0 : updates.size();
+            int cacheSize = deferredUpdates==null ? 0 : deferredUpdates.size();
+            return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+", cache: "+cacheSize+" }";
         }
 
-        public void read(int pageId, Buffer buffer) throws IOPagingException {
-           
-            Integer updatedPageId = updates == null ? null : updates.get(pageId);
-            if (updatedPageId != null) {
-                switch (updatedPageId) {
-                case PAGE_ALLOCATED:
-                case PAGE_FREED:
-                    // TODO: Perhaps use a RuntimeException subclass.
-                    throw new PagingException("You should never try to read a page that has been allocated or freed.");
-                default:
-                    // read back in the updated we had done.
-                    pageFile.read(updatedPageId, buffer);
+        public long commitCheck(HashMap<Integer, Integer> newUpdate) {
+            for (Integer page : newUpdate.keySet()) {
+                if( updates.containsKey( page ) ) {
+                    throw new OptimisticUpdateException();
                 }
-            } else {
-                // Get the data from the snapshot.
-                snapshot().read(pageId, buffer);
             }
+            return head;
         }
 
-        public ByteBuffer slice(SliceType type, int page, int count) throws IOPagingException {
-            //TODO: need to improve the design of ranged ops..
-            if( type==SliceType.READ ) {
-                Integer updatedPageId = updates == null ? null : updates.get(page);
-                if (updatedPageId != null) {
-                    switch (updatedPageId) {
-                    case PAGE_ALLOCATED:
-                    case PAGE_FREED:
-                        throw new PagingException("You should never try to read a page that has been allocated or freed.");
-                    }
-                    return pageFile.slice(type, updatedPageId, count);
+        public void putAll(HashMap<Integer, Integer> udpates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
+            if( udpates!=null ) {
+                if( this.updates == null ) {
+                    this.updates = udpates;
                 } else {
-                    // Get the data from the snapshot.
-                    return snapshot().slice(page, count);
+                    this.updates.putAll(udpates);
                 }
-                
-            } else {
-                Integer update = getUpdates().get(page);
-                if (update == null) {
-                    update = allocator.alloc(count);
-                    
-                    if (type==SliceType.READ_WRITE) {
-                        ByteBuffer slice = snapshot().slice(page, count);
-                        try {
-                            pageFile.write(update, slice);
-                        } finally { 
-                            pageFile.unslice(slice);
-                        }
-                    }
-                    
-                    int end = page+count;
-                    for (int i = page; i < end; i++) {
-                        getUpdates().put(i, PAGE_ALLOCATED);
-                    }
-                    getUpdates().put(page, update);
-                    
-                    return pageFile.slice(type, update, count);
+            }
+            if( deferredUpdates!=null ) {
+                if( this.deferredUpdates == null ) {
+                    this.deferredUpdates = deferredUpdates;
                 } else {
-                    switch (update) {
-                    case PAGE_FREED:
-                        throw new PagingException("You should never try to write a page that has been freed.");
-                    case PAGE_ALLOCATED:
-                        break;
-                    default:
-                        page = update;
-                    }
+                    this.deferredUpdates.putAll(deferredUpdates);
                 }
-                return pageFile.slice(type, page, count);
-                
             }
-            
         }
         
-        public void unslice(ByteBuffer buffer) {
-            pageFile.unslice(buffer);
+    }
+    
+    /**
+     * Aggregates a group of commits so that they can be more efficiently operated against.
+     * 
+     */
+    static private class Redo extends LinkedNode<Redo> implements Externalizable {
+        private static final long serialVersionUID = 1188640492489990493L;
+        
+        /** the pageId that this redo batch is stored at */
+        private transient int page=-1;
+        /** points to a previous redo batch page */
+        public int previous=-1;
+        /** was the redo loaded in the {@link recover} method */
+        private transient boolean recovered;
+        /** the commits stored in the redo */ 
+        private transient LinkedNodeList<Commit> commits = new LinkedNodeList<Commit>();
+        /** all the page updates that are part of the redo */
+        private ConcurrentHashMap<Integer, Integer> updates = new ConcurrentHashMap<Integer, Integer>();
+        /** the deferred updates that need to be done in this redo */
+        private transient ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates = new ConcurrentHashMap<Integer, DeferredUpdate>();
+        /** tracks how many snapshots are referencing the redo */
+        private int references;
+        /** set to the open snapshot who's head is before this redo */
+        private Snapshot prevSnapshot;
+        /** set to the open snapshot who's head is this redo */
+        private Snapshot snapshot;
+        /** the oldest commit in this redo */
+        public long base=-1;
+        /** the newest commit in this redo */
+        public long head;
+        
+        @SuppressWarnings("unused")
+        public Redo() {
+        }
+        
+        public Redo(long head) {
+            this.head = head;
         }
 
-        public void write(int page, Buffer buffer) throws IOPagingException {
-            Integer update = getUpdates().get(page);
-            if (update == null) {
-                // We are updating an existing page in the snapshot...
-                snapshot();
-                update = allocator.alloc(1);
-                getUpdates().put(page, update);
-                page = update;
-            } else {
-                switch (update) {
-                case PAGE_FREED:
-                    throw new PagingException("You should never try to write a page that has been freed.");
-                case PAGE_ALLOCATED:
-                    break;
-                default:
-                    page = update;
-                }
-            }
-            pageFile.write(page, buffer);
+        public String toString() { 
+            int count = updates==null ? 0 : updates.size();
+            return "{ page: "+this.page+", updates: "+count+", previous: "+previous+" }";
+        }
+        
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(head);
+            out.writeLong(base);
+            out.writeInt(previous);
+            out.writeObject(updates);
         }
 
+        @SuppressWarnings("unchecked")
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            head = in.readLong();
+            base = in.readLong();
+            previous = in.readInt();
+            updates = (ConcurrentHashMap<Integer, Integer>) in.readObject();
+        }        
 
-        public void commit() throws IOPagingException {
-            boolean failed = true;
-            try {
-                if (updates!=null) {
-                    Update previousUpdate = snapshot==null ? null : snapshot.updates.get(0);
-                    ConcurrentPageFile.this.commit(previousUpdate, updates, cache);
-                }
-                failed = false;
-            } finally {
-                // Rollback if the commit fails.
-                if (failed) {
-                    freeAllocatedPages();
-                }
-                ConcurrentPageFile.this.releaseSnapshot(snapshot);
-                updates = null;
-                cache = null;
-                snapshot = null;
-            }
+        public int pageCount() {
+            int rc = 0;
+            rc = updates.size();
+            // TODO: we can probably get an idea of how many pages the deferred update will use.
+            // rc += deferredUpdates.size();
+            return rc;
         }
 
-        public void rollback() throws IOPagingException {
-            try {
-                if (updates!=null) {
-                    freeAllocatedPages();
+        public long commitCheck(HashMap<Integer, Integer> newUpdate) {
+            for (Integer page : newUpdate.keySet()) {
+                if( updates.containsKey( page ) ) {
+                    throw new OptimisticUpdateException();
                 }
-            } finally {
-                ConcurrentPageFile.this.releaseSnapshot(snapshot);
-                updates = null;
-                cache = null;
-                snapshot = null;
             }
+            return head;
         }
 
-        private void freeAllocatedPages() {
-            for (Entry<Integer, Integer> entry : updates.entrySet()) {
-                switch (entry.getValue()) {
-                case PAGE_FREED:
-                    // Don't need to do anything..
-                    break;
-                case PAGE_ALLOCATED:
-                default:
-                    // We need to free the page that was allocated for the
-                    // update..
-                    allocator.free(entry.getKey(), 1);
-                }
+        public void putAll(HashMap<Integer, Integer> updates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
+            if( updates!=null ) {
+                this.updates.putAll(updates);
             }
-        }
-
-        public Snapshot snapshot() {
-            if (snapshot == null) {
-                snapshot = aquireSnapshot();
+            if( deferredUpdates!=null ) {
+                this.deferredUpdates.putAll(deferredUpdates);
             }
-            return snapshot;
         }
 
-        public boolean isReadOnly() {
-            return updates == null;
-        }
+    }
 
-        public HashMap<Integer, CacheUpdate> getCacheUpdates() {
-            if( cache==null ) {
-                cache = new HashMap<Integer, CacheUpdate>();
-            }
-            return cache;
+    /**
+     * Provides a snapshot view of the page file.
+     *  
+     * @author chirino
+     */
+    abstract class Snapshot {
+        /** The number of transactions that are holding this snapshot open */
+        protected int references;
+        
+        public String toString() { 
+            return "{ references: "+this.references+" }";
         }
 
-        private HashMap<Integer, Integer> getUpdates() {
-            if (updates == null) {
-                updates = new HashMap<Integer, Integer>();
-            }
-            return updates;
+        public int mapPageId(int page) {
+            return page;
         }
-
-        public int getPageSize() {
-            return pageFile.getPageSize();
+        
+        public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
+            return readCache.cacheLoad(marshaller, page);
         }
-
-        public String toString() { 
-            int updatesSize = updates==null ? 0 : updates.size();
-            return "{ snapshot: "+this.snapshot+", updates: "+updatesSize+" }";
+        
+        public void read(int pageId, Buffer buffer) throws IOPagingException {
+            pageId = mapPageId(pageId);
+            pageFile.read(pageId, buffer);
         }
 
-        public int pages(int length) {
-            return pageFile.pages(length);
+        public ByteBuffer slice(int pageId, int count) {
+            pageId = mapPageId(pageId);
+            return pageFile.slice(SliceType.READ, pageId, count);
         }
+        
+        abstract public Snapshot open();
+        abstract public void close();
+        abstract public long commitCheck(HashMap<Integer, Integer> pageUpdates);
+    }
+    
+    class PreviousSnapshot extends Snapshot {
+        private final Redo redo;
 
-        public void flush() {
-            ConcurrentPageFile.this.flush();
+        public PreviousSnapshot(Redo redo) {
+            this.redo = redo;
+        }
+        
+        @Override
+        public Snapshot open() {
+            references++;
+            if( references==1 ) {
+                redo.references++;
+                redo.prevSnapshot = this;
+            }
+            return this;
+        }
+
+        @Override
+        public void close() {
+            references--;
+            if( references==0 ) {
+                redo.references--;
+                redo.prevSnapshot = null;
+            }
+        }
+        
+        public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
+            long rc = 0;
+            Redo cur = redo;
+            while (cur != null) {
+                rc = cur.commitCheck(pageUpdates);
+                cur = cur.getNext();
+            }
+            return rc;
         }
 
     }
 
-    static class Update extends LinkedNode<Update> implements Serializable {
-        private static final long serialVersionUID = 9160865012544031094L;
-
-        transient final AtomicBoolean applied = new AtomicBoolean();
-        private final Redo redo;
+    class RedosSnapshot extends Snapshot {
         
-        long base;
-        long head;
-        Snapshot snapshot;
+        /** The snapshot will load page updates from the following redo list. */
+        protected final List<Redo> redosInSnapshot;
         
-        HashMap<Integer, Integer> updates;
-        transient HashMap<Integer, CacheUpdate> cache;
-
-        public Update(long version, HashMap<Integer, Integer> updates, HashMap<Integer, CacheUpdate> cache, Redo redo) {
-            this.redo = redo;
-            this.head = this.base = version;
-            this.updates = updates;
-            this.cache = new HashMap<Integer, CacheUpdate>();
+        public RedosSnapshot() {
+            this.redosInSnapshot = snapshotRedos();
         }
         
-        /**
-         * Merges previous updates that can be merged with this update. 
-         */
-        public void mergePrevious() {
-            prev = getPrevious();
-            while( prev!=null && prev.snapshot==null && prev.redo==redo ) {
-                
-                assert prev.head+1 == this.base;
-                this.base = prev.base;
-                
-                if( prev.updates!=null ) {
-                    if(this.updates!=null) {
-                        prev.updates.putAll(this.updates);
-                    }
-                    this.updates = prev.updates;
+        public Snapshot open() {
+            references++;
+            if( references==1 ) {
+                for (Redo redo : redosInSnapshot) {
+                    redo.references++;
                 }
-                
-                if( prev.cache!=null ) {
-                    if( this.cache!=null ) {
-                        prev.cache.putAll(this.cache);
+                redosInSnapshot.get(0).snapshot = this;
+            }
+            return this;
+        }
+
+        public void close() {
+            references--;
+            if( references==0 ) {
+                for (Redo redo : redosInSnapshot) {
+                    redo.references--;
+                }
+                redosInSnapshot.get(0).snapshot = null;
+            }
+        }
+        
+        public int mapPageId(int page) {
+            // It may be in the redos..
+            for (Redo redo : redosInSnapshot) {
+                Integer updatedPage = redo.updates.get(page);
+                if (updatedPage != null) {
+                    switch (updatedPage) {
+                    case PAGE_FREED:
+                        throw new PagingException("You should never try to read page that has been freed.");
+                    case PAGE_ALLOCATED:
+                        return page;
+                    default:
+                        return updatedPage;
                     }
-                    this.cache = prev.cache;
                 }
-                prev.unlink();
             }
+            
+            return super.mapPageId(page);
         }
-
         
+        public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
+            for (Redo redo : redosInSnapshot) {
+                DeferredUpdate cu  = redo.deferredUpdates.get(page);
+                if (cu != null) {
+                    return cu.<T>value();
+                }
+            }
+            return super.cacheLoad(marshaller, page);
+        }
         
-        public String toString() { 
-            int updateSize = updates==null ? 0 : updates.size();
-            int cacheSize = cache==null ? 0 : cache.size();
-            return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+", cache: "+cacheSize+", applied: "+applied+" }";
+        public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
+            long rc = 0;
+            Redo cur = redosInSnapshot.get(0).getNext();
+            while (cur != null) {
+                rc = cur.commitCheck(pageUpdates);
+                cur = cur.getNext();
+            }
+            return rc;
         }
         
+        
     }
+    
+    class CommitsSnapshot extends RedosSnapshot {
+        /** The snapshot will load page updates from the following commit and all it's previous linked commits. */
+        private final Commit commit;
 
-    /**
-     * @author chirino
-     */
-    private class Snapshot {
-
-        private final ArrayList<Update> updates;
-        private int references;
-
-        public Snapshot(ArrayList<Update> updates) {
-            this.updates = updates;
+        public CommitsSnapshot(Commit commit) {
+            this.commit= commit;
         }
 
-        private int mapPageId(int page) {
-            for (Update update : updates) {
-                if( update.updates!=null ) {
-                    Integer updatedPage = update.updates.get(page);
-                    if (updatedPage != null) {
-                        switch (updatedPage) {
-                        case PAGE_FREED:
-                            throw new PagingException("You should never try to read page that has been freed.");
-                        case PAGE_ALLOCATED:
-                            break;
-                        default:
-                            page = updatedPage;
-                        }
-                        break;
-                    }
+        public Snapshot open() {
+            references++;
+            if( references==1 ) {
+                for (Redo redo : redosInSnapshot) {
+                    redo.references++;
                 }
+                commit.redo.references++;
+                commit.snapshot = this;
             }
-            return page;
-        }
-
-        void read(int pageId, Buffer buffer) throws IOPagingException {
-            pageId = mapPageId(pageId);
-            pageFile.read(pageId, buffer);
-        }
-
-        public ByteBuffer slice(int pageId, int count) {
-            pageId = mapPageId(pageId);
-            return pageFile.slice(SliceType.READ, pageId, count);
+            return this;
         }
         
-        private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
-            // See if any of the updates in the snapshot have an update of the 
-            // requested page...
-            for (Update update : updates) {
-                if( update.cache!=null ) {
-                    CacheUpdate cu = update.cache.get(pageId);
-                    if (cu != null) {
-                        return cu.<T>value();
-                    }
+        public void close() {
+            references--;
+            if( references==0 ) {
+                for (Redo redo : redosInSnapshot) {
+                    redo.references--;
                 }
+                commit.redo.references--;
+                commit.snapshot = null;
             }
-            return readCache.cacheLoad(marshaller, pageId);
         }
 
-        public String toString() { 
-            return "{ updates: "+this.updates.size()+", references: "+this.references+" }";
-        }
-    }
 
-    class ReadCache {
-        Map<Integer, Object> map = Collections.synchronizedMap(new LRUCache<Integer, Object>(1024));
-        
-        @SuppressWarnings("unchecked")
-        private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
-            T rc = (T) map.get(pageId);
-            if( rc ==null ) {
-                rc = marshaller.load(pageFile, pageId);
-                map.put(pageId, rc);
+        public int mapPageId(int page) {
+            
+            // Check to see if it's in the current update list...
+            Commit update = this.commit;
+            while (update!=null) {
+                Integer updatedPage = update.updates.get(page);
+                if (updatedPage != null) {
+                    switch (updatedPage) {
+                    case PAGE_FREED:
+                        throw new PagingException("You should never try to read page that has been freed.");
+                    case PAGE_ALLOCATED:
+                        return page;
+                    default:
+                        return updatedPage;
+                    }
+                }
+                update = update.getPrevious();
             }
-            return rc;
-        }        
-    }
-    
-    private final ReadCache readCache = new ReadCache();
-    
-    /**
-     * The redo log is composed of a linked list of RedoBatch records.  A 
-     * RedoBatch stores the redo data for multiple updates.
-     * 
-     * @author chirino
-     */
-    static private class Redo implements Serializable {
-        private static final long serialVersionUID = 1188640492489990493L;
-        
-        /** the pageId that this redo batch is stored at */
-        private transient int page=-1;
-        private transient boolean recovered;
-        
-        private long base;
-        private long head;
-        
-        final HashMap<Integer, Integer> updates = new HashMap<Integer, Integer>();
-        transient final HashMap<Integer, CacheUpdate> cache = new HashMap<Integer, CacheUpdate>();
-        
-        /** points to a previous redo batch page */
-        public int previous=-1;
-
-        public String toString() { 
-            int count = updates==null ? 0 : updates.size();
-            return "{ page: "+this.page+", updates: "+count+", previous: "+previous+" }";
+            
+            return super.mapPageId(page);
         }
 
-        public int pageCount() {
-            int rc = 0;
-            if( updates!=null ) {
-                rc = updates.size();
+        public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
+            // Check to see if it's in the current update list...
+            Commit update = this.commit;
+            while (update!=null) {
+                DeferredUpdate du = update.deferredUpdates.get(page);
+                if (du != null) {
+                    return du.<T>value();
+                }
+                update = update.getPrevious();
             }
-            if( cache!=null ) {
-                rc = cache.size();
+            
+            return super.cacheLoad(marshaller, page);
+        }
+        
+        public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
+            long rc = 0;
+            
+            Commit next = this.commit.getNext();
+            while (next != null) {
+                rc = next.commitCheck(pageUpdates);
+                next = next.getNext();
+            }
+            
+            Redo cur = this.commit.redo.getNext();
+            while (cur != null) {
+                rc = cur.commitCheck(pageUpdates);
+                cur = cur.getNext();
             }
             return rc;
         }
         
     }
     
-    private class Header extends Struct {
-        public final UTF8String file_magic = new UTF8String(80);
-        public final Signed64 base_revision = new Signed64();
-        public final Signed32 page_size = new Signed32();
-        public final Signed32 free_list_page = new Signed32();
-        /** points at the latest redo page which might have been partially stored */ 
-        public final Signed32 redo_page = new Signed32();
-        /** points at the latest redo page which is guaranteed to be fully stored */
-        public final Signed32 synced_redo_page = new Signed32();
-        public final Signed64 checksum = new Signed64();
-        
-        public String toString() { 
-            return "{ base_revision: "+this.base_revision.get()+
-            ", page_size: "+page_size.get()+", free_list_page: "+free_list_page.get()+
-            ", redo_page: "+redo_page.get()+", checksum: "+checksum.get()+
-            " }";
-        }
-    }
+    private final MemoryMappedFile file;
+    final SimpleAllocator allocator;
+    final PageFile pageFile;
+    private static final int updateBatchSize = 1024;
 
+
+    /** The header structure of the file */
+    private final Header header = new Header();
     
-    /**
-     * A list of updates to the page file. The list head points to the most
-     * recent update.
-     */
-//    private final HashMap<Long, Update> updatesMap = new HashMap<Long, Update>();
-    private final LinkedNodeList<Update> updatesList = new LinkedNodeList<Update>();
-    
-    /**
-     * This is the next redo that will get logged.  It is currently being built.
-     */
-    Redo nextRedo;
+    int lastRedoPage = -1;
     
-    /**
-     * These are stored redos that are waiting for a file sync.  They may or may not survive 
-     * a failure.
-     */
-    private final LinkedList<Redo> unsyncedRedos = new LinkedList<Redo>();
+    private final LinkedNodeList<Redo> redos = new LinkedNodeList<Redo>();
     
-    /**
-     * These are stored redos that have been file synced.  They should survive a failure.
-     */
-    private final LinkedList<Redo> syncedRedos = new LinkedList<Redo>();
+    //
+    // The following Redo objects point to linked nodes in the previous redo list.  
+    // They are used to track designate the state of the redo object.
+    //
     
-    /**
-     * These are updates which have been applied but who's temp pages have not yet been 
-     * freed since they are being used by some snapshots.
-     */
-    private final LinkedList<Update> updatesWaitingCleanup = new LinkedList<Update>();
+    /** The current redo that is currently being built */
+    Redo buildingRedo;
+    /** The stored redos.  These might be be recoverable. */
+    Redo storedRedos;
+    /** The synced redos.  A file sync occurred after these redos were stored. */
+    Redo syncedRedos;
+    /** The performed redos.  Updates are actually performed to the original page file. */
+    Redo performedRedos;
     
-    private final Header header = new Header();
+    /** Used as cache read objects */
+    private ReadCache readCache = new ReadCache();
 
-    private final PageFile pageFile;
-    private final SimpleAllocator allocator;
-    private final MemoryMappedFile file;
+    /** Mutex for data structures which are used during house keeping tasks like redo management. Once acquired, you can also acquire the TRANSACTION_MUTEX */
+    private final Object HOUSE_KEEPING_MUTEX = "HOUSE_KEEPING_MUTEX";
+
+    /** Mutex for data structures which transaction threads access. Never attempt to acquire the HOUSE_KEEPING_MUTEX once this mutex is acquired.  */
+    private final Object TRANSACTION_MUTEX = "TRANSACTION_MUTEX";
+    
 
     /**
      * This is the free page list at the base revision.  It does not track allocations in transactions
@@ -644,139 +552,210 @@
         this.allocator = pageFile.allocator();
         ByteBuffer slice = file.slice(false, 0, FILE_HEADER_SIZE);
         this.header.setByteBuffer(slice, slice.position());
-        this.updatesList.addLast(new Update(0, null, null, null));
     }
 
     public Transaction tx() {
-        return new ConcurrentTransaction();
+        return new ConcurrentTransaction(this);
+    }
+
+    /**
+     * Attempts to commit a set of page updates.
+     * 
+     * @param updatedSnapshot
+     * @param pageUpdates
+     * @param deferredUpdates
+     */
+    void commit(Snapshot updatedSnapshot, HashMap<Integer, Integer> pageUpdates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
+        
+        boolean fullRedo=false;
+        synchronized (TRANSACTION_MUTEX) {
+            
+            // we need to figure out the revision id of the this commit...
+            long rev;
+            if( updatedSnapshot!=null ) {
+                
+                // Lets check for an OptimisticUpdateException
+                // verify that the new commit's updates don't conflict with a commit that occurred
+                // subsequent to the snapshot that this commit started operating on.
+                
+                // Note: every deferred update has an entry in the pageUpdates, so no need to 
+                // check to see if that map also conflicts.
+                rev = updatedSnapshot.commitCheck(pageUpdates);
+                
+            } else {
+                rev = buildingRedo.head;
+            }
+            rev++;
+
+            if( buildingRedo.base == -1 ) {
+                buildingRedo.base = rev;
+            }
+
+            buildingRedo.head = rev;
+            
+            // TODO: This map merging has to be a bit CPU intensive.. need 
+            // to look for ways to optimize it out.
+            buildingRedo.putAll(pageUpdates, deferredUpdates);
+            
+            Commit last = buildingRedo.commits.getTail();
+            if( last==null || last.snapshot!=null ) {
+                last = new Commit(rev, pageUpdates, deferredUpdates, buildingRedo);
+                buildingRedo.commits.addLast(last);
+            } else {
+                // we can merge into the previous commit.
+                last.putAll(pageUpdates, deferredUpdates);
+            }
+            
+            if( buildingRedo.pageCount() > updateBatchSize ) {
+                fullRedo = true;
+            }
+        }
+        
+        if( fullRedo ) {
+            synchronized (HOUSE_KEEPING_MUTEX) {
+                storeRedos(false);
+                // TODO: do the following actions async.
+                syncRedos();
+                performRedos();
+            }
+        }
     }
-
+    
     /**
      * Used to initialize a new file or to clear out the 
      * contents of an existing file.
      */
     public void reset() {
-        updatesList.clear();
-        updatesList.addLast(new Update(0, null, null, null));
-        unsyncedRedos.clear();
-        updatesWaitingCleanup.clear();
-        allocator.clear(); 
-        baseRevisionFreePages.clear();
-        baseRevisionFreePages.add(0, allocator.getLimit());
-
-        // Initialize the file header..
-        this.header.setByteBufferPosition(0);
-        this.header.file_magic.set(MAGIC);
-        this.header.base_revision.set(0);
-        this.header.free_list_page.set(-1);
-        this.header.page_size.set(pageFile.getPageSize());
-        this.header.redo_page.set(-1);
-        replicateHeader();
-    }
+        synchronized (HOUSE_KEEPING_MUTEX) {
+            redos.clear();
+            performedRedos = syncedRedos = storedRedos = buildingRedo = new Redo(-1);
+            redos.addFirst(buildingRedo);
+            
+            lastRedoPage = -1;
+            readCache.clear();
+            
+            allocator.clear(); 
+            baseRevisionFreePages.clear();
+            baseRevisionFreePages.add(0, allocator.getLimit());
     
+            // Initialize the file header..
+            Header h = header();
+            h.setByteBufferPosition(0);
+            h.magic.set(MAGIC);
+            h.base_revision.set(0);
+            h.free_list_page.set(-1);
+            h.page_size.set(pageFile.getPageSize());
+            h.reserved.set("");
+            h.unsynced_redo_page.set(-1);
+            replicateHeader();
+        }
+    }    
     /**
      * Loads an existing file and replays the redo
      * logs to put it in a consistent state.
      */
     public void recover() {
-        
-        unsyncedRedos.clear();
-        updatesWaitingCleanup.clear();
+        synchronized (HOUSE_KEEPING_MUTEX) {
 
-        this.header.setByteBufferPosition(0);
-        long baseRevision = header.base_revision.get();
-        updatesList.clear();
-        updatesList.addLast(new Update(baseRevision, null, null, null));
-
-        // Initialize the free page list.
-        int pageId = header.free_list_page.get();
-        if( pageId >= 0 ) {
-            baseRevisionFreePages = loadObject(pageId);
-            allocator.copy(baseRevisionFreePages);
-            Extent.unfree(pageFile, pageId);
-        } else {
-            allocator.clear(); 
-            baseRevisionFreePages.add(0, allocator.getLimit());
-        }
-        
-        // Load the redo batches.
-        pageId = header.redo_page.get();
-        while( pageId >= 0 ) {
-            Redo redo = loadObject(pageId); 
-            redo.page = pageId;
-            redo.recovered = true;
-            Extent.unfree(pageFile, pageId);
+            redos.clear();
+            performedRedos = syncedRedos = storedRedos = buildingRedo = new Redo(-1);
+            redos.addFirst(buildingRedo);
+            lastRedoPage = -1;
+            readCache.clear();
+    
+            Header h = header();
+            if( !MAGIC.equals( h.magic.get()) ) {
+                throw new PagingException("The file header is not of the expected type.");
+            }
             
-            pageId=-1;
-            if( baseRevision < redo.head ) {
-                
-                // add first since we are loading redo objects oldest to youngest
-                // but want to put them in the list youngest to oldest.
-                unsyncedRedos.addFirst(redo);
+            long baseRevision = h.base_revision.get();
+    
+            // Initialize the free page list.
+            int pageId = h.free_list_page.get();
+            if( pageId >= 0 ) {
+                baseRevisionFreePages = loadObject(pageId);
+                allocator.copy(baseRevisionFreePages);
+                Extent.unfree(pageFile, pageId);
+            } else {
+                allocator.clear(); 
+                baseRevisionFreePages.add(0, allocator.getLimit());
+            }
+            
+            // Load the redo batches.
+            pageId = h.unsynced_redo_page.get();
+            while( pageId >= 0 ) {
+                Redo redo = loadObject(pageId); 
+                redo.page = pageId;
+                redo.recovered = true;
+                Extent.unfree(pageFile, pageId);
                 
-                if( baseRevision < redo.base ) {
-                    pageId=redo.previous;
+                if( buildingRedo.head == -1 ) {
+                    buildingRedo.head = redo.head;
+                }
+    
+                pageId=-1;
+                if( baseRevision < redo.head ) {
+                    
+                    // add first since we are loading redo objects oldest to youngest
+                    // but want to put them in the list youngest to oldest.
+                    redos.addFirst(redo);
+                    syncedRedos = redo;
+                    
+                    if( baseRevision < redo.base ) {
+                        pageId=redo.previous;
+                    }
                 }
             }
-        }
-        
-        // Apply all the redos..
-        applyRedos();
+            
+            // Apply all the redos..
+            performRedos();
+        }        
     }
 
-
     /**
      * Once this method returns, any previously committed transactions 
      * are flushed and to the disk, ensuring that they will not be lost
-     * upon failure. 
+     * upon failure.
      */
     public void flush() {
-        
-        // Write out the current redo if it has data...
-        Redo redo;
-        synchronized (this) {
-            redo = nextRedo;
-            nextRedo = null;
-        }
-        if( redo == null ) {
-            store(redo);
-        }
-
-        // Find out if there are unsynced redos...
-        redo = null;
-        synchronized (this) {
-            if( !unsyncedRedos.isEmpty() ) {
-                for (Redo r : unsyncedRedos) {
-                    syncedRedos.add(r);
-                    redo = r;
-                }
-            }
+        synchronized (HOUSE_KEEPING_MUTEX) {
+            storeRedos(true);
+            syncRedos();
         }
+    }   
+    
+    // /////////////////////////////////////////////////////////////////
+    //
+    // Methods which transition redos through their life cycle states;
+    //
+    //    building -> stored -> synced -> performed -> released
+    //
+    // The HOUSE_KEEPING_MUTEX must be acquired before being called. 
+    //
+    // /////////////////////////////////////////////////////////////////
+    
+    /**
+     * Attempts to perform a redo state change: building -> stored
+     */
+    private void storeRedos(boolean force) {
+        Redo redo;
         
-        // Yep.. we had some.. 
-        if( redo!=null ) {
-            // This is a slow operation..
-            file.sync();
-            // Update the header so that it knows about the redos that are 
-            // guaranteed to survive a failure.
-            header().synced_redo_page.set(redo.page);
-            replicateHeader();
+        // We synchronized /w the transactions so that they see the state change.
+        synchronized (TRANSACTION_MUTEX) {
+            // Re-checking since storing the redo may not be needed.
+            if( (force && buildingRedo.base!=-1 ) || buildingRedo.pageCount() > updateBatchSize ) {
+                redo = buildingRedo;
+                buildingRedo = new Redo(redo.head);
+                redos.addLast(buildingRedo);
+            } else {
+                return;
+            }
         }
-    }
-
-    private Header header() {
-        this.header.getByteBuffer().position(0);
-        this.header.setByteBufferPosition(0);
-        Header h = this.header;
-        return h;
-    }
-    
-    public void store(Redo redo) {
         
         // Write any outstanding deferred cache updates...
-        if( redo.cache != null ) {
-            for (Entry<Integer, CacheUpdate> entry : redo.cache.entrySet()) {
-                CacheUpdate cu = entry.getValue();
+        if( redo.deferredUpdates != null ) {
+            for (Entry<Integer, DeferredUpdate> entry : redo.deferredUpdates.entrySet()) {
+                DeferredUpdate cu = entry.getValue();
                 List<Integer> allocatedPages = cu.store(pageFile);
                 for (Integer page : allocatedPages) {
                     // add any allocated pages to the update list so that the free 
@@ -787,85 +766,154 @@
         }
 
         // Link it to the last redo.
-        Redo last = unsyncedRedos.getLast();
-        if( last!=null ) {
-            redo.previous = last.page; 
-        }
+        redo.previous = lastRedoPage; 
         
-        // Store the redo.
-        redo.page = storeObject(redo);
-        synchronized (this) {
-            unsyncedRedos.add(redo);
-        }
+        // Store the redo record.
+        lastRedoPage = redo.page = storeObject(redo);
 
         // Update the header to know about the new redo page.
-        header().redo_page.set(redo.page);
+        header().unsynced_redo_page.set(redo.page);
         replicateHeader();
     }
-
+    
     /**
-     *  Frees up space by applying redos and releasing the pages that
-     *  the redo was stored on. 
+     * Performs a file sync. 
+     * 
+     * This allows two types of redo state changes to occur:
+     * <ul>
+     * <li> stored -> synced
+     * <li> performed -> released
+     * </ul>
      */
-    public void applyRedos() {
+    private void syncRedos() {
 
-        // We can only apply redos which we know are not partially stored on disk
-        // and which hold revisions which are older than the oldest active snapshot.
-        ArrayList<Redo> redoList = new ArrayList<Redo>();
-        synchronized (this) {
-            long snapshotHeadRev = Long.MAX_VALUE;
-            Update cur = updatesList.getHead();
-            while( cur!=null ) {
-                if( cur.snapshot!=null ) {
-                    snapshotHeadRev = cur.head;
-                    break;
-                }
+        // This is a slow operation..
+        file.sync();
+        Header h = header();
+
+        // Update the base_revision with the last performed revision.
+        if (performedRedos!=syncedRedos) {
+            Redo lastPerformedRedo = syncedRedos.getPrevious();
+            h.base_revision.set(lastPerformedRedo.head);
+        }
+
+        // Were there some redos in the stored state?
+        if (storedRedos!=buildingRedo) {
+            
+            // The last stored is actually synced now..
+            Redo lastStoredRedo = buildingRedo.getPrevious();
+            // Let the header know about it..
+            h.redo_page.set(lastStoredRedo.page);
+            
+            // We synchronized /w the transactions so that they see the state change.
+            synchronized (TRANSACTION_MUTEX) {
+                // Transition stored -> synced.
+                storedRedos = buildingRedo;
+            }
+        }
+        
+        // Once a redo has been performed, subsequently synced, and no longer referenced,
+        // it's allocated recovery space can be released.
+        while( performedRedos!=syncedRedos ) {
+            if( performedRedos.references!=0 ) {
+                break;
             }
             
-            for (Iterator<Redo> i = this.unsyncedRedos.iterator(); i.hasNext();) {
-                Redo redo = i.next();
-                if (redo.base > snapshotHeadRev) {
-                    // we can't apply the rest of the updates, since a snapshot depends on 
-                    // the current base revision.
-                    // the rest of the updates will have incrementing revision numbers too.
+            // Free the update pages associated with the redo.
+            for (Entry<Integer, Integer> entry : performedRedos.updates.entrySet()) {
+                int key = entry.getKey();
+                int value = entry.getValue();
+        
+                switch( value ) {
+                case PAGE_ALLOCATED:
+                    // It was a new page that was written.. we don't need to 
+                    // free it.
+                    break;
+                case PAGE_FREED:
+                    // update freed a previous page.. now is when we actually free it.
+                    allocator.free(key, 1);
                     break;
+                default:
+                    // This updated the 'key' page, now we can release the 'value' page
+                    // since it has been copied over the 'key' page and is no longer needed.
+                    allocator.free(value, 1);
                 }
-                redoList.add(redo);
-                i.remove();
             }
+            
+            // Free the redo record itself.
+            Extent.free(pageFile, performedRedos.page);
+            
+            // don't need to sync /w transactions since they don't use the performedRedos variable.
+            // Transition performed -> released
+            performedRedos = performedRedos.getNext();
+            
+            // removes the released redo form the redo list.
+            performedRedos.getPrevious().unlink();
         }
-        
-        // Perhaps we can't do any work...
-        if( redoList.isEmpty() ) {
+
+        // Store the free list..
+        int previousFreeListPage = h.free_list_page.get();
+        h.free_list_page.set(storeObject(baseRevisionFreePages));
+        replicateHeader();
+
+        // Release the previous free list.
+        if (previousFreeListPage >= 0) {
+            Extent.free(pageFile, previousFreeListPage);
+        }
+    }
+
+    /**
+     * Attempts to perform a redo state change: synced -> performed
+     * 
+     * Once a redo is performed, new snapshots will not reference 
+     * the redo anymore.
+     */
+    public void performRedos() {
+
+        if( syncedRedos==storedRedos ) {
+            // There are no redos in the synced state for use to transition.
+            return;
+        }
+              
+        // The last performed redo MIGHT still have an open snapshot.
+        // we can't transition from synced, until that snapshot closes.
+        Redo lastPerformed = syncedRedos.getPrevious();
+        if( lastPerformed!=null && lastPerformed.references!=0) {
             return;
         }
         
-        long baseRevision = header().base_revision.get();
-
-        for (Redo redo : redoList) {
-            // revision numbers should be sequentially increasing.
-            assert baseRevision+1==redo.base;
+        while( syncedRedos!=storedRedos ) {
             
-            for (Entry<Integer, Integer> entry : redo.updates.entrySet()) {
+            // Performing the redo actually applies the updates to the original page locations. 
+            for (Entry<Integer, Integer> entry : syncedRedos.updates.entrySet()) {
                 int key = entry.getKey();
                 int value = entry.getValue();
                 switch( value ) {
                 case PAGE_ALLOCATED:
-                    if( redo.recovered ) {
+                    if( syncedRedos.recovered ) {
+                        // If we are recovering, the allocator MIGHT not have this 
+                        // page as being allocated.  This makes sure it's allocated so that
+                        // new transaction to get this page and overwrite it in error.
                         allocator.unfree(key, 1);
                     }
+                    // Update the persistent free list.  This gets stored on the next sync.
                     baseRevisionFreePages.remove(key, 1);
                     break;
                 case PAGE_FREED:
-                    if( redo.recovered ) {
-                        allocator.free(key, 1);
-                    }
+                    // The actual free gets done on the next file sync.
+                    // Update the persistent free list.  This gets stored on the next sync.
                     baseRevisionFreePages.add(key, 1);
                     break;
                 default:
-                    if( redo.recovered ) {
+                    if( syncedRedos.recovered ) {
+                        // If we are recovering, the allocator MIGHT not have this 
+                        // page as being allocated.  This makes sure it's allocated so that
+                        // new transaction to get this page and overwrite it in error.
                         allocator.unfree(key, 1);
                     }
+                    
+                    // Perform the update by copying the updated 'redo page' to the original
+                    // page location.
                     ByteBuffer slice = pageFile.slice(SliceType.READ, value, 1);
                     try {
                         pageFile.write(key, slice);
@@ -874,34 +922,103 @@
                     }
                 }
             }
-            baseRevision = redo.base;
+            
+            // We synchronized /w the transactions so that they see the state change.
+            synchronized (TRANSACTION_MUTEX) {
+                // Transition synced -> performed
+                syncedRedos = syncedRedos.getNext();
+            }
+            
+            lastPerformed = syncedRedos.getPrevious();
+            // We have to stop if the last redo performed has an open snapshot.
+            if( lastPerformed.references!=0 ) {
+                break;
+            }
         }
-
+    }
+    
+    // /////////////////////////////////////////////////////////////////
+    // Snapshot management
+    // /////////////////////////////////////////////////////////////////
+    
+    Snapshot openSnapshot() {
+        synchronized(TRANSACTION_MUTEX) {
             
-        // force to ensure all data is fully stored before the header 
-        // starts making reference to new stuff
-        file.sync();
+            // Is it a partial redo snapshot??  
+            // If there are commits in the next redo..
+            Snapshot snapshot;
+            Commit commit = buildingRedo.commits.getTail();
+            if( commit!=null ) {
+                snapshot = commit.snapshot != null ? commit.snapshot : new CommitsSnapshot(commit);
+                return snapshot.open();
+            }
 
-        
-        Header h = header();
-        int previousFreeListPage = h.free_list_page.get();
-        h.free_list_page.set(storeObject(baseRevisionFreePages));
-        h.base_revision.set(baseRevision);
-        replicateHeader();
-        
-        // Release the previous free list.
-        if( previousFreeListPage>=0 ) {
-            Extent.free(pageFile, previousFreeListPage);
+            // Perhaps this snapshot has to deal with full redos..
+            if( syncedRedos!=buildingRedo ) {
+                Redo lastRedo = buildingRedo.getPrevious();
+                snapshot = lastRedo.snapshot != null ? lastRedo.snapshot : new RedosSnapshot();
+                return snapshot.open();
+            }
+            
+            // Then the snapshot does not have previous updates.
+            snapshot = buildingRedo.prevSnapshot != null ? buildingRedo.prevSnapshot : new PreviousSnapshot(buildingRedo);
+            return snapshot.open();
         }
-        
-        // Free the space associated with the redo batches
-        if( !redoList.isEmpty() ) {
-            for (Redo redo : redoList) {
-                Extent.free(pageFile, redo.page);
+    }
+    
+    private List<Redo> snapshotRedos() {
+        if( syncedRedos!=buildingRedo ) {
+            ArrayList<Redo> rc = new ArrayList<Redo>(4);
+            Redo cur = buildingRedo.getPrevious();
+            while( true ) {
+                rc.add(cur);
+                if( cur == syncedRedos ) {
+                    break;
+                }
+                cur = cur.getPrevious();
+            }
+            return rc;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    void closeSnapshot(Snapshot snapshot) {
+        if( snapshot!=null ) {
+            synchronized(TRANSACTION_MUTEX) {
+                snapshot.close();
             }
         }
     }
+    
+    // /////////////////////////////////////////////////////////////////
+    // TODO:
+    // /////////////////////////////////////////////////////////////////
+
+    /**
+     * The quiesce method is used to pause/stop access to the concurrent page file.
+     * access can be restored using the {@link #resume()} method.    
+     * 
+     * @param reads if true, the suspend will also suspend read only transactions. 
+     * @param blocking if true, transactions will block until the {@link #resume()} method 
+     *          is called, otherwise they will receive errors.
+     * @param drain if true, in progress transactions are allowed to complete, otherwise they
+     *        also are suspended. 
+     */
+    public void suspend(boolean reads, boolean blocking, boolean drain) {
+    }
 
+    /**
+     * Resumes a previously suspended page file. 
+     */
+    public void resume() {
+    }
+    
+    
+    // /////////////////////////////////////////////////////////////////
+    // Helper methods
+    // /////////////////////////////////////////////////////////////////
+    
     private int storeObject(Object value) {
         try {
             ExtentOutputStream eos = new ExtentOutputStream(pageFile);
@@ -927,6 +1044,14 @@
         }
     }
     
+
+    private Header header() {
+        this.header.getByteBuffer().position(0);
+        this.header.setByteBufferPosition(0);
+        Header h = this.header;
+        return h;
+    }
+    
     private void replicateHeader() {
         // Calculate the checksum of the header so that we can tell if the
         // header is corrupted.
@@ -939,109 +1064,53 @@
         // Copy the header so we can survive a partial update.
         ByteBuffer header = file.read(0, this.header.size());
         file.write(FILE_HEADER_SIZE / 2, header);
-    }
-
+    }    
+    
     // /////////////////////////////////////////////////////////////////
-    // Transaction calls back to these methods...
+    // Simple Helper Classes
     // /////////////////////////////////////////////////////////////////
-    synchronized private Snapshot aquireSnapshot() {
-        Snapshot snapshot = updatesList.getTail().snapshot;
-        if (snapshot == null) {
-            ArrayList<Update> updates = updatesList.toArrayListReversed();
-            updates.get(0).snapshot = new Snapshot(updates);
+
+    class ReadCache {
+        private final Map<Integer, Object> map = Collections.synchronizedMap(new LRUCache<Integer, Object>(1024));
+
+        @SuppressWarnings("unchecked")
+        private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
+            T rc = (T) map.get(pageId);
+            if( rc ==null ) {
+                rc = marshaller.load(pageFile, pageId);
+                map.put(pageId, rc);
+            }
+            return rc;
         }
-        snapshot.references++;
-        return snapshot;
+
+        public void clear() {
+            map.clear();
+        }        
     }
     
-    synchronized private void releaseSnapshot(Snapshot snapshot) {
-        if( snapshot!=null ) {
-            synchronized(this) {
-                snapshot.references--;
-                if( snapshot.references==0 ) {
-                    Update update = snapshot.updates.get(0);
-                    update.snapshot=null;
-                    Update next = update.getNext();
-                    if( next !=null ) {
-                        next.mergePrevious();
-                    }
-                }
-            }
+    static class DeferredUpdate {
+        private final int page;
+        private Object value;
+        private EncoderDecoder<?> marshaller;
+
+        public DeferredUpdate(int page, Object value, EncoderDecoder<?> marshaller) {
+            this.page = page;
+            this.value = value;
+            this.marshaller = marshaller;
         }
-    }
 
-    /**
-     * Attempts to commit a set of page updates.
-     * 
-     * @param previousUpdate
-     * @param pageUpdates
-     * @param cache
-     */
-    private void commit(Update previousUpdate, HashMap<Integer, Integer> pageUpdates, HashMap<Integer, CacheUpdate> cache) {
-        
-        Redo fullRedo=null;
-        synchronized (this) {
-            // Perhaps a concurrent update came in before this one...
-            long rev;
-            if( previousUpdate!=null ) {
-                rev = previousUpdate.head;
-                Update concurrentUpdate = previousUpdate.getNext();
-                while( concurrentUpdate != null ) {
-                    // Yep.. there were concurrent updates.  
-                    // Make sure we don't don't have update conflict.
-                    for (Integer page : pageUpdates.keySet()) {
-                        if( concurrentUpdate.updates.containsKey(page) ) {
-                            throw new OptimisticUpdateException();
-                        }
-                    }
-        
-                    rev = concurrentUpdate.head;
-                    concurrentUpdate = concurrentUpdate.getNext();
-                }
-            } else {
-                rev = updatesList.getTail().head;
-            }
-            rev++;
+        public void reset(Object value, EncoderDecoder<?> marshaller) {
+            this.value = value;
+            this.marshaller = marshaller;
+        }
 
-            if( nextRedo == null ) {
-                nextRedo = new Redo();
-                nextRedo.base = rev;
-            }
-            
-            nextRedo.head = rev;
-            nextRedo.updates.putAll(pageUpdates);
-            nextRedo.cache.putAll(cache);
-            
-            Update value = new Update(rev, pageUpdates, cache, nextRedo);
-            updatesList.addLast(value);
-            value.mergePrevious();
-            
-            if( nextRedo.pageCount() > 10 ) {
-                fullRedo = nextRedo;
-                nextRedo = null;
-            }
+        @SuppressWarnings("unchecked")
+        <T> T value() {
+            return (T) value;
         }
-        if( fullRedo!=null ) {
-            store(fullRedo);
+        
+        @SuppressWarnings("unchecked")
+        public List<Integer> store(Paged paged) {
+            return ((EncoderDecoder)marshaller).store(paged, page, value);
         }
-    }
-    
-    /**
-     * The quiesce method is used to pause/stop access to the concurrent page file.
-     * access can be restored using the {@link #resume()} method.    
-     * 
-     * @param reads if true, the suspend will also suspend read only transactions. 
-     * @param blocking if true, transactions will block until the {@link #resume()} method 
-     *          is called, otherwise they will receive errors.
-     * @param drain if true, in progress transactions are allowed to complete, otherwise they
-     *        also are suspended. 
-     */
-    public void suspend(boolean reads, boolean blocking, boolean drain) {
-    }
-
-    /**
-     * Resumes a previously suspended page file. 
-     */
-    public void resume() {
-    }
-}
+    }}

Modified: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java (original)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java Fri Oct 16 18:56:37 2009
@@ -59,7 +59,7 @@
         if (concurrentPageFile != null) {
             concurrentPageFile.suspend(true, false, drainOnClose);
             concurrentPageFile.flush();
-            concurrentPageFile.applyRedos();
+            concurrentPageFile.performRedos();
             concurrentPageFile=null;
         }
         super.close();