You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/15 19:04:15 UTC

svn commit: r825564 [4/5] - in /activemq/sandbox/activemq-flow: ./ activemq-util/src/main/java/org/apache/activemq/util/buffer/ hawtdb/ hawtdb/src/ hawtdb/src/main/ hawtdb/src/main/java/ hawtdb/src/main/java/org/ hawtdb/src/main/java/org/apache/ hawtdb...

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,1047 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.CRC32;
+
+import javolution.io.Struct;
+
+import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.LinkedNodeList;
+import org.apache.hawtdb.api.Allocator;
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.IOPagingException;
+import org.apache.hawtdb.api.OptimisticUpdateException;
+import org.apache.hawtdb.api.OutOfSpaceException;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.api.PagingException;
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.api.Paged.SliceType;
+import org.apache.hawtdb.internal.io.MemoryMappedFile;
+import org.apache.hawtdb.internal.util.Ranges;
+
+
+/**
+ * Provides concurrent page file access via Multiversion concurrency control
+ * (MVCC).
+ * 
+ * Once a transaction begins working against the data, it acquires a snapshot of
+ * all the data in the page file. This snapshot is used to provides the
+ * transaction consistent view of the data in spite of it being concurrently
+ * modified by other transactions.
+ * 
+ * When a transaction does a page update, the update is stored in a temporary
+ * page location. Subsequent reads of the original page will result in page read
+ * of the temporary page. If the transaction rolls back, the temporary pages are
+ * freed. If the transaction commits, the page updates are assigned the next
+ * snapshot version number and the update gets queued so that it can be applied
+ * atomically at a later time.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public final class ConcurrentPageFile {
+
+    private static final String MAGIC = "HawtDB:MVCC Page File:1.0\n";
+    private static final int FILE_HEADER_SIZE = 1024 * 4;
+
+    public static final int PAGE_ALLOCATED = -1;
+    public static final int PAGE_FREED = -2;
+//    public static final int PAGE_CACHED_WRITE = -3;
+    public static final int HEADER_SIZE = 1024*4;
+
+    static class CacheUpdate {
+        private final int page;
+        private Object value;
+        private EncoderDecoder<?> marshaller;
+
+        public CacheUpdate(int page, Object value, EncoderDecoder<?> marshaller) {
+            this.page = page;
+            this.value = value;
+            this.marshaller = marshaller;
+        }
+
+        public void reset(Object value, EncoderDecoder<?> marshaller) {
+            this.value = value;
+            this.marshaller = marshaller;
+        }
+
+        @SuppressWarnings("unchecked")
+        <T> T value() {
+            return (T) value;
+        }
+        
+        @SuppressWarnings("unchecked")
+        public List<Integer> store(Paged paged) {
+            return ((EncoderDecoder)marshaller).store(paged, page, value);
+        }
+    }
+    
+    /**
+     * Transaction objects are NOT thread safe. Users of this object should
+     * guard it from concurrent access.
+     * 
+     * @author chirino
+     */
+    private final class ConcurrentTransaction implements Transaction {
+        private HashMap<Integer, CacheUpdate> cache;
+        private HashMap<Integer, Integer> updates;
+        private Snapshot snapshot;
+        
+        private final Allocator txallocator = new Allocator() {
+            
+            public void free(int pageId, int count) {
+                // TODO: this is not a very efficient way to handle allocation ranges.
+                int end = pageId+count;
+                for (int key = pageId; key < end; key++) {
+                    Integer previous = getUpdates().put(key, PAGE_FREED);
+                    
+                    // If it was an allocation that was done in this
+                    // tx, then we can directly release it.
+                    assert previous!=null;
+                    if( previous == PAGE_ALLOCATED) {
+                        getUpdates().remove(key);
+                        allocator.free(key, 1);
+                    }
+                }
+            }
+            
+            public int alloc(int count) throws OutOfSpaceException {
+                int pageId = allocator.alloc(count);
+                // TODO: this is not a very efficient way to handle allocation ranges.
+                int end = pageId+count;
+                for (int key = pageId; key < end; key++) {
+                    getUpdates().put(key, PAGE_ALLOCATED);
+                }
+                return pageId;
+            }
+
+            public void unfree(int pageId, int count) {
+                throw new UnsupportedOperationException();
+            }
+            
+            public void clear() throws UnsupportedOperationException {
+                throw new UnsupportedOperationException();
+            }
+
+            public int getLimit() {
+                return allocator.getLimit();
+            }
+
+            public boolean isAllocated(int page) {
+                return allocator.isAllocated(page);
+            }
+
+        };
+
+        public <T> T get(EncoderDecoder<T> marshaller, int page) {
+            // Perhaps the page was updated in the current transaction...
+            CacheUpdate rc = cache == null ? null : cache.get(page);
+            if( rc != null ) {
+                return rc.<T>value();
+            }
+            
+            // No?  Then ask the snapshot to load the object.
+            return snapshot().cacheLoad(marshaller, page);
+        }
+
+        public <T> void put(EncoderDecoder<T> marshaller, int page, T value) {
+            Integer update = getUpdates().get(page);
+            if (update == null) {
+                // This is the first time this transaction updates the page...
+                snapshot();
+                update = allocator.alloc(1);
+                getUpdates().put(page, update);
+                getCacheUpdates().put(page, new CacheUpdate(update, value, marshaller));
+            } else {
+                // We have updated it before...
+                switch (update) {
+                case PAGE_FREED:
+                    throw new PagingException("You should never try to write a page that has been freed.");
+                case PAGE_ALLOCATED:
+                    getCacheUpdates().put(page, new CacheUpdate(page, value, marshaller));
+                    break;
+                default:
+                    CacheUpdate cu = getCacheUpdates().get(page);
+                    if( cu == null ) {
+                        throw new PagingException("You should never try to store mix using the cached objects with normal page updates.");
+                    }
+                    cu.reset(value, marshaller);
+                }
+            }
+        }
+
+        public <T> void remove(EncoderDecoder<T> marshaller, int page) {
+            marshaller.remove(ConcurrentTransaction.this, page);
+        }
+        
+        public Allocator allocator() {
+            return txallocator;
+        }
+
+        public void read(int pageId, Buffer buffer) throws IOPagingException {
+           
+            Integer updatedPageId = updates == null ? null : updates.get(pageId);
+            if (updatedPageId != null) {
+                switch (updatedPageId) {
+                case PAGE_ALLOCATED:
+                case PAGE_FREED:
+                    // TODO: Perhaps use a RuntimeException subclass.
+                    throw new PagingException("You should never try to read a page that has been allocated or freed.");
+                default:
+                    // read back in the updated we had done.
+                    pageFile.read(updatedPageId, buffer);
+                }
+            } else {
+                // Get the data from the snapshot.
+                snapshot().read(pageId, buffer);
+            }
+        }
+
+        public ByteBuffer slice(SliceType type, int page, int count) throws IOPagingException {
+            //TODO: need to improve the design of ranged ops..
+            if( type==SliceType.READ ) {
+                Integer updatedPageId = updates == null ? null : updates.get(page);
+                if (updatedPageId != null) {
+                    switch (updatedPageId) {
+                    case PAGE_ALLOCATED:
+                    case PAGE_FREED:
+                        throw new PagingException("You should never try to read a page that has been allocated or freed.");
+                    }
+                    return pageFile.slice(type, updatedPageId, count);
+                } else {
+                    // Get the data from the snapshot.
+                    return snapshot().slice(page, count);
+                }
+                
+            } else {
+                Integer update = getUpdates().get(page);
+                if (update == null) {
+                    update = allocator.alloc(count);
+                    
+                    if (type==SliceType.READ_WRITE) {
+                        ByteBuffer slice = snapshot().slice(page, count);
+                        try {
+                            pageFile.write(update, slice);
+                        } finally { 
+                            pageFile.unslice(slice);
+                        }
+                    }
+                    
+                    int end = page+count;
+                    for (int i = page; i < end; i++) {
+                        getUpdates().put(i, PAGE_ALLOCATED);
+                    }
+                    getUpdates().put(page, update);
+                    
+                    return pageFile.slice(type, update, count);
+                } else {
+                    switch (update) {
+                    case PAGE_FREED:
+                        throw new PagingException("You should never try to write a page that has been freed.");
+                    case PAGE_ALLOCATED:
+                        break;
+                    default:
+                        page = update;
+                    }
+                }
+                return pageFile.slice(type, page, count);
+                
+            }
+            
+        }
+        
+        public void unslice(ByteBuffer buffer) {
+            pageFile.unslice(buffer);
+        }
+
+        public void write(int page, Buffer buffer) throws IOPagingException {
+            Integer update = getUpdates().get(page);
+            if (update == null) {
+                // We are updating an existing page in the snapshot...
+                snapshot();
+                update = allocator.alloc(1);
+                getUpdates().put(page, update);
+                page = update;
+            } else {
+                switch (update) {
+                case PAGE_FREED:
+                    throw new PagingException("You should never try to write a page that has been freed.");
+                case PAGE_ALLOCATED:
+                    break;
+                default:
+                    page = update;
+                }
+            }
+            pageFile.write(page, buffer);
+        }
+
+
+        public void commit() throws IOPagingException {
+            boolean failed = true;
+            try {
+                if (updates!=null) {
+                    Update previousUpdate = snapshot==null ? null : snapshot.updates.get(0);
+                    ConcurrentPageFile.this.commit(previousUpdate, updates, cache);
+                }
+                failed = false;
+            } finally {
+                // Rollback if the commit fails.
+                if (failed) {
+                    freeAllocatedPages();
+                }
+                ConcurrentPageFile.this.releaseSnapshot(snapshot);
+                updates = null;
+                cache = null;
+                snapshot = null;
+            }
+        }
+
+        public void rollback() throws IOPagingException {
+            try {
+                if (updates!=null) {
+                    freeAllocatedPages();
+                }
+            } finally {
+                ConcurrentPageFile.this.releaseSnapshot(snapshot);
+                updates = null;
+                cache = null;
+                snapshot = null;
+            }
+        }
+
+        private void freeAllocatedPages() {
+            for (Entry<Integer, Integer> entry : updates.entrySet()) {
+                switch (entry.getValue()) {
+                case PAGE_FREED:
+                    // Don't need to do anything..
+                    break;
+                case PAGE_ALLOCATED:
+                default:
+                    // We need to free the page that was allocated for the
+                    // update..
+                    allocator.free(entry.getKey(), 1);
+                }
+            }
+        }
+
+        public Snapshot snapshot() {
+            if (snapshot == null) {
+                snapshot = aquireSnapshot();
+            }
+            return snapshot;
+        }
+
+        public boolean isReadOnly() {
+            return updates == null;
+        }
+
+        public HashMap<Integer, CacheUpdate> getCacheUpdates() {
+            if( cache==null ) {
+                cache = new HashMap<Integer, CacheUpdate>();
+            }
+            return cache;
+        }
+
+        private HashMap<Integer, Integer> getUpdates() {
+            if (updates == null) {
+                updates = new HashMap<Integer, Integer>();
+            }
+            return updates;
+        }
+
+        public int getPageSize() {
+            return pageFile.getPageSize();
+        }
+
+        public String toString() { 
+            int updatesSize = updates==null ? 0 : updates.size();
+            return "{ snapshot: "+this.snapshot+", updates: "+updatesSize+" }";
+        }
+
+        public int pages(int length) {
+            return pageFile.pages(length);
+        }
+
+        public void flush() {
+            ConcurrentPageFile.this.flush();
+        }
+
+    }
+
+    static class Update extends LinkedNode<Update> implements Serializable {
+        private static final long serialVersionUID = 9160865012544031094L;
+
+        transient final AtomicBoolean applied = new AtomicBoolean();
+        private final Redo redo;
+        
+        long base;
+        long head;
+        Snapshot snapshot;
+        
+        HashMap<Integer, Integer> updates;
+        transient HashMap<Integer, CacheUpdate> cache;
+
+        public Update(long version, HashMap<Integer, Integer> updates, HashMap<Integer, CacheUpdate> cache, Redo redo) {
+            this.redo = redo;
+            this.head = this.base = version;
+            this.updates = updates;
+            this.cache = new HashMap<Integer, CacheUpdate>();
+        }
+        
+        /**
+         * Merges previous updates that can be merged with this update. 
+         */
+        public void mergePrevious() {
+            prev = getPrevious();
+            while( prev!=null && prev.snapshot==null && prev.redo==redo ) {
+                
+                assert prev.head+1 == this.base;
+                this.base = prev.base;
+                
+                if( prev.updates!=null ) {
+                    if(this.updates!=null) {
+                        prev.updates.putAll(this.updates);
+                    }
+                    this.updates = prev.updates;
+                }
+                
+                if( prev.cache!=null ) {
+                    if( this.cache!=null ) {
+                        prev.cache.putAll(this.cache);
+                    }
+                    this.cache = prev.cache;
+                }
+                prev.unlink();
+            }
+        }
+
+        
+        
+        public String toString() { 
+            int updateSize = updates==null ? 0 : updates.size();
+            int cacheSize = cache==null ? 0 : cache.size();
+            return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+", cache: "+cacheSize+", applied: "+applied+" }";
+        }
+        
+    }
+
+    /**
+     * @author chirino
+     */
+    private class Snapshot {
+
+        private final ArrayList<Update> updates;
+        private int references;
+
+        public Snapshot(ArrayList<Update> updates) {
+            this.updates = updates;
+        }
+
+        private int mapPageId(int page) {
+            for (Update update : updates) {
+                if( update.updates!=null ) {
+                    Integer updatedPage = update.updates.get(page);
+                    if (updatedPage != null) {
+                        switch (updatedPage) {
+                        case PAGE_FREED:
+                            throw new PagingException("You should never try to read page that has been freed.");
+                        case PAGE_ALLOCATED:
+                            break;
+                        default:
+                            page = updatedPage;
+                        }
+                        break;
+                    }
+                }
+            }
+            return page;
+        }
+
+        void read(int pageId, Buffer buffer) throws IOPagingException {
+            pageId = mapPageId(pageId);
+            pageFile.read(pageId, buffer);
+        }
+
+        public ByteBuffer slice(int pageId, int count) {
+            pageId = mapPageId(pageId);
+            return pageFile.slice(SliceType.READ, pageId, count);
+        }
+        
+        private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
+            // See if any of the updates in the snapshot have an update of the 
+            // requested page...
+            for (Update update : updates) {
+                if( update.cache!=null ) {
+                    CacheUpdate cu = update.cache.get(pageId);
+                    if (cu != null) {
+                        return cu.<T>value();
+                    }
+                }
+            }
+            return readCache.cacheLoad(marshaller, pageId);
+        }
+
+        public String toString() { 
+            return "{ updates: "+this.updates.size()+", references: "+this.references+" }";
+        }
+    }
+
+    class ReadCache {
+        Map<Integer, Object> map = Collections.synchronizedMap(new LRUCache<Integer, Object>(1024));
+        
+        @SuppressWarnings("unchecked")
+        private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
+            T rc = (T) map.get(pageId);
+            if( rc ==null ) {
+                rc = marshaller.load(pageFile, pageId);
+                map.put(pageId, rc);
+            }
+            return rc;
+        }        
+    }
+    
+    private final ReadCache readCache = new ReadCache();
+    
+    /**
+     * The redo log is composed of a linked list of RedoBatch records.  A 
+     * RedoBatch stores the redo data for multiple updates.
+     * 
+     * @author chirino
+     */
+    static private class Redo implements Serializable {
+        private static final long serialVersionUID = 1188640492489990493L;
+        
+        /** the pageId that this redo batch is stored at */
+        private transient int page=-1;
+        private transient boolean recovered;
+        
+        private long base;
+        private long head;
+        
+        final HashMap<Integer, Integer> updates = new HashMap<Integer, Integer>();
+        transient final HashMap<Integer, CacheUpdate> cache = new HashMap<Integer, CacheUpdate>();
+        
+        /** points to a previous redo batch page */
+        public int previous=-1;
+
+        public String toString() { 
+            int count = updates==null ? 0 : updates.size();
+            return "{ page: "+this.page+", updates: "+count+", previous: "+previous+" }";
+        }
+
+        public int pageCount() {
+            int rc = 0;
+            if( updates!=null ) {
+                rc = updates.size();
+            }
+            if( cache!=null ) {
+                rc = cache.size();
+            }
+            return rc;
+        }
+        
+    }
+    
+    private class Header extends Struct {
+        public final UTF8String file_magic = new UTF8String(80);
+        public final Signed64 base_revision = new Signed64();
+        public final Signed32 page_size = new Signed32();
+        public final Signed32 free_list_page = new Signed32();
+        /** points at the latest redo page which might have been partially stored */ 
+        public final Signed32 redo_page = new Signed32();
+        /** points at the latest redo page which is guaranteed to be fully stored */
+        public final Signed32 synced_redo_page = new Signed32();
+        public final Signed64 checksum = new Signed64();
+        
+        public String toString() { 
+            return "{ base_revision: "+this.base_revision.get()+
+            ", page_size: "+page_size.get()+", free_list_page: "+free_list_page.get()+
+            ", redo_page: "+redo_page.get()+", checksum: "+checksum.get()+
+            " }";
+        }
+    }
+
+    
+    /**
+     * A list of updates to the page file. The list head points to the most
+     * recent update.
+     */
+//    private final HashMap<Long, Update> updatesMap = new HashMap<Long, Update>();
+    private final LinkedNodeList<Update> updatesList = new LinkedNodeList<Update>();
+    
+    /**
+     * This is the next redo that will get logged.  It is currently being built.
+     */
+    Redo nextRedo;
+    
+    /**
+     * These are stored redos that are waiting for a file sync.  They may or may not survive 
+     * a failure.
+     */
+    private final LinkedList<Redo> unsyncedRedos = new LinkedList<Redo>();
+    
+    /**
+     * These are stored redos that have been file synced.  They should survive a failure.
+     */
+    private final LinkedList<Redo> syncedRedos = new LinkedList<Redo>();
+    
+    /**
+     * These are updates which have been applied but who's temp pages have not yet been 
+     * freed since they are being used by some snapshots.
+     */
+    private final LinkedList<Update> updatesWaitingCleanup = new LinkedList<Update>();
+    
+    private final Header header = new Header();
+
+    private final PageFile pageFile;
+    private final SimpleAllocator allocator;
+    private final MemoryMappedFile file;
+
+    /**
+     * This is the free page list at the base revision.  It does not track allocations in transactions
+     * or committed updates.  Only when the updates are squashed will this list be updated.
+     * 
+     * The main purpose of this list is to initialize the free list on recovery.
+     * 
+     * This does not track the space associated with redo batches and free lists.  On 
+     * recovery that space is discovered and tracked in the allocator.
+     */
+    private Ranges baseRevisionFreePages = new Ranges();
+    
+    public ConcurrentPageFile(PageFile pageFile) {
+        this.pageFile = pageFile;
+        this.file = pageFile.getFile();
+        this.allocator = pageFile.allocator();
+        ByteBuffer slice = file.slice(false, 0, FILE_HEADER_SIZE);
+        this.header.setByteBuffer(slice, slice.position());
+        this.updatesList.addLast(new Update(0, null, null, null));
+    }
+
+    public Transaction tx() {
+        return new ConcurrentTransaction();
+    }
+
+    /**
+     * Used to initialize a new file or to clear out the 
+     * contents of an existing file.
+     */
+    public void reset() {
+        updatesList.clear();
+        updatesList.addLast(new Update(0, null, null, null));
+        unsyncedRedos.clear();
+        updatesWaitingCleanup.clear();
+        allocator.clear(); 
+        baseRevisionFreePages.clear();
+        baseRevisionFreePages.add(0, allocator.getLimit());
+
+        // Initialize the file header..
+        this.header.setByteBufferPosition(0);
+        this.header.file_magic.set(MAGIC);
+        this.header.base_revision.set(0);
+        this.header.free_list_page.set(-1);
+        this.header.page_size.set(pageFile.getPageSize());
+        this.header.redo_page.set(-1);
+        replicateHeader();
+    }
+    
+    /**
+     * Loads an existing file and replays the redo
+     * logs to put it in a consistent state.
+     */
+    public void recover() {
+        
+        unsyncedRedos.clear();
+        updatesWaitingCleanup.clear();
+
+        this.header.setByteBufferPosition(0);
+        long baseRevision = header.base_revision.get();
+        updatesList.clear();
+        updatesList.addLast(new Update(baseRevision, null, null, null));
+
+        // Initialize the free page list.
+        int pageId = header.free_list_page.get();
+        if( pageId >= 0 ) {
+            baseRevisionFreePages = loadObject(pageId);
+            allocator.copy(baseRevisionFreePages);
+            Extent.unfree(pageFile, pageId);
+        } else {
+            allocator.clear(); 
+            baseRevisionFreePages.add(0, allocator.getLimit());
+        }
+        
+        // Load the redo batches.
+        pageId = header.redo_page.get();
+        while( pageId >= 0 ) {
+            Redo redo = loadObject(pageId); 
+            redo.page = pageId;
+            redo.recovered = true;
+            Extent.unfree(pageFile, pageId);
+            
+            pageId=-1;
+            if( baseRevision < redo.head ) {
+                
+                // add first since we are loading redo objects oldest to youngest
+                // but want to put them in the list youngest to oldest.
+                unsyncedRedos.addFirst(redo);
+                
+                if( baseRevision < redo.base ) {
+                    pageId=redo.previous;
+                }
+            }
+        }
+        
+        // Apply all the redos..
+        applyRedos();
+    }
+
+
+    /**
+     * Once this method returns, any previously committed transactions 
+     * are flushed and to the disk, ensuring that they will not be lost
+     * upon failure. 
+     */
+    public void flush() {
+        
+        // Write out the current redo if it has data...
+        Redo redo;
+        synchronized (this) {
+            redo = nextRedo;
+            nextRedo = null;
+        }
+        if( redo == null ) {
+            store(redo);
+        }
+
+        // Find out if there are unsynced redos...
+        redo = null;
+        synchronized (this) {
+            if( !unsyncedRedos.isEmpty() ) {
+                for (Redo r : unsyncedRedos) {
+                    syncedRedos.add(r);
+                    redo = r;
+                }
+            }
+        }
+        
+        // Yep.. we had some.. 
+        if( redo!=null ) {
+            // This is a slow operation..
+            file.sync();
+            // Update the header so that it knows about the redos that are 
+            // guaranteed to survive a failure.
+            header().synced_redo_page.set(redo.page);
+            replicateHeader();
+        }
+    }
+
+    private Header header() {
+        this.header.getByteBuffer().position(0);
+        this.header.setByteBufferPosition(0);
+        Header h = this.header;
+        return h;
+    }
+    
+    public void store(Redo redo) {
+        
+        // Write any outstanding deferred cache updates...
+        if( redo.cache != null ) {
+            for (Entry<Integer, CacheUpdate> entry : redo.cache.entrySet()) {
+                CacheUpdate cu = entry.getValue();
+                List<Integer> allocatedPages = cu.store(pageFile);
+                for (Integer page : allocatedPages) {
+                    // add any allocated pages to the update list so that the free 
+                    // list gets properly adjusted.
+                    redo.updates.put(page, PAGE_ALLOCATED);
+                }
+            }
+        }
+
+        // Link it to the last redo.
+        Redo last = unsyncedRedos.getLast();
+        if( last!=null ) {
+            redo.previous = last.page; 
+        }
+        
+        // Store the redo.
+        redo.page = storeObject(redo);
+        synchronized (this) {
+            unsyncedRedos.add(redo);
+        }
+
+        // Update the header to know about the new redo page.
+        header().redo_page.set(redo.page);
+        replicateHeader();
+    }
+
+    /**
+     *  Frees up space by applying redos and releasing the pages that
+     *  the redo was stored on. 
+     */
+    public void applyRedos() {
+
+        // We can only apply redos which we know are not partially stored on disk
+        // and which hold revisions which are older than the oldest active snapshot.
+        ArrayList<Redo> redoList = new ArrayList<Redo>();
+        synchronized (this) {
+            long snapshotHeadRev = Long.MAX_VALUE;
+            Update cur = updatesList.getHead();
+            while( cur!=null ) {
+                if( cur.snapshot!=null ) {
+                    snapshotHeadRev = cur.head;
+                    break;
+                }
+            }
+            
+            for (Iterator<Redo> i = this.unsyncedRedos.iterator(); i.hasNext();) {
+                Redo redo = i.next();
+                if (redo.base > snapshotHeadRev) {
+                    // we can't apply the rest of the updates, since a snapshot depends on 
+                    // the current base revision.
+                    // the rest of the updates will have incrementing revision numbers too.
+                    break;
+                }
+                redoList.add(redo);
+                i.remove();
+            }
+        }
+        
+        // Perhaps we can't do any work...
+        if( redoList.isEmpty() ) {
+            return;
+        }
+        
+        long baseRevision = header().base_revision.get();
+
+        for (Redo redo : redoList) {
+            // revision numbers should be sequentially increasing.
+            assert baseRevision+1==redo.base;
+            
+            for (Entry<Integer, Integer> entry : redo.updates.entrySet()) {
+                int key = entry.getKey();
+                int value = entry.getValue();
+                switch( value ) {
+                case PAGE_ALLOCATED:
+                    if( redo.recovered ) {
+                        allocator.unfree(key, 1);
+                    }
+                    baseRevisionFreePages.remove(key, 1);
+                    break;
+                case PAGE_FREED:
+                    if( redo.recovered ) {
+                        allocator.free(key, 1);
+                    }
+                    baseRevisionFreePages.add(key, 1);
+                    break;
+                default:
+                    if( redo.recovered ) {
+                        allocator.unfree(key, 1);
+                    }
+                    ByteBuffer slice = pageFile.slice(SliceType.READ, value, 1);
+                    try {
+                        pageFile.write(key, slice);
+                    } finally { 
+                        pageFile.unslice(slice);
+                    }
+                }
+            }
+            baseRevision = redo.base;
+        }
+
+            
+        // force to ensure all data is fully stored before the header 
+        // starts making reference to new stuff
+        file.sync();
+
+        
+        Header h = header();
+        int previousFreeListPage = h.free_list_page.get();
+        h.free_list_page.set(storeObject(baseRevisionFreePages));
+        h.base_revision.set(baseRevision);
+        replicateHeader();
+        
+        // Release the previous free list.
+        if( previousFreeListPage>=0 ) {
+            Extent.free(pageFile, previousFreeListPage);
+        }
+        
+        // Free the space associated with the redo batches
+        if( !redoList.isEmpty() ) {
+            for (Redo redo : redoList) {
+                Extent.free(pageFile, redo.page);
+            }
+        }
+    }
+
+    private int storeObject(Object value) {
+        try {
+            ExtentOutputStream eos = new ExtentOutputStream(pageFile);
+            ObjectOutputStream oos = new ObjectOutputStream(eos);
+            oos.writeObject(value);
+            oos.close();
+            return eos.getPage();
+        } catch (IOException e) {
+            throw new IOPagingException(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T loadObject( int pageId ) {
+        try {
+            ExtentInputStream eis = new ExtentInputStream(pageFile, pageId);
+            ObjectInputStream ois = new ObjectInputStream(eis);
+            return (T) ois.readObject();
+        } catch (IOException e) {
+            throw new IOPagingException(e);
+        } catch (ClassNotFoundException e) {
+            throw new IOPagingException(e);
+        }
+    }
+    
+    private void replicateHeader() {
+        // Calculate the checksum of the header so that we can tell if the
+        // header is corrupted.
+        byte[] data = new byte[this.header.size() - 8];
+        file.read(0, data);
+        CRC32 checksum = new CRC32();
+        checksum.update(data);
+        this.header.checksum.set(checksum.getValue());
+
+        // Copy the header so we can survive a partial update.
+        ByteBuffer header = file.read(0, this.header.size());
+        file.write(FILE_HEADER_SIZE / 2, header);
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Transaction calls back to these methods...
+    // /////////////////////////////////////////////////////////////////
+    synchronized private Snapshot aquireSnapshot() {
+        Snapshot snapshot = updatesList.getTail().snapshot;
+        if (snapshot == null) {
+            ArrayList<Update> updates = updatesList.toArrayListReversed();
+            updates.get(0).snapshot = new Snapshot(updates);
+        }
+        snapshot.references++;
+        return snapshot;
+    }
+    
+    synchronized private void releaseSnapshot(Snapshot snapshot) {
+        if( snapshot!=null ) {
+            synchronized(this) {
+                snapshot.references--;
+                if( snapshot.references==0 ) {
+                    Update update = snapshot.updates.get(0);
+                    update.snapshot=null;
+                    Update next = update.getNext();
+                    if( next !=null ) {
+                        next.mergePrevious();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Attempts to commit a set of page updates.
+     * 
+     * @param previousUpdate
+     * @param pageUpdates
+     * @param cache
+     */
+    private void commit(Update previousUpdate, HashMap<Integer, Integer> pageUpdates, HashMap<Integer, CacheUpdate> cache) {
+        
+        Redo fullRedo=null;
+        synchronized (this) {
+            // Perhaps a concurrent update came in before this one...
+            long rev;
+            if( previousUpdate!=null ) {
+                rev = previousUpdate.head;
+                Update concurrentUpdate = previousUpdate.getNext();
+                while( concurrentUpdate != null ) {
+                    // Yep.. there were concurrent updates.  
+                    // Make sure we don't don't have update conflict.
+                    for (Integer page : pageUpdates.keySet()) {
+                        if( concurrentUpdate.updates.containsKey(page) ) {
+                            throw new OptimisticUpdateException();
+                        }
+                    }
+        
+                    rev = concurrentUpdate.head;
+                    concurrentUpdate = concurrentUpdate.getNext();
+                }
+            } else {
+                rev = updatesList.getTail().head;
+            }
+            rev++;
+
+            if( nextRedo == null ) {
+                nextRedo = new Redo();
+                nextRedo.base = rev;
+            }
+            
+            nextRedo.head = rev;
+            nextRedo.updates.putAll(pageUpdates);
+            nextRedo.cache.putAll(cache);
+            
+            Update value = new Update(rev, pageUpdates, cache, nextRedo);
+            updatesList.addLast(value);
+            value.mergePrevious();
+            
+            if( nextRedo.pageCount() > 10 ) {
+                fullRedo = nextRedo;
+                nextRedo = null;
+            }
+        }
+        if( fullRedo!=null ) {
+            store(fullRedo);
+        }
+    }
+    
+    /**
+     * The quiesce method is used to pause/stop access to the concurrent page file.
+     * access can be restored using the {@link #resume()} method.    
+     * 
+     * @param reads if true, the suspend will also suspend read only transactions. 
+     * @param blocking if true, transactions will block until the {@link #resume()} method 
+     *          is called, otherwise they will receive errors.
+     * @param drain if true, in progress transactions are allowed to complete, otherwise they
+     *        also are suspended. 
+     */
+    public void suspend(boolean reads, boolean blocking, boolean drain) {
+    }
+
+    /**
+     * Resumes a previously suspended page file. 
+     */
+    public void resume() {
+    }
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ConcurrentPageFileFactory extends PageFileFactory {
+
+    private ConcurrentPageFile concurrentPageFile;
+    
+    protected boolean drainOnClose;
+
+    public ConcurrentPageFile getConcurrentPageFile() {
+        return concurrentPageFile;
+    }
+    
+    public ConcurrentPageFileFactory() {
+        super.setHeaderSize(ConcurrentPageFile.HEADER_SIZE);
+    }
+    
+    @Override
+    public void setHeaderSize(int headerSize) {
+        throw new IllegalArgumentException("headerSize property cannot not be manually configured.");
+    }
+
+    public void open() {
+        if( file ==  null ) {
+            throw new IllegalArgumentException("file property not set");
+        }
+        boolean existed = file.isFile();
+        super.open();
+        if (concurrentPageFile == null) {
+            concurrentPageFile = new ConcurrentPageFile(getPageFile());
+            if( existed ) {
+                concurrentPageFile.recover();
+            } else {
+                concurrentPageFile.reset();
+            }
+        }
+    }
+    
+    public void close() {
+        if (concurrentPageFile != null) {
+            concurrentPageFile.suspend(true, false, drainOnClose);
+            concurrentPageFile.flush();
+            concurrentPageFile.applyRedos();
+            concurrentPageFile=null;
+        }
+        super.close();
+    }    
+    
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Extent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Extent.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Extent.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Extent.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.nio.ByteBuffer;
+
+import javolution.io.Struct;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.IOPagingException;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.api.Paged.SliceType;
+
+
+/**
+ * An extent is a sequence of adjacent pages which can be linked
+ * to subsequent extents.
+ * 
+ * Extents allow you to write large streams of data to a Paged object
+ * contiguously to avoid fragmentation.
+ * 
+ * The first page of the extent contains a header which specifies
+ * the size of the extent and the page id of the next extent that
+ * it is linked to.
+ *  
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Extent {
+    
+    public final static byte MAGIC_VALUE = 'x'; 
+    
+    static private class Header extends Struct {
+        /**
+         * A constant prefix that can be used to identify the page type.
+         */
+        public final Signed8 magic = new Signed8();
+        /** 
+         * The number of bytes in the extent, including the header.
+         */
+        public final Signed32 length = new Signed32();
+        /**
+         * The page of the next extent or -1.
+         */
+        public final Signed32 next = new Signed32();
+    }
+
+    private final Extent.Header header = new Header();
+
+    private final Paged paged;
+    private final int page;
+    private ByteBuffer buffer;
+
+    private int bufferStartPosition;
+
+    public Extent(Paged paged, int page) {
+        this.paged = paged;
+        this.page = page;
+    }
+    
+    @Override
+    public String toString() {
+        int position = 0;
+        int limit=0;
+        if( buffer!=null ) {
+            position = buffer.position()-bufferStartPosition;
+            limit = buffer.limit()-bufferStartPosition;
+        }
+        return "{ page: "+page+", position: "+position+", limit: "+limit+" }";
+    }
+    
+    public void readOpen() {
+        buffer = paged.slice(SliceType.READ, page, 1);
+        header.setByteBuffer(buffer, buffer.position());
+        if( header.magic.get() != MAGIC_VALUE ) {
+            throw new IOPagingException("Invalid extent read request.  The requested page was not an extent: "+page);
+        }
+        
+        int length = header.length.get();
+        int pages = paged.pages(length);
+        if( pages > 1 ) {
+            paged.unslice(buffer);
+            buffer = paged.slice(SliceType.READ, page, pages);
+            header.setByteBuffer(buffer, buffer.position());
+        }
+        
+        bufferStartPosition = buffer.position();
+        buffer.position(bufferStartPosition+header.size());
+        buffer.limit(bufferStartPosition+length);
+    }
+
+    public void writeOpen(short size) {
+        buffer = paged.slice(SliceType.WRITE, page, size);
+        header.setByteBuffer(buffer, buffer.position());
+        bufferStartPosition = buffer.position();
+        buffer.position(bufferStartPosition+header.size());
+    }
+
+    public void writeCloseLinked(int next) {
+        int length = buffer.position()-bufferStartPosition;
+        header.magic.set(MAGIC_VALUE);
+        header.next.set(next);
+        header.length.set(length);
+        paged.unslice(buffer);
+    }
+
+    public void writeCloseEOF() {
+        
+        int length = buffer.position()-bufferStartPosition;
+        header.magic.set(MAGIC_VALUE);
+        header.next.set(-1);
+        header.length.set(length);
+        paged.unslice(buffer);
+
+        int originalPages = paged.pages(buffer.limit()-bufferStartPosition);
+        int usedPages = paged.pages(length);
+        int remainingPages = originalPages-usedPages;
+        
+        // Release un-used pages.
+        if (remainingPages>0) {
+            paged.allocator().free(page+usedPages, remainingPages);
+        }
+        paged.unslice(buffer);
+    }
+    
+    public void readClose() {
+        paged.unslice(buffer);
+    }
+
+    boolean atEnd() {
+        return buffer.remaining() == 0;
+    }
+
+    /**
+     * @return true if the write fit into the extent.
+     */
+    public boolean write(byte b) {
+        if (atEnd()) {
+            return false;
+        }
+        buffer.put(b);
+        return true;
+    }
+
+    public boolean write(Buffer source) {
+        while (source.length > 0) {
+            if (atEnd()) {
+                return false;
+            }
+            int count = Math.min(buffer.remaining(), source.length);
+            buffer.put(source.data, source.offset, count);
+            source.offset += count;
+            source.length -= count;
+        }
+        return true;
+    }
+
+    public int read() {
+        return buffer.get() & 0xFF;
+    }
+
+    public void read(Buffer target) {
+        while (target.length > 0 && !atEnd()) {
+            int count = Math.min(buffer.remaining(), target.length);
+            buffer.get(target.data, target.offset, count);
+            target.offset += count;
+            target.length -= count;
+        }
+    }
+
+    public int getNext() {
+        return header.next.get();
+    }
+
+    
+    /**
+     * Frees the linked extents at the provided page id.
+     * 
+     * @param paged
+     * @param page
+     */
+    public static void freeLinked(Paged paged, int page) {
+        ByteBuffer buffer = paged.slice(SliceType.READ, page, 1);
+        Header header = new Header();
+        header.setByteBuffer(buffer, buffer.position());
+        if( header.magic.get() != MAGIC_VALUE ) {
+            throw new IOPagingException("Invalid extent read request.  The requested page was not an extent: "+page);
+        }
+        int next = header.next.get();
+        free(paged, next);
+    }
+    
+    /**
+     * Frees the extent at the provided page id.
+     * 
+     * @param paged
+     * @param page
+     */
+    public static void free(Paged paged, int page) {
+        while( page>=0 ) {
+            ByteBuffer buffer = paged.slice(SliceType.READ, page, 1);
+            try {
+                Header header = new Header();
+                header.setByteBuffer(buffer, buffer.position());
+                if( header.magic.get() != MAGIC_VALUE ) {
+                    throw new IOPagingException("Invalid extent read request.  The requested page was not an extent: "+page);
+                }
+
+                int next = header.next.get();
+                paged.allocator().free(page, paged.pages(header.length.get()));
+                page=next;
+            } finally {
+                paged.unslice(buffer);
+            }
+        }
+    }
+
+    /**
+     * Un-frees the extent at the provided page id.  Basically undoes
+     * a previous {@link #free(PageFile, int)} operation.
+     * 
+     * @param paged
+     * @param page
+     */
+    public static void unfree(Paged paged, int page) {
+        while( page>=0 ) {
+            ByteBuffer buffer = paged.slice(SliceType.READ, page, 1);
+            try {
+                Header header = new Header();
+                header.setByteBuffer(buffer, buffer.position());
+                if( header.magic.get() != MAGIC_VALUE ) {
+                    throw new IOPagingException("Invalid extent read request.  The requested page was not an extent: "+page);
+                }
+                
+                int next = header.next.get();
+                paged.allocator().unfree(page, paged.pages(header.length.get()));
+                page=next;
+            } finally {
+                paged.unslice(buffer);
+            }
+        }
+    }
+
+    public int getPage() {
+        return page;
+    }
+    
+    public int getLength() {
+        return header.length.get();
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentInputStream.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentInputStream.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentInputStream.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.internal.util.Ranges;
+
+
+/**
+ * An InputStream which reads it's data from an 
+ * extent previously written with the {@link ExtentOutputStream}.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ExtentInputStream extends InputStream {
+
+    private final Paged paged;
+    private Extent current;
+    private final int page;
+    private Ranges pages = new Ranges();
+
+    public ExtentInputStream(Paged paged, int page) {
+        this.paged = paged;
+        this.page = page;
+        current = new Extent(paged, page);
+        current.readOpen();
+        pages.add(current.getPage(), paged.pages(current.getLength()));
+    }
+    
+    @Override
+    public String toString() {
+        return "{ page: "+page+", current: "+current+" }";
+    }
+    
+
+    @Override
+    public int read() throws IOException {
+        if( current == null ) {
+            return -1;
+        }
+        if (current.atEnd()) {
+            int next = current.getNext();
+            if (next == -1) {
+                current.readClose();
+                current=null;
+                return -1;
+            }
+            current.readClose();
+            current = new Extent(paged, next);
+            current.readOpen();
+            pages.add(current.getPage(), paged.pages(current.getLength()));
+        }
+        return current.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        int rc=len;
+        Buffer buffer = new Buffer(b, off, len);
+        if( current == null ) {
+            throw new EOFException();
+        }
+        while (buffer.length > 0) {
+            if (current.atEnd()) {
+                int next = current.getNext();
+                if (next == -1) {
+                    current.readClose();
+                    current=null;
+                    break;
+                }
+                current.readClose();
+                current = new Extent(paged, next);
+                current.readOpen();
+                pages.add(current.getPage(), paged.pages(current.getLength()));
+            }
+            current.read(buffer);
+        }
+        rc-=buffer.length;
+        if ( rc==0 ) {
+            throw new EOFException();
+        }
+        return rc;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if( current!=null ) {
+            current.readClose();
+            current=null;
+        }
+    }
+    
+    public Ranges getPages() {
+        return pages;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentOutputStream.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentOutputStream.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentOutputStream.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.internal.util.Ranges;
+
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ExtentOutputStream extends OutputStream {
+
+    private static final short DEFAULT_EXTENT_SIZE = 128; // 128 * 4k = .5MB
+    private final Paged paged;
+    private final short extentSize;
+    private final int page;
+    private Extent current;
+    private Ranges pages = new Ranges();
+    
+    
+    public ExtentOutputStream(Paged paged) {
+        this(paged, DEFAULT_EXTENT_SIZE);
+    }
+    
+    public ExtentOutputStream(Paged paged, short extentSize) {
+        this(paged, paged.allocator().alloc(extentSize), extentSize, extentSize);
+    }
+    
+    public ExtentOutputStream(Paged paged, int page, short extentSize, short nextExtentSize ) {
+        this.paged = paged;
+        this.extentSize = nextExtentSize;
+        this.page = page;
+        current = new Extent(paged, page);
+        current.writeOpen(extentSize);
+    }
+
+    @Override
+    public String toString() {
+        return "{ page: "+page+", extent size: "+extentSize+", current: "+current+" }";
+    }
+    
+    public void write(int b) throws IOException {
+        if (!current.write((byte) b)) {
+            int nextPageId = this.paged.allocator().alloc(extentSize);
+            current.writeCloseLinked(nextPageId);
+            pages.add(current.getPage(), paged.pages(current.getLength()));
+            current = new Extent(paged, nextPageId);
+            current.writeOpen(extentSize);
+            current.write((byte) b);
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) {
+        Buffer buffer = new Buffer(b, off, len);
+        while (buffer.length > 0) {
+            if (!current.write(buffer)) {
+                int nextPageId = this.paged.allocator().alloc(extentSize);
+                current.writeCloseLinked(nextPageId);
+                pages.add(current.getPage(), paged.pages(current.getLength()));
+                current = new Extent(paged, nextPageId);
+                current.writeOpen(extentSize);
+            }
+        }
+    }
+
+    public short getExtentSize() {
+        return extentSize;
+    }
+
+    public int getPage() {
+        return page;
+    }
+
+    @Override
+    public void close(){
+        current.writeCloseEOF();
+        pages.add(current.getPage(), paged.pages(current.getLength()));
+    }
+    
+    public Ranges getPages() {
+        return pages;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.internal.io.MemoryMappedFile;
+
+
+/**
+ * Provides Paged access to a MemoryMappedFile.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PageFile implements Paged {
+    
+    private final SimpleAllocator allocator;
+    private final short pageSize;
+    private final int headerSize;
+    private final MemoryMappedFile file;
+    
+    
+    @Override
+    public String toString() {
+        return "{ header size: "+headerSize+", page size: "+pageSize+", allocator: "+allocator+" }";
+    }
+
+    public PageFile(MemoryMappedFile file, short pageSize, int headerSize, int maxPages) {
+        this.file = file;
+        this.allocator = new SimpleAllocator(maxPages);
+        this.pageSize = pageSize;
+        this.headerSize = headerSize;
+    }
+		
+    public SimpleAllocator allocator() {
+        return allocator;
+    }
+
+    public int getHeaderSize() {
+        return headerSize;
+    }
+
+    public MemoryMappedFile getFile() {
+        return file;
+    }
+
+	public void read(int pageId, Buffer buffer) {
+		file.read(offset(pageId), buffer);
+	}
+
+	public void write(int pageId, Buffer buffer) {
+		file.write(offset(pageId), buffer);
+	}
+	
+    public void write(int pageId, ByteBuffer buffer) {
+        file.write(offset(pageId), buffer);
+    }
+
+	public ByteBuffer slice(SliceType type, int pageId, int size) {
+        assert size > 0;
+        return file.slice(type==SliceType.READ, offset(pageId), pageSize*size);
+    }
+
+    public void unslice(ByteBuffer buffer) {
+        file.unslice(buffer);
+    }
+    
+    
+    public long offset(long pageId) {
+        assert pageId >= 0;
+        return headerSize+(pageId*pageSize);
+    }
+	
+    public int getPageSize() {
+        return pageSize;
+    }
+    
+    public int pages(int length) {
+        assert length >= 0;
+        return ((length-1)/pageSize)+1;
+    }
+
+    public void flush() {
+        file.sync();
+    }
+
+    public <T> T get(EncoderDecoder<T> encoderDecoder, int page) {
+        return encoderDecoder.load(this, page);
+    }
+
+    public <T> void put(EncoderDecoder<T> encoderDecoder, int page, T value) {
+        encoderDecoder.store(this, page, value);
+    }
+
+    public <T> void remove(EncoderDecoder<T> encoderDecoder, int page) {
+        encoderDecoder.remove(this, page);
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFileFactory.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFileFactory.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFileFactory.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.IOException;
+
+import org.apache.hawtdb.api.IOPagingException;
+import org.apache.hawtdb.internal.io.MemoryMappedFileFactory;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PageFileFactory extends MemoryMappedFileFactory {
+
+    private PageFile pageFile;
+
+    protected int headerSize = 0;
+    protected short pageSize = 1024 * 4;
+    protected int maxPages = Integer.MAX_VALUE;
+
+    public PageFile getPageFile() {
+        return pageFile;
+    }
+
+    public void open() {
+        try {
+            super.open();
+        } catch (IOException e) {
+            throw new IOPagingException(e);
+        }
+        if (pageFile == null) {
+            if( pageSize <= 0 ) {
+                throw new IllegalArgumentException("pageSize property must be greater than 0");
+            }
+            if( maxPages <= 0 ) {
+                throw new IllegalArgumentException("maxPages property must be greater than 0");
+            }
+            if( headerSize < 0 ) {
+                throw new IllegalArgumentException("headerSize property cannot be negative.");
+            }
+            try {
+                pageFile = new PageFile(getMemoryMappedFile(), pageSize, headerSize, maxPages);
+            } catch (IOException e) {
+                throw new IOPagingException(e);
+            }
+        }
+    } 
+    
+    public void close() {
+        if (pageFile != null) {
+            pageFile = null;
+        }        
+        super.close();
+    }
+
+    public int getHeaderSize() {
+        return headerSize;
+    }
+    public void setHeaderSize(int headerSize) {
+        this.headerSize = headerSize;
+    }
+
+    public short getPageSize() {
+        return pageSize;
+    }
+    public void setPageSize(short pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    public int getMaxPages() {
+        return maxPages;
+    }
+    public void setMaxPages(int maxPages) {
+        this.maxPages = maxPages;
+    }
+    
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.util.Iterator;
+
+import org.apache.hawtdb.api.Allocator;
+import org.apache.hawtdb.api.OutOfSpaceException;
+import org.apache.hawtdb.internal.util.Ranges;
+import org.apache.hawtdb.internal.util.Ranges.Range;
+
+
+/**
+ * This class is used to provides allocation management of pages.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SimpleAllocator implements Allocator {
+
+    private final Ranges freeRanges = new Ranges();
+    private int limit;
+
+    public SimpleAllocator(int max) {
+        this.limit = max;
+        freeRanges.add(0, max);
+    }
+
+    /**
+     * @see org.apache.hawtdb.api.hiramchirino.hawtdb.Allocator#allocate(int)
+     */
+    synchronized public int alloc(int size) throws OutOfSpaceException {
+        for (Iterator<Range> i = freeRanges.iterator(); i.hasNext();) {
+            Range r = (Range) i.next();
+            if( r.size() >= size ) {
+                int rc = r.start;
+                freeRanges.remove(rc, size);
+                return rc;
+            }
+        }
+        throw new OutOfSpaceException();
+    }
+
+    
+    /**
+     * @see org.apache.hawtdb.api.hiramchirino.hawtdb.Allocator#remove(int, int)
+     */
+    synchronized public void free(int pageId, int count) {
+        freeRanges.add(pageId, count);
+    }
+
+    /**
+     * @see org.apache.hawtdb.api.hiramchirino.hawtdb.Allocator#reallocate(int, int)
+     */
+    synchronized public void unfree(int pageId, int count) {
+        freeRanges.remove(pageId, count);
+    }
+
+    synchronized public void clear() throws UnsupportedOperationException {
+        freeRanges.clear();
+        freeRanges.add(0, limit);
+    }
+
+    synchronized public void copy(Ranges freePages) throws UnsupportedOperationException {
+        freeRanges.copy(freePages);
+    }
+    
+    public int getLimit() {
+        return limit;
+    }
+    
+    public boolean isAllocated(int page) {
+        return !freeRanges.contains(page);
+    }
+
+    public Ranges getFreeRanges() {
+        return freeRanges;
+    }
+    
+    @Override
+    public String toString() {
+        return "{ free pages: "+freeRanges.toString()+" }";
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html Thu Oct 15 17:04:11 2009
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	Implementations paged IO. 
+</p>
+
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/Ranges.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/Ranges.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/Ranges.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/Ranges.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.util;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Map.Entry;
+
+import org.apache.activemq.util.TreeMap;
+import org.apache.activemq.util.TreeMap.TreeEntry;
+
+/**
+ * Tracks numeric ranges.  Handy for keeping track of things like allocation or free lists.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final public class Ranges implements Serializable, Iterable<Ranges.Range> {
+    private static final long serialVersionUID = 8340484139329633582L;
+
+    final public static class Range implements Serializable {
+        private static final long serialVersionUID = -4904483630105365841L;
+        
+        public int start;
+        public int end;
+
+        public Range(int start, int end) {
+            this.start = start;
+            this.end = end;
+        }
+
+        public int size() {
+            return end - start;
+        }
+                
+        @Override
+        public String toString() {
+            if( start == end-1 ) {
+                return Integer.toString(start);
+            }
+            return start+"-"+(end-1);
+        }
+        
+        @Override
+        public boolean equals(Object obj) {
+            if( obj == this ) {
+                return true;
+            }
+            if( obj == null || obj.getClass()!=Range.class ) {
+                return false;
+            }
+            Range r = (Range)obj;
+            return start == r.start && end==r.end; 
+        }
+        
+        @Override
+        public int hashCode() {
+            return start*77+end;
+        }
+
+        public boolean contains(int value) {
+            return start <= value && value < end;
+        }
+    }
+
+    private final TreeMap<Integer, Range> ranges = new TreeMap<Integer, Range>();
+
+    public void add(int start) {
+        add(start, 1);
+    }
+    
+    public void add(int start, int length) {
+        int end = start+length;
+        
+        // look for entries starting from the end of the add range.
+        TreeEntry<Integer, Range> entry = ranges.floorEntry(end);
+        if( entry!=null ) {
+            while( entry!=null) {
+                Range range = entry.getValue();
+                TreeEntry<Integer, Range> curr = entry;
+                entry = entry.previous();
+                
+                // If tail of the range is not in the add range.
+                if( range.end < start ) {
+                    // we are done..
+                    break;
+                }
+                
+                // if the end of the range is not in the add range.
+                if( end < range.end  ) {
+                    // extend the length out..
+                    end = range.end;
+                }
+
+                if( start < range.start ) {
+                    // if the front of the range is in the add range.
+                    // just remove it..
+                    ranges.removeEntry(curr);
+                } else {
+                    // The front is not in the add range...
+                    // Then resize.. and we are done
+                    range.end = end;
+                    return;
+                }
+            }
+        }
+        
+        // put the new range in.
+        ranges.put(start, range(start, end));
+    }    
+    
+    public void remove(int start) {
+        remove(start, 1);
+    }
+    
+    public void remove(int start, int length) {
+        int end = start+length;
+        
+        // look for entries starting from the end of the remove range.
+        TreeEntry<Integer, Range> entry = ranges.lowerEntry(end);
+        while( entry!=null) {
+            Range range = entry.getValue();
+            TreeEntry<Integer, Range> curr = entry;
+            entry = entry.previous();
+            
+            // If tail of the range is not in the remove range.
+            if( range.end <= start ) {
+                // we are done..
+                break;
+            }
+            
+            // if the end if the range is not in the remove range.
+            if( end < range.end  ) {
+                // Then we need to add back the tail part.
+                ranges.put(end, range(end, range.end));
+            }
+
+            if( start <= range.start ) {
+                // if the front of the range is in the remove range.
+                // just remove it..
+                ranges.removeEntry(curr);
+                
+            } else {
+                // The front is not in the remove range...
+                // Then resize.. and we are done
+                range.end = start;
+                break;
+            }
+        }
+    }
+
+    public boolean contains(int value) {
+        TreeEntry<Integer, Range> entry = ranges.floorEntry(value);
+        if( entry == null ) {
+            return false;
+        }
+        return entry.getValue().contains(value);
+    }
+
+    
+    public void clear() {
+        ranges.clear();
+    }
+
+    public void copy(Ranges source) {
+        ranges.clear();
+        for (Entry<Integer, Range> entry : source.ranges.entrySet()) {
+            Range value = entry.getValue();
+            ranges.put(entry.getKey(), range(value.start, value.end));
+        }
+    }
+    
+    public int size() {
+        int rc=0;
+        TreeEntry<Integer, Range> entry = ranges.firstEntry();
+        while(entry!=null) {
+            rc += entry.getValue().size();
+            entry = entry.next();
+        }
+        return rc;
+    }
+
+    
+    static public Range range(int start, int end) {
+        return new Range(start, end);
+    }
+    
+    public ArrayList<Range> toArrayList() {
+        return new ArrayList<Range>(ranges.values());
+    }
+    
+    @Override
+    public String toString() {
+        StringBuilder sb  = new StringBuilder(20+(10*ranges.size()));
+        sb.append("[ ");
+        
+        boolean first=true;
+        for (Range r : this) {
+            if( !first ) {
+                sb.append(", ");
+            }
+            first=false;
+            sb.append(r);
+        }
+        
+        sb.append(" ]");
+        return sb.toString();
+    }
+    
+    public Iterator<Range> iterator() {
+        return ranges.values().iterator();
+    }
+
+    public Iterator<Range> iteratorNotInRange(final Range mask) {
+        
+        return new Iterator<Range>() {
+            
+            Iterator<Range> iter = ranges.values().iterator();
+            Range last = new Range(mask.start, mask.start);
+            Range next = null;
+
+            public boolean hasNext() {
+                if( next==null ) {
+                    while( last.end < mask.end && iter.hasNext() ) {
+                        Range r = iter.next();
+                        
+                        // skip over the initial ranges not within the mask
+                        if( r.end < last.end ) {
+                            continue;
+                        }
+                        
+                        // Handle the case where a range straddles the mask start position 
+                        if( r.start < last.end ) {
+                            // extend the last range out so that the next one starts at
+                            // the end of the range.
+                            last = new Range(last.start, r.end);
+                            continue;
+                        }
+
+                        if( r.start < mask.end ) {
+                            next = new Range(last.end, r.start);
+                        } else {
+                            next = new Range(last.end, mask.end);
+                        }
+                        break;
+                    }
+                }
+                return next!=null;
+            }
+
+            public Range next() {
+                if( !hasNext() ) {
+                    throw new NoSuchElementException();
+                }
+                last = next;
+                next=null;
+                return last;
+            }
+
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+    
+    static private final class ValueIterator implements Iterator<Integer>, Iterable<Integer> {
+        Iterator<Range> ranges;
+        Range range;
+        Integer next;
+        int last;
+
+        private ValueIterator(Iterator<Range> t) {
+            ranges = t;
+        }
+
+        public boolean hasNext() {
+            if( next==null ) {
+                if( range == null ) {
+                    if( ranges.hasNext() ) {
+                        range = ranges.next();
+                        next = range.start;
+                    } else {
+                        return false;
+                    }
+                } else {
+                    next = last+1;
+                }
+                if( next == (range.end-1) ) {
+                    range=null;
+                }
+            }
+            return next!=null;
+        }
+
+        public Integer next() {
+            if( !hasNext() ) {
+                throw new NoSuchElementException();
+            }
+            last = next;
+            next=null;
+            return last;
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+        public Iterator<Integer> iterator() {
+            return this;
+        }
+    }    
+    
+    public List<Integer> values() {
+        ArrayList<Integer> rc = new ArrayList<Integer>();
+        for (Integer i : new ValueIterator(iterator())) {
+            rc.add(i);
+        }
+        return rc;
+    }
+    
+    public Iterator<Integer> valueIterator() {
+       return new ValueIterator(iterator()); 
+    }
+
+    public Iterator<Integer> valuesIteratorNotInRange(Range r) {
+        return new ValueIterator(iteratorNotInRange(r)); 
+    }
+
+    public boolean isEmpty() {
+        return ranges.isEmpty();
+    }
+    
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html Thu Oct 15 17:04:11 2009
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	Collections and utility classes 
+</p>
+
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/eclipse-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/eclipse-resources/log4j.properties?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/eclipse-resources/log4j.properties (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/eclipse-resources/log4j.properties Thu Oct 15 17:04:11 2009
@@ -0,0 +1,36 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=INFO, out
+
+
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.fout=org.apache.log4j.FileAppender
+log4j.appender.fout.layout=org.apache.log4j.PatternLayout
+log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.fout.file=target/amq-testlog.log
+log4j.appender.fout.append=true
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Action.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Action.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Action.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Action.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Action<T extends Actor> {
+	void init(T actor);
+	void run(T actor) throws Exception;
+	String getName();
+}
\ No newline at end of file