You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/06/02 17:28:31 UTC

svn commit: r1130607 [2/2] - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core...

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java Thu Jun  2 15:28:30 2011
@@ -33,18 +33,20 @@ import org.apache.kahadb.util.VariableMa
  * one overflowing Page of a PageFile.
  */
 public final class ListNode<Key,Value> {
+    private final static boolean ADD_FIRST = true;
+    private final static boolean ADD_LAST = false;
+    private final static long NOT_SET = -1;
 
     // The index that this node is part of.
     private final ListIndex<Key,Value> index;
-    // The parent node or null if this is the root node of the List
-    private ListNode<Key,Value> parent;
+
     // The page associated with this node
     private Page<ListNode<Key,Value>> page;
 
     protected LinkedNodeList<KeyValueEntry<Key, Value>> entries = new LinkedNodeList<KeyValueEntry<Key, Value>>();
 
     // The next page after this one.
-    private long next = -1;
+    private long next = NOT_SET;
 
     public int size(Transaction tx) {
         return entries.size();
@@ -95,9 +97,9 @@ public final class ListNode<Key,Value> {
         public ListNode<Key,Value> next() {
             ListNode<Key,Value> current = nextEntry;
             if( nextEntry !=null ) {
-                if (nextEntry.next != -1) {
+                if (nextEntry.next != NOT_SET) {
                     try {
-                        nextEntry = index.loadNode(tx, current.next, current);
+                        nextEntry = index.loadNode(tx, current.next);
                     } catch (IOException unexpected) {
                         IllegalStateException e = new IllegalStateException("failed to load next: " + current.next + ", reason: " + unexpected.getLocalizedMessage());
                         e.initCause(unexpected);
@@ -118,16 +120,16 @@ public final class ListNode<Key,Value> {
     private final class ListIterator implements Iterator<Entry<Key, Value>> {
 
         private final Transaction tx;
-        ListNode<Key,Value> current;
+        ListNode<Key,Value> current, prev;
         KeyValueEntry<Key, Value> nextEntry;
         KeyValueEntry<Key, Value>  toRemove;
 
-        private ListIterator(Transaction tx, ListNode<Key,Value> current, int nextIndex) throws IOException {
+        private ListIterator(Transaction tx, ListNode<Key,Value> current, long nextIndex) throws IOException {
             this.tx = tx;
             this.current = current;
             nextEntry = current.entries.getHead();
-            if (nextIndex > 0) {
-                for (int i=0; i<nextIndex; i++) {
+            if (nextIndex > 0 && nextEntry != null) {
+                for (long i=0; i<nextIndex; i++) {
                     nextEntry = nextEntry.getNext();
                     if (nextEntry == null) {
                         if (!nextFromNextListNode())
@@ -139,9 +141,10 @@ public final class ListNode<Key,Value> {
 
         private boolean nextFromNextListNode() {
             boolean haveNext = false;
-            if (current.getNext() != -1) {
+            if (current.getNext() != NOT_SET) {
                 try {
-                    current = index.loadNode(tx, current.getNext(), current);
+                    prev = current;
+                    current = index.loadNode(tx, current.getNext());
                 } catch (IOException unexpected) {
                     NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
                     e.initCause(unexpected);
@@ -172,7 +175,7 @@ public final class ListNode<Key,Value> {
                 throw new IllegalStateException("can only remove once, call next again");
             }
             try {
-                doRemove(tx, current, toRemove);
+                doRemove(tx, current, prev, toRemove);
                 index.onRemove();
                 toRemove = null;
             } catch (IOException unexpected) {
@@ -197,7 +200,7 @@ public final class ListNode<Key,Value> {
         }
 
         public void writePayload(ListNode<Key,Value> node, DataOutput os) throws IOException {
-            // Write the keys
+            os.writeLong(node.next);
             short count = (short)node.entries.size(); // cast may truncate value...
             if( count != node.entries.size() ) {
                 throw new IOException("short over flow, too many entries in list: " + node.entries.size());
@@ -215,6 +218,7 @@ public final class ListNode<Key,Value> {
         @SuppressWarnings("unchecked")
         public ListNode<Key,Value> readPayload(DataInput is) throws IOException {
             ListNode<Key,Value> node = new ListNode<Key,Value>(index);
+            node.next = is.readLong();
             final short size = is.readShort();
             for (short i = 0; i < size; i++) {
                 node.entries.addLast(
@@ -229,40 +233,26 @@ public final class ListNode<Key,Value> {
         this.index = index;
     }
 
-    public void setEmpty() {
-    }
-
-    public Value remove(Transaction tx, Key key) throws IOException {
-        Value result = null;
-        KeyValueEntry<Key, Value> entry = entries.getHead();
-        while (entry != null) {
-            if (entry.getKey().equals(key)) {
-                 result = entry.getValue();
-                 doRemove(tx, this, entry);
-                 break;
-            }
-            entry = entry.getNext();
-        }
-        return result;
-    }
-
-    private void doRemove(Transaction tx, ListNode current, KeyValueEntry<Key, Value> entry) throws IOException {
+    private void doRemove(final Transaction tx, final ListNode current, final ListNode prev, KeyValueEntry<Key, Value> entry) throws IOException {
         entry.unlink();
         if (current.entries.isEmpty()) {
                 if (current.getPageId() == index.getHeadPageId()) {
-                    if (current.getNext() != -1) {
+                    if (current.getNext() != NOT_SET) {
                         // new head
                         index.setHeadPageId(current.getNext());
                         tx.free(current.getPageId());
+                    } else {
+                        //  store current in empty state
+                        store(tx);
                     }
                 } else {
                     // need to unlink the node
-                    current.parent.setNext(current.getNext());
+                    prev.setNext(current.next);
+                    index.storeNode(tx, prev, false);
                     tx.free(current.getPageId());
-                    index.storeNode(tx, current.parent, false);
                 }
         } else {
-            store(tx, true);
+            store(tx);
         }
     }
 
@@ -271,7 +261,7 @@ public final class ListNode<Key,Value> {
             throw new IllegalArgumentException("Key cannot be null");
         }
         entries.addLast(new KeyValueEntry(key, value));
-        store(tx, false);
+        store(tx, ADD_LAST);
         return null;
     }
 
@@ -280,29 +270,30 @@ public final class ListNode<Key,Value> {
             throw new IllegalArgumentException("Key cannot be null");
         }
         entries.addFirst(new KeyValueEntry(key, value));
-        store(tx, true);
+        store(tx, ADD_FIRST);
         return null;
     }
 
     private void store(Transaction tx, boolean addFirst) throws IOException {
         try {
-            index.storeNode(tx, this, allowOverflow());
+            index.storeNode(tx, this, false);
         } catch ( Transaction.PageOverflowIOException e ) {
                 // If we get an overflow
                 split(tx, addFirst);
         }
     }
 
-    private boolean allowOverflow() {
-        return false;
+    private void store(Transaction tx) throws IOException {
+        index.storeNode(tx, this, false);
     }
 
     private void split(Transaction tx, boolean isAddFirst) throws IOException {
-        ListNode<Key, Value> extension = index.createNode(tx, this);
+        ListNode<Key, Value> extension = index.createNode(tx);
         if (isAddFirst) {
-            extension.setEntries(entries.getHead().splitAfter());
+            // head keeps the first entry, insert extension with the rest
             extension.setNext(this.getNext());
             this.setNext(extension.getPageId());
+            extension.setEntries(entries.getHead().splitAfter());
         }  else {
             index.setTailPageId(extension.getPageId());
             this.setNext(extension.getPageId());
@@ -345,7 +336,7 @@ public final class ListNode<Key,Value> {
         return entries.getTail();
     }
 
-    public Iterator<Entry<Key,Value>> iterator(final Transaction tx, int pos) throws IOException {
+    public Iterator<Entry<Key,Value>> iterator(final Transaction tx, long pos) throws IOException {
         return new ListIterator(tx, this, pos);
     }
 
@@ -386,14 +377,6 @@ public final class ListNode<Key,Value> {
         return page.getPageId();
     }
 
-    public ListNode<Key, Value> getParent() {
-        return parent;
-    }
-
-    public void setParent(ListNode<Key, Value> parent) {
-        this.parent = parent;
-    }
-
     public Page<ListNode<Key, Value>> getPage() {
         return page;
     }
@@ -412,7 +395,7 @@ public final class ListNode<Key,Value> {
     
     @Override
     public String toString() {
-        return "[ListNode "+ entries.toString() + "]";
+        return "[ListNode(" + page.getPageId() + "->" + next + ") " + entries.toString() + "]";
     }
 }
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Jun  2 15:28:30 2011
@@ -64,6 +64,7 @@ public class PageFile {
     // 4k Default page size.
     public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
     public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
+    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", ""+100));;
     private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
     private static final int PAGE_FILE_HEADER_SIZE=1024*4;
 
@@ -103,8 +104,8 @@ public class PageFile {
     // The cache of recently used pages.
     private boolean enablePageCaching=true;
     // How many pages will we keep in the cache?
-    private int pageCacheSize = 100;
-    
+    private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
+
     // Should first log the page write to the recovery buffer? Avoids partial
     // page write failures..
     private boolean enableRecoveryFile=true;
@@ -129,7 +130,7 @@ public class PageFile {
     
     // Persistent settings stored in the page file. 
     private MetaData metaData;
-    
+
     /**
      * Use to keep track of updated pages which have not yet been committed.
      */
@@ -682,7 +683,7 @@ public class PageFile {
 
     public long getFreePageCount() {
         assertLoaded();
-        return freeList.size();
+        return freeList.rangeSize();
     }
 
     public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Thu Jun  2 15:28:30 2011
@@ -38,7 +38,7 @@ import org.apache.kahadb.util.SequenceSe
  * do multiple update operations in a single unit of work.
  */
 public class Transaction implements Iterable<Page> {
-    
+
     /**
      * The PageOverflowIOException occurs when a page write is requested
      * and it's data is larger than what would fit into a single page.
@@ -142,7 +142,7 @@ public class Transaction implements Iter
     /**
      * Frees up a previously allocated page so that it can be re-allocated again.
      * 
-     * @param page the page to free up
+     * @param pageId the page to free up
      * @throws IOException
      *         If an disk error occurred.
      * @throws IllegalStateException
@@ -155,7 +155,7 @@ public class Transaction implements Iter
     /**
      * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
      * 
-     * @param page the initial page of the sequence that will be getting freed
+     * @param pageId the initial page of the sequence that will be getting freed
      * @param count the number of pages in the sequence
      * 
      * @throws IOException
@@ -216,6 +216,8 @@ public class Transaction implements Iter
             }
 
             page.makeFree(getWriteTransactionId());
+            // ensure free page is visible while write is pending
+            pageFile.addToCache(page.copy());
 
             DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
             page.write(out);
@@ -451,7 +453,7 @@ public class Transaction implements Iter
                 }
 
                 if (page.getType() == Page.PAGE_FREE_TYPE) {
-                    throw new EOFException("Chunk stream does not exist at page: " + page.getPageId());
+                    throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
                 }
 
                 return page;
@@ -560,7 +562,6 @@ public class Transaction implements Iter
      * iterated.
      * 
      * @param includeFreePages - if true, free pages are included in the iteration
-     * @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Thu Jun  2 15:28:30 2011
@@ -280,4 +280,14 @@ public class SequenceSet extends LinkedN
         return false;
     }
 
+    public long rangeSize() {
+        long result = 0;
+        Sequence sequence = getHead();
+        while (sequence != null) {
+            result += sequence.range();
+            sequence = sequence.getNext();
+        }
+        return result;
+    }
+
 }
\ No newline at end of file

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java Thu Jun  2 15:28:30 2011
@@ -228,39 +228,9 @@ public class ListIndexTest extends Index
         tx.commit();
     }
 
-    public void testVisitor() throws Exception {
-        createPageFileAndIndex(100);
-        ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
-        this.index.load(tx);
-        tx.commit();
-
-        // Insert in reverse order..
-        doInsert(1000);
-
-        this.index.unload(tx);
-        tx.commit();
-        this.index.load(tx);
-        tx.commit();
-
-        // BTree should iterate it in sorted order.
-
-        /*index.visit(tx, new BTreeVisitor<String, Long>(){
-            public boolean isInterestedInKeysBetween(String first, String second) {
-                return true;
-            }
-            public void visit(List<String> keys, List<Long> values) {
-            }
-        });*/
-
-
-        this.index.unload(tx);
-        tx.commit();
-    }
-
-
     public void testRandomRemove() throws Exception {
 
-        createPageFileAndIndex(100);
+        createPageFileAndIndex(4*1024);
         ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
         this.index.load(tx);
         tx.commit();
@@ -295,21 +265,34 @@ public class ListIndexTest extends Index
         index.remove(tx, key(1566));
     }
 
-    public void testLargeAppendTimed() throws Exception {
-        createPageFileAndIndex(100);
+    public void testLargeAppendRemoveTimed() throws Exception {
+        createPageFileAndIndex(1024*4);
         ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
         this.index.load(tx);
         tx.commit();
         final int COUNT = 50000;
         long start = System.currentTimeMillis();
         for (int i = 0; i < COUNT; i++) {
-            //String test = new String("test" + i);
-            //ByteSequence bs = new ByteSequence(test.getBytes());
              listIndex.put(tx, key(i), (long) i);
              tx.commit();
         }
         LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
         LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
+
+        start = System.currentTimeMillis();
+        tx = pf.tx();
+        int removeCount = 0;
+        Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
+        while (iterator.hasNext()) {
+            iterator.next();
+            iterator.remove();
+            removeCount++;
+        }
+        tx.commit();
+        assertEquals("Removed all", COUNT, removeCount);
+        LOG.info("Time to remove " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
+        LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
+        LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount());
     }
 
     void doInsertReverse(int count) throws Exception {