You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/06/02 17:28:31 UTC
svn commit: r1130607 [2/2] - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/broker/
activemq-core/src/main/java/org/apache/activemq/broker/region/
activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/
activemq-core...
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java Thu Jun 2 15:28:30 2011
@@ -33,18 +33,20 @@ import org.apache.kahadb.util.VariableMa
* one overflowing Page of a PageFile.
*/
public final class ListNode<Key,Value> {
+ private final static boolean ADD_FIRST = true;
+ private final static boolean ADD_LAST = false;
+ private final static long NOT_SET = -1;
// The index that this node is part of.
private final ListIndex<Key,Value> index;
- // The parent node or null if this is the root node of the List
- private ListNode<Key,Value> parent;
+
// The page associated with this node
private Page<ListNode<Key,Value>> page;
protected LinkedNodeList<KeyValueEntry<Key, Value>> entries = new LinkedNodeList<KeyValueEntry<Key, Value>>();
// The next page after this one.
- private long next = -1;
+ private long next = NOT_SET;
public int size(Transaction tx) {
return entries.size();
@@ -95,9 +97,9 @@ public final class ListNode<Key,Value> {
public ListNode<Key,Value> next() {
ListNode<Key,Value> current = nextEntry;
if( nextEntry !=null ) {
- if (nextEntry.next != -1) {
+ if (nextEntry.next != NOT_SET) {
try {
- nextEntry = index.loadNode(tx, current.next, current);
+ nextEntry = index.loadNode(tx, current.next);
} catch (IOException unexpected) {
IllegalStateException e = new IllegalStateException("failed to load next: " + current.next + ", reason: " + unexpected.getLocalizedMessage());
e.initCause(unexpected);
@@ -118,16 +120,16 @@ public final class ListNode<Key,Value> {
private final class ListIterator implements Iterator<Entry<Key, Value>> {
private final Transaction tx;
- ListNode<Key,Value> current;
+ ListNode<Key,Value> current, prev;
KeyValueEntry<Key, Value> nextEntry;
KeyValueEntry<Key, Value> toRemove;
- private ListIterator(Transaction tx, ListNode<Key,Value> current, int nextIndex) throws IOException {
+ private ListIterator(Transaction tx, ListNode<Key,Value> current, long nextIndex) throws IOException {
this.tx = tx;
this.current = current;
nextEntry = current.entries.getHead();
- if (nextIndex > 0) {
- for (int i=0; i<nextIndex; i++) {
+ if (nextIndex > 0 && nextEntry != null) {
+ for (long i=0; i<nextIndex; i++) {
nextEntry = nextEntry.getNext();
if (nextEntry == null) {
if (!nextFromNextListNode())
@@ -139,9 +141,10 @@ public final class ListNode<Key,Value> {
private boolean nextFromNextListNode() {
boolean haveNext = false;
- if (current.getNext() != -1) {
+ if (current.getNext() != NOT_SET) {
try {
- current = index.loadNode(tx, current.getNext(), current);
+ prev = current;
+ current = index.loadNode(tx, current.getNext());
} catch (IOException unexpected) {
NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
e.initCause(unexpected);
@@ -172,7 +175,7 @@ public final class ListNode<Key,Value> {
throw new IllegalStateException("can only remove once, call next again");
}
try {
- doRemove(tx, current, toRemove);
+ doRemove(tx, current, prev, toRemove);
index.onRemove();
toRemove = null;
} catch (IOException unexpected) {
@@ -197,7 +200,7 @@ public final class ListNode<Key,Value> {
}
public void writePayload(ListNode<Key,Value> node, DataOutput os) throws IOException {
- // Write the keys
+ os.writeLong(node.next);
short count = (short)node.entries.size(); // cast may truncate value...
if( count != node.entries.size() ) {
throw new IOException("short over flow, too many entries in list: " + node.entries.size());
@@ -215,6 +218,7 @@ public final class ListNode<Key,Value> {
@SuppressWarnings("unchecked")
public ListNode<Key,Value> readPayload(DataInput is) throws IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>(index);
+ node.next = is.readLong();
final short size = is.readShort();
for (short i = 0; i < size; i++) {
node.entries.addLast(
@@ -229,40 +233,26 @@ public final class ListNode<Key,Value> {
this.index = index;
}
- public void setEmpty() {
- }
-
- public Value remove(Transaction tx, Key key) throws IOException {
- Value result = null;
- KeyValueEntry<Key, Value> entry = entries.getHead();
- while (entry != null) {
- if (entry.getKey().equals(key)) {
- result = entry.getValue();
- doRemove(tx, this, entry);
- break;
- }
- entry = entry.getNext();
- }
- return result;
- }
-
- private void doRemove(Transaction tx, ListNode current, KeyValueEntry<Key, Value> entry) throws IOException {
+ private void doRemove(final Transaction tx, final ListNode current, final ListNode prev, KeyValueEntry<Key, Value> entry) throws IOException {
entry.unlink();
if (current.entries.isEmpty()) {
if (current.getPageId() == index.getHeadPageId()) {
- if (current.getNext() != -1) {
+ if (current.getNext() != NOT_SET) {
// new head
index.setHeadPageId(current.getNext());
tx.free(current.getPageId());
+ } else {
+ // store current in empty state
+ store(tx);
}
} else {
// need to unlink the node
- current.parent.setNext(current.getNext());
+ prev.setNext(current.next);
+ index.storeNode(tx, prev, false);
tx.free(current.getPageId());
- index.storeNode(tx, current.parent, false);
}
} else {
- store(tx, true);
+ store(tx);
}
}
@@ -271,7 +261,7 @@ public final class ListNode<Key,Value> {
throw new IllegalArgumentException("Key cannot be null");
}
entries.addLast(new KeyValueEntry(key, value));
- store(tx, false);
+ store(tx, ADD_LAST);
return null;
}
@@ -280,29 +270,30 @@ public final class ListNode<Key,Value> {
throw new IllegalArgumentException("Key cannot be null");
}
entries.addFirst(new KeyValueEntry(key, value));
- store(tx, true);
+ store(tx, ADD_FIRST);
return null;
}
private void store(Transaction tx, boolean addFirst) throws IOException {
try {
- index.storeNode(tx, this, allowOverflow());
+ index.storeNode(tx, this, false);
} catch ( Transaction.PageOverflowIOException e ) {
// If we get an overflow
split(tx, addFirst);
}
}
- private boolean allowOverflow() {
- return false;
+ private void store(Transaction tx) throws IOException {
+ index.storeNode(tx, this, false);
}
private void split(Transaction tx, boolean isAddFirst) throws IOException {
- ListNode<Key, Value> extension = index.createNode(tx, this);
+ ListNode<Key, Value> extension = index.createNode(tx);
if (isAddFirst) {
- extension.setEntries(entries.getHead().splitAfter());
+ // head keeps the first entry, insert extension with the rest
extension.setNext(this.getNext());
this.setNext(extension.getPageId());
+ extension.setEntries(entries.getHead().splitAfter());
} else {
index.setTailPageId(extension.getPageId());
this.setNext(extension.getPageId());
@@ -345,7 +336,7 @@ public final class ListNode<Key,Value> {
return entries.getTail();
}
- public Iterator<Entry<Key,Value>> iterator(final Transaction tx, int pos) throws IOException {
+ public Iterator<Entry<Key,Value>> iterator(final Transaction tx, long pos) throws IOException {
return new ListIterator(tx, this, pos);
}
@@ -386,14 +377,6 @@ public final class ListNode<Key,Value> {
return page.getPageId();
}
- public ListNode<Key, Value> getParent() {
- return parent;
- }
-
- public void setParent(ListNode<Key, Value> parent) {
- this.parent = parent;
- }
-
public Page<ListNode<Key, Value>> getPage() {
return page;
}
@@ -412,7 +395,7 @@ public final class ListNode<Key,Value> {
@Override
public String toString() {
- return "[ListNode "+ entries.toString() + "]";
+ return "[ListNode(" + page.getPageId() + "->" + next + ") " + entries.toString() + "]";
}
}
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Jun 2 15:28:30 2011
@@ -64,6 +64,7 @@ public class PageFile {
// 4k Default page size.
public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4));
public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
+ public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", ""+100));;
private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
private static final int PAGE_FILE_HEADER_SIZE=1024*4;
@@ -103,8 +104,8 @@ public class PageFile {
// The cache of recently used pages.
private boolean enablePageCaching=true;
// How many pages will we keep in the cache?
- private int pageCacheSize = 100;
-
+ private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
+
// Should first log the page write to the recovery buffer? Avoids partial
// page write failures..
private boolean enableRecoveryFile=true;
@@ -129,7 +130,7 @@ public class PageFile {
// Persistent settings stored in the page file.
private MetaData metaData;
-
+
/**
* Use to keep track of updated pages which have not yet been committed.
*/
@@ -682,7 +683,7 @@ public class PageFile {
public long getFreePageCount() {
assertLoaded();
- return freeList.size();
+ return freeList.rangeSize();
}
public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Thu Jun 2 15:28:30 2011
@@ -38,7 +38,7 @@ import org.apache.kahadb.util.SequenceSe
* do multiple update operations in a single unit of work.
*/
public class Transaction implements Iterable<Page> {
-
+
/**
* The PageOverflowIOException occurs when a page write is requested
* and it's data is larger than what would fit into a single page.
@@ -142,7 +142,7 @@ public class Transaction implements Iter
/**
* Frees up a previously allocated page so that it can be re-allocated again.
*
- * @param page the page to free up
+ * @param pageId the page to free up
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
@@ -155,7 +155,7 @@ public class Transaction implements Iter
/**
* Frees up a previously allocated sequence of pages so that it can be re-allocated again.
*
- * @param page the initial page of the sequence that will be getting freed
+ * @param pageId the initial page of the sequence that will be getting freed
* @param count the number of pages in the sequence
*
* @throws IOException
@@ -216,6 +216,8 @@ public class Transaction implements Iter
}
page.makeFree(getWriteTransactionId());
+ // ensure free page is visible while write is pending
+ pageFile.addToCache(page.copy());
DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
page.write(out);
@@ -451,7 +453,7 @@ public class Transaction implements Iter
}
if (page.getType() == Page.PAGE_FREE_TYPE) {
- throw new EOFException("Chunk stream does not exist at page: " + page.getPageId());
+ throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
}
return page;
@@ -560,7 +562,6 @@ public class Transaction implements Iter
* iterated.
*
* @param includeFreePages - if true, free pages are included in the iteration
- * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Thu Jun 2 15:28:30 2011
@@ -280,4 +280,14 @@ public class SequenceSet extends LinkedN
return false;
}
+ public long rangeSize() {
+ long result = 0;
+ Sequence sequence = getHead();
+ while (sequence != null) {
+ result += sequence.range();
+ sequence = sequence.getNext();
+ }
+ return result;
+ }
+
}
\ No newline at end of file
Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java Thu Jun 2 15:28:30 2011
@@ -228,39 +228,9 @@ public class ListIndexTest extends Index
tx.commit();
}
- public void testVisitor() throws Exception {
- createPageFileAndIndex(100);
- ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
- this.index.load(tx);
- tx.commit();
-
- // Insert in reverse order..
- doInsert(1000);
-
- this.index.unload(tx);
- tx.commit();
- this.index.load(tx);
- tx.commit();
-
- // BTree should iterate it in sorted order.
-
- /*index.visit(tx, new BTreeVisitor<String, Long>(){
- public boolean isInterestedInKeysBetween(String first, String second) {
- return true;
- }
- public void visit(List<String> keys, List<Long> values) {
- }
- });*/
-
-
- this.index.unload(tx);
- tx.commit();
- }
-
-
public void testRandomRemove() throws Exception {
- createPageFileAndIndex(100);
+ createPageFileAndIndex(4*1024);
ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
this.index.load(tx);
tx.commit();
@@ -295,21 +265,34 @@ public class ListIndexTest extends Index
index.remove(tx, key(1566));
}
- public void testLargeAppendTimed() throws Exception {
- createPageFileAndIndex(100);
+ public void testLargeAppendRemoveTimed() throws Exception {
+ createPageFileAndIndex(1024*4);
ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
this.index.load(tx);
tx.commit();
final int COUNT = 50000;
long start = System.currentTimeMillis();
for (int i = 0; i < COUNT; i++) {
- //String test = new String("test" + i);
- //ByteSequence bs = new ByteSequence(test.getBytes());
listIndex.put(tx, key(i), (long) i);
tx.commit();
}
LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
+
+ start = System.currentTimeMillis();
+ tx = pf.tx();
+ int removeCount = 0;
+ Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
+ while (iterator.hasNext()) {
+ iterator.next();
+ iterator.remove();
+ removeCount++;
+ }
+ tx.commit();
+ assertEquals("Removed all", COUNT, removeCount);
+ LOG.info("Time to remove " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
+ LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
+ LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount());
}
void doInsertReverse(int count) throws Exception {