You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/06 20:59:05 UTC
svn commit: r692707 - in /activemq/sandbox/kahadb/src:
main/java/org/apache/kahadb/ main/java/org/apache/kahadb/index/
main/java/org/apache/kahadb/store/ test/java/org/apache/kahadb/index/
Author: chirino
Date: Sat Sep 6 11:59:04 2008
New Revision: 692707
URL: http://svn.apache.org/viewvc?rev=692707&view=rev
Log:
Updated the Index interface so that a Transaction is passed to the load and unload methods.
This allows the initialization and shutdown to be done as part of a larger unit of work.
HashIndex resizing is now done in the context of a transaction so it simplified tremendously as
it does not need to worry about a partial resize operation occuring.
Also added BTreeVisitor class that I forgot to add in a previous commit.
Added:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java?rev=692707&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IntegerMarshaller.java Sat Sep 6 11:59:04 2008
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Implementation of a Marshaller for a Integer
+ *
+ * @version $Revision: 1.2 $
+ */
+public class IntegerMarshaller implements Marshaller<Integer> {
+
+ public static final IntegerMarshaller INSTANCE = new IntegerMarshaller();
+
+ public void writePayload(Integer object, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(object);
+ }
+
+ public Integer readPayload(DataInput dataIn) throws IOException {
+ return dataIn.readInt();
+ }
+
+ public Class<Integer> getType() {
+ return Integer.class;
+ }
+}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java Sat Sep 6 11:59:04 2008
@@ -139,34 +139,28 @@
this(pageFile, page.getPageId());
}
- synchronized public void load() throws IOException {
+ synchronized public void load(Transaction tx) throws IOException {
if (loaded.compareAndSet(false, true)) {
LOG.debug("loading");
if( keyMarshaller == null ) {
- throw new IllegalArgumentException("The keyMarshaller must be set before loading the BTreeIndex");
+ throw new IllegalArgumentException("The key marshaller must be set before loading the BTreeIndex");
}
if( valueMarshaller == null ) {
- throw new IllegalArgumentException("The valueMarshaller must be set before loading the BTreeIndex");
+ throw new IllegalArgumentException("The value marshaller must be set before loading the BTreeIndex");
}
- Transaction tx = pageFile.tx();
final Page<BTreeNode<Key,Value>> p = tx.load(pageId, null);
if( p.getType() == Page.PAGE_FREE_TYPE ) {
// Need to initialize it..
- tx.execute(new Transaction.Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
- root = createNode(p, null);
- storeNode(tx, root, true);
- }
- });
- pageFile.checkpoint();
+ root = createNode(p, null);
+ storeNode(tx, root, true);
} else {
root = loadNode(tx, pageId, null);
}
}
}
- synchronized public void unload() {
+ synchronized public void unload(Transaction tx) {
if (loaded.compareAndSet(true, false)) {
root=null;
}
Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=692707&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Sat Sep 6 11:59:04 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+/**
+ * Interface used to selectively visit the entries in a BTree.
+ *
+ * @param <Key>
+ * @param <Value>
+ */
+public interface BTreeVisitor<Key,Value> {
+
+ /**
+ * Do you want to visit the range of BTree entries between the first and and second key?
+ *
+ * @param first if null indicates the range of values before the second key.
+ * @param second if null indicates the range of values after the first key.
+ * @return true if you want to visit the values between the first and second key.
+ */
+ boolean isInterestedInKeysBetween(Key first, Key second);
+
+ /**
+ * The keys and values of a BTree leaf node.
+ *
+ * @param keys
+ * @param values
+ */
+ void visit(Key[] keys, Value[] values);
+
+}
\ No newline at end of file
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java Sat Sep 6 11:59:04 2008
@@ -41,10 +41,7 @@
public static final int CLOSED_STATE = 1;
public static final int OPEN_STATE = 2;
- public static final int INITIALIZING_STATE = 3;
- public static final int RESIZING_PHASE1_STATE = 4;
- public static final int RESIZING_PHASE2_STATE = 5;
private static final Log LOG = LogFactory.getLog(HashIndex.class);
@@ -91,9 +88,6 @@
private int binCapacity = DEFAULT_BIN_CAPACITY;
private int binsActive;
private int size;
- // While resizing, the following contains the new resize data.
- private int resizeCapacity;
- private long resizePageId;
public void read(DataInput is) throws IOException {
@@ -102,8 +96,6 @@
binCapacity = is.readInt();
size = is.readInt();
binsActive = is.readInt();
- resizePageId = is.readLong();
- resizeCapacity = is.readInt();
}
public void write(DataOutput os) throws IOException {
os.writeInt(state);
@@ -111,8 +103,6 @@
os.writeInt(binCapacity);
os.writeInt(size);
os.writeInt(binsActive);
- os.writeLong(resizePageId);
- os.writeInt(resizeCapacity);
}
static class Marshaller implements org.apache.kahadb.Marshaller<Metadata> {
@@ -154,127 +144,74 @@
this.pageId = pageId;
}
- public synchronized void load() {
- Transaction tx = pageFile.tx();
- try {
-
- if (loaded.compareAndSet(false, true)) {
- try {
- final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
- // Is this a brand new index?
- if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
- // We need to create the pages for the bins
- tx.execute(new Transaction.Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
- Page binPage = tx.allocate(metadata.binCapacity);
- metadata.binPageId = binPage.getPageId();
- metadata.state = INITIALIZING_STATE;
- metadata.page = metadataPage;
- metadataPage.set(metadata);
- tx.store(metadataPage, metadataMarshaller, true);
- }
- });
- pageFile.checkpoint();
-
- // If failure happens now we can continue initializing the
- // the hash bins...
- } else {
-
- metadata = metadataPage.get();
- metadata.page = metadataPage;
-
- // If we did not have a clean shutdown...
- if (metadata.state == OPEN_STATE || metadata.state == RESIZING_PHASE1_STATE) {
- // Figure out the size and the # of bins that are
- // active. Yeah This loads the first page of every bin. :(
- // We might want to put this in the metadata page, but
- // then that page would be getting updated on every write.
- metadata.size = 0;
- for (int i = 0; i < metadata.binCapacity; i++) {
- int t = sizeOfBin(metadata.binPageId);
- if (t > 0) {
- metadata.binsActive++;
- }
- metadata.size += t;
- }
+ public synchronized void load(Transaction tx) throws IOException {
+ if (loaded.compareAndSet(false, true)) {
+ final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
+ // Is this a brand new index?
+ if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
+ // We need to create the pages for the bins
+ Page binPage = tx.allocate(metadata.binCapacity);
+ metadata.binPageId = binPage.getPageId();
+ metadata.page = metadataPage;
+ metadataPage.set(metadata);
+ clear(tx);
+
+ // If failure happens now we can continue initializing the
+ // the hash bins...
+ } else {
+
+ metadata = metadataPage.get();
+ metadata.page = metadataPage;
+
+ // If we did not have a clean shutdown...
+ if (metadata.state == OPEN_STATE ) {
+ // Figure out the size and the # of bins that are
+ // active. Yeah This loads the first page of every bin. :(
+ // We might want to put this in the metadata page, but
+ // then that page would be getting updated on every write.
+ metadata.size = 0;
+ for (int i = 0; i < metadata.binCapacity; i++) {
+ int t = sizeOfBin(tx, i);
+ if (t > 0) {
+ metadata.binsActive++;
}
+ metadata.size += t;
}
-
- if (metadata.state == INITIALIZING_STATE) {
- // TODO:
- // If a failure occurs mid way through us initializing the
- // bins.. will the page file still think we have the rest
- // of them previously allocated to us?
-
- tx.execute(new Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
- clear(tx);
- }
- });
- }
-
- if (metadata.state == RESIZING_PHASE1_STATE) {
- // continue resize phase 1
- resizePhase1();
- }
- if (metadata.state == RESIZING_PHASE2_STATE) {
- // continue resize phase 1
- resizePhase2();
- }
-
- calcThresholds();
-
- metadata.state = OPEN_STATE;
- tx.execute(new Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
- tx.store(metadataPage, metadataMarshaller, true);
- }
- });
- pageFile.checkpoint();
-
- LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
-
- } catch (IOException e) {
- throw new RuntimeException(e);
}
}
+
+ calcThresholds();
+
+ metadata.state = OPEN_STATE;
+ tx.store(metadataPage, metadataMarshaller, true);
- } finally {
- // All pending updates should have been committed by now.
- assert tx.isReadOnly();
+ LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
}
}
- private int sizeOfBin(long binPageId) {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public synchronized void unload() throws IOException {
+ public synchronized void unload(Transaction tx) throws IOException {
if (loaded.compareAndSet(true, false)) {
metadata.state = CLOSED_STATE;
- pageFile.tx().execute(new Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
- tx.store(metadata.page, metadataMarshaller, true);
- }
- });
+ tx.store(metadata.page, metadataMarshaller, true);
}
}
+ private int sizeOfBin(Transaction tx, int index) throws IOException {
+ return getBin(tx, index).size();
+ }
+
public synchronized Value get(Transaction tx, Key key) throws IOException {
- load();
+ assertLoaded();
return getBin(tx, key).get(key);
}
public synchronized boolean containsKey(Transaction tx, Key key) throws IOException {
- // TODO: multiple loads is smelly..
- load();
+ assertLoaded();
return getBin(tx, key).containsKey(key);
}
synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
- // TODO: multiple loads is smelly..
- load();
+ assertLoaded();
HashBin<Key,Value> bin = getBin(tx, key);
int originalSize = bin.size();
@@ -293,15 +230,14 @@
if (metadata.binsActive >= this.increaseThreshold) {
newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2);
if(metadata.binCapacity!=newSize) {
- resize(newSize);
+ resize(tx, newSize);
}
}
return result;
}
synchronized public Value remove(Transaction tx, Key key) throws IOException {
- // TODO: multiple loads is smelly..
- load();
+ assertLoaded();
HashBin<Key,Value> bin = getBin(tx, key);
int originalSize = bin.size();
@@ -320,7 +256,7 @@
if (metadata.binsActive <= this.decreaseThreshold) {
newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2);
if(metadata.binCapacity!=newSize) {
- resize(newSize);
+ resize(tx, newSize);
}
}
return result;
@@ -328,8 +264,7 @@
public synchronized void clear(Transaction tx) throws IOException {
- // TODO: multiple loads is smelly..
- load();
+ assertLoaded();
for (int i = 0; i < metadata.binCapacity; i++) {
long pageId = metadata.binPageId + i;
clearBinAtPage(tx, pageId);
@@ -365,109 +300,80 @@
// Implementation Methods
// /////////////////////////////////////////////////////////////////
+ private void assertLoaded() throws IllegalStateException {
+ if( !loaded.get() ) {
+ throw new IllegalStateException("The HashIndex is not loaded");
+ }
+ }
+
public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException {
tx.store(bin.getPage(), hashBinMarshaller, true);
}
+ // While resizing, the following contains the new resize data.
- private void resize(final int newSize) throws IOException {
+ private void resize(Transaction tx, final int newSize) throws IOException {
LOG.debug("Resizing to: "+newSize);
- pageFile.tx().execute(new Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
- metadata.state = RESIZING_PHASE1_STATE;
- metadata.resizeCapacity = newSize;
- metadata.resizePageId = tx.allocate(metadata.resizeCapacity).getPageId();
- tx.store(metadata.page, metadataMarshaller, true);
- }
- });
- pageFile.checkpoint();
- resizePhase1();
- resizePhase2();
- }
+ int resizeCapacity = newSize;
+ long resizePageId = tx.allocate(resizeCapacity).getPageId();
- private void resizePhase1() throws IOException {
// In Phase 1 we copy the data to the new bins..
- pageFile.tx().execute(new Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
-
- // Initialize the bins..
- for (int i = 0; i < metadata.resizeCapacity; i++) {
- long pageId = metadata.resizePageId + i;
- clearBinAtPage(tx, pageId);
- }
+ // Initialize the bins..
+ for (int i = 0; i < resizeCapacity; i++) {
+ long pageId = resizePageId + i;
+ clearBinAtPage(tx, pageId);
+ }
- metadata.binsActive = 0;
- // Copy the data from the old bins to the new bins.
- for (int i = 0; i < metadata.binCapacity; i++) {
-
- HashBin<Key,Value> bin = getBin(tx, i);
- for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
- HashBin<Key,Value> resizeBin = getResizeBin(tx, entry.getKey());
- resizeBin.put(entry.getKey(), entry.getValue());
- store(tx, resizeBin);
- if( resizeBin.size() == 1) {
- metadata.binsActive++;
- }
- }
+ metadata.binsActive = 0;
+ // Copy the data from the old bins to the new bins.
+ for (int i = 0; i < metadata.binCapacity; i++) {
+
+ HashBin<Key,Value> bin = getBin(tx, i);
+ for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
+ HashBin<Key,Value> resizeBin = getBin(tx, entry.getKey(), resizePageId, resizeCapacity);
+ resizeBin.put(entry.getKey(), entry.getValue());
+ store(tx, resizeBin);
+ if( resizeBin.size() == 1) {
+ metadata.binsActive++;
}
-
- // Now we can release the old data.
- metadata.state = RESIZING_PHASE2_STATE;
- tx.store(metadata.page, metadataMarshaller, true);
}
- });
- pageFile.checkpoint();
- }
-
- private void resizePhase2() throws IOException {
+ }
+
// In phase 2 we free the old bins and switch the the new bins.
- pageFile.tx().execute(new Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
- for (int i = 0; i < metadata.binCapacity; i++) {
- long pageId = metadata.binPageId + i;
- clearBinAtPage(tx, pageId);
- }
- tx.free(metadata.binPageId, metadata.binCapacity);
-
- metadata.binCapacity = metadata.resizeCapacity;
- metadata.binPageId = metadata.resizePageId;
- metadata.resizeCapacity=0;
- metadata.resizePageId=0;
- metadata.state = OPEN_STATE;
- tx.store(metadata.page, metadataMarshaller, true);
- }
- });
-
- pageFile.checkpoint();
+ tx.free(metadata.binPageId, metadata.binCapacity);
+
+ metadata.binCapacity = resizeCapacity;
+ metadata.binPageId = resizePageId;
+ metadata.state = OPEN_STATE;
+ tx.store(metadata.page, metadataMarshaller, true);
calcThresholds();
- LOG.debug("Resizing done. New bins start at: "+metadata.binPageId);
+
+ LOG.debug("Resizing done. New bins start at: "+metadata.binPageId);
+ resizeCapacity=0;
+ resizePageId=0;
}
private void calcThresholds() {
increaseThreshold = (metadata.binCapacity * loadFactor)/100;
decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000;
}
-
- private HashBin<Key,Value> getResizeBin(Transaction tx, Key key) throws IOException {
- int i = indexFor(key, metadata.resizeCapacity);
- return getResizeBin(tx, i);
+
+ private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
+ return getBin(tx, key, metadata.binPageId, metadata.binCapacity);
}
- private HashBin<Key,Value> getResizeBin(Transaction tx, int i) throws IOException {
- Page<HashBin<Key, Value>> page = tx.load(metadata.resizePageId + i, hashBinMarshaller);
- HashBin<Key, Value> rc = page.get();
- rc.setPage(page);
- return rc;
+ private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
+ return getBin(tx, i, metadata.binPageId);
}
-
- private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
- int i = indexFor(key, metadata.binCapacity);
- return getBin(tx, i);
+
+ private HashBin<Key,Value> getBin(Transaction tx, Key key, long basePage, int capacity) throws IOException {
+ int i = indexFor(key, capacity);
+ return getBin(tx, i, basePage);
}
- private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
- Page<HashBin<Key, Value>> page = tx.load(metadata.binPageId + i, hashBinMarshaller);
+ private HashBin<Key,Value> getBin(Transaction tx, int i, long basePage) throws IOException {
+ Page<HashBin<Key, Value>> page = tx.load(basePage + i, hashBinMarshaller);
HashBin<Key, Value> rc = page.get();
rc.setPage(page);
return rc;
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java Sat Sep 6 11:59:04 2008
@@ -47,14 +47,14 @@
/**
* load indexes
*/
- void load() throws IOException;
+ void load(Transaction tx) throws IOException;
/**
* unload indexes
*
* @throws IOException
*/
- void unload() throws IOException;
+ void unload(Transaction tx) throws IOException;
/**
* clear the index
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Sat Sep 6 11:59:04 2008
@@ -227,7 +227,7 @@
}
metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
- metadata.destinations.load();
+ metadata.destinations.load(tx);
}
});
@@ -285,7 +285,7 @@
}
});
- metadata.destinations.unload();
+// metadata.destinations.unload(tx);
pageFile.unload();
metadata = new Metadata();
}
@@ -611,16 +611,16 @@
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
sd.orderIndex.clear(tx);
sd.messageIdIndex.clear(tx);
- sd.orderIndex.unload();
- sd.messageIdIndex.unload();
+ sd.orderIndex.unload(tx);
+ sd.messageIdIndex.unload(tx);
tx.free(sd.orderIndex.getPageId());
tx.free(sd.messageIdIndex.getPageId());
if (sd.subscriptions != null) {
sd.subscriptions.clear(tx);
sd.subscriptionAcks.clear(tx);
- sd.subscriptions.unload();
- sd.subscriptionAcks.unload();
+ sd.subscriptions.unload(tx);
+ sd.subscriptionAcks.unload(tx);
tx.free(sd.subscriptions.getPageId());
tx.free(sd.subscriptionAcks.getPageId());
}
@@ -842,22 +842,22 @@
// Configure the marshalers and load.
rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
- rc.orderIndex.load();
+ rc.orderIndex.load(tx);
rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
- rc.messageIdIndex.load();
+ rc.messageIdIndex.load(tx);
// If it was a topic...
if (topic) {
rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
- rc.subscriptions.load();
+ rc.subscriptions.load(tx);
rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
- rc.subscriptionAcks.load();
+ rc.subscriptionAcks.load(tx);
rc.ackLocations = new TreeMap<Location, HashSet<String>>();
rc.subscriptionCursors = new HashMap<String, Location>();
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Sat Sep 6 11:59:04 2008
@@ -61,7 +61,8 @@
createPageFileAndIndex(100);
BTreeIndex index = ((BTreeIndex)this.index);
- this.index.load();
+ this.index.load(tx);
+ tx.commit();
doInsert(50);
@@ -80,7 +81,7 @@
assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
- this.index.unload();
+ this.index.unload(tx);
}
public void testPruning() throws Exception {
@@ -88,7 +89,8 @@
BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
- this.index.load();
+ this.index.load(tx);
+ tx.commit();
int minLeafDepth = index.getMinLeafDepth(tx);
int maxLeafDepth = index.getMaxLeafDepth(tx);
@@ -110,19 +112,23 @@
assertEquals(1, minLeafDepth);
assertEquals(1, maxLeafDepth);
- this.index.unload();
+ this.index.unload(tx);
+ tx.commit();
}
public void testIteration() throws Exception {
createPageFileAndIndex(100);
BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
- this.index.load();
+ this.index.load(tx);
+ tx.commit();
// Insert in reverse order..
doInsertReverse(1000);
- this.index.unload();
- this.index.load();
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
// BTree should iterate it in sorted order.
int counter=0;
@@ -133,7 +139,8 @@
counter++;
}
- this.index.unload();
+ this.index.unload(tx);
+ tx.commit();
}
void doInsertReverse(int count) throws Exception {
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java Sat Sep 6 11:59:04 2008
@@ -57,23 +57,27 @@
}
protected void tearDown() throws Exception {
+ Transaction tx = pf.tx();
for (Index i : indexes.values()) {
try {
- i.unload();
+ i.unload(tx);
} catch (Throwable ignore) {
}
}
+ tx.commit();
}
abstract protected Index<String, Long> createIndex() throws Exception;
synchronized private Index<String, Long> openIndex(String name) throws Exception {
+ Transaction tx = pf.tx();
Index<String, Long> index = indexes.get(name);
if (index == null) {
index = createIndex();
- index.load();
+ index.load(tx);
indexes.put(name, index);
}
+ tx.commit();
return index;
}
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java?rev=692707&r1=692706&r2=692707&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java Sat Sep 6 11:59:04 2008
@@ -67,22 +67,29 @@
public void testIndex() throws Exception {
createPageFileAndIndex(500);
-
- this.index.load();
+ this.index.load(tx);
+ tx.commit();
doInsert(COUNT);
- this.index.unload();
- this.index.load();
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
checkRetrieve(COUNT);
doRemove(COUNT);
- this.index.unload();
- this.index.load();
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
doInsert(COUNT);
doRemoveHalf(COUNT);
doInsertHalf(COUNT);
- this.index.unload();
- this.index.load();
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
checkRetrieve(COUNT);
- this.index.unload();
+ this.index.unload(tx);
+ tx.commit();
}
void doInsert(int count) throws Exception {