You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/15 19:04:15 UTC
svn commit: r825564 [4/5] - in /activemq/sandbox/activemq-flow: ./
activemq-util/src/main/java/org/apache/activemq/util/buffer/ hawtdb/
hawtdb/src/ hawtdb/src/main/ hawtdb/src/main/java/
hawtdb/src/main/java/org/ hawtdb/src/main/java/org/apache/ hawtdb...
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,1047 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.CRC32;
+
+import javolution.io.Struct;
+
+import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.LinkedNodeList;
+import org.apache.hawtdb.api.Allocator;
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.IOPagingException;
+import org.apache.hawtdb.api.OptimisticUpdateException;
+import org.apache.hawtdb.api.OutOfSpaceException;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.api.PagingException;
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.api.Paged.SliceType;
+import org.apache.hawtdb.internal.io.MemoryMappedFile;
+import org.apache.hawtdb.internal.util.Ranges;
+
+
+/**
+ * Provides concurrent page file access via Multiversion concurrency control
+ * (MVCC).
+ *
+ * Once a transaction begins working against the data, it acquires a snapshot of
+ * all the data in the page file. This snapshot is used to provides the
+ * transaction consistent view of the data in spite of it being concurrently
+ * modified by other transactions.
+ *
+ * When a transaction does a page update, the update is stored in a temporary
+ * page location. Subsequent reads of the original page will result in page read
+ * of the temporary page. If the transaction rolls back, the temporary pages are
+ * freed. If the transaction commits, the page updates are assigned the next
+ * snapshot version number and the update gets queued so that it can be applied
+ * atomically at a later time.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public final class ConcurrentPageFile {
+
+ private static final String MAGIC = "HawtDB:MVCC Page File:1.0\n";
+ private static final int FILE_HEADER_SIZE = 1024 * 4;
+
+ public static final int PAGE_ALLOCATED = -1;
+ public static final int PAGE_FREED = -2;
+// public static final int PAGE_CACHED_WRITE = -3;
+ public static final int HEADER_SIZE = 1024*4;
+
+ static class CacheUpdate {
+ private final int page;
+ private Object value;
+ private EncoderDecoder<?> marshaller;
+
+ public CacheUpdate(int page, Object value, EncoderDecoder<?> marshaller) {
+ this.page = page;
+ this.value = value;
+ this.marshaller = marshaller;
+ }
+
+ public void reset(Object value, EncoderDecoder<?> marshaller) {
+ this.value = value;
+ this.marshaller = marshaller;
+ }
+
+ @SuppressWarnings("unchecked")
+ <T> T value() {
+ return (T) value;
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<Integer> store(Paged paged) {
+ return ((EncoderDecoder)marshaller).store(paged, page, value);
+ }
+ }
+
+ /**
+ * Transaction objects are NOT thread safe. Users of this object should
+ * guard it from concurrent access.
+ *
+ * @author chirino
+ */
+ private final class ConcurrentTransaction implements Transaction {
+ private HashMap<Integer, CacheUpdate> cache;
+ private HashMap<Integer, Integer> updates;
+ private Snapshot snapshot;
+
+ private final Allocator txallocator = new Allocator() {
+
+ public void free(int pageId, int count) {
+ // TODO: this is not a very efficient way to handle allocation ranges.
+ int end = pageId+count;
+ for (int key = pageId; key < end; key++) {
+ Integer previous = getUpdates().put(key, PAGE_FREED);
+
+ // If it was an allocation that was done in this
+ // tx, then we can directly release it.
+ assert previous!=null;
+ if( previous == PAGE_ALLOCATED) {
+ getUpdates().remove(key);
+ allocator.free(key, 1);
+ }
+ }
+ }
+
+ public int alloc(int count) throws OutOfSpaceException {
+ int pageId = allocator.alloc(count);
+ // TODO: this is not a very efficient way to handle allocation ranges.
+ int end = pageId+count;
+ for (int key = pageId; key < end; key++) {
+ getUpdates().put(key, PAGE_ALLOCATED);
+ }
+ return pageId;
+ }
+
+ public void unfree(int pageId, int count) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void clear() throws UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
+
+ public int getLimit() {
+ return allocator.getLimit();
+ }
+
+ public boolean isAllocated(int page) {
+ return allocator.isAllocated(page);
+ }
+
+ };
+
+ public <T> T get(EncoderDecoder<T> marshaller, int page) {
+ // Perhaps the page was updated in the current transaction...
+ CacheUpdate rc = cache == null ? null : cache.get(page);
+ if( rc != null ) {
+ return rc.<T>value();
+ }
+
+ // No? Then ask the snapshot to load the object.
+ return snapshot().cacheLoad(marshaller, page);
+ }
+
+ public <T> void put(EncoderDecoder<T> marshaller, int page, T value) {
+ Integer update = getUpdates().get(page);
+ if (update == null) {
+ // This is the first time this transaction updates the page...
+ snapshot();
+ update = allocator.alloc(1);
+ getUpdates().put(page, update);
+ getCacheUpdates().put(page, new CacheUpdate(update, value, marshaller));
+ } else {
+ // We have updated it before...
+ switch (update) {
+ case PAGE_FREED:
+ throw new PagingException("You should never try to write a page that has been freed.");
+ case PAGE_ALLOCATED:
+ getCacheUpdates().put(page, new CacheUpdate(page, value, marshaller));
+ break;
+ default:
+ CacheUpdate cu = getCacheUpdates().get(page);
+ if( cu == null ) {
+ throw new PagingException("You should never try to store mix using the cached objects with normal page updates.");
+ }
+ cu.reset(value, marshaller);
+ }
+ }
+ }
+
+ public <T> void remove(EncoderDecoder<T> marshaller, int page) {
+ marshaller.remove(ConcurrentTransaction.this, page);
+ }
+
+ public Allocator allocator() {
+ return txallocator;
+ }
+
+ public void read(int pageId, Buffer buffer) throws IOPagingException {
+
+ Integer updatedPageId = updates == null ? null : updates.get(pageId);
+ if (updatedPageId != null) {
+ switch (updatedPageId) {
+ case PAGE_ALLOCATED:
+ case PAGE_FREED:
+ // TODO: Perhaps use a RuntimeException subclass.
+ throw new PagingException("You should never try to read a page that has been allocated or freed.");
+ default:
+ // read back in the updated we had done.
+ pageFile.read(updatedPageId, buffer);
+ }
+ } else {
+ // Get the data from the snapshot.
+ snapshot().read(pageId, buffer);
+ }
+ }
+
+ public ByteBuffer slice(SliceType type, int page, int count) throws IOPagingException {
+ //TODO: need to improve the design of ranged ops..
+ if( type==SliceType.READ ) {
+ Integer updatedPageId = updates == null ? null : updates.get(page);
+ if (updatedPageId != null) {
+ switch (updatedPageId) {
+ case PAGE_ALLOCATED:
+ case PAGE_FREED:
+ throw new PagingException("You should never try to read a page that has been allocated or freed.");
+ }
+ return pageFile.slice(type, updatedPageId, count);
+ } else {
+ // Get the data from the snapshot.
+ return snapshot().slice(page, count);
+ }
+
+ } else {
+ Integer update = getUpdates().get(page);
+ if (update == null) {
+ update = allocator.alloc(count);
+
+ if (type==SliceType.READ_WRITE) {
+ ByteBuffer slice = snapshot().slice(page, count);
+ try {
+ pageFile.write(update, slice);
+ } finally {
+ pageFile.unslice(slice);
+ }
+ }
+
+ int end = page+count;
+ for (int i = page; i < end; i++) {
+ getUpdates().put(i, PAGE_ALLOCATED);
+ }
+ getUpdates().put(page, update);
+
+ return pageFile.slice(type, update, count);
+ } else {
+ switch (update) {
+ case PAGE_FREED:
+ throw new PagingException("You should never try to write a page that has been freed.");
+ case PAGE_ALLOCATED:
+ break;
+ default:
+ page = update;
+ }
+ }
+ return pageFile.slice(type, page, count);
+
+ }
+
+ }
+
+ public void unslice(ByteBuffer buffer) {
+ pageFile.unslice(buffer);
+ }
+
+ public void write(int page, Buffer buffer) throws IOPagingException {
+ Integer update = getUpdates().get(page);
+ if (update == null) {
+ // We are updating an existing page in the snapshot...
+ snapshot();
+ update = allocator.alloc(1);
+ getUpdates().put(page, update);
+ page = update;
+ } else {
+ switch (update) {
+ case PAGE_FREED:
+ throw new PagingException("You should never try to write a page that has been freed.");
+ case PAGE_ALLOCATED:
+ break;
+ default:
+ page = update;
+ }
+ }
+ pageFile.write(page, buffer);
+ }
+
+
+ public void commit() throws IOPagingException {
+ boolean failed = true;
+ try {
+ if (updates!=null) {
+ Update previousUpdate = snapshot==null ? null : snapshot.updates.get(0);
+ ConcurrentPageFile.this.commit(previousUpdate, updates, cache);
+ }
+ failed = false;
+ } finally {
+ // Rollback if the commit fails.
+ if (failed) {
+ freeAllocatedPages();
+ }
+ ConcurrentPageFile.this.releaseSnapshot(snapshot);
+ updates = null;
+ cache = null;
+ snapshot = null;
+ }
+ }
+
+ public void rollback() throws IOPagingException {
+ try {
+ if (updates!=null) {
+ freeAllocatedPages();
+ }
+ } finally {
+ ConcurrentPageFile.this.releaseSnapshot(snapshot);
+ updates = null;
+ cache = null;
+ snapshot = null;
+ }
+ }
+
+ private void freeAllocatedPages() {
+ for (Entry<Integer, Integer> entry : updates.entrySet()) {
+ switch (entry.getValue()) {
+ case PAGE_FREED:
+ // Don't need to do anything..
+ break;
+ case PAGE_ALLOCATED:
+ default:
+ // We need to free the page that was allocated for the
+ // update..
+ allocator.free(entry.getKey(), 1);
+ }
+ }
+ }
+
+ public Snapshot snapshot() {
+ if (snapshot == null) {
+ snapshot = aquireSnapshot();
+ }
+ return snapshot;
+ }
+
+ public boolean isReadOnly() {
+ return updates == null;
+ }
+
+ public HashMap<Integer, CacheUpdate> getCacheUpdates() {
+ if( cache==null ) {
+ cache = new HashMap<Integer, CacheUpdate>();
+ }
+ return cache;
+ }
+
+ private HashMap<Integer, Integer> getUpdates() {
+ if (updates == null) {
+ updates = new HashMap<Integer, Integer>();
+ }
+ return updates;
+ }
+
+ public int getPageSize() {
+ return pageFile.getPageSize();
+ }
+
+ public String toString() {
+ int updatesSize = updates==null ? 0 : updates.size();
+ return "{ snapshot: "+this.snapshot+", updates: "+updatesSize+" }";
+ }
+
+ public int pages(int length) {
+ return pageFile.pages(length);
+ }
+
+ public void flush() {
+ ConcurrentPageFile.this.flush();
+ }
+
+ }
+
+ static class Update extends LinkedNode<Update> implements Serializable {
+ private static final long serialVersionUID = 9160865012544031094L;
+
+ transient final AtomicBoolean applied = new AtomicBoolean();
+ private final Redo redo;
+
+ long base;
+ long head;
+ Snapshot snapshot;
+
+ HashMap<Integer, Integer> updates;
+ transient HashMap<Integer, CacheUpdate> cache;
+
+ public Update(long version, HashMap<Integer, Integer> updates, HashMap<Integer, CacheUpdate> cache, Redo redo) {
+ this.redo = redo;
+ this.head = this.base = version;
+ this.updates = updates;
+ this.cache = new HashMap<Integer, CacheUpdate>();
+ }
+
+ /**
+ * Merges previous updates that can be merged with this update.
+ */
+ public void mergePrevious() {
+ prev = getPrevious();
+ while( prev!=null && prev.snapshot==null && prev.redo==redo ) {
+
+ assert prev.head+1 == this.base;
+ this.base = prev.base;
+
+ if( prev.updates!=null ) {
+ if(this.updates!=null) {
+ prev.updates.putAll(this.updates);
+ }
+ this.updates = prev.updates;
+ }
+
+ if( prev.cache!=null ) {
+ if( this.cache!=null ) {
+ prev.cache.putAll(this.cache);
+ }
+ this.cache = prev.cache;
+ }
+ prev.unlink();
+ }
+ }
+
+
+
+ public String toString() {
+ int updateSize = updates==null ? 0 : updates.size();
+ int cacheSize = cache==null ? 0 : cache.size();
+ return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+", cache: "+cacheSize+", applied: "+applied+" }";
+ }
+
+ }
+
+ /**
+ * @author chirino
+ */
+ private class Snapshot {
+
+ private final ArrayList<Update> updates;
+ private int references;
+
+ public Snapshot(ArrayList<Update> updates) {
+ this.updates = updates;
+ }
+
+ private int mapPageId(int page) {
+ for (Update update : updates) {
+ if( update.updates!=null ) {
+ Integer updatedPage = update.updates.get(page);
+ if (updatedPage != null) {
+ switch (updatedPage) {
+ case PAGE_FREED:
+ throw new PagingException("You should never try to read page that has been freed.");
+ case PAGE_ALLOCATED:
+ break;
+ default:
+ page = updatedPage;
+ }
+ break;
+ }
+ }
+ }
+ return page;
+ }
+
+ void read(int pageId, Buffer buffer) throws IOPagingException {
+ pageId = mapPageId(pageId);
+ pageFile.read(pageId, buffer);
+ }
+
+ public ByteBuffer slice(int pageId, int count) {
+ pageId = mapPageId(pageId);
+ return pageFile.slice(SliceType.READ, pageId, count);
+ }
+
+ private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
+ // See if any of the updates in the snapshot have an update of the
+ // requested page...
+ for (Update update : updates) {
+ if( update.cache!=null ) {
+ CacheUpdate cu = update.cache.get(pageId);
+ if (cu != null) {
+ return cu.<T>value();
+ }
+ }
+ }
+ return readCache.cacheLoad(marshaller, pageId);
+ }
+
+ public String toString() {
+ return "{ updates: "+this.updates.size()+", references: "+this.references+" }";
+ }
+ }
+
+ class ReadCache {
+ Map<Integer, Object> map = Collections.synchronizedMap(new LRUCache<Integer, Object>(1024));
+
+ @SuppressWarnings("unchecked")
+ private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
+ T rc = (T) map.get(pageId);
+ if( rc ==null ) {
+ rc = marshaller.load(pageFile, pageId);
+ map.put(pageId, rc);
+ }
+ return rc;
+ }
+ }
+
+ private final ReadCache readCache = new ReadCache();
+
+ /**
+ * The redo log is composed of a linked list of RedoBatch records. A
+ * RedoBatch stores the redo data for multiple updates.
+ *
+ * @author chirino
+ */
+ static private class Redo implements Serializable {
+ private static final long serialVersionUID = 1188640492489990493L;
+
+ /** the pageId that this redo batch is stored at */
+ private transient int page=-1;
+ private transient boolean recovered;
+
+ private long base;
+ private long head;
+
+ final HashMap<Integer, Integer> updates = new HashMap<Integer, Integer>();
+ transient final HashMap<Integer, CacheUpdate> cache = new HashMap<Integer, CacheUpdate>();
+
+ /** points to a previous redo batch page */
+ public int previous=-1;
+
+ public String toString() {
+ int count = updates==null ? 0 : updates.size();
+ return "{ page: "+this.page+", updates: "+count+", previous: "+previous+" }";
+ }
+
+ public int pageCount() {
+ int rc = 0;
+ if( updates!=null ) {
+ rc = updates.size();
+ }
+ if( cache!=null ) {
+ rc = cache.size();
+ }
+ return rc;
+ }
+
+ }
+
+ private class Header extends Struct {
+ public final UTF8String file_magic = new UTF8String(80);
+ public final Signed64 base_revision = new Signed64();
+ public final Signed32 page_size = new Signed32();
+ public final Signed32 free_list_page = new Signed32();
+ /** points at the latest redo page which might have been partially stored */
+ public final Signed32 redo_page = new Signed32();
+ /** points at the latest redo page which is guaranteed to be fully stored */
+ public final Signed32 synced_redo_page = new Signed32();
+ public final Signed64 checksum = new Signed64();
+
+ public String toString() {
+ return "{ base_revision: "+this.base_revision.get()+
+ ", page_size: "+page_size.get()+", free_list_page: "+free_list_page.get()+
+ ", redo_page: "+redo_page.get()+", checksum: "+checksum.get()+
+ " }";
+ }
+ }
+
+
+ /**
+ * A list of updates to the page file. The list head points to the most
+ * recent update.
+ */
+// private final HashMap<Long, Update> updatesMap = new HashMap<Long, Update>();
+ private final LinkedNodeList<Update> updatesList = new LinkedNodeList<Update>();
+
+ /**
+ * This is the next redo that will get logged. It is currently being built.
+ */
+ Redo nextRedo;
+
+ /**
+ * These are stored redos that are waiting for a file sync. They may or may not survive
+ * a failure.
+ */
+ private final LinkedList<Redo> unsyncedRedos = new LinkedList<Redo>();
+
+ /**
+ * These are stored redos that have been file synced. They should survive a failure.
+ */
+ private final LinkedList<Redo> syncedRedos = new LinkedList<Redo>();
+
+ /**
+ * These are updates which have been applied but who's temp pages have not yet been
+ * freed since they are being used by some snapshots.
+ */
+ private final LinkedList<Update> updatesWaitingCleanup = new LinkedList<Update>();
+
+ private final Header header = new Header();
+
+ private final PageFile pageFile;
+ private final SimpleAllocator allocator;
+ private final MemoryMappedFile file;
+
+ /**
+ * This is the free page list at the base revision. It does not track allocations in transactions
+ * or committed updates. Only when the updates are squashed will this list be updated.
+ *
+ * The main purpose of this list is to initialize the free list on recovery.
+ *
+ * This does not track the space associated with redo batches and free lists. On
+ * recovery that space is discovered and tracked in the allocator.
+ */
+ private Ranges baseRevisionFreePages = new Ranges();
+
+ public ConcurrentPageFile(PageFile pageFile) {
+ this.pageFile = pageFile;
+ this.file = pageFile.getFile();
+ this.allocator = pageFile.allocator();
+ ByteBuffer slice = file.slice(false, 0, FILE_HEADER_SIZE);
+ this.header.setByteBuffer(slice, slice.position());
+ this.updatesList.addLast(new Update(0, null, null, null));
+ }
+
+ public Transaction tx() {
+ return new ConcurrentTransaction();
+ }
+
+ /**
+ * Used to initialize a new file or to clear out the
+ * contents of an existing file.
+ */
+ public void reset() {
+ updatesList.clear();
+ updatesList.addLast(new Update(0, null, null, null));
+ unsyncedRedos.clear();
+ updatesWaitingCleanup.clear();
+ allocator.clear();
+ baseRevisionFreePages.clear();
+ baseRevisionFreePages.add(0, allocator.getLimit());
+
+ // Initialize the file header..
+ this.header.setByteBufferPosition(0);
+ this.header.file_magic.set(MAGIC);
+ this.header.base_revision.set(0);
+ this.header.free_list_page.set(-1);
+ this.header.page_size.set(pageFile.getPageSize());
+ this.header.redo_page.set(-1);
+ replicateHeader();
+ }
+
+ /**
+ * Loads an existing file and replays the redo
+ * logs to put it in a consistent state.
+ */
+ public void recover() {
+
+ unsyncedRedos.clear();
+ updatesWaitingCleanup.clear();
+
+ this.header.setByteBufferPosition(0);
+ long baseRevision = header.base_revision.get();
+ updatesList.clear();
+ updatesList.addLast(new Update(baseRevision, null, null, null));
+
+ // Initialize the free page list.
+ int pageId = header.free_list_page.get();
+ if( pageId >= 0 ) {
+ baseRevisionFreePages = loadObject(pageId);
+ allocator.copy(baseRevisionFreePages);
+ Extent.unfree(pageFile, pageId);
+ } else {
+ allocator.clear();
+ baseRevisionFreePages.add(0, allocator.getLimit());
+ }
+
+ // Load the redo batches.
+ pageId = header.redo_page.get();
+ while( pageId >= 0 ) {
+ Redo redo = loadObject(pageId);
+ redo.page = pageId;
+ redo.recovered = true;
+ Extent.unfree(pageFile, pageId);
+
+ pageId=-1;
+ if( baseRevision < redo.head ) {
+
+ // add first since we are loading redo objects oldest to youngest
+ // but want to put them in the list youngest to oldest.
+ unsyncedRedos.addFirst(redo);
+
+ if( baseRevision < redo.base ) {
+ pageId=redo.previous;
+ }
+ }
+ }
+
+ // Apply all the redos..
+ applyRedos();
+ }
+
+
+ /**
+ * Once this method returns, any previously committed transactions
+ * are flushed and to the disk, ensuring that they will not be lost
+ * upon failure.
+ */
+ public void flush() {
+
+ // Write out the current redo if it has data...
+ Redo redo;
+ synchronized (this) {
+ redo = nextRedo;
+ nextRedo = null;
+ }
+ if( redo == null ) {
+ store(redo);
+ }
+
+ // Find out if there are unsynced redos...
+ redo = null;
+ synchronized (this) {
+ if( !unsyncedRedos.isEmpty() ) {
+ for (Redo r : unsyncedRedos) {
+ syncedRedos.add(r);
+ redo = r;
+ }
+ }
+ }
+
+ // Yep.. we had some..
+ if( redo!=null ) {
+ // This is a slow operation..
+ file.sync();
+ // Update the header so that it knows about the redos that are
+ // guaranteed to survive a failure.
+ header().synced_redo_page.set(redo.page);
+ replicateHeader();
+ }
+ }
+
+ private Header header() {
+ this.header.getByteBuffer().position(0);
+ this.header.setByteBufferPosition(0);
+ Header h = this.header;
+ return h;
+ }
+
+ public void store(Redo redo) {
+
+ // Write any outstanding deferred cache updates...
+ if( redo.cache != null ) {
+ for (Entry<Integer, CacheUpdate> entry : redo.cache.entrySet()) {
+ CacheUpdate cu = entry.getValue();
+ List<Integer> allocatedPages = cu.store(pageFile);
+ for (Integer page : allocatedPages) {
+ // add any allocated pages to the update list so that the free
+ // list gets properly adjusted.
+ redo.updates.put(page, PAGE_ALLOCATED);
+ }
+ }
+ }
+
+ // Link it to the last redo.
+ Redo last = unsyncedRedos.getLast();
+ if( last!=null ) {
+ redo.previous = last.page;
+ }
+
+ // Store the redo.
+ redo.page = storeObject(redo);
+ synchronized (this) {
+ unsyncedRedos.add(redo);
+ }
+
+ // Update the header to know about the new redo page.
+ header().redo_page.set(redo.page);
+ replicateHeader();
+ }
+
+ /**
+ * Frees up space by applying redos and releasing the pages that
+ * the redo was stored on.
+ */
+ public void applyRedos() {
+
+ // We can only apply redos which we know are not partially stored on disk
+ // and which hold revisions which are older than the oldest active snapshot.
+ ArrayList<Redo> redoList = new ArrayList<Redo>();
+ synchronized (this) {
+ long snapshotHeadRev = Long.MAX_VALUE;
+ Update cur = updatesList.getHead();
+ while( cur!=null ) {
+ if( cur.snapshot!=null ) {
+ snapshotHeadRev = cur.head;
+ break;
+ }
+ }
+
+ for (Iterator<Redo> i = this.unsyncedRedos.iterator(); i.hasNext();) {
+ Redo redo = i.next();
+ if (redo.base > snapshotHeadRev) {
+ // we can't apply the rest of the updates, since a snapshot depends on
+ // the current base revision.
+ // the rest of the updates will have incrementing revision numbers too.
+ break;
+ }
+ redoList.add(redo);
+ i.remove();
+ }
+ }
+
+ // Perhaps we can't do any work...
+ if( redoList.isEmpty() ) {
+ return;
+ }
+
+ long baseRevision = header().base_revision.get();
+
+ for (Redo redo : redoList) {
+ // revision numbers should be sequentially increasing.
+ assert baseRevision+1==redo.base;
+
+ for (Entry<Integer, Integer> entry : redo.updates.entrySet()) {
+ int key = entry.getKey();
+ int value = entry.getValue();
+ switch( value ) {
+ case PAGE_ALLOCATED:
+ if( redo.recovered ) {
+ allocator.unfree(key, 1);
+ }
+ baseRevisionFreePages.remove(key, 1);
+ break;
+ case PAGE_FREED:
+ if( redo.recovered ) {
+ allocator.free(key, 1);
+ }
+ baseRevisionFreePages.add(key, 1);
+ break;
+ default:
+ if( redo.recovered ) {
+ allocator.unfree(key, 1);
+ }
+ ByteBuffer slice = pageFile.slice(SliceType.READ, value, 1);
+ try {
+ pageFile.write(key, slice);
+ } finally {
+ pageFile.unslice(slice);
+ }
+ }
+ }
+ baseRevision = redo.base;
+ }
+
+
+ // force to ensure all data is fully stored before the header
+ // starts making reference to new stuff
+ file.sync();
+
+
+ Header h = header();
+ int previousFreeListPage = h.free_list_page.get();
+ h.free_list_page.set(storeObject(baseRevisionFreePages));
+ h.base_revision.set(baseRevision);
+ replicateHeader();
+
+ // Release the previous free list.
+ if( previousFreeListPage>=0 ) {
+ Extent.free(pageFile, previousFreeListPage);
+ }
+
+ // Free the space associated with the redo batches
+ if( !redoList.isEmpty() ) {
+ for (Redo redo : redoList) {
+ Extent.free(pageFile, redo.page);
+ }
+ }
+ }
+
+ private int storeObject(Object value) {
+ try {
+ ExtentOutputStream eos = new ExtentOutputStream(pageFile);
+ ObjectOutputStream oos = new ObjectOutputStream(eos);
+ oos.writeObject(value);
+ oos.close();
+ return eos.getPage();
+ } catch (IOException e) {
+ throw new IOPagingException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T loadObject( int pageId ) {
+ try {
+ ExtentInputStream eis = new ExtentInputStream(pageFile, pageId);
+ ObjectInputStream ois = new ObjectInputStream(eis);
+ return (T) ois.readObject();
+ } catch (IOException e) {
+ throw new IOPagingException(e);
+ } catch (ClassNotFoundException e) {
+ throw new IOPagingException(e);
+ }
+ }
+
+ private void replicateHeader() {
+ // Calculate the checksum of the header so that we can tell if the
+ // header is corrupted.
+ byte[] data = new byte[this.header.size() - 8];
+ file.read(0, data);
+ CRC32 checksum = new CRC32();
+ checksum.update(data);
+ this.header.checksum.set(checksum.getValue());
+
+ // Copy the header so we can survive a partial update.
+ ByteBuffer header = file.read(0, this.header.size());
+ file.write(FILE_HEADER_SIZE / 2, header);
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Transaction calls back to these methods...
+ // /////////////////////////////////////////////////////////////////
+ synchronized private Snapshot aquireSnapshot() {
+ Snapshot snapshot = updatesList.getTail().snapshot;
+ if (snapshot == null) {
+ ArrayList<Update> updates = updatesList.toArrayListReversed();
+ updates.get(0).snapshot = new Snapshot(updates);
+ }
+ snapshot.references++;
+ return snapshot;
+ }
+
+ synchronized private void releaseSnapshot(Snapshot snapshot) {
+ if( snapshot!=null ) {
+ synchronized(this) {
+ snapshot.references--;
+ if( snapshot.references==0 ) {
+ Update update = snapshot.updates.get(0);
+ update.snapshot=null;
+ Update next = update.getNext();
+ if( next !=null ) {
+ next.mergePrevious();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Attempts to commit a set of page updates.
+ *
+ * @param previousUpdate
+ * @param pageUpdates
+ * @param cache
+ */
+ private void commit(Update previousUpdate, HashMap<Integer, Integer> pageUpdates, HashMap<Integer, CacheUpdate> cache) {
+
+ Redo fullRedo=null;
+ synchronized (this) {
+ // Perhaps a concurrent update came in before this one...
+ long rev;
+ if( previousUpdate!=null ) {
+ rev = previousUpdate.head;
+ Update concurrentUpdate = previousUpdate.getNext();
+ while( concurrentUpdate != null ) {
+ // Yep.. there were concurrent updates.
+ // Make sure we don't don't have update conflict.
+ for (Integer page : pageUpdates.keySet()) {
+ if( concurrentUpdate.updates.containsKey(page) ) {
+ throw new OptimisticUpdateException();
+ }
+ }
+
+ rev = concurrentUpdate.head;
+ concurrentUpdate = concurrentUpdate.getNext();
+ }
+ } else {
+ rev = updatesList.getTail().head;
+ }
+ rev++;
+
+ if( nextRedo == null ) {
+ nextRedo = new Redo();
+ nextRedo.base = rev;
+ }
+
+ nextRedo.head = rev;
+ nextRedo.updates.putAll(pageUpdates);
+ nextRedo.cache.putAll(cache);
+
+ Update value = new Update(rev, pageUpdates, cache, nextRedo);
+ updatesList.addLast(value);
+ value.mergePrevious();
+
+ if( nextRedo.pageCount() > 10 ) {
+ fullRedo = nextRedo;
+ nextRedo = null;
+ }
+ }
+ if( fullRedo!=null ) {
+ store(fullRedo);
+ }
+ }
+
+ /**
+ * The quiesce method is used to pause/stop access to the concurrent page file.
+ * access can be restored using the {@link #resume()} method.
+ *
+ * @param reads if true, the suspend will also suspend read only transactions.
+ * @param blocking if true, transactions will block until the {@link #resume()} method
+ * is called, otherwise they will receive errors.
+ * @param drain if true, in progress transactions are allowed to complete, otherwise they
+ * also are suspended.
+ */
+ public void suspend(boolean reads, boolean blocking, boolean drain) {
+ }
+
+ /**
+ * Resumes a previously suspended page file.
+ */
+ public void resume() {
+ }
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ConcurrentPageFileFactory extends PageFileFactory {
+
+ private ConcurrentPageFile concurrentPageFile;
+
+ protected boolean drainOnClose;
+
+ public ConcurrentPageFile getConcurrentPageFile() {
+ return concurrentPageFile;
+ }
+
+ public ConcurrentPageFileFactory() {
+ super.setHeaderSize(ConcurrentPageFile.HEADER_SIZE);
+ }
+
+ @Override
+ public void setHeaderSize(int headerSize) {
+ throw new IllegalArgumentException("headerSize property cannot not be manually configured.");
+ }
+
+ public void open() {
+ if( file == null ) {
+ throw new IllegalArgumentException("file property not set");
+ }
+ boolean existed = file.isFile();
+ super.open();
+ if (concurrentPageFile == null) {
+ concurrentPageFile = new ConcurrentPageFile(getPageFile());
+ if( existed ) {
+ concurrentPageFile.recover();
+ } else {
+ concurrentPageFile.reset();
+ }
+ }
+ }
+
+ public void close() {
+ if (concurrentPageFile != null) {
+ concurrentPageFile.suspend(true, false, drainOnClose);
+ concurrentPageFile.flush();
+ concurrentPageFile.applyRedos();
+ concurrentPageFile=null;
+ }
+ super.close();
+ }
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Extent.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Extent.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Extent.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Extent.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.nio.ByteBuffer;
+
+import javolution.io.Struct;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.IOPagingException;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.api.Paged.SliceType;
+
+
+/**
+ * An extent is a sequence of adjacent pages which can be linked
+ * to subsequent extents.
+ *
+ * Extents allow you to write large streams of data to a Paged object
+ * contiguously to avoid fragmentation.
+ *
+ * The first page of the extent contains a header which specifies
+ * the size of the extent and the page id of the next extent that
+ * it is linked to.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Extent {
+
+ public final static byte MAGIC_VALUE = 'x';
+
+ static private class Header extends Struct {
+ /**
+ * A constant prefix that can be used to identify the page type.
+ */
+ public final Signed8 magic = new Signed8();
+ /**
+ * The number of bytes in the extent, including the header.
+ */
+ public final Signed32 length = new Signed32();
+ /**
+ * The page of the next extent or -1.
+ */
+ public final Signed32 next = new Signed32();
+ }
+
+ private final Extent.Header header = new Header();
+
+ private final Paged paged;
+ private final int page;
+ private ByteBuffer buffer;
+
+ private int bufferStartPosition;
+
+ public Extent(Paged paged, int page) {
+ this.paged = paged;
+ this.page = page;
+ }
+
+ @Override
+ public String toString() {
+ int position = 0;
+ int limit=0;
+ if( buffer!=null ) {
+ position = buffer.position()-bufferStartPosition;
+ limit = buffer.limit()-bufferStartPosition;
+ }
+ return "{ page: "+page+", position: "+position+", limit: "+limit+" }";
+ }
+
+ public void readOpen() {
+ buffer = paged.slice(SliceType.READ, page, 1);
+ header.setByteBuffer(buffer, buffer.position());
+ if( header.magic.get() != MAGIC_VALUE ) {
+ throw new IOPagingException("Invalid extent read request. The requested page was not an extent: "+page);
+ }
+
+ int length = header.length.get();
+ int pages = paged.pages(length);
+ if( pages > 1 ) {
+ paged.unslice(buffer);
+ buffer = paged.slice(SliceType.READ, page, pages);
+ header.setByteBuffer(buffer, buffer.position());
+ }
+
+ bufferStartPosition = buffer.position();
+ buffer.position(bufferStartPosition+header.size());
+ buffer.limit(bufferStartPosition+length);
+ }
+
+ public void writeOpen(short size) {
+ buffer = paged.slice(SliceType.WRITE, page, size);
+ header.setByteBuffer(buffer, buffer.position());
+ bufferStartPosition = buffer.position();
+ buffer.position(bufferStartPosition+header.size());
+ }
+
+ public void writeCloseLinked(int next) {
+ int length = buffer.position()-bufferStartPosition;
+ header.magic.set(MAGIC_VALUE);
+ header.next.set(next);
+ header.length.set(length);
+ paged.unslice(buffer);
+ }
+
+ public void writeCloseEOF() {
+
+ int length = buffer.position()-bufferStartPosition;
+ header.magic.set(MAGIC_VALUE);
+ header.next.set(-1);
+ header.length.set(length);
+ paged.unslice(buffer);
+
+ int originalPages = paged.pages(buffer.limit()-bufferStartPosition);
+ int usedPages = paged.pages(length);
+ int remainingPages = originalPages-usedPages;
+
+ // Release un-used pages.
+ if (remainingPages>0) {
+ paged.allocator().free(page+usedPages, remainingPages);
+ }
+ paged.unslice(buffer);
+ }
+
+ public void readClose() {
+ paged.unslice(buffer);
+ }
+
+ boolean atEnd() {
+ return buffer.remaining() == 0;
+ }
+
+ /**
+ * @return true if the write fit into the extent.
+ */
+ public boolean write(byte b) {
+ if (atEnd()) {
+ return false;
+ }
+ buffer.put(b);
+ return true;
+ }
+
+ public boolean write(Buffer source) {
+ while (source.length > 0) {
+ if (atEnd()) {
+ return false;
+ }
+ int count = Math.min(buffer.remaining(), source.length);
+ buffer.put(source.data, source.offset, count);
+ source.offset += count;
+ source.length -= count;
+ }
+ return true;
+ }
+
+ public int read() {
+ return buffer.get() & 0xFF;
+ }
+
+ public void read(Buffer target) {
+ while (target.length > 0 && !atEnd()) {
+ int count = Math.min(buffer.remaining(), target.length);
+ buffer.get(target.data, target.offset, count);
+ target.offset += count;
+ target.length -= count;
+ }
+ }
+
+ public int getNext() {
+ return header.next.get();
+ }
+
+
+ /**
+ * Frees the linked extents at the provided page id.
+ *
+ * @param paged
+ * @param page
+ */
+ public static void freeLinked(Paged paged, int page) {
+ ByteBuffer buffer = paged.slice(SliceType.READ, page, 1);
+ Header header = new Header();
+ header.setByteBuffer(buffer, buffer.position());
+ if( header.magic.get() != MAGIC_VALUE ) {
+ throw new IOPagingException("Invalid extent read request. The requested page was not an extent: "+page);
+ }
+ int next = header.next.get();
+ free(paged, next);
+ }
+
+ /**
+ * Frees the extent at the provided page id.
+ *
+ * @param paged
+ * @param page
+ */
+ public static void free(Paged paged, int page) {
+ while( page>=0 ) {
+ ByteBuffer buffer = paged.slice(SliceType.READ, page, 1);
+ try {
+ Header header = new Header();
+ header.setByteBuffer(buffer, buffer.position());
+ if( header.magic.get() != MAGIC_VALUE ) {
+ throw new IOPagingException("Invalid extent read request. The requested page was not an extent: "+page);
+ }
+
+ int next = header.next.get();
+ paged.allocator().free(page, paged.pages(header.length.get()));
+ page=next;
+ } finally {
+ paged.unslice(buffer);
+ }
+ }
+ }
+
+ /**
+ * Un-frees the extent at the provided page id. Basically undoes
+ * a previous {@link #free(PageFile, int)} operation.
+ *
+ * @param paged
+ * @param page
+ */
+ public static void unfree(Paged paged, int page) {
+ while( page>=0 ) {
+ ByteBuffer buffer = paged.slice(SliceType.READ, page, 1);
+ try {
+ Header header = new Header();
+ header.setByteBuffer(buffer, buffer.position());
+ if( header.magic.get() != MAGIC_VALUE ) {
+ throw new IOPagingException("Invalid extent read request. The requested page was not an extent: "+page);
+ }
+
+ int next = header.next.get();
+ paged.allocator().unfree(page, paged.pages(header.length.get()));
+ page=next;
+ } finally {
+ paged.unslice(buffer);
+ }
+ }
+ }
+
+ public int getPage() {
+ return page;
+ }
+
+ public int getLength() {
+ return header.length.get();
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentInputStream.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentInputStream.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentInputStream.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.internal.util.Ranges;
+
+
+/**
+ * An InputStream which reads it's data from an
+ * extent previously written with the {@link ExtentOutputStream}.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ExtentInputStream extends InputStream {
+
+ private final Paged paged;
+ private Extent current;
+ private final int page;
+ private Ranges pages = new Ranges();
+
+ public ExtentInputStream(Paged paged, int page) {
+ this.paged = paged;
+ this.page = page;
+ current = new Extent(paged, page);
+ current.readOpen();
+ pages.add(current.getPage(), paged.pages(current.getLength()));
+ }
+
+ @Override
+ public String toString() {
+ return "{ page: "+page+", current: "+current+" }";
+ }
+
+
+ @Override
+ public int read() throws IOException {
+ if( current == null ) {
+ return -1;
+ }
+ if (current.atEnd()) {
+ int next = current.getNext();
+ if (next == -1) {
+ current.readClose();
+ current=null;
+ return -1;
+ }
+ current.readClose();
+ current = new Extent(paged, next);
+ current.readOpen();
+ pages.add(current.getPage(), paged.pages(current.getLength()));
+ }
+ return current.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int rc=len;
+ Buffer buffer = new Buffer(b, off, len);
+ if( current == null ) {
+ throw new EOFException();
+ }
+ while (buffer.length > 0) {
+ if (current.atEnd()) {
+ int next = current.getNext();
+ if (next == -1) {
+ current.readClose();
+ current=null;
+ break;
+ }
+ current.readClose();
+ current = new Extent(paged, next);
+ current.readOpen();
+ pages.add(current.getPage(), paged.pages(current.getLength()));
+ }
+ current.read(buffer);
+ }
+ rc-=buffer.length;
+ if ( rc==0 ) {
+ throw new EOFException();
+ }
+ return rc;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if( current!=null ) {
+ current.readClose();
+ current=null;
+ }
+ }
+
+ public Ranges getPages() {
+ return pages;
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentOutputStream.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentOutputStream.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ExtentOutputStream.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.internal.util.Ranges;
+
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ExtentOutputStream extends OutputStream {
+
+ private static final short DEFAULT_EXTENT_SIZE = 128; // 128 * 4k = .5MB
+ private final Paged paged;
+ private final short extentSize;
+ private final int page;
+ private Extent current;
+ private Ranges pages = new Ranges();
+
+
+ public ExtentOutputStream(Paged paged) {
+ this(paged, DEFAULT_EXTENT_SIZE);
+ }
+
+ public ExtentOutputStream(Paged paged, short extentSize) {
+ this(paged, paged.allocator().alloc(extentSize), extentSize, extentSize);
+ }
+
+ public ExtentOutputStream(Paged paged, int page, short extentSize, short nextExtentSize ) {
+ this.paged = paged;
+ this.extentSize = nextExtentSize;
+ this.page = page;
+ current = new Extent(paged, page);
+ current.writeOpen(extentSize);
+ }
+
+ @Override
+ public String toString() {
+ return "{ page: "+page+", extent size: "+extentSize+", current: "+current+" }";
+ }
+
+ public void write(int b) throws IOException {
+ if (!current.write((byte) b)) {
+ int nextPageId = this.paged.allocator().alloc(extentSize);
+ current.writeCloseLinked(nextPageId);
+ pages.add(current.getPage(), paged.pages(current.getLength()));
+ current = new Extent(paged, nextPageId);
+ current.writeOpen(extentSize);
+ current.write((byte) b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ Buffer buffer = new Buffer(b, off, len);
+ while (buffer.length > 0) {
+ if (!current.write(buffer)) {
+ int nextPageId = this.paged.allocator().alloc(extentSize);
+ current.writeCloseLinked(nextPageId);
+ pages.add(current.getPage(), paged.pages(current.getLength()));
+ current = new Extent(paged, nextPageId);
+ current.writeOpen(extentSize);
+ }
+ }
+ }
+
+ public short getExtentSize() {
+ return extentSize;
+ }
+
+ public int getPage() {
+ return page;
+ }
+
+ @Override
+ public void close(){
+ current.writeCloseEOF();
+ pages.add(current.getPage(), paged.pages(current.getLength()));
+ }
+
+ public Ranges getPages() {
+ return pages;
+ }
+}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.internal.io.MemoryMappedFile;
+
+
+/**
+ * Provides Paged access to a MemoryMappedFile.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PageFile implements Paged {
+
+ private final SimpleAllocator allocator;
+ private final short pageSize;
+ private final int headerSize;
+ private final MemoryMappedFile file;
+
+
+ @Override
+ public String toString() {
+ return "{ header size: "+headerSize+", page size: "+pageSize+", allocator: "+allocator+" }";
+ }
+
+ public PageFile(MemoryMappedFile file, short pageSize, int headerSize, int maxPages) {
+ this.file = file;
+ this.allocator = new SimpleAllocator(maxPages);
+ this.pageSize = pageSize;
+ this.headerSize = headerSize;
+ }
+
+ public SimpleAllocator allocator() {
+ return allocator;
+ }
+
+ public int getHeaderSize() {
+ return headerSize;
+ }
+
+ public MemoryMappedFile getFile() {
+ return file;
+ }
+
+ public void read(int pageId, Buffer buffer) {
+ file.read(offset(pageId), buffer);
+ }
+
+ public void write(int pageId, Buffer buffer) {
+ file.write(offset(pageId), buffer);
+ }
+
+ public void write(int pageId, ByteBuffer buffer) {
+ file.write(offset(pageId), buffer);
+ }
+
+ public ByteBuffer slice(SliceType type, int pageId, int size) {
+ assert size > 0;
+ return file.slice(type==SliceType.READ, offset(pageId), pageSize*size);
+ }
+
+ public void unslice(ByteBuffer buffer) {
+ file.unslice(buffer);
+ }
+
+
+ public long offset(long pageId) {
+ assert pageId >= 0;
+ return headerSize+(pageId*pageSize);
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
+
+ public int pages(int length) {
+ assert length >= 0;
+ return ((length-1)/pageSize)+1;
+ }
+
+ public void flush() {
+ file.sync();
+ }
+
+ public <T> T get(EncoderDecoder<T> encoderDecoder, int page) {
+ return encoderDecoder.load(this, page);
+ }
+
+ public <T> void put(EncoderDecoder<T> encoderDecoder, int page, T value) {
+ encoderDecoder.store(this, page, value);
+ }
+
+ public <T> void remove(EncoderDecoder<T> encoderDecoder, int page) {
+ encoderDecoder.remove(this, page);
+ }
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFileFactory.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFileFactory.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFileFactory.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.IOException;
+
+import org.apache.hawtdb.api.IOPagingException;
+import org.apache.hawtdb.internal.io.MemoryMappedFileFactory;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class PageFileFactory extends MemoryMappedFileFactory {
+
+ private PageFile pageFile;
+
+ protected int headerSize = 0;
+ protected short pageSize = 1024 * 4;
+ protected int maxPages = Integer.MAX_VALUE;
+
+ public PageFile getPageFile() {
+ return pageFile;
+ }
+
+ public void open() {
+ try {
+ super.open();
+ } catch (IOException e) {
+ throw new IOPagingException(e);
+ }
+ if (pageFile == null) {
+ if( pageSize <= 0 ) {
+ throw new IllegalArgumentException("pageSize property must be greater than 0");
+ }
+ if( maxPages <= 0 ) {
+ throw new IllegalArgumentException("maxPages property must be greater than 0");
+ }
+ if( headerSize < 0 ) {
+ throw new IllegalArgumentException("headerSize property cannot be negative.");
+ }
+ try {
+ pageFile = new PageFile(getMemoryMappedFile(), pageSize, headerSize, maxPages);
+ } catch (IOException e) {
+ throw new IOPagingException(e);
+ }
+ }
+ }
+
+ public void close() {
+ if (pageFile != null) {
+ pageFile = null;
+ }
+ super.close();
+ }
+
+ public int getHeaderSize() {
+ return headerSize;
+ }
+ public void setHeaderSize(int headerSize) {
+ this.headerSize = headerSize;
+ }
+
+ public short getPageSize() {
+ return pageSize;
+ }
+ public void setPageSize(short pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ public int getMaxPages() {
+ return maxPages;
+ }
+ public void setMaxPages(int maxPages) {
+ this.maxPages = maxPages;
+ }
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.util.Iterator;
+
+import org.apache.hawtdb.api.Allocator;
+import org.apache.hawtdb.api.OutOfSpaceException;
+import org.apache.hawtdb.internal.util.Ranges;
+import org.apache.hawtdb.internal.util.Ranges.Range;
+
+
+/**
+ * This class is used to provides allocation management of pages.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class SimpleAllocator implements Allocator {
+
+ private final Ranges freeRanges = new Ranges();
+ private int limit;
+
+ public SimpleAllocator(int max) {
+ this.limit = max;
+ freeRanges.add(0, max);
+ }
+
+ /**
+ * @see org.apache.hawtdb.api.hiramchirino.hawtdb.Allocator#allocate(int)
+ */
+ synchronized public int alloc(int size) throws OutOfSpaceException {
+ for (Iterator<Range> i = freeRanges.iterator(); i.hasNext();) {
+ Range r = (Range) i.next();
+ if( r.size() >= size ) {
+ int rc = r.start;
+ freeRanges.remove(rc, size);
+ return rc;
+ }
+ }
+ throw new OutOfSpaceException();
+ }
+
+
+ /**
+ * @see org.apache.hawtdb.api.hiramchirino.hawtdb.Allocator#remove(int, int)
+ */
+ synchronized public void free(int pageId, int count) {
+ freeRanges.add(pageId, count);
+ }
+
+ /**
+ * @see org.apache.hawtdb.api.hiramchirino.hawtdb.Allocator#reallocate(int, int)
+ */
+ synchronized public void unfree(int pageId, int count) {
+ freeRanges.remove(pageId, count);
+ }
+
+ synchronized public void clear() throws UnsupportedOperationException {
+ freeRanges.clear();
+ freeRanges.add(0, limit);
+ }
+
+ synchronized public void copy(Ranges freePages) throws UnsupportedOperationException {
+ freeRanges.copy(freePages);
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public boolean isAllocated(int page) {
+ return !freeRanges.contains(page);
+ }
+
+ public Ranges getFreeRanges() {
+ return freeRanges;
+ }
+
+ @Override
+ public String toString() {
+ return "{ free pages: "+freeRanges.toString()+" }";
+ }
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html Thu Oct 15 17:04:11 2009
@@ -0,0 +1,27 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+ Implementations paged IO.
+</p>
+
+</body>
+</html>
Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/package.html
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/Ranges.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/Ranges.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/Ranges.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/Ranges.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.util;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Map.Entry;
+
+import org.apache.activemq.util.TreeMap;
+import org.apache.activemq.util.TreeMap.TreeEntry;
+
+/**
+ * Tracks numeric ranges. Handy for keeping track of things like allocation or free lists.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final public class Ranges implements Serializable, Iterable<Ranges.Range> {
+ private static final long serialVersionUID = 8340484139329633582L;
+
+ final public static class Range implements Serializable {
+ private static final long serialVersionUID = -4904483630105365841L;
+
+ public int start;
+ public int end;
+
+ public Range(int start, int end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ public int size() {
+ return end - start;
+ }
+
+ @Override
+ public String toString() {
+ if( start == end-1 ) {
+ return Integer.toString(start);
+ }
+ return start+"-"+(end-1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if( obj == this ) {
+ return true;
+ }
+ if( obj == null || obj.getClass()!=Range.class ) {
+ return false;
+ }
+ Range r = (Range)obj;
+ return start == r.start && end==r.end;
+ }
+
+ @Override
+ public int hashCode() {
+ return start*77+end;
+ }
+
+ public boolean contains(int value) {
+ return start <= value && value < end;
+ }
+ }
+
+ private final TreeMap<Integer, Range> ranges = new TreeMap<Integer, Range>();
+
+ public void add(int start) {
+ add(start, 1);
+ }
+
+ public void add(int start, int length) {
+ int end = start+length;
+
+ // look for entries starting from the end of the add range.
+ TreeEntry<Integer, Range> entry = ranges.floorEntry(end);
+ if( entry!=null ) {
+ while( entry!=null) {
+ Range range = entry.getValue();
+ TreeEntry<Integer, Range> curr = entry;
+ entry = entry.previous();
+
+ // If tail of the range is not in the add range.
+ if( range.end < start ) {
+ // we are done..
+ break;
+ }
+
+ // if the end of the range is not in the add range.
+ if( end < range.end ) {
+ // extend the length out..
+ end = range.end;
+ }
+
+ if( start < range.start ) {
+ // if the front of the range is in the add range.
+ // just remove it..
+ ranges.removeEntry(curr);
+ } else {
+ // The front is not in the add range...
+ // Then resize.. and we are done
+ range.end = end;
+ return;
+ }
+ }
+ }
+
+ // put the new range in.
+ ranges.put(start, range(start, end));
+ }
+
+ public void remove(int start) {
+ remove(start, 1);
+ }
+
+ public void remove(int start, int length) {
+ int end = start+length;
+
+ // look for entries starting from the end of the remove range.
+ TreeEntry<Integer, Range> entry = ranges.lowerEntry(end);
+ while( entry!=null) {
+ Range range = entry.getValue();
+ TreeEntry<Integer, Range> curr = entry;
+ entry = entry.previous();
+
+ // If tail of the range is not in the remove range.
+ if( range.end <= start ) {
+ // we are done..
+ break;
+ }
+
+ // if the end if the range is not in the remove range.
+ if( end < range.end ) {
+ // Then we need to add back the tail part.
+ ranges.put(end, range(end, range.end));
+ }
+
+ if( start <= range.start ) {
+ // if the front of the range is in the remove range.
+ // just remove it..
+ ranges.removeEntry(curr);
+
+ } else {
+ // The front is not in the remove range...
+ // Then resize.. and we are done
+ range.end = start;
+ break;
+ }
+ }
+ }
+
+ public boolean contains(int value) {
+ TreeEntry<Integer, Range> entry = ranges.floorEntry(value);
+ if( entry == null ) {
+ return false;
+ }
+ return entry.getValue().contains(value);
+ }
+
+
+ public void clear() {
+ ranges.clear();
+ }
+
+ public void copy(Ranges source) {
+ ranges.clear();
+ for (Entry<Integer, Range> entry : source.ranges.entrySet()) {
+ Range value = entry.getValue();
+ ranges.put(entry.getKey(), range(value.start, value.end));
+ }
+ }
+
+ public int size() {
+ int rc=0;
+ TreeEntry<Integer, Range> entry = ranges.firstEntry();
+ while(entry!=null) {
+ rc += entry.getValue().size();
+ entry = entry.next();
+ }
+ return rc;
+ }
+
+
+ static public Range range(int start, int end) {
+ return new Range(start, end);
+ }
+
+ public ArrayList<Range> toArrayList() {
+ return new ArrayList<Range>(ranges.values());
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(20+(10*ranges.size()));
+ sb.append("[ ");
+
+ boolean first=true;
+ for (Range r : this) {
+ if( !first ) {
+ sb.append(", ");
+ }
+ first=false;
+ sb.append(r);
+ }
+
+ sb.append(" ]");
+ return sb.toString();
+ }
+
+ public Iterator<Range> iterator() {
+ return ranges.values().iterator();
+ }
+
+ public Iterator<Range> iteratorNotInRange(final Range mask) {
+
+ return new Iterator<Range>() {
+
+ Iterator<Range> iter = ranges.values().iterator();
+ Range last = new Range(mask.start, mask.start);
+ Range next = null;
+
+ public boolean hasNext() {
+ if( next==null ) {
+ while( last.end < mask.end && iter.hasNext() ) {
+ Range r = iter.next();
+
+ // skip over the initial ranges not within the mask
+ if( r.end < last.end ) {
+ continue;
+ }
+
+ // Handle the case where a range straddles the mask start position
+ if( r.start < last.end ) {
+ // extend the last range out so that the next one starts at
+ // the end of the range.
+ last = new Range(last.start, r.end);
+ continue;
+ }
+
+ if( r.start < mask.end ) {
+ next = new Range(last.end, r.start);
+ } else {
+ next = new Range(last.end, mask.end);
+ }
+ break;
+ }
+ }
+ return next!=null;
+ }
+
+ public Range next() {
+ if( !hasNext() ) {
+ throw new NoSuchElementException();
+ }
+ last = next;
+ next=null;
+ return last;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ static private final class ValueIterator implements Iterator<Integer>, Iterable<Integer> {
+ Iterator<Range> ranges;
+ Range range;
+ Integer next;
+ int last;
+
+ private ValueIterator(Iterator<Range> t) {
+ ranges = t;
+ }
+
+ public boolean hasNext() {
+ if( next==null ) {
+ if( range == null ) {
+ if( ranges.hasNext() ) {
+ range = ranges.next();
+ next = range.start;
+ } else {
+ return false;
+ }
+ } else {
+ next = last+1;
+ }
+ if( next == (range.end-1) ) {
+ range=null;
+ }
+ }
+ return next!=null;
+ }
+
+ public Integer next() {
+ if( !hasNext() ) {
+ throw new NoSuchElementException();
+ }
+ last = next;
+ next=null;
+ return last;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Iterator<Integer> iterator() {
+ return this;
+ }
+ }
+
+ public List<Integer> values() {
+ ArrayList<Integer> rc = new ArrayList<Integer>();
+ for (Integer i : new ValueIterator(iterator())) {
+ rc.add(i);
+ }
+ return rc;
+ }
+
+ public Iterator<Integer> valueIterator() {
+ return new ValueIterator(iterator());
+ }
+
+ public Iterator<Integer> valuesIteratorNotInRange(Range r) {
+ return new ValueIterator(iteratorNotInRange(r));
+ }
+
+ public boolean isEmpty() {
+ return ranges.isEmpty();
+ }
+
+}
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html Thu Oct 15 17:04:11 2009
@@ -0,0 +1,27 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+ Collections and utility classes
+</p>
+
+</body>
+</html>
Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/util/package.html
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/sandbox/activemq-flow/hawtdb/src/test/eclipse-resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/eclipse-resources/log4j.properties?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/eclipse-resources/log4j.properties (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/eclipse-resources/log4j.properties Thu Oct 15 17:04:11 2009
@@ -0,0 +1,36 @@
+## ------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ------------------------------------------------------------------------
+
+#
+# The logging properties used for eclipse testing, We want to see debug output on the console.
+#
+log4j.rootLogger=INFO, out
+
+
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.fout=org.apache.log4j.FileAppender
+log4j.appender.fout.layout=org.apache.log4j.PatternLayout
+log4j.appender.fout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.fout.file=target/amq-testlog.log
+log4j.appender.fout.append=true
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Action.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Action.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Action.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Action.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal;
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Action<T extends Actor> {
+ void init(T actor);
+ void run(T actor) throws Exception;
+ String getName();
+}
\ No newline at end of file