You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/22 17:53:43 UTC
svn commit: r688106 - in /activemq/sandbox/kahadb/src:
main/java/org/apache/kahadb/page/ test/java/org/apache/kahadb/page/
Author: chirino
Date: Fri Aug 22 08:53:42 2008
New Revision: 688106
URL: http://svn.apache.org/viewvc?rev=688106&view=rev
Log:
Update the PageFile public interface so that all read/update operations have be done via a Transaction object. Updated the
HashIndex to support doing index operations in the context of the Transaction. The idea is that multiple page and index updates
can be performed in a single Unit of Work controled by the Transaction object. Should make recovering higher level complex
operations easier to recover.
Added:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Chunk.java Fri Aug 22 08:53:42 2008
@@ -108,7 +108,7 @@
public static class PageInputStream extends InputStream {
- private PageFile file;
+ private Transaction tx;
private Chunk chunk;
private int pos;
private int pageCount;
@@ -118,11 +118,11 @@
private int markPageCount;
private ChunkMarshaller marshaller;
- public PageInputStream(PageFile file, long pageId) throws IOException {
- this.file = file;
- this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE);
+ public PageInputStream(Transaction tx, long pageId) throws IOException {
+ this.tx = tx;
+ this.marshaller = new ChunkMarshaller(tx.getPageFile().getPageContentSize()-HEADER_MAX_SIZE);
- Page page = file.load(pageId, marshaller);
+ Page page = tx.load(pageId, marshaller);
if( page.getType() != Page.CHUNK_TYPE ) {
throw new EOFException("Chunk stream does not exist at page: "+pageId);
}
@@ -151,7 +151,7 @@
}
private void fill() throws IOException {
- Page page = file.load(chunk.next, marshaller);
+ Page page = tx.load(chunk.next, marshaller);
if( page.getType() == Page.INVALID_TYPE ) {
throw new IOException("Invalid page: "+chunk.next);
}
@@ -225,16 +225,16 @@
static public class PageOutputStream extends ByteArrayOutputStream {
- private PageFile file;
private long pageId;
private ChunkMarshaller marshaller;
private int pageCount;
- private ArrayList<Page> pages;
+ private ArrayList<Page> pages;
+ private final Transaction tx;
- public PageOutputStream(PageFile file, long pageId) {
- this.file = file;
+ public PageOutputStream(Transaction tx, long pageId) {
+ this.tx = tx;
this.pageId = pageId;
- this.marshaller = new ChunkMarshaller(file.getPageContentSize()-HEADER_MAX_SIZE);
+ this.marshaller = new ChunkMarshaller(tx.getPageFile().getPageContentSize()-HEADER_MAX_SIZE);
}
@Override
@@ -261,7 +261,7 @@
pages = new ArrayList<Page>();
long p = pageId;
while( p >= 0 ) {
- Page page = file.load(p, marshaller);
+ Page page = tx.load(p, marshaller);
Chunk c = (Chunk)page.getData();
if( c!=null && !c.last ) {
p = c.next;
@@ -273,7 +273,7 @@
// Add more if needed.
while( pages.size() < chunks.size() ) {
- pages.add(file.allocate());
+ pages.add(tx.allocate());
}
// Update the page data.
@@ -285,6 +285,7 @@
if( !chunk.last ) {
chunk.next = pages.get(i+1).getPageId();
}
+ tx.write(page, marshaller);
}
// If there were extra pages.. Free them up.
@@ -292,10 +293,9 @@
Page page = pages.get(i);
page.setData(null);
page.setType(Page.FREE_TYPE);
+ tx.write(page, marshaller);
}
- file.write(pages, marshaller);
-
pageCount=chunks.size();
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java Fri Aug 22 08:53:42 2008
@@ -50,7 +50,7 @@
this.pageId = pageId;
}
- private void load() throws IOException {
+ private void load(Transaction tx) throws IOException {
data = new TreeMap<Comparable, Long>();
@@ -59,8 +59,7 @@
// we will still need to de-marshall from the stream.
// I think it will be better to make the bin a btree root.
-
- PageInputStream pis = new PageInputStream(index.getPageFile(), pageId);
+ PageInputStream pis = new PageInputStream(tx, pageId);
DataInputStream is = new DataInputStream(pis);
try {
@@ -77,8 +76,8 @@
}
}
- public void store() throws IOException {
- PageOutputStream pos = new PageOutputStream(index.getPageFile(), pageId);
+ public void store(Transaction tx) throws IOException {
+ PageOutputStream pos = new PageOutputStream(tx, pageId);
DataOutputStream os = new DataOutputStream(pos);
if( data == null ) {
os.writeInt(0);
@@ -93,13 +92,13 @@
pageCount = pos.getPageCount();
}
- public int size() throws IOException {
+ public int size(Transaction tx) throws IOException {
if( data!=null ) {
return data.size();
} else {
// Peek at the page to see how many items it contains.
- PageInputStream pis = new PageInputStream(index.getPageFile(), pageId);
+ PageInputStream pis = new PageInputStream(tx, pageId);
DataInputStream is = new DataInputStream(pis);
int size = is.readInt();
is.close();
@@ -108,38 +107,38 @@
}
}
- public Long put(Comparable key, Long value) throws IOException {
+ public Long put(Transaction tx, Comparable key, Long value) throws IOException {
if( data==null ) {
- load();
+ load(tx);
}
Long rc = data.put(key, value);
if( !value.equals(rc) ) {
- store();
+ store(tx);
}
return rc;
}
- public Long find(Comparable key) throws IOException {
+ public Long find(Transaction tx, Comparable key) throws IOException {
if( data==null ) {
- load();
+ load(tx);
}
return data.get(key);
}
- public Map<Comparable, Long> getAll() throws IOException {
+ public Map<Comparable, Long> getAll(Transaction tx) throws IOException {
if( data==null ) {
- load();
+ load(tx);
}
return data;
}
- public Long remove(Comparable key) throws IOException {
+ public Long remove(Transaction tx, Comparable key) throws IOException {
if( data==null ) {
- load();
+ load(tx);
}
Long rc = data.remove(key);
if( rc!=null ) {
- store();
+ store(tx);
}
return rc;
}
@@ -148,4 +147,8 @@
return pageId;
}
+ public int getPageCount() {
+ return pageCount;
+ }
+
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java Fri Aug 22 08:53:42 2008
@@ -30,6 +30,9 @@
import org.apache.kahadb.impl.index.IndexManager;
import org.apache.kahadb.page.Chunk.PageInputStream;
import org.apache.kahadb.page.Chunk.PageOutputStream;
+import org.apache.kahadb.page.PageFile.PageFileTransaction;
+import org.apache.kahadb.page.Transaction.Closure;
+import org.apache.kahadb.page.Transaction.CallableClosure;
/**
* BTree implementation
@@ -108,101 +111,143 @@
}
public synchronized void load() {
- if (loaded.compareAndSet(false, true)) {
- try {
- Page page = pageFile.load(pageId, null);
-
- // Is this a brand new index?
- if (page.getType() == Page.FREE_TYPE) {
-
- // We need to create the pages for the bins
- Page binPage = pageFile.allocate(binCapacity);
- binPageId = binPage.getPageId();
- state = INITIALIZING_STATE;
- storeMetadata();
- pageFile.checkpoint();
-
- // If failure happens now we can continue initializing the
- // the hash bins...
-
- } else {
- // Lets load it's data
- loadMetadata();
-
- // If we did not have a clean shutdown...
- if (state == OPEN_STATE || state == RESIZING_PHASE1_STATE) {
- // Figure out the size and the # of bins that are
- // active. Yeah This loads the first page of every bin. :(
- // We might want to put this in the metadata page, but
- // then that page would be getting updated on every write.
- size = 0;
- for (int i = 0; i < binCapacity; i++) {
- HashBin hashBin = new HashBin(this, binPageId + i);
- int t = hashBin.size();
- if (t > 0) {
- binsActive++;
+ PageFileTransaction tx = pageFile.tx();
+ try {
+
+ if (loaded.compareAndSet(false, true)) {
+ try {
+ Page page = tx.load(pageId, null);
+
+ // Is this a brand new index?
+ if (page.getType() == Page.FREE_TYPE) {
+
+ // We need to create the pages for the bins
+ tx.execute(new Transaction.Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ Page binPage = tx.allocate(binCapacity);
+ binPageId = binPage.getPageId();
+ state = INITIALIZING_STATE;
+ storeMetadata(tx);
+ }
+ });
+ pageFile.checkpoint();
+
+ // If failure happens now we can continue initializing the
+ // the hash bins...
+
+ } else {
+ // Lets load it's data
+ loadMetadata(tx);
+
+ // If we did not have a clean shutdown...
+ if (state == OPEN_STATE || state == RESIZING_PHASE1_STATE) {
+ // Figure out the size and the # of bins that are
+ // active. Yeah This loads the first page of every bin. :(
+ // We might want to put this in the metadata page, but
+ // then that page would be getting updated on every write.
+ size = 0;
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin hashBin = new HashBin(this, binPageId + i);
+ int t = hashBin.size(tx);
+ if (t > 0) {
+ binsActive++;
+ }
+ size += t;
}
- size += t;
}
}
- }
-
- if (state == INITIALIZING_STATE) {
- // TODO:
- // If a failure occurs mid way through us initializing the
- // bins.. will the page file still think we have the rest
- // of them previously allocated to us?
-
- for (int i = 0; i < binCapacity; i++) {
- HashBin hashBin = new HashBin(this, binPageId + i);
- hashBin.store();
+
+ if (state == INITIALIZING_STATE) {
+ // TODO:
+ // If a failure occurs mid way through us initializing the
+ // bins.. will the page file still think we have the rest
+ // of them previously allocated to us?
+
+ tx.execute(new Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin hashBin = new HashBin(HashIndex.this, binPageId + i);
+ hashBin.store(tx);
+ }
+ }
+ });
+ size = 0;
+ binsActive = 0;
}
- size = 0;
- binsActive = 0;
+
+ if (state == RESIZING_PHASE1_STATE) {
+ // continue resize phase 1
+ resizePhase1();
+ }
+ if (state == RESIZING_PHASE2_STATE) {
+ // continue resize phase 1
+ resizePhase2();
+ }
+
+ calcThresholds();
+
+ state = OPEN_STATE;
+ tx.execute(new Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ storeMetadata(tx);
+ }
+ });
+ pageFile.checkpoint();
+
+ LOG.debug("HashIndex loaded. Using "+binCapacity+" bins starting at page "+binPageId);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
-
- if (state == RESIZING_PHASE1_STATE) {
- // continue resize phase 1
- resizePhase1();
- }
- if (state == RESIZING_PHASE2_STATE) {
- // continue resize phase 1
- resizePhase2();
- }
-
- calcThresholds();
-
- state = OPEN_STATE;
- storeMetadata();
- pageFile.checkpoint();
-
- LOG.debug("HashIndex loaded. Using "+binCapacity+" bins starting at page "+binPageId);
-
- } catch (IOException e) {
- throw new RuntimeException(e);
}
+
+ } finally {
+ // All pending updates should have been committed by now.
+ assert tx.isReadOnly();
}
}
public synchronized void unload() throws IOException {
if (loaded.compareAndSet(true, false)) {
state = CLOSED_STATE;
- storeMetadata();
+ pageFile.tx().execute(new Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ storeMetadata(tx);
+ }
+ });
}
}
- public synchronized StoreEntry get(Object key) throws IOException {
+ public synchronized StoreEntry get(final Object key) throws IOException {
+ return pageFile.tx().execute(new CallableClosure<StoreEntry,IOException>(){
+ public StoreEntry execute(Transaction tx) throws IOException {
+ return get(tx, key);
+ }
+ });
+ }
+
+ public synchronized StoreEntry get(Transaction tx, Object key) throws IOException {
+ // TODO: multiple loads is smelly..
load();
- Long result = getBin(key).find((Comparable)key);
+ Long result = getBin(key).find(tx, (Comparable)key);
return result != null ? indexManager.getIndex(result) : null;
}
- public synchronized void store(Object key, StoreEntry value) throws IOException {
+ public synchronized void store(final Object key, final StoreEntry value) throws IOException {
+ pageFile.tx().execute(new Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ store(tx, key, value);
+ }
+ });
+ }
+
+ public synchronized void store(Transaction tx, Object key, StoreEntry value) throws IOException {
+ // TODO: multiple loads is smelly..
load();
HashBin bin = getBin(key);
- if (bin.put((Comparable)key, value.getOffset()) == null) {
+ if (bin.put(tx, (Comparable)key, value.getOffset()) == null) {
this.size++;
- if (bin.size() == 1) {
+ if (bin.size(tx) == 1) {
binsActive++;
}
}
@@ -214,15 +259,24 @@
}
}
- public synchronized StoreEntry remove(Object key) throws IOException {
+ public synchronized StoreEntry remove(final Object key) throws IOException {
+ return pageFile.tx().execute(new CallableClosure<StoreEntry,IOException>(){
+ public StoreEntry execute(Transaction tx) throws IOException {
+ return remove(tx, key);
+ }
+ });
+ }
+
+ public synchronized StoreEntry remove(Transaction tx, Object key) throws IOException {
+ // TODO: multiple loads is smelly..
load();
StoreEntry result = null;
HashBin bin = getBin(key);
- Long offset = bin.remove((Comparable)key);
+ Long offset = bin.remove(tx, (Comparable)key);
if (offset != null) {
this.size--;
- if (bin.size() == 0) {
+ if (bin.size(tx) == 0) {
binsActive--;
}
result = this.indexManager.getIndex(offset);
@@ -241,12 +295,25 @@
public synchronized boolean containsKey(Object key) throws IOException {
return get(key) != null;
}
+
+ public synchronized boolean containsKey(Transaction tx, Object key) throws IOException {
+ return get(tx, key) != null;
+ }
public synchronized void clear() throws IOException {
+ pageFile.tx().execute(new Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ clear(tx);
+ }
+ });
+ }
+
+ public synchronized void clear(Transaction tx) throws IOException {
+ // TODO: multiple loads is smelly..
load();
for (int i = 0; i < binCapacity; i++) {
HashBin hashBin = new HashBin(this, binPageId + i);
- hashBin.store(); // A store before a load.. clears the data out.
+ hashBin.store(tx); // A store before a load.. clears the data out.
}
size = 0;
binsActive = 0;
@@ -261,8 +328,8 @@
// Implementation Methods
// /////////////////////////////////////////////////////////////////
- private void loadMetadata() throws IOException {
- PageInputStream pis = new PageInputStream(pageFile, pageId);
+ private void loadMetadata(Transaction tx) throws IOException {
+ PageInputStream pis = new PageInputStream(tx, pageId);
DataInputStream is = new DataInputStream(pis);
state = is.readInt();
binPageId = is.readLong();
@@ -274,8 +341,8 @@
is.close();
}
- private void storeMetadata() throws IOException {
- PageOutputStream pos = new PageOutputStream(pageFile, pageId);
+ private void storeMetadata(Transaction tx) throws IOException {
+ PageOutputStream pos = new PageOutputStream(tx, pageId);
DataOutputStream os = new DataOutputStream(pos);
os.writeInt(state);
os.writeLong(binPageId);
@@ -287,65 +354,76 @@
os.close();
}
- private void resize(int newSize) throws IOException {
-
+ private void resize(final int newSize) throws IOException {
LOG.debug("Resizing to: "+newSize);
-
- state = RESIZING_PHASE1_STATE;
- resizeCapacity = newSize;
- resizePageId = pageFile.allocate(resizeCapacity).getPageId();
- storeMetadata();
+ pageFile.tx().execute(new Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ state = RESIZING_PHASE1_STATE;
+ resizeCapacity = newSize;
+ resizePageId = tx.allocate(resizeCapacity).getPageId();
+ storeMetadata(tx);
+ }
+ });
pageFile.checkpoint();
+
resizePhase1();
resizePhase2();
}
private void resizePhase1() throws IOException {
// In Phase 1 we copy the data to the new bins..
-
- // Initialize the bins..
- for (int i = 0; i < resizeCapacity; i++) {
- HashBin bin = new HashBin(this, resizePageId + i);
- bin.store();
- }
+ pageFile.tx().execute(new Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+
+ // Initialize the bins..
+ for (int i = 0; i < resizeCapacity; i++) {
+ HashBin bin = new HashBin(HashIndex.this, resizePageId + i);
+ bin.store(tx);
+ }
- binsActive = 0;
- // Copy the data from the old bins to the new bins.
- for (int i = 0; i < binCapacity; i++) {
- HashBin bin = new HashBin(this, binPageId + i);
- for (Map.Entry<Comparable, Long> entry : bin.getAll().entrySet()) {
- HashBin resizeBin = getResizeBin(entry.getKey());
- resizeBin.put(entry.getKey(), entry.getValue());
- if( resizeBin.size() == 1) {
- binsActive++;
+ binsActive = 0;
+ // Copy the data from the old bins to the new bins.
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin bin = new HashBin(HashIndex.this, binPageId + i);
+ for (Map.Entry<Comparable, Long> entry : bin.getAll(tx).entrySet()) {
+ HashBin resizeBin = getResizeBin(entry.getKey());
+ resizeBin.put(tx, entry.getKey(), entry.getValue());
+ if( resizeBin.size(tx) == 1) {
+ binsActive++;
+ }
+ }
}
+
+ // Now we can release the old data.
+ state = RESIZING_PHASE2_STATE;
+ storeMetadata(tx);
}
- }
-
- // Now we can release the old data.
- state = RESIZING_PHASE2_STATE;
- storeMetadata();
+ });
pageFile.checkpoint();
}
private void resizePhase2() throws IOException {
- for (int i = 0; i < binCapacity; i++) {
- HashBin hashBin = new HashBin(this, binPageId + i);
- hashBin.store(); // A store before a load.. clears the data out.
- }
- pageFile.free(binPageId, binCapacity);
-
- binCapacity = resizeCapacity;
- binPageId = resizePageId;
- resizeCapacity=0;
- resizePageId=0;
- state = OPEN_STATE;
- storeMetadata();
+ // In phase 2 we free the old bins and switch the the new bins.
+ pageFile.tx().execute(new Closure<IOException>(){
+ public void execute(Transaction tx) throws IOException {
+ for (int i = 0; i < binCapacity; i++) {
+ HashBin hashBin = new HashBin(HashIndex.this, binPageId + i);
+ hashBin.store(tx); // A store before a load.. clears the data out.
+ }
+ tx.free(binPageId, binCapacity);
+
+ binCapacity = resizeCapacity;
+ binPageId = resizePageId;
+ resizeCapacity=0;
+ resizePageId=0;
+ state = OPEN_STATE;
+ storeMetadata(tx);
+ }
+ });
+
pageFile.checkpoint();
calcThresholds();
-
LOG.debug("Resizing done. New bins start at: "+binPageId);
-
}
private void calcThresholds() {
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Fri Aug 22 08:53:42 2008
@@ -63,7 +63,7 @@
*
* @version $Revision: 1.1.1.1 $
*/
-public class PageFile implements Iterable<Page> {
+public class PageFile {
// 4k Default page size.
public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4));
@@ -201,6 +201,360 @@
}
+ /**
+ * Provides transaction update access to the PageFile. All operations that modify
+ * the PageFile are done via a Transaction.
+ */
+ class PageFileTransaction implements Transaction {
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#getPageFile()
+ */
+ public PageFile getPageFile() {
+ return PageFile.this;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#allocate()
+ */
+ public Page allocate() throws IOException {
+ if( !loaded.get() ) {
+ throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
+ }
+
+ Page page = null;
+
+ // We may need to create a new free page...
+ if(freeList.isEmpty()) {
+ page = new Page();
+ page.setPageId(nextFreePageId);
+ page.setType(Page.FREE_TYPE);
+ nextFreePageId ++;
+// LOG.debug("allocated: "+page.getPageId());
+ write(page, null);
+ }
+
+ long pageId = freeList.removeFirst();
+ page = new Page();
+ page.setPageId(pageId);
+ page.setType(Page.FREE_TYPE);
+ return page;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#allocate(int)
+ */
+ public Page allocate(int count) throws IOException {
+ if( !loaded.get() ) {
+ throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
+ }
+
+ Page page = null;
+ Sequence seq = freeList.removeFirstSequence(count);
+ if(seq==null) {
+
+ // We may need to create a new free page...
+ page = new Page();
+ int c=count;
+ while( c > 0 ) {
+ page.setPageId(nextFreePageId);
+ page.setType(Page.FREE_TYPE);
+ nextFreePageId ++;
+// LOG.debug("allocate writing: "+page.getPageId());
+ write(page, null);
+ c--;
+ if( page == null ) {
+ page = page;
+ }
+ }
+
+ seq = freeList.removeFirstSequence(count);
+ }
+
+ page = new Page();
+ page.setPageId(seq.getFirst());
+ page.setType(Page.FREE_TYPE);
+// LOG.debug("allocated: "+page.getPageId());
+ return page;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#free(org.apache.kahadb.page.Page)
+ */
+ public void free(Page page) throws IOException {
+ page.setType(Page.FREE_TYPE);
+ free(page.getPageId(), 1);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#free(long)
+ */
+ public void free(long pageId) throws IOException {
+ free(pageId, 1);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#free(org.apache.kahadb.page.Page, int)
+ */
+ public void free(Page page, int count) throws IOException {
+ page.setType(Page.FREE_TYPE);
+ free(page.getPageId(), count);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#free(long, int)
+ */
+ public void free(long pageId, int count) throws IOException {
+ if( !loaded.get() ) {
+ throw new IllegalStateException("Cannot free a page when the page file is not loaded");
+ }
+
+ Page page = new Page();
+ long initialId = pageId;
+ for (int i = 0; i < count; i++) {
+ page.setPageId(initialId+i);
+ page.setType(Page.FREE_TYPE);
+// LOG.debug("free: "+page.getPageId());
+ write(page, null);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#write(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
+ */
+ public void write(Page page, Marshaller marshaller) throws IOException {
+
+ if( !loaded.get() ) {
+ throw new IllegalStateException("Cannot wriate a page when the page file is not loaded");
+ }
+
+ DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
+ page.setTxId(nextTxid.get());
+ page.write(dataOut, marshaller);
+ if (dataOut.size() > pageSize) {
+ throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
+ }
+
+ page = page.copy();
+ Long key = page.getPageId();
+ addToCache(page);
+
+// LOG.debug("write: "+page.getPageId());
+
+ synchronized( writes ) {
+ // If it's not in the write cache...
+ PageWrite write = writes.get(key);
+ if( write==null ) {
+ write = new PageWrite(page, dataOut.getData());
+ writes.put(key, write);
+ } else {
+ write.setCurrent(page, dataOut.getData());
+ }
+
+ // Once we start approaching capacity, notify the writer to start writing
+ if( canStartWriteBatch() ) {
+ if( enableAsyncWrites ) {
+ writes.notify();
+ } else {
+ while( canStartWriteBatch() ) {
+ writeBatch(-1, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ if( page.getType() == Page.FREE_TYPE ) {
+ removeFromCache(page);
+ freeList.add(page.getPageId());
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#load(long, org.apache.kahadb.Marshaller)
+ */
+ public Page load(long pageId, Marshaller marshaller) throws IOException {
+ Page page = new Page();
+ page.setPageId(pageId);
+ load(page, marshaller);
+ return page;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
+ */
+ public void load(Page page, Marshaller marshaller) throws IOException {
+ if( !loaded.get() ) {
+ throw new IllegalStateException("Cannot load a page when the page file is not loaded");
+ }
+
+ // Can't load invalid offsets...
+ if (page.getPageId() < 0) {
+ page.setType(Page.INVALID_TYPE);
+ return;
+ }
+
+ // Try to load it from the cache first...
+ Page t = getFromCache(page.getPageId());
+ if (t != null) {
+ page.copy(t);
+ return;
+ }
+
+ // Read the page data
+ readFile.seek(toOffset(page.getPageId()));
+ readFile.readFully(readBuffer, 0, pageSize);
+ dataIn.restart(readBuffer);
+
+ // Unmarshall it.
+// LOG.debug("load: "+page.getPageId());
+ page.read(dataIn, marshaller);
+
+ // Cache it.
+ addToCache(page);
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#iterator()
+ */
+ public Iterator<Page> iterator() {
+ return iterator(false);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#iterator(boolean)
+ */
+ public Iterator<Page> iterator(final boolean includeFreePages) {
+
+ if( !loaded.get() ) {
+ throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
+ }
+
+ return new Iterator<Page>() {
+ long nextId;
+ Page nextPage;
+ Page lastPage;
+
+ private void findNextPage() {
+ if( !loaded.get() ) {
+ throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
+ }
+
+ if( nextPage!=null ) {
+ return;
+ }
+
+ try {
+ while( nextId < PageFile.this.nextFreePageId ) {
+ readFile.seek(toOffset(nextId));
+ readFile.readFully(readBuffer, 0, Page.PAGE_HEADER_SIZE);
+ dataIn.restart(readBuffer);
+
+ Page page = new Page();
+ page.setPageId(nextId);
+ page.read(dataIn, null);
+
+ if( includeFreePages || page.getType()!=Page.FREE_TYPE ) {
+ nextPage = page;
+ return;
+ } else {
+ nextId++;
+ }
+ }
+ } catch (IOException e) {
+ }
+ }
+
+ public boolean hasNext() {
+ findNextPage();
+ return nextPage !=null;
+ }
+
+ public Page next() {
+ findNextPage();
+ if( nextPage !=null ) {
+ lastPage = nextPage;
+ nextPage=null;
+ nextId++;
+ return lastPage;
+ } else {
+ throw new NoSuchElementException();
+ }
+ }
+
+ public void remove() {
+ if( lastPage==null ) {
+ throw new IllegalStateException();
+ }
+ try {
+ free(lastPage);
+ lastPage=null;
+ } catch (IOException e) {
+ new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#commit()
+ */
+ public void commit() throws IOException {
+ }
+
+ /**
+ * Rolls back the transaction.
+ */
+ private void rollback() throws IOException {
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#execute(org.apache.kahadb.page.PageFile.Closure)
+ */
+ public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
+ boolean success=false;
+ try {
+ closure.execute(this);
+ success=true;
+ } finally {
+ if( success ) {
+ commit();
+ } else {
+ rollback();
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#execute(org.apache.kahadb.page.PageFile.CallableClosure)
+ */
+ public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
+ boolean success=false;
+ try {
+ R rc = closure.execute(this);
+ success=true;
+ return rc;
+ } finally {
+ if( success ) {
+ commit();
+ } else {
+ rollback();
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.kahadb.page.ITransaction#isReadOnly()
+ */
+ public boolean isReadOnly() {
+ return false;
+ }
+ }
+
+ public PageFileTransaction tx() {
+ return new PageFileTransaction();
+ }
/**
* Creates a PageFile in the specified directory who's data files are named by name.
@@ -377,388 +731,7 @@
throw new IllegalStateException("Cannot unload the page file when it is not loaded");
}
}
-
-
- /**
- * Allocates a free page that you can write data to.
- *
- * @return a newly allocated page.
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public Page allocate() throws IOException {
- if( !loaded.get() ) {
- throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
- }
-
- Page page = null;
- // We may need to create a new free page...
- if(freeList.isEmpty()) {
- page = new Page();
- page.setPageId(nextFreePageId);
- page.setType(Page.FREE_TYPE);
- nextFreePageId ++;
-// LOG.debug("allocated: "+page.getPageId());
- write(page, null);
- }
-
- long pageId = freeList.removeFirst();
- page = new Page();
- page.setPageId(pageId);
- page.setType(Page.FREE_TYPE);
- return page;
- }
-
- /**
- * Allocates a block of free pages that you can write data to.
- *
- * @param count the number of sequential pages to allocate
- * @return the first page of the sequential set.
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public Page allocate(int count) throws IOException {
- if( !loaded.get() ) {
- throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
- }
-
- Page page = null;
- Sequence seq = freeList.removeFirstSequence(count);
- if(seq==null) {
-
- // We may need to create a new free page...
- page = new Page();
- int c=count;
- while( c > 0 ) {
- page.setPageId(nextFreePageId);
- page.setType(Page.FREE_TYPE);
- nextFreePageId ++;
-// LOG.debug("allocate writing: "+page.getPageId());
- write(page, null);
- c--;
- if( page == null ) {
- page = page;
- }
- }
-
- seq = freeList.removeFirstSequence(count);
- }
-
- page = new Page();
- page.setPageId(seq.getFirst());
- page.setType(Page.FREE_TYPE);
-// LOG.debug("allocated: "+page.getPageId());
- return page;
- }
-
- /**
- * Frees up a previously allocated page so that it can be re-allocated again.
- *
- * @param page the page to free up
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public void free(Page page) throws IOException {
- page.setType(Page.FREE_TYPE);
- free(page.getPageId(), 1);
- }
-
- /**
- * Frees up a previously allocated page so that it can be re-allocated again.
- *
- * @param page the page to free up
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public void free(long pageId) throws IOException {
- free(pageId, 1);
- }
-
- /**
- * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
- *
- * @param page the initial page of the sequence that will be getting freed
- * @param count the number of pages in the sequence
- *
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public void free(Page page, int count) throws IOException {
- page.setType(Page.FREE_TYPE);
- free(page.getPageId(), count);
- }
-
- /**
- * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
- *
- * @param page the initial page of the sequence that will be getting freed
- * @param count the number of pages in the sequence
- *
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public void free(long pageId, int count) throws IOException {
- if( !loaded.get() ) {
- throw new IllegalStateException("Cannot free a page when the page file is not loaded");
- }
-
- Page page = new Page();
- long initialId = pageId;
- for (int i = 0; i < count; i++) {
- page.setPageId(initialId+i);
- page.setType(Page.FREE_TYPE);
-// LOG.debug("free: "+page.getPageId());
- write(page, null);
- }
- }
-
- /**
- * Loads a page from disk.
- *
- * @param pageId
- * the id of the page to load
- * @param marshaller
- * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
- * @return The page with the given id
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public Page load(long pageId, Marshaller marshaller) throws IOException {
- Page page = new Page();
- page.setPageId(pageId);
- load(page, marshaller);
- return page;
- }
-
- /**
- * Loads a page from disk. If the page.pageId is not valid then then this method will set the page.type to
- * Page.INVALID_TYPE.
- *
- * @param page - The pageId field must be properly set
- * @param marshaller
- * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public void load(Page page, Marshaller marshaller) throws IOException {
- if( !loaded.get() ) {
- throw new IllegalStateException("Cannot load a page when the page file is not loaded");
- }
-
- // Can't load invalid offsets...
- if (page.getPageId() < 0) {
- page.setType(Page.INVALID_TYPE);
- return;
- }
-
- // Try to load it from the cache first...
- Page t = getFromCache(page.getPageId());
- if (t != null) {
- page.copy(t);
- return;
- }
-
- // Read the page data
- readFile.seek(toOffset(page.getPageId()));
- readFile.readFully(readBuffer, 0, pageSize);
- dataIn.restart(readBuffer);
-
- // Unmarshall it.
-// LOG.debug("load: "+page.getPageId());
- page.read(dataIn, marshaller);
-
- // Cache it.
- addToCache(page);
- }
-
- /**
- * Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are not included
- * in this iteration.
- *
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public Iterator<Page> iterator() {
- return iterator(false);
- }
-
- /**
- * Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages
- * iterated.
- *
- * @param includeFreePages - if true, free pages are included in the iteration
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public Iterator<Page> iterator(final boolean includeFreePages) {
-
- if( !loaded.get() ) {
- throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
- }
-
- return new Iterator<Page>() {
- long nextId;
- Page nextPage;
- Page lastPage;
-
- private void findNextPage() {
- if( !loaded.get() ) {
- throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
- }
-
- if( nextPage!=null ) {
- return;
- }
-
- try {
- while( nextId < PageFile.this.nextFreePageId ) {
- readFile.seek(toOffset(nextId));
- readFile.readFully(readBuffer, 0, Page.PAGE_HEADER_SIZE);
- dataIn.restart(readBuffer);
-
- Page page = new Page();
- page.setPageId(nextId);
- page.read(dataIn, null);
-
- if( includeFreePages || page.getType()!=Page.FREE_TYPE ) {
- nextPage = page;
- return;
- } else {
- nextId++;
- }
- }
- } catch (IOException e) {
- }
- }
-
- public boolean hasNext() {
- findNextPage();
- return nextPage !=null;
- }
-
- public Page next() {
- findNextPage();
- if( nextPage !=null ) {
- lastPage = nextPage;
- nextPage=null;
- nextId++;
- return lastPage;
- } else {
- throw new NoSuchElementException();
- }
- }
-
- public void remove() {
- if( lastPage==null ) {
- throw new IllegalStateException();
- }
- try {
- free(lastPage);
- lastPage=null;
- } catch (IOException e) {
- new RuntimeException(e);
- }
- }
- };
- }
-
- /**
- * Updates multiple pages in a single unit of work.
- *
- * @param pages
- * the pages to write. The Pages object must be fully populated with a valid pageId, type, and data.
- * @param marshaller
- * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public void write(Collection<Page> pages, ChunkMarshaller marshaller) throws IOException {
- // TODO: Need to update double buffer impl so that it handles a collection of writes. As is right now,
- // the pages in this write may be split across multiple write batches which means that they
- // will not get applied as a unit of work.
- for (Page page : pages) {
- write(page, marshaller);
- }
- }
-
- /**
- *
- * @param page
- * the page to write. The Page object must be fully populated with a valid pageId, type, and data.
- * @param marshaller
- * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
- * @throws IOException
- * If an disk error occurred.
- * @throws IllegalStateException
- * if the PageFile is not loaded
- */
- public void write(Page page, Marshaller marshaller) throws IOException {
-
- if( !loaded.get() ) {
- throw new IllegalStateException("Cannot wriate a page when the page file is not loaded");
- }
-
- DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
- page.setTxId(nextTxid.get());
- page.write(dataOut, marshaller);
- if (dataOut.size() > pageSize) {
- throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
- }
-
- page = page.copy();
- Long key = page.getPageId();
- addToCache(page);
-
-// LOG.debug("write: "+page.getPageId());
-
- synchronized( writes ) {
- // If it's not in the write cache...
- PageWrite write = writes.get(key);
- if( write==null ) {
- write = new PageWrite(page, dataOut.getData());
- writes.put(key, write);
- } else {
- write.setCurrent(page, dataOut.getData());
- }
-
- // Once we start approaching capacity, notify the writer to start writing
- if( canStartWriteBatch() ) {
- if( enableAsyncWrites ) {
- writes.notify();
- } else {
- while( canStartWriteBatch() ) {
- writeBatch(-1, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
- if( page.getType() == Page.FREE_TYPE ) {
- removeFromCache(page);
- freeList.add(page.getPageId());
- }
-
- }
-
/**
* Flushes all write buffers to disk and returns the transaction id of the last write done to disk. The
* transaction id can be used for recovery purposes since it always incrementing.
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=688106&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Fri Aug 22 08:53:42 2008
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kahadb.Marshaller;
+
+/**
+ * The interface used to read/update a PageFile object. Using a transaction allows you to
+ * do multiple update operations in a single unit of work.
+ */
+interface Transaction extends Iterable<Page> {
+
+ /**
+ * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
+ *
+ * @param <T> The type of exceptions that operation will throw.
+ */
+ public interface Closure <T extends Throwable> {
+ public void execute(Transaction tx) throws T;
+ }
+
+ /**
+ * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
+ *
+ * @param <R> The type of result that the closure produces.
+ * @param <T> The type of exceptions that operation will throw.
+ */
+ public interface CallableClosure<R, T extends Throwable> {
+ public R execute(Transaction tx) throws T;
+ }
+
+ public PageFile getPageFile();
+
+ /**
+ * Allocates a free page that you can write data to.
+ *
+ * @return a newly allocated page.
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public Page allocate() throws IOException;
+
+ /**
+ * Allocates a block of free pages that you can write data to.
+ *
+ * @param count the number of sequential pages to allocate
+ * @return the first page of the sequential set.
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public Page allocate(int count) throws IOException;
+
+ /**
+ * Frees up a previously allocated page so that it can be re-allocated again.
+ *
+ * @param page the page to free up
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void free(Page page) throws IOException;
+
+ /**
+ * Frees up a previously allocated page so that it can be re-allocated again.
+ *
+ * @param page the page to free up
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void free(long pageId) throws IOException;
+
+ /**
+ * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+ *
+ * @param page the initial page of the sequence that will be getting freed
+ * @param count the number of pages in the sequence
+ *
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void free(Page page, int count) throws IOException;
+
+ /**
+ * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+ *
+ * @param page the initial page of the sequence that will be getting freed
+ * @param count the number of pages in the sequence
+ *
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void free(long pageId, int count) throws IOException;
+
+ /**
+ *
+ * @param page
+ * the page to write. The Page object must be fully populated with a valid pageId, type, and data.
+ * @param marshaller
+ * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void write(Page page, Marshaller marshaller) throws IOException;
+
+ /**
+ * Loads a page from disk.
+ *
+ * @param pageId
+ * the id of the page to load
+ * @param marshaller
+ * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
+ * @return The page with the given id
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public Page load(long pageId, Marshaller marshaller) throws IOException;
+
+ /**
+ * Loads a page from disk. If the page.pageId is not valid then then this method will set the page.type to
+ * Page.INVALID_TYPE.
+ *
+ * @param page - The pageId field must be properly set
+ * @param marshaller
+ * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
+ * @throws IOException
+ * If an disk error occurred.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public void load(Page page, Marshaller marshaller) throws IOException;
+
+ /**
+ * Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are
+ * not included in this iteration.
+ *
+ * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
+ *
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public Iterator<Page> iterator();
+
+ /**
+ * Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages
+ * iterated.
+ *
+ * @param includeFreePages - if true, free pages are included in the iteration
+ * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
+ * @throws IllegalStateException
+ * if the PageFile is not loaded
+ */
+ public Iterator<Page> iterator(final boolean includeFreePages);
+
+ /**
+ * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
+ * with the transaction are written to disk or none will.
+ */
+ public void commit() throws IOException;
+
+ /**
+ * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
+ * If the closure throws an Exception, then the transaction is rolled back.
+ *
+ * @param <T>
+ * @param closure - the work to get exectued.
+ * @throws T if the closure throws it
+ * @throws IOException If the commit fails.
+ */
+ public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException;
+
+ /**
+ * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
+ * If the closure throws an Exception, then the transaction is rolled back.
+ *
+ * @param <T>
+ * @param closure - the work to get exectued.
+ * @throws T if the closure throws it
+ * @throws IOException If the commit fails.
+ */
+ public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException;
+
+ /**
+ *
+ * @return true if there are no uncommitted page file updates associated with this transaction.
+ */
+ public boolean isReadOnly();
+
+}
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/ChunkTest.java Fri Aug 22 08:53:42 2008
@@ -37,9 +37,11 @@
pf.delete();
pf.load();
- long id = pf.allocate().getPageId();
+ Transaction tx = pf.tx();
+ long id = tx.allocate().getPageId();
+ tx.commit();
- PageOutputStream pos = new Chunk.PageOutputStream(pf, id);
+ PageOutputStream pos = new Chunk.PageOutputStream(tx, id);
DataOutputStream os = new DataOutputStream(pos);
for( int i=0; i < 10000; i++) {
os.writeUTF("Test string:"+i);
@@ -47,12 +49,14 @@
os.close();
System.out.println("Chuncks used: "+pos.getPageCount());
+ tx.commit();
// Reload the page file.
pf.unload();
pf.load();
+ tx = pf.tx();
- PageInputStream pis = new PageInputStream(pf, id);
+ PageInputStream pis = new PageInputStream(tx, id);
DataInputStream is = new DataInputStream(pis);
for( int i=0; i < 10000; i++) {
assertEquals("Test string:"+i, is.readUTF());
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java Fri Aug 22 08:53:42 2008
@@ -29,7 +29,12 @@
PageFile pf = new PageFile(root, name);
pf.load();
- HashIndex index = new HashIndex(indexManager, pf,pf.allocate().getPageId());
+
+ Transaction tx = pf.tx();
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ HashIndex index = new HashIndex(indexManager, pf, id);
index.setKeyMarshaller(Store.STRING_MARSHALLER);
// index.setEnableRecoveryBuffer(false);
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashTest.java Fri Aug 22 08:53:42 2008
@@ -51,7 +51,11 @@
pf.load();
indexManager = new IndexManager(directory, "im-hash-test", "rw", null, new AtomicLong());
- this.hashIndex = new HashIndex(indexManager, pf, pf.allocate().getPageId());
+ Transaction tx = pf.tx();
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ this.hashIndex = new HashIndex(indexManager, pf, id);
this.hashIndex.setBinCapacity(12);
this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
}
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java?rev=688106&r1=688105&r2=688106&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java Fri Aug 22 08:53:42 2008
@@ -37,24 +37,27 @@
HashSet<String> expected = new HashSet<String>();
// Insert some data into the page file.
+ Transaction tx = pf.tx();
for( int i=0 ; i < 100; i++) {
- Page page = pf.allocate();
+ Page page = tx.allocate();
page.setType(TEST_TYPE);
String t = "page:"+i;
expected.add(t);
page.setData(t);
- pf.write(page, StringMarshaller.INSTANCE);
+ tx.write(page, StringMarshaller.INSTANCE);
+ tx.commit();
}
// Reload it...
pf.unload();
pf.load();
+ tx = pf.tx();
// Iterate it to make sure they are still there..
HashSet<String> actual = new HashSet<String>();
- for (Page page : pf) {
- pf.load(page, StringMarshaller.INSTANCE);
+ for (Page page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
actual.add((String)page.getData());
}
assertEquals(expected, actual);
@@ -67,46 +70,49 @@
String t = "page:"+i;
expected.remove(t);
}
- for (Page page : pf) {
- pf.load(page, StringMarshaller.INSTANCE);
+ for (Page page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
if( !expected.contains(page.getData()) ) {
- pf.free(page);
+ tx.free(page);
}
}
+ tx.commit();
// Reload it...
pf.unload();
pf.load();
+ tx = pf.tx();
// Iterate it to make sure the even records are still there..
actual.clear();
- for (Page page : pf) {
- pf.load(page, StringMarshaller.INSTANCE);
+ for (Page page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
actual.add((String)page.getData());
}
assertEquals(expected, actual);
-
// Update the records...
HashSet<String> t = expected;
expected = new HashSet<String>();
for (String s : t) {
expected.add(s+":updated");
}
- for (Page page : pf) {
- pf.load(page, StringMarshaller.INSTANCE);
+ for (Page page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
page.setData(page.getData()+":updated");
- pf.write(page, StringMarshaller.INSTANCE);
+ tx.write(page, StringMarshaller.INSTANCE);
}
-
+ tx.commit();
+
// Reload it...
pf.unload();
pf.load();
+ tx = pf.tx();
// Iterate it to make sure the updated records are still there..
actual.clear();
- for (Page page : pf) {
- pf.load(page, StringMarshaller.INSTANCE);
+ for (Page page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
actual.add((String)page.getData());
}
assertEquals(expected, actual);