You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/16 20:56:38 UTC
svn commit: r826036 [1/2] - in /activemq/sandbox/activemq-flow:
activemq-util/src/main/java/org/apache/activemq/util/list/
hawtdb/src/main/java/org/apache/hawtdb/internal/index/
hawtdb/src/main/java/org/apache/hawtdb/internal/page/ hawtdb/src/test/java...
Author: chirino
Date: Fri Oct 16 18:56:37 2009
New Revision: 826036
URL: http://svn.apache.org/viewvc?rev=826036&view=rev
Log:
ConcurrentPageFile is in better shape now.. indexes almost working now.
Added:
activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentTransaction.java
Modified:
activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java
activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java
activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java
activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java
activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java
activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java
activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java
activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java
activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java
Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/LinkedNodeList.java Fri Oct 16 18:56:37 2009
@@ -17,13 +17,14 @@
package org.apache.activemq.util.list;
import java.util.ArrayList;
+import java.util.Iterator;
/**
* Provides a list of LinkedNode objects.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-public class LinkedNodeList<T extends LinkedNode<T>> {
+public class LinkedNodeList<T extends LinkedNode<T>> implements Iterable<T> {
T head;
int size;
@@ -48,6 +49,9 @@
}
public T getTail() {
+ if( head==null ) {
+ return null;
+ }
return head.prev;
}
@@ -167,4 +171,29 @@
}
return rc;
}
+
+ public Iterator<T> iterator() {
+ return new Iterator<T>() {
+ T next = getHead();
+ private T last;
+
+ public boolean hasNext() {
+ return next!=null;
+ }
+
+ public T next() {
+ last = next;
+ next = last.getNext();
+ return last;
+ }
+
+ public void remove() {
+ if( last==null ) {
+ throw new IllegalStateException();
+ }
+ last.unlink();
+ last=null;
+ }
+ };
+ }
}
Modified: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java (original)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java Fri Oct 16 18:56:37 2009
@@ -59,6 +59,8 @@
private Marshaller<Key> keyMarshaller;
private Marshaller<Value> valueMarshaller;
+ private boolean deferredEncoding;
+ private Prefixer<Key> prefixer;
public BTreeIndex<Key, Value> create(Paged paged, int page) {
BTreeIndex<Key, Value> index = createInstance(paged, page);
@@ -79,7 +81,7 @@
if (valueMarshaller == null) {
throw new IllegalArgumentException("The key marshaller must be set before calling open");
}
- return new BTreeIndex<Key, Value>(paged, page, keyMarshaller, valueMarshaller, null);
+ return new BTreeIndex<Key, Value>(paged, page, this);
}
public Marshaller<Key> getKeyMarshaller() {
@@ -97,6 +99,23 @@
public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
this.valueMarshaller = valueMarshaller;
}
+
+ public boolean isDeferredEncoding() {
+ return deferredEncoding;
+ }
+
+ public void setDeferredEncoding(boolean deferredEncoding) {
+ this.deferredEncoding = deferredEncoding;
+ }
+
+ public Prefixer<Key> getPrefixer() {
+ return prefixer;
+ }
+
+ public void setPrefixer(Prefixer<Key> prefixer) {
+ this.prefixer = prefixer;
+ }
+
}
private final BTreeNode.BTreeNodeEncoderDecoder<Key, Value> PAGE_ENCODER_DECODER = new BTreeNode.BTreeNodeEncoderDecoder<Key, Value>(this);
@@ -106,13 +125,15 @@
private final Marshaller<Key> keyMarshaller;
private final Marshaller<Value> valueMarshaller;
private final Prefixer<Key> prefixer;
+ private final boolean deferredEncoding;
- private BTreeIndex(Paged paged, int page, Marshaller<Key> keyMarshaller, Marshaller<Value> valueMarshaller, Prefixer<Key> prefixer) {
+ public BTreeIndex(Paged paged, int page, Factory<Key, Value> factory) {
this.paged = paged;
this.page = page;
- this.keyMarshaller = keyMarshaller;
- this.valueMarshaller = valueMarshaller;
- this.prefixer = prefixer;
+ this.keyMarshaller = factory.getKeyMarshaller();
+ this.valueMarshaller = factory.getValueMarshaller();
+ this.deferredEncoding = factory.isDeferredEncoding();
+ this.prefixer = factory.getPrefixer();
}
public boolean containsKey(Key key) {
@@ -192,17 +213,30 @@
}
void storeNode(BTreeNode<Key, Value> node) {
- paged.put(PAGE_ENCODER_DECODER, node.getPage(), node);
+ if( deferredEncoding ) {
+ paged.put(PAGE_ENCODER_DECODER, node.getPage(), node);
+ } else {
+ PAGE_ENCODER_DECODER.store(paged, node.getPage(), node);
+ }
}
BTreeNode<Key, Value> loadNode(int page) {
- BTreeNode<Key, Value> node = paged.get(PAGE_ENCODER_DECODER, page);
+ BTreeNode<Key, Value> node;
+ if( deferredEncoding ) {
+ node = paged.get(PAGE_ENCODER_DECODER, page);
+ } else {
+ node = PAGE_ENCODER_DECODER.load(paged, page);
+ }
node.setPage(page);
return node;
}
void free( int page ) {
- paged.remove(PAGE_ENCODER_DECODER, page);
+ if( deferredEncoding ) {
+ paged.remove(PAGE_ENCODER_DECODER, page);
+ } else {
+ PAGE_ENCODER_DECODER.remove(paged, page);
+ }
paged.allocator().free(page, 1);
}
Modified: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java (original)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java Fri Oct 16 18:56:37 2009
@@ -166,7 +166,13 @@
}
<Key,Value> int index(Key x) {
- return Math.abs(x.hashCode()%capacity);
+ try {
+ return Math.abs(x.hashCode()%capacity);
+ } catch (ArithmeticException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ throw e;
+ }
}
@Override
Modified: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java (original)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFile.java Fri Oct 16 18:56:37 2009
@@ -16,20 +16,20 @@
*/
package org.apache.hawtdb.internal.page;
+import java.io.Externalizable;
import java.io.IOException;
+import java.io.ObjectInput;
import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
-import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.CRC32;
import javolution.io.Struct;
@@ -38,11 +38,9 @@
import org.apache.activemq.util.buffer.Buffer;
import org.apache.activemq.util.list.LinkedNode;
import org.apache.activemq.util.list.LinkedNodeList;
-import org.apache.hawtdb.api.Allocator;
import org.apache.hawtdb.api.EncoderDecoder;
import org.apache.hawtdb.api.IOPagingException;
import org.apache.hawtdb.api.OptimisticUpdateException;
-import org.apache.hawtdb.api.OutOfSpaceException;
import org.apache.hawtdb.api.Paged;
import org.apache.hawtdb.api.PagingException;
import org.apache.hawtdb.api.Transaction;
@@ -71,561 +69,471 @@
*/
public final class ConcurrentPageFile {
- private static final String MAGIC = "HawtDB:MVCC Page File:1.0\n";
+ private static final String MAGIC = "HawtDB:1.0\n";
private static final int FILE_HEADER_SIZE = 1024 * 4;
public static final int PAGE_ALLOCATED = -1;
public static final int PAGE_FREED = -2;
-// public static final int PAGE_CACHED_WRITE = -3;
public static final int HEADER_SIZE = 1024*4;
- static class CacheUpdate {
- private final int page;
- private Object value;
- private EncoderDecoder<?> marshaller;
-
- public CacheUpdate(int page, Object value, EncoderDecoder<?> marshaller) {
- this.page = page;
- this.value = value;
- this.marshaller = marshaller;
- }
-
- public void reset(Object value, EncoderDecoder<?> marshaller) {
- this.value = value;
- this.marshaller = marshaller;
- }
+ /**
+ * The first 4K of the file is used to hold 2 copies of the header.
+ * Each copy is 2K big. The header is checksummed so that corruption
+ * can be detected.
+ */
+ static private class Header extends Struct {
+
+ /** Identifies the file format */
+ public final UTF8String magic = new UTF8String(32);
+ /** The oldest applied commit revision */
+ public final Signed64 base_revision = new Signed64();
+ /** The size of each page in the page file */
+ public final Signed32 page_size = new Signed32();
+ /** The page location of the free page list */
+ public final Signed32 free_list_page = new Signed32();
+ /** points at the latest redo page which is guaranteed to be fully stored */
+ public final Signed32 redo_page = new Signed32();
+ /** The page location of the latest redo page. Not guaranteed to be fully stored */
+ public final Signed32 unsynced_redo_page = new Signed32();
+
+ /** The size of all the previous fields */
+ private static final int USED_FIELDS_SIZE = 32 + 8 + 4 + 4 + 4 + 4;
- @SuppressWarnings("unchecked")
- <T> T value() {
- return (T) value;
- }
+ /** reserves header space for future use */
+ public final UTF8String reserved = new UTF8String((FILE_HEADER_SIZE/2)-(USED_FIELDS_SIZE+8));
- @SuppressWarnings("unchecked")
- public List<Integer> store(Paged paged) {
- return ((EncoderDecoder)marshaller).store(paged, page, value);
+ /** a checksum of all the previous fields. The reserved space
+ * positions this right at the end of a 2k block */
+ public final Signed64 checksum = new Signed64();
+
+ public String toString() {
+ return "{ base_revision: "+this.base_revision.get()+
+ ", page_size: "+page_size.get()+", free_list_page: "+free_list_page.get()+
+ ", redo_page: "+unsynced_redo_page.get()+", checksum: "+checksum.get()+
+ " }";
}
}
-
+
/**
- * Transaction objects are NOT thread safe. Users of this object should
- * guard it from concurrent access.
+ * Tracks the page changes that were part of a commit.
+ *
+ * Commits can be merged, in that sense this then tracks range of commits.
*
* @author chirino
*/
- private final class ConcurrentTransaction implements Transaction {
- private HashMap<Integer, CacheUpdate> cache;
- private HashMap<Integer, Integer> updates;
- private Snapshot snapshot;
-
- private final Allocator txallocator = new Allocator() {
-
- public void free(int pageId, int count) {
- // TODO: this is not a very efficient way to handle allocation ranges.
- int end = pageId+count;
- for (int key = pageId; key < end; key++) {
- Integer previous = getUpdates().put(key, PAGE_FREED);
-
- // If it was an allocation that was done in this
- // tx, then we can directly release it.
- assert previous!=null;
- if( previous == PAGE_ALLOCATED) {
- getUpdates().remove(key);
- allocator.free(key, 1);
- }
- }
- }
-
- public int alloc(int count) throws OutOfSpaceException {
- int pageId = allocator.alloc(count);
- // TODO: this is not a very efficient way to handle allocation ranges.
- int end = pageId+count;
- for (int key = pageId; key < end; key++) {
- getUpdates().put(key, PAGE_ALLOCATED);
- }
- return pageId;
- }
+ static class Commit extends LinkedNode<Commit> {
- public void unfree(int pageId, int count) {
- throw new UnsupportedOperationException();
- }
-
- public void clear() throws UnsupportedOperationException {
- throw new UnsupportedOperationException();
- }
-
- public int getLimit() {
- return allocator.getLimit();
- }
-
- public boolean isAllocated(int page) {
- return allocator.isAllocated(page);
- }
-
- };
-
- public <T> T get(EncoderDecoder<T> marshaller, int page) {
- // Perhaps the page was updated in the current transaction...
- CacheUpdate rc = cache == null ? null : cache.get(page);
- if( rc != null ) {
- return rc.<T>value();
- }
-
- // No? Then ask the snapshot to load the object.
- return snapshot().cacheLoad(marshaller, page);
- }
-
- public <T> void put(EncoderDecoder<T> marshaller, int page, T value) {
- Integer update = getUpdates().get(page);
- if (update == null) {
- // This is the first time this transaction updates the page...
- snapshot();
- update = allocator.alloc(1);
- getUpdates().put(page, update);
- getCacheUpdates().put(page, new CacheUpdate(update, value, marshaller));
- } else {
- // We have updated it before...
- switch (update) {
- case PAGE_FREED:
- throw new PagingException("You should never try to write a page that has been freed.");
- case PAGE_ALLOCATED:
- getCacheUpdates().put(page, new CacheUpdate(page, value, marshaller));
- break;
- default:
- CacheUpdate cu = getCacheUpdates().get(page);
- if( cu == null ) {
- throw new PagingException("You should never try to store mix using the cached objects with normal page updates.");
- }
- cu.reset(value, marshaller);
- }
- }
- }
+ /** the redo that this commit is stored in */
+ private final Redo redo;
+ /** oldest revision in the commit range. */
+ private long base;
+ /** newest revision in the commit range, will match base if this only tracks one commit */
+ private long head;
+ /** set to the open snapshot who's head is this commit */
+ private Snapshot snapshot;
+ /** all the page updates that are part of the redo */
+ private HashMap<Integer, Integer> updates;
+ /** the deferred updates that need to be done in this redo */
+ private HashMap<Integer, DeferredUpdate> deferredUpdates;
- public <T> void remove(EncoderDecoder<T> marshaller, int page) {
- marshaller.remove(ConcurrentTransaction.this, page);
+ public Commit(long version, HashMap<Integer, Integer> updates, HashMap<Integer, DeferredUpdate> deferredUpdates, Redo redo) {
+ this.redo = redo;
+ this.head = this.base = version;
+ this.updates = updates;
+ this.deferredUpdates = deferredUpdates;
}
- public Allocator allocator() {
- return txallocator;
+ public String toString() {
+ int updateSize = updates==null ? 0 : updates.size();
+ int cacheSize = deferredUpdates==null ? 0 : deferredUpdates.size();
+ return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+", cache: "+cacheSize+" }";
}
- public void read(int pageId, Buffer buffer) throws IOPagingException {
-
- Integer updatedPageId = updates == null ? null : updates.get(pageId);
- if (updatedPageId != null) {
- switch (updatedPageId) {
- case PAGE_ALLOCATED:
- case PAGE_FREED:
- // TODO: Perhaps use a RuntimeException subclass.
- throw new PagingException("You should never try to read a page that has been allocated or freed.");
- default:
- // read back in the updated we had done.
- pageFile.read(updatedPageId, buffer);
+ public long commitCheck(HashMap<Integer, Integer> newUpdate) {
+ for (Integer page : newUpdate.keySet()) {
+ if( updates.containsKey( page ) ) {
+ throw new OptimisticUpdateException();
}
- } else {
- // Get the data from the snapshot.
- snapshot().read(pageId, buffer);
}
+ return head;
}
- public ByteBuffer slice(SliceType type, int page, int count) throws IOPagingException {
- //TODO: need to improve the design of ranged ops..
- if( type==SliceType.READ ) {
- Integer updatedPageId = updates == null ? null : updates.get(page);
- if (updatedPageId != null) {
- switch (updatedPageId) {
- case PAGE_ALLOCATED:
- case PAGE_FREED:
- throw new PagingException("You should never try to read a page that has been allocated or freed.");
- }
- return pageFile.slice(type, updatedPageId, count);
+ public void putAll(HashMap<Integer, Integer> udpates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
+ if( udpates!=null ) {
+ if( this.updates == null ) {
+ this.updates = udpates;
} else {
- // Get the data from the snapshot.
- return snapshot().slice(page, count);
+ this.updates.putAll(udpates);
}
-
- } else {
- Integer update = getUpdates().get(page);
- if (update == null) {
- update = allocator.alloc(count);
-
- if (type==SliceType.READ_WRITE) {
- ByteBuffer slice = snapshot().slice(page, count);
- try {
- pageFile.write(update, slice);
- } finally {
- pageFile.unslice(slice);
- }
- }
-
- int end = page+count;
- for (int i = page; i < end; i++) {
- getUpdates().put(i, PAGE_ALLOCATED);
- }
- getUpdates().put(page, update);
-
- return pageFile.slice(type, update, count);
+ }
+ if( deferredUpdates!=null ) {
+ if( this.deferredUpdates == null ) {
+ this.deferredUpdates = deferredUpdates;
} else {
- switch (update) {
- case PAGE_FREED:
- throw new PagingException("You should never try to write a page that has been freed.");
- case PAGE_ALLOCATED:
- break;
- default:
- page = update;
- }
+ this.deferredUpdates.putAll(deferredUpdates);
}
- return pageFile.slice(type, page, count);
-
}
-
}
- public void unslice(ByteBuffer buffer) {
- pageFile.unslice(buffer);
+ }
+
+ /**
+ * Aggregates a group of commits so that they can be more efficiently operated against.
+ *
+ */
+ static private class Redo extends LinkedNode<Redo> implements Externalizable {
+ private static final long serialVersionUID = 1188640492489990493L;
+
+ /** the pageId that this redo batch is stored at */
+ private transient int page=-1;
+ /** points to a previous redo batch page */
+ public int previous=-1;
+ /** was the redo loaded in the {@link recover} method */
+ private transient boolean recovered;
+ /** the commits stored in the redo */
+ private transient LinkedNodeList<Commit> commits = new LinkedNodeList<Commit>();
+ /** all the page updates that are part of the redo */
+ private ConcurrentHashMap<Integer, Integer> updates = new ConcurrentHashMap<Integer, Integer>();
+ /** the deferred updates that need to be done in this redo */
+ private transient ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates = new ConcurrentHashMap<Integer, DeferredUpdate>();
+ /** tracks how many snapshots are referencing the redo */
+ private int references;
+ /** set to the open snapshot who's head is before this redo */
+ private Snapshot prevSnapshot;
+ /** set to the open snapshot who's head is this redo */
+ private Snapshot snapshot;
+ /** the oldest commit in this redo */
+ public long base=-1;
+ /** the newest commit in this redo */
+ public long head;
+
+ @SuppressWarnings("unused")
+ public Redo() {
+ }
+
+ public Redo(long head) {
+ this.head = head;
}
- public void write(int page, Buffer buffer) throws IOPagingException {
- Integer update = getUpdates().get(page);
- if (update == null) {
- // We are updating an existing page in the snapshot...
- snapshot();
- update = allocator.alloc(1);
- getUpdates().put(page, update);
- page = update;
- } else {
- switch (update) {
- case PAGE_FREED:
- throw new PagingException("You should never try to write a page that has been freed.");
- case PAGE_ALLOCATED:
- break;
- default:
- page = update;
- }
- }
- pageFile.write(page, buffer);
+ public String toString() {
+ int count = updates==null ? 0 : updates.size();
+ return "{ page: "+this.page+", updates: "+count+", previous: "+previous+" }";
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(head);
+ out.writeLong(base);
+ out.writeInt(previous);
+ out.writeObject(updates);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ head = in.readLong();
+ base = in.readLong();
+ previous = in.readInt();
+ updates = (ConcurrentHashMap<Integer, Integer>) in.readObject();
+ }
- public void commit() throws IOPagingException {
- boolean failed = true;
- try {
- if (updates!=null) {
- Update previousUpdate = snapshot==null ? null : snapshot.updates.get(0);
- ConcurrentPageFile.this.commit(previousUpdate, updates, cache);
- }
- failed = false;
- } finally {
- // Rollback if the commit fails.
- if (failed) {
- freeAllocatedPages();
- }
- ConcurrentPageFile.this.releaseSnapshot(snapshot);
- updates = null;
- cache = null;
- snapshot = null;
- }
+ public int pageCount() {
+ int rc = 0;
+ rc = updates.size();
+ // TODO: we can probably get an idea of how many pages the deferred update will use.
+ // rc += deferredUpdates.size();
+ return rc;
}
- public void rollback() throws IOPagingException {
- try {
- if (updates!=null) {
- freeAllocatedPages();
+ public long commitCheck(HashMap<Integer, Integer> newUpdate) {
+ for (Integer page : newUpdate.keySet()) {
+ if( updates.containsKey( page ) ) {
+ throw new OptimisticUpdateException();
}
- } finally {
- ConcurrentPageFile.this.releaseSnapshot(snapshot);
- updates = null;
- cache = null;
- snapshot = null;
}
+ return head;
}
- private void freeAllocatedPages() {
- for (Entry<Integer, Integer> entry : updates.entrySet()) {
- switch (entry.getValue()) {
- case PAGE_FREED:
- // Don't need to do anything..
- break;
- case PAGE_ALLOCATED:
- default:
- // We need to free the page that was allocated for the
- // update..
- allocator.free(entry.getKey(), 1);
- }
+ public void putAll(HashMap<Integer, Integer> updates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
+ if( updates!=null ) {
+ this.updates.putAll(updates);
}
- }
-
- public Snapshot snapshot() {
- if (snapshot == null) {
- snapshot = aquireSnapshot();
+ if( deferredUpdates!=null ) {
+ this.deferredUpdates.putAll(deferredUpdates);
}
- return snapshot;
}
- public boolean isReadOnly() {
- return updates == null;
- }
+ }
- public HashMap<Integer, CacheUpdate> getCacheUpdates() {
- if( cache==null ) {
- cache = new HashMap<Integer, CacheUpdate>();
- }
- return cache;
+ /**
+ * Provides a snapshot view of the page file.
+ *
+ * @author chirino
+ */
+ abstract class Snapshot {
+ /** The number of transactions that are holding this snapshot open */
+ protected int references;
+
+ public String toString() {
+ return "{ references: "+this.references+" }";
}
- private HashMap<Integer, Integer> getUpdates() {
- if (updates == null) {
- updates = new HashMap<Integer, Integer>();
- }
- return updates;
+ public int mapPageId(int page) {
+ return page;
}
-
- public int getPageSize() {
- return pageFile.getPageSize();
+
+ public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
+ return readCache.cacheLoad(marshaller, page);
}
-
- public String toString() {
- int updatesSize = updates==null ? 0 : updates.size();
- return "{ snapshot: "+this.snapshot+", updates: "+updatesSize+" }";
+
+ public void read(int pageId, Buffer buffer) throws IOPagingException {
+ pageId = mapPageId(pageId);
+ pageFile.read(pageId, buffer);
}
- public int pages(int length) {
- return pageFile.pages(length);
+ public ByteBuffer slice(int pageId, int count) {
+ pageId = mapPageId(pageId);
+ return pageFile.slice(SliceType.READ, pageId, count);
}
+
+ abstract public Snapshot open();
+ abstract public void close();
+ abstract public long commitCheck(HashMap<Integer, Integer> pageUpdates);
+ }
+
+ class PreviousSnapshot extends Snapshot {
+ private final Redo redo;
- public void flush() {
- ConcurrentPageFile.this.flush();
+ public PreviousSnapshot(Redo redo) {
+ this.redo = redo;
+ }
+
+ @Override
+ public Snapshot open() {
+ references++;
+ if( references==1 ) {
+ redo.references++;
+ redo.prevSnapshot = this;
+ }
+ return this;
+ }
+
+ @Override
+ public void close() {
+ references--;
+ if( references==0 ) {
+ redo.references--;
+ redo.prevSnapshot = null;
+ }
+ }
+
+ public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
+ long rc = 0;
+ Redo cur = redo;
+ while (cur != null) {
+ rc = cur.commitCheck(pageUpdates);
+ cur = cur.getNext();
+ }
+ return rc;
}
}
- static class Update extends LinkedNode<Update> implements Serializable {
- private static final long serialVersionUID = 9160865012544031094L;
-
- transient final AtomicBoolean applied = new AtomicBoolean();
- private final Redo redo;
+ class RedosSnapshot extends Snapshot {
- long base;
- long head;
- Snapshot snapshot;
+ /** The snapshot will load page updates from the following redo list. */
+ protected final List<Redo> redosInSnapshot;
- HashMap<Integer, Integer> updates;
- transient HashMap<Integer, CacheUpdate> cache;
-
- public Update(long version, HashMap<Integer, Integer> updates, HashMap<Integer, CacheUpdate> cache, Redo redo) {
- this.redo = redo;
- this.head = this.base = version;
- this.updates = updates;
- this.cache = new HashMap<Integer, CacheUpdate>();
+ public RedosSnapshot() {
+ this.redosInSnapshot = snapshotRedos();
}
- /**
- * Merges previous updates that can be merged with this update.
- */
- public void mergePrevious() {
- prev = getPrevious();
- while( prev!=null && prev.snapshot==null && prev.redo==redo ) {
-
- assert prev.head+1 == this.base;
- this.base = prev.base;
-
- if( prev.updates!=null ) {
- if(this.updates!=null) {
- prev.updates.putAll(this.updates);
- }
- this.updates = prev.updates;
+ public Snapshot open() {
+ references++;
+ if( references==1 ) {
+ for (Redo redo : redosInSnapshot) {
+ redo.references++;
}
-
- if( prev.cache!=null ) {
- if( this.cache!=null ) {
- prev.cache.putAll(this.cache);
+ redosInSnapshot.get(0).snapshot = this;
+ }
+ return this;
+ }
+
+ public void close() {
+ references--;
+ if( references==0 ) {
+ for (Redo redo : redosInSnapshot) {
+ redo.references--;
+ }
+ redosInSnapshot.get(0).snapshot = null;
+ }
+ }
+
+ public int mapPageId(int page) {
+ // It may be in the redos..
+ for (Redo redo : redosInSnapshot) {
+ Integer updatedPage = redo.updates.get(page);
+ if (updatedPage != null) {
+ switch (updatedPage) {
+ case PAGE_FREED:
+ throw new PagingException("You should never try to read page that has been freed.");
+ case PAGE_ALLOCATED:
+ return page;
+ default:
+ return updatedPage;
}
- this.cache = prev.cache;
}
- prev.unlink();
}
+
+ return super.mapPageId(page);
}
-
+ public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
+ for (Redo redo : redosInSnapshot) {
+ DeferredUpdate cu = redo.deferredUpdates.get(page);
+ if (cu != null) {
+ return cu.<T>value();
+ }
+ }
+ return super.cacheLoad(marshaller, page);
+ }
- public String toString() {
- int updateSize = updates==null ? 0 : updates.size();
- int cacheSize = cache==null ? 0 : cache.size();
- return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+", cache: "+cacheSize+", applied: "+applied+" }";
+ public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
+ long rc = 0;
+ Redo cur = redosInSnapshot.get(0).getNext();
+ while (cur != null) {
+ rc = cur.commitCheck(pageUpdates);
+ cur = cur.getNext();
+ }
+ return rc;
}
+
}
+
+ class CommitsSnapshot extends RedosSnapshot {
+ /** The snapshot will load page updates from the following commit and all it's previous linked commits. */
+ private final Commit commit;
- /**
- * @author chirino
- */
- private class Snapshot {
-
- private final ArrayList<Update> updates;
- private int references;
-
- public Snapshot(ArrayList<Update> updates) {
- this.updates = updates;
+ public CommitsSnapshot(Commit commit) {
+ this.commit= commit;
}
- private int mapPageId(int page) {
- for (Update update : updates) {
- if( update.updates!=null ) {
- Integer updatedPage = update.updates.get(page);
- if (updatedPage != null) {
- switch (updatedPage) {
- case PAGE_FREED:
- throw new PagingException("You should never try to read page that has been freed.");
- case PAGE_ALLOCATED:
- break;
- default:
- page = updatedPage;
- }
- break;
- }
+ public Snapshot open() {
+ references++;
+ if( references==1 ) {
+ for (Redo redo : redosInSnapshot) {
+ redo.references++;
}
+ commit.redo.references++;
+ commit.snapshot = this;
}
- return page;
- }
-
- void read(int pageId, Buffer buffer) throws IOPagingException {
- pageId = mapPageId(pageId);
- pageFile.read(pageId, buffer);
- }
-
- public ByteBuffer slice(int pageId, int count) {
- pageId = mapPageId(pageId);
- return pageFile.slice(SliceType.READ, pageId, count);
+ return this;
}
- private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
- // See if any of the updates in the snapshot have an update of the
- // requested page...
- for (Update update : updates) {
- if( update.cache!=null ) {
- CacheUpdate cu = update.cache.get(pageId);
- if (cu != null) {
- return cu.<T>value();
- }
+ public void close() {
+ references--;
+ if( references==0 ) {
+ for (Redo redo : redosInSnapshot) {
+ redo.references--;
}
+ commit.redo.references--;
+ commit.snapshot = null;
}
- return readCache.cacheLoad(marshaller, pageId);
}
- public String toString() {
- return "{ updates: "+this.updates.size()+", references: "+this.references+" }";
- }
- }
- class ReadCache {
- Map<Integer, Object> map = Collections.synchronizedMap(new LRUCache<Integer, Object>(1024));
-
- @SuppressWarnings("unchecked")
- private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
- T rc = (T) map.get(pageId);
- if( rc ==null ) {
- rc = marshaller.load(pageFile, pageId);
- map.put(pageId, rc);
+ public int mapPageId(int page) {
+
+ // Check to see if it's in the current update list...
+ Commit update = this.commit;
+ while (update!=null) {
+ Integer updatedPage = update.updates.get(page);
+ if (updatedPage != null) {
+ switch (updatedPage) {
+ case PAGE_FREED:
+ throw new PagingException("You should never try to read page that has been freed.");
+ case PAGE_ALLOCATED:
+ return page;
+ default:
+ return updatedPage;
+ }
+ }
+ update = update.getPrevious();
}
- return rc;
- }
- }
-
- private final ReadCache readCache = new ReadCache();
-
- /**
- * The redo log is composed of a linked list of RedoBatch records. A
- * RedoBatch stores the redo data for multiple updates.
- *
- * @author chirino
- */
- static private class Redo implements Serializable {
- private static final long serialVersionUID = 1188640492489990493L;
-
- /** the pageId that this redo batch is stored at */
- private transient int page=-1;
- private transient boolean recovered;
-
- private long base;
- private long head;
-
- final HashMap<Integer, Integer> updates = new HashMap<Integer, Integer>();
- transient final HashMap<Integer, CacheUpdate> cache = new HashMap<Integer, CacheUpdate>();
-
- /** points to a previous redo batch page */
- public int previous=-1;
-
- public String toString() {
- int count = updates==null ? 0 : updates.size();
- return "{ page: "+this.page+", updates: "+count+", previous: "+previous+" }";
+
+ return super.mapPageId(page);
}
- public int pageCount() {
- int rc = 0;
- if( updates!=null ) {
- rc = updates.size();
+ public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
+ // Check to see if it's in the current update list...
+ Commit update = this.commit;
+ while (update!=null) {
+ DeferredUpdate du = update.deferredUpdates.get(page);
+ if (du != null) {
+ return du.<T>value();
+ }
+ update = update.getPrevious();
}
- if( cache!=null ) {
- rc = cache.size();
+
+ return super.cacheLoad(marshaller, page);
+ }
+
+ public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
+ long rc = 0;
+
+ Commit next = this.commit.getNext();
+ while (next != null) {
+ rc = next.commitCheck(pageUpdates);
+ next = next.getNext();
+ }
+
+ Redo cur = this.commit.redo.getNext();
+ while (cur != null) {
+ rc = cur.commitCheck(pageUpdates);
+ cur = cur.getNext();
}
return rc;
}
}
- private class Header extends Struct {
- public final UTF8String file_magic = new UTF8String(80);
- public final Signed64 base_revision = new Signed64();
- public final Signed32 page_size = new Signed32();
- public final Signed32 free_list_page = new Signed32();
- /** points at the latest redo page which might have been partially stored */
- public final Signed32 redo_page = new Signed32();
- /** points at the latest redo page which is guaranteed to be fully stored */
- public final Signed32 synced_redo_page = new Signed32();
- public final Signed64 checksum = new Signed64();
-
- public String toString() {
- return "{ base_revision: "+this.base_revision.get()+
- ", page_size: "+page_size.get()+", free_list_page: "+free_list_page.get()+
- ", redo_page: "+redo_page.get()+", checksum: "+checksum.get()+
- " }";
- }
- }
+ private final MemoryMappedFile file;
+ final SimpleAllocator allocator;
+ final PageFile pageFile;
+ private static final int updateBatchSize = 1024;
+
+ /** The header structure of the file */
+ private final Header header = new Header();
- /**
- * A list of updates to the page file. The list head points to the most
- * recent update.
- */
-// private final HashMap<Long, Update> updatesMap = new HashMap<Long, Update>();
- private final LinkedNodeList<Update> updatesList = new LinkedNodeList<Update>();
-
- /**
- * This is the next redo that will get logged. It is currently being built.
- */
- Redo nextRedo;
+ int lastRedoPage = -1;
- /**
- * These are stored redos that are waiting for a file sync. They may or may not survive
- * a failure.
- */
- private final LinkedList<Redo> unsyncedRedos = new LinkedList<Redo>();
+ private final LinkedNodeList<Redo> redos = new LinkedNodeList<Redo>();
- /**
- * These are stored redos that have been file synced. They should survive a failure.
- */
- private final LinkedList<Redo> syncedRedos = new LinkedList<Redo>();
+ //
+ // The following Redo objects point to linked nodes in the previous redo list.
+ // They are used to track designate the state of the redo object.
+ //
- /**
- * These are updates which have been applied but who's temp pages have not yet been
- * freed since they are being used by some snapshots.
- */
- private final LinkedList<Update> updatesWaitingCleanup = new LinkedList<Update>();
+ /** The current redo that is currently being built */
+ Redo buildingRedo;
+ /** The stored redos. These might be be recoverable. */
+ Redo storedRedos;
+ /** The synced redos. A file sync occurred after these redos were stored. */
+ Redo syncedRedos;
+ /** The performed redos. Updates are actually performed to the original page file. */
+ Redo performedRedos;
- private final Header header = new Header();
+ /** Used as cache read objects */
+ private ReadCache readCache = new ReadCache();
- private final PageFile pageFile;
- private final SimpleAllocator allocator;
- private final MemoryMappedFile file;
+ /** Mutex for data structures which are used during house keeping tasks like redo management. Once acquired, you can also acquire the TRANSACTION_MUTEX */
+ private final Object HOUSE_KEEPING_MUTEX = "HOUSE_KEEPING_MUTEX";
+
+ /** Mutex for data structures which transaction threads access. Never attempt to acquire the HOUSE_KEEPING_MUTEX once this mutex is acquired. */
+ private final Object TRANSACTION_MUTEX = "TRANSACTION_MUTEX";
+
/**
* This is the free page list at the base revision. It does not track allocations in transactions
@@ -644,139 +552,210 @@
this.allocator = pageFile.allocator();
ByteBuffer slice = file.slice(false, 0, FILE_HEADER_SIZE);
this.header.setByteBuffer(slice, slice.position());
- this.updatesList.addLast(new Update(0, null, null, null));
}
public Transaction tx() {
- return new ConcurrentTransaction();
+ return new ConcurrentTransaction(this);
+ }
+
+ /**
+ * Attempts to commit a set of page updates.
+ *
+ * @param updatedSnapshot
+ * @param pageUpdates
+ * @param deferredUpdates
+ */
+ void commit(Snapshot updatedSnapshot, HashMap<Integer, Integer> pageUpdates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
+
+ boolean fullRedo=false;
+ synchronized (TRANSACTION_MUTEX) {
+
+ // we need to figure out the revision id of the this commit...
+ long rev;
+ if( updatedSnapshot!=null ) {
+
+ // Lets check for an OptimisticUpdateException
+ // verify that the new commit's updates don't conflict with a commit that occurred
+ // subsequent to the snapshot that this commit started operating on.
+
+ // Note: every deferred update has an entry in the pageUpdates, so no need to
+ // check to see if that map also conflicts.
+ rev = updatedSnapshot.commitCheck(pageUpdates);
+
+ } else {
+ rev = buildingRedo.head;
+ }
+ rev++;
+
+ if( buildingRedo.base == -1 ) {
+ buildingRedo.base = rev;
+ }
+
+ buildingRedo.head = rev;
+
+ // TODO: This map merging has to be a bit CPU intensive.. need
+ // to look for ways to optimize it out.
+ buildingRedo.putAll(pageUpdates, deferredUpdates);
+
+ Commit last = buildingRedo.commits.getTail();
+ if( last==null || last.snapshot!=null ) {
+ last = new Commit(rev, pageUpdates, deferredUpdates, buildingRedo);
+ buildingRedo.commits.addLast(last);
+ } else {
+ // we can merge into the previous commit.
+ last.putAll(pageUpdates, deferredUpdates);
+ }
+
+ if( buildingRedo.pageCount() > updateBatchSize ) {
+ fullRedo = true;
+ }
+ }
+
+ if( fullRedo ) {
+ synchronized (HOUSE_KEEPING_MUTEX) {
+ storeRedos(false);
+ // TODO: do the following actions async.
+ syncRedos();
+ performRedos();
+ }
+ }
}
-
+
/**
* Used to initialize a new file or to clear out the
* contents of an existing file.
*/
public void reset() {
- updatesList.clear();
- updatesList.addLast(new Update(0, null, null, null));
- unsyncedRedos.clear();
- updatesWaitingCleanup.clear();
- allocator.clear();
- baseRevisionFreePages.clear();
- baseRevisionFreePages.add(0, allocator.getLimit());
-
- // Initialize the file header..
- this.header.setByteBufferPosition(0);
- this.header.file_magic.set(MAGIC);
- this.header.base_revision.set(0);
- this.header.free_list_page.set(-1);
- this.header.page_size.set(pageFile.getPageSize());
- this.header.redo_page.set(-1);
- replicateHeader();
- }
+ synchronized (HOUSE_KEEPING_MUTEX) {
+ redos.clear();
+ performedRedos = syncedRedos = storedRedos = buildingRedo = new Redo(-1);
+ redos.addFirst(buildingRedo);
+
+ lastRedoPage = -1;
+ readCache.clear();
+
+ allocator.clear();
+ baseRevisionFreePages.clear();
+ baseRevisionFreePages.add(0, allocator.getLimit());
+ // Initialize the file header..
+ Header h = header();
+ h.setByteBufferPosition(0);
+ h.magic.set(MAGIC);
+ h.base_revision.set(0);
+ h.free_list_page.set(-1);
+ h.page_size.set(pageFile.getPageSize());
+ h.reserved.set("");
+ h.unsynced_redo_page.set(-1);
+ replicateHeader();
+ }
+ }
/**
* Loads an existing file and replays the redo
* logs to put it in a consistent state.
*/
public void recover() {
-
- unsyncedRedos.clear();
- updatesWaitingCleanup.clear();
+ synchronized (HOUSE_KEEPING_MUTEX) {
- this.header.setByteBufferPosition(0);
- long baseRevision = header.base_revision.get();
- updatesList.clear();
- updatesList.addLast(new Update(baseRevision, null, null, null));
-
- // Initialize the free page list.
- int pageId = header.free_list_page.get();
- if( pageId >= 0 ) {
- baseRevisionFreePages = loadObject(pageId);
- allocator.copy(baseRevisionFreePages);
- Extent.unfree(pageFile, pageId);
- } else {
- allocator.clear();
- baseRevisionFreePages.add(0, allocator.getLimit());
- }
-
- // Load the redo batches.
- pageId = header.redo_page.get();
- while( pageId >= 0 ) {
- Redo redo = loadObject(pageId);
- redo.page = pageId;
- redo.recovered = true;
- Extent.unfree(pageFile, pageId);
+ redos.clear();
+ performedRedos = syncedRedos = storedRedos = buildingRedo = new Redo(-1);
+ redos.addFirst(buildingRedo);
+ lastRedoPage = -1;
+ readCache.clear();
+
+ Header h = header();
+ if( !MAGIC.equals( h.magic.get()) ) {
+ throw new PagingException("The file header is not of the expected type.");
+ }
- pageId=-1;
- if( baseRevision < redo.head ) {
-
- // add first since we are loading redo objects oldest to youngest
- // but want to put them in the list youngest to oldest.
- unsyncedRedos.addFirst(redo);
+ long baseRevision = h.base_revision.get();
+
+ // Initialize the free page list.
+ int pageId = h.free_list_page.get();
+ if( pageId >= 0 ) {
+ baseRevisionFreePages = loadObject(pageId);
+ allocator.copy(baseRevisionFreePages);
+ Extent.unfree(pageFile, pageId);
+ } else {
+ allocator.clear();
+ baseRevisionFreePages.add(0, allocator.getLimit());
+ }
+
+ // Load the redo batches.
+ pageId = h.unsynced_redo_page.get();
+ while( pageId >= 0 ) {
+ Redo redo = loadObject(pageId);
+ redo.page = pageId;
+ redo.recovered = true;
+ Extent.unfree(pageFile, pageId);
- if( baseRevision < redo.base ) {
- pageId=redo.previous;
+ if( buildingRedo.head == -1 ) {
+ buildingRedo.head = redo.head;
+ }
+
+ pageId=-1;
+ if( baseRevision < redo.head ) {
+
+ // add first since we are loading redo objects oldest to youngest
+ // but want to put them in the list youngest to oldest.
+ redos.addFirst(redo);
+ syncedRedos = redo;
+
+ if( baseRevision < redo.base ) {
+ pageId=redo.previous;
+ }
}
}
- }
-
- // Apply all the redos..
- applyRedos();
+
+ // Apply all the redos..
+ performRedos();
+ }
}
-
/**
* Once this method returns, any previously committed transactions
* are flushed and to the disk, ensuring that they will not be lost
- * upon failure.
+ * upon failure.
*/
public void flush() {
-
- // Write out the current redo if it has data...
- Redo redo;
- synchronized (this) {
- redo = nextRedo;
- nextRedo = null;
- }
- if( redo == null ) {
- store(redo);
- }
-
- // Find out if there are unsynced redos...
- redo = null;
- synchronized (this) {
- if( !unsyncedRedos.isEmpty() ) {
- for (Redo r : unsyncedRedos) {
- syncedRedos.add(r);
- redo = r;
- }
- }
+ synchronized (HOUSE_KEEPING_MUTEX) {
+ storeRedos(true);
+ syncRedos();
}
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ //
+ // Methods which transition redos through their life cycle states;
+ //
+ // building -> stored -> synced -> performed -> released
+ //
+ // The HOUSE_KEEPING_MUTEX must be acquired before being called.
+ //
+ // /////////////////////////////////////////////////////////////////
+
+ /**
+ * Attempts to perform a redo state change: building -> stored
+ */
+ private void storeRedos(boolean force) {
+ Redo redo;
- // Yep.. we had some..
- if( redo!=null ) {
- // This is a slow operation..
- file.sync();
- // Update the header so that it knows about the redos that are
- // guaranteed to survive a failure.
- header().synced_redo_page.set(redo.page);
- replicateHeader();
+ // We synchronized /w the transactions so that they see the state change.
+ synchronized (TRANSACTION_MUTEX) {
+ // Re-checking since storing the redo may not be needed.
+ if( (force && buildingRedo.base!=-1 ) || buildingRedo.pageCount() > updateBatchSize ) {
+ redo = buildingRedo;
+ buildingRedo = new Redo(redo.head);
+ redos.addLast(buildingRedo);
+ } else {
+ return;
+ }
}
- }
-
- private Header header() {
- this.header.getByteBuffer().position(0);
- this.header.setByteBufferPosition(0);
- Header h = this.header;
- return h;
- }
-
- public void store(Redo redo) {
// Write any outstanding deferred cache updates...
- if( redo.cache != null ) {
- for (Entry<Integer, CacheUpdate> entry : redo.cache.entrySet()) {
- CacheUpdate cu = entry.getValue();
+ if( redo.deferredUpdates != null ) {
+ for (Entry<Integer, DeferredUpdate> entry : redo.deferredUpdates.entrySet()) {
+ DeferredUpdate cu = entry.getValue();
List<Integer> allocatedPages = cu.store(pageFile);
for (Integer page : allocatedPages) {
// add any allocated pages to the update list so that the free
@@ -787,85 +766,154 @@
}
// Link it to the last redo.
- Redo last = unsyncedRedos.getLast();
- if( last!=null ) {
- redo.previous = last.page;
- }
+ redo.previous = lastRedoPage;
- // Store the redo.
- redo.page = storeObject(redo);
- synchronized (this) {
- unsyncedRedos.add(redo);
- }
+ // Store the redo record.
+ lastRedoPage = redo.page = storeObject(redo);
// Update the header to know about the new redo page.
- header().redo_page.set(redo.page);
+ header().unsynced_redo_page.set(redo.page);
replicateHeader();
}
-
+
/**
- * Frees up space by applying redos and releasing the pages that
- * the redo was stored on.
+ * Performs a file sync.
+ *
+ * This allows two types of redo state changes to occur:
+ * <ul>
+ * <li> stored -> synced
+ * <li> performed -> released
+ * </ul>
*/
- public void applyRedos() {
+ private void syncRedos() {
- // We can only apply redos which we know are not partially stored on disk
- // and which hold revisions which are older than the oldest active snapshot.
- ArrayList<Redo> redoList = new ArrayList<Redo>();
- synchronized (this) {
- long snapshotHeadRev = Long.MAX_VALUE;
- Update cur = updatesList.getHead();
- while( cur!=null ) {
- if( cur.snapshot!=null ) {
- snapshotHeadRev = cur.head;
- break;
- }
+ // This is a slow operation..
+ file.sync();
+ Header h = header();
+
+ // Update the base_revision with the last performed revision.
+ if (performedRedos!=syncedRedos) {
+ Redo lastPerformedRedo = syncedRedos.getPrevious();
+ h.base_revision.set(lastPerformedRedo.head);
+ }
+
+ // Were there some redos in the stored state?
+ if (storedRedos!=buildingRedo) {
+
+ // The last stored is actually synced now..
+ Redo lastStoredRedo = buildingRedo.getPrevious();
+ // Let the header know about it..
+ h.redo_page.set(lastStoredRedo.page);
+
+ // We synchronized /w the transactions so that they see the state change.
+ synchronized (TRANSACTION_MUTEX) {
+ // Transition stored -> synced.
+ storedRedos = buildingRedo;
+ }
+ }
+
+ // Once a redo has been performed, subsequently synced, and no longer referenced,
+ // it's allocated recovery space can be released.
+ while( performedRedos!=syncedRedos ) {
+ if( performedRedos.references!=0 ) {
+ break;
}
- for (Iterator<Redo> i = this.unsyncedRedos.iterator(); i.hasNext();) {
- Redo redo = i.next();
- if (redo.base > snapshotHeadRev) {
- // we can't apply the rest of the updates, since a snapshot depends on
- // the current base revision.
- // the rest of the updates will have incrementing revision numbers too.
+ // Free the update pages associated with the redo.
+ for (Entry<Integer, Integer> entry : performedRedos.updates.entrySet()) {
+ int key = entry.getKey();
+ int value = entry.getValue();
+
+ switch( value ) {
+ case PAGE_ALLOCATED:
+ // It was a new page that was written.. we don't need to
+ // free it.
+ break;
+ case PAGE_FREED:
+ // update freed a previous page.. now is when we actually free it.
+ allocator.free(key, 1);
break;
+ default:
+ // This updated the 'key' page, now we can release the 'value' page
+ // since it has been copied over the 'key' page and is no longer needed.
+ allocator.free(value, 1);
}
- redoList.add(redo);
- i.remove();
}
+
+ // Free the redo record itself.
+ Extent.free(pageFile, performedRedos.page);
+
+ // don't need to sync /w transactions since they don't use the performedRedos variable.
+ // Transition performed -> released
+ performedRedos = performedRedos.getNext();
+
+ // removes the released redo form the redo list.
+ performedRedos.getPrevious().unlink();
}
-
- // Perhaps we can't do any work...
- if( redoList.isEmpty() ) {
+
+ // Store the free list..
+ int previousFreeListPage = h.free_list_page.get();
+ h.free_list_page.set(storeObject(baseRevisionFreePages));
+ replicateHeader();
+
+ // Release the previous free list.
+ if (previousFreeListPage >= 0) {
+ Extent.free(pageFile, previousFreeListPage);
+ }
+ }
+
+ /**
+ * Attempts to perform a redo state change: synced -> performed
+ *
+ * Once a redo is performed, new snapshots will not reference
+ * the redo anymore.
+ */
+ public void performRedos() {
+
+ if( syncedRedos==storedRedos ) {
+ // There are no redos in the synced state for use to transition.
+ return;
+ }
+
+ // The last performed redo MIGHT still have an open snapshot.
+ // we can't transition from synced, until that snapshot closes.
+ Redo lastPerformed = syncedRedos.getPrevious();
+ if( lastPerformed!=null && lastPerformed.references!=0) {
return;
}
- long baseRevision = header().base_revision.get();
-
- for (Redo redo : redoList) {
- // revision numbers should be sequentially increasing.
- assert baseRevision+1==redo.base;
+ while( syncedRedos!=storedRedos ) {
- for (Entry<Integer, Integer> entry : redo.updates.entrySet()) {
+ // Performing the redo actually applies the updates to the original page locations.
+ for (Entry<Integer, Integer> entry : syncedRedos.updates.entrySet()) {
int key = entry.getKey();
int value = entry.getValue();
switch( value ) {
case PAGE_ALLOCATED:
- if( redo.recovered ) {
+ if( syncedRedos.recovered ) {
+ // If we are recovering, the allocator MIGHT not have this
+ // page as being allocated. This makes sure it's allocated so that
+ // new transaction to get this page and overwrite it in error.
allocator.unfree(key, 1);
}
+ // Update the persistent free list. This gets stored on the next sync.
baseRevisionFreePages.remove(key, 1);
break;
case PAGE_FREED:
- if( redo.recovered ) {
- allocator.free(key, 1);
- }
+ // The actual free gets done on the next file sync.
+ // Update the persistent free list. This gets stored on the next sync.
baseRevisionFreePages.add(key, 1);
break;
default:
- if( redo.recovered ) {
+ if( syncedRedos.recovered ) {
+ // If we are recovering, the allocator MIGHT not have this
+ // page as being allocated. This makes sure it's allocated so that
+ // new transaction to get this page and overwrite it in error.
allocator.unfree(key, 1);
}
+
+ // Perform the update by copying the updated 'redo page' to the original
+ // page location.
ByteBuffer slice = pageFile.slice(SliceType.READ, value, 1);
try {
pageFile.write(key, slice);
@@ -874,34 +922,103 @@
}
}
}
- baseRevision = redo.base;
+
+ // We synchronized /w the transactions so that they see the state change.
+ synchronized (TRANSACTION_MUTEX) {
+ // Transition synced -> performed
+ syncedRedos = syncedRedos.getNext();
+ }
+
+ lastPerformed = syncedRedos.getPrevious();
+ // We have to stop if the last redo performed has an open snapshot.
+ if( lastPerformed.references!=0 ) {
+ break;
+ }
}
-
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Snapshot management
+ // /////////////////////////////////////////////////////////////////
+
+ Snapshot openSnapshot() {
+ synchronized(TRANSACTION_MUTEX) {
- // force to ensure all data is fully stored before the header
- // starts making reference to new stuff
- file.sync();
+ // Is it a partial redo snapshot??
+ // If there are commits in the next redo..
+ Snapshot snapshot;
+ Commit commit = buildingRedo.commits.getTail();
+ if( commit!=null ) {
+ snapshot = commit.snapshot != null ? commit.snapshot : new CommitsSnapshot(commit);
+ return snapshot.open();
+ }
-
- Header h = header();
- int previousFreeListPage = h.free_list_page.get();
- h.free_list_page.set(storeObject(baseRevisionFreePages));
- h.base_revision.set(baseRevision);
- replicateHeader();
-
- // Release the previous free list.
- if( previousFreeListPage>=0 ) {
- Extent.free(pageFile, previousFreeListPage);
+ // Perhaps this snapshot has to deal with full redos..
+ if( syncedRedos!=buildingRedo ) {
+ Redo lastRedo = buildingRedo.getPrevious();
+ snapshot = lastRedo.snapshot != null ? lastRedo.snapshot : new RedosSnapshot();
+ return snapshot.open();
+ }
+
+ // Then the snapshot does not have previous updates.
+ snapshot = buildingRedo.prevSnapshot != null ? buildingRedo.prevSnapshot : new PreviousSnapshot(buildingRedo);
+ return snapshot.open();
}
-
- // Free the space associated with the redo batches
- if( !redoList.isEmpty() ) {
- for (Redo redo : redoList) {
- Extent.free(pageFile, redo.page);
+ }
+
+ private List<Redo> snapshotRedos() {
+ if( syncedRedos!=buildingRedo ) {
+ ArrayList<Redo> rc = new ArrayList<Redo>(4);
+ Redo cur = buildingRedo.getPrevious();
+ while( true ) {
+ rc.add(cur);
+ if( cur == syncedRedos ) {
+ break;
+ }
+ cur = cur.getPrevious();
+ }
+ return rc;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ void closeSnapshot(Snapshot snapshot) {
+ if( snapshot!=null ) {
+ synchronized(TRANSACTION_MUTEX) {
+ snapshot.close();
}
}
}
+
+ // /////////////////////////////////////////////////////////////////
+ // TODO:
+ // /////////////////////////////////////////////////////////////////
+
+ /**
+ * The quiesce method is used to pause/stop access to the concurrent page file.
+ * access can be restored using the {@link #resume()} method.
+ *
+ * @param reads if true, the suspend will also suspend read only transactions.
+ * @param blocking if true, transactions will block until the {@link #resume()} method
+ * is called, otherwise they will receive errors.
+ * @param drain if true, in progress transactions are allowed to complete, otherwise they
+ * also are suspended.
+ */
+ public void suspend(boolean reads, boolean blocking, boolean drain) {
+ }
+ /**
+ * Resumes a previously suspended page file.
+ */
+ public void resume() {
+ }
+
+
+ // /////////////////////////////////////////////////////////////////
+ // Helper methods
+ // /////////////////////////////////////////////////////////////////
+
private int storeObject(Object value) {
try {
ExtentOutputStream eos = new ExtentOutputStream(pageFile);
@@ -927,6 +1044,14 @@
}
}
+
+ private Header header() {
+ this.header.getByteBuffer().position(0);
+ this.header.setByteBufferPosition(0);
+ Header h = this.header;
+ return h;
+ }
+
private void replicateHeader() {
// Calculate the checksum of the header so that we can tell if the
// header is corrupted.
@@ -939,109 +1064,53 @@
// Copy the header so we can survive a partial update.
ByteBuffer header = file.read(0, this.header.size());
file.write(FILE_HEADER_SIZE / 2, header);
- }
-
+ }
+
// /////////////////////////////////////////////////////////////////
- // Transaction calls back to these methods...
+ // Simple Helper Classes
// /////////////////////////////////////////////////////////////////
- synchronized private Snapshot aquireSnapshot() {
- Snapshot snapshot = updatesList.getTail().snapshot;
- if (snapshot == null) {
- ArrayList<Update> updates = updatesList.toArrayListReversed();
- updates.get(0).snapshot = new Snapshot(updates);
+
+ class ReadCache {
+ private final Map<Integer, Object> map = Collections.synchronizedMap(new LRUCache<Integer, Object>(1024));
+
+ @SuppressWarnings("unchecked")
+ private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
+ T rc = (T) map.get(pageId);
+ if( rc ==null ) {
+ rc = marshaller.load(pageFile, pageId);
+ map.put(pageId, rc);
+ }
+ return rc;
}
- snapshot.references++;
- return snapshot;
+
+ public void clear() {
+ map.clear();
+ }
}
- synchronized private void releaseSnapshot(Snapshot snapshot) {
- if( snapshot!=null ) {
- synchronized(this) {
- snapshot.references--;
- if( snapshot.references==0 ) {
- Update update = snapshot.updates.get(0);
- update.snapshot=null;
- Update next = update.getNext();
- if( next !=null ) {
- next.mergePrevious();
- }
- }
- }
+ static class DeferredUpdate {
+ private final int page;
+ private Object value;
+ private EncoderDecoder<?> marshaller;
+
+ public DeferredUpdate(int page, Object value, EncoderDecoder<?> marshaller) {
+ this.page = page;
+ this.value = value;
+ this.marshaller = marshaller;
}
- }
- /**
- * Attempts to commit a set of page updates.
- *
- * @param previousUpdate
- * @param pageUpdates
- * @param cache
- */
- private void commit(Update previousUpdate, HashMap<Integer, Integer> pageUpdates, HashMap<Integer, CacheUpdate> cache) {
-
- Redo fullRedo=null;
- synchronized (this) {
- // Perhaps a concurrent update came in before this one...
- long rev;
- if( previousUpdate!=null ) {
- rev = previousUpdate.head;
- Update concurrentUpdate = previousUpdate.getNext();
- while( concurrentUpdate != null ) {
- // Yep.. there were concurrent updates.
- // Make sure we don't don't have update conflict.
- for (Integer page : pageUpdates.keySet()) {
- if( concurrentUpdate.updates.containsKey(page) ) {
- throw new OptimisticUpdateException();
- }
- }
-
- rev = concurrentUpdate.head;
- concurrentUpdate = concurrentUpdate.getNext();
- }
- } else {
- rev = updatesList.getTail().head;
- }
- rev++;
+ public void reset(Object value, EncoderDecoder<?> marshaller) {
+ this.value = value;
+ this.marshaller = marshaller;
+ }
- if( nextRedo == null ) {
- nextRedo = new Redo();
- nextRedo.base = rev;
- }
-
- nextRedo.head = rev;
- nextRedo.updates.putAll(pageUpdates);
- nextRedo.cache.putAll(cache);
-
- Update value = new Update(rev, pageUpdates, cache, nextRedo);
- updatesList.addLast(value);
- value.mergePrevious();
-
- if( nextRedo.pageCount() > 10 ) {
- fullRedo = nextRedo;
- nextRedo = null;
- }
+ @SuppressWarnings("unchecked")
+ <T> T value() {
+ return (T) value;
}
- if( fullRedo!=null ) {
- store(fullRedo);
+
+ @SuppressWarnings("unchecked")
+ public List<Integer> store(Paged paged) {
+ return ((EncoderDecoder)marshaller).store(paged, page, value);
}
- }
-
- /**
- * The quiesce method is used to pause/stop access to the concurrent page file.
- * access can be restored using the {@link #resume()} method.
- *
- * @param reads if true, the suspend will also suspend read only transactions.
- * @param blocking if true, transactions will block until the {@link #resume()} method
- * is called, otherwise they will receive errors.
- * @param drain if true, in progress transactions are allowed to complete, otherwise they
- * also are suspended.
- */
- public void suspend(boolean reads, boolean blocking, boolean drain) {
- }
-
- /**
- * Resumes a previously suspended page file.
- */
- public void resume() {
- }
-}
+ }}
Modified: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java?rev=826036&r1=826035&r2=826036&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java (original)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/page/ConcurrentPageFileFactory.java Fri Oct 16 18:56:37 2009
@@ -59,7 +59,7 @@
if (concurrentPageFile != null) {
concurrentPageFile.suspend(true, false, drainOnClose);
concurrentPageFile.flush();
- concurrentPageFile.applyRedos();
+ concurrentPageFile.performRedos();
concurrentPageFile=null;
}
super.close();