You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/15 00:31:35 UTC

svn commit: r1409554 [1/2] - in /activemq/trunk: activemq-core/src/test/java/org/apache/activemq/store/kahadb/disk/ activemq-kahadb-store/ activemq-kahadb-store/src/test/ activemq-kahadb-store/src/test/java/ activemq-kahadb-store/src/test/java/org/ act...

Author: tabish
Date: Wed Nov 14 23:31:34 2012
New Revision: 1409554

URL: http://svn.apache.org/viewvc?rev=1409554&view=rev
Log:
Move some of the KahaDB tests from core into the KahaDB store module

Added:
    activemq/trunk/activemq-kahadb-store/src/test/
    activemq/trunk/activemq-kahadb-store/src/test/java/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/
    activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java   (with props)
    activemq/trunk/activemq-kahadb-store/src/test/resources/
    activemq/trunk/activemq-kahadb-store/src/test/resources/log4j.properties   (with props)
Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/disk/
Modified:
    activemq/trunk/activemq-kahadb-store/pom.xml

Modified: activemq/trunk/activemq-kahadb-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/pom.xml?rev=1409554&r1=1409553&r2=1409554&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/pom.xml (original)
+++ activemq/trunk/activemq-kahadb-store/pom.xml Wed Nov 14 23:31:34 2012
@@ -30,11 +30,8 @@
   <name>ActiveMQ :: KahaDB Store</name>
   <description>The ActiveMQ KahaDB Store Implementation</description>
 
-  <properties>
-  </properties>
-
   <dependencies>
-    
+
     <!-- =============================== -->
     <!-- Required Dependencies -->
     <!-- =============================== -->
@@ -42,7 +39,6 @@
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-broker</artifactId>
     </dependency>
-
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>activeio-core</artifactId>
@@ -53,10 +49,6 @@
       <artifactId>activemq-protobuf</artifactId>
       <optional>false</optional>
     </dependency>
-    <dependency>
-      <groupId>org.fusesource.mqtt-client</groupId>
-      <artifactId>mqtt-client</artifactId>
-    </dependency>
 
     <!-- =============================== -->
     <!-- Optional Dependencies           -->
@@ -128,13 +120,6 @@
       <artifactId>commons-net</artifactId>
     </dependency>
 
-    <!-- not really a dependency at all - just added optionally to get the generator working -->
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-openwire-generator</artifactId>
-      <optional>true</optional>
-    </dependency>
-
     <!-- =============================== -->
     <!-- Testing Dependencies            -->
     <!-- =============================== -->

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+
+public class BTreeIndexBenchMark extends IndexBenchmark {
+
+    private NumberFormat nf;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        nf = NumberFormat.getIntegerInstance();
+        nf.setMinimumIntegerDigits(10);
+        nf.setGroupingUsed(false);
+    }
+    
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+
+        Transaction tx = pf.tx();
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<String, Long> index = new BTreeIndex<String, Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+    
+    @Override
+    protected void dumpIndex(Index<String, Long> index) throws IOException {
+        Transaction tx = pf.tx();
+        ((BTreeIndex)index).printStructure(tx, System.out);
+    }
+
+    /**
+     * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+     * always insert to the end of the BTree.  
+     */
+    @Override
+    protected String key(long i) {
+        return "a-long-message-id-like-key:"+nf.format(i);
+    }
+
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
+import java.text.NumberFormat;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BTreeIndexTest extends IndexTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(BTreeIndexTest.class);
+    private NumberFormat nf;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        nf = NumberFormat.getIntegerInstance();
+        nf.setMinimumIntegerDigits(6);
+        nf.setGroupingUsed(false);
+    }
+    
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+        
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<String, Long> index = new BTreeIndex<String,Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+
+    /**
+     * Yeah, the current implementation does NOT try to balance the tree.  Here is 
+     * a test case showing that it gets out of balance.  
+     * 
+     * @throws Exception
+     */
+    public void disabled_testTreeBalancing() throws Exception {
+        createPageFileAndIndex(100);
+
+        BTreeIndex index = ((BTreeIndex)this.index);
+        this.index.load(tx);
+        tx.commit();
+        
+        doInsert(50);
+        
+        int minLeafDepth = index.getMinLeafDepth(tx);
+        int maxLeafDepth = index.getMaxLeafDepth(tx);
+        assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+        // Remove some of the data
+        doRemove(16);
+        minLeafDepth = index.getMinLeafDepth(tx);
+        maxLeafDepth = index.getMaxLeafDepth(tx);
+
+        System.out.println( "min:"+minLeafDepth );
+        System.out.println( "max:"+maxLeafDepth );
+        index.printStructure(tx, new PrintWriter(System.out));
+
+        assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+        this.index.unload(tx);
+    }
+    
+    public void testPruning() throws Exception {
+        createPageFileAndIndex(100);
+
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+
+        this.index.load(tx);
+        tx.commit();
+     
+        int minLeafDepth = index.getMinLeafDepth(tx);
+        int maxLeafDepth = index.getMaxLeafDepth(tx);
+        assertEquals(1, minLeafDepth);
+        assertEquals(1, maxLeafDepth);
+        
+        doInsert(1000);
+        
+        minLeafDepth = index.getMinLeafDepth(tx);
+        maxLeafDepth = index.getMaxLeafDepth(tx);
+        assertTrue("Depth of tree grew", minLeafDepth > 1);
+        assertTrue("Depth of tree grew", maxLeafDepth > 1);
+
+        // Remove the data.
+        doRemove(1000);
+        minLeafDepth = index.getMinLeafDepth(tx);
+        maxLeafDepth = index.getMaxLeafDepth(tx);
+
+        assertEquals(1, minLeafDepth);
+        assertEquals(1, maxLeafDepth);
+
+        this.index.unload(tx);
+        tx.commit();
+    }
+
+    public void testIteration() throws Exception {
+        createPageFileAndIndex(500);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+        this.index.load(tx);
+        tx.commit();
+          
+        // Insert in reverse order..
+        doInsertReverse(1000);
+        
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+
+        exerciseAnotherIndex(tx);
+
+        // BTree should iterate it in sorted order.
+        int counter=0;
+        for (Iterator<Map.Entry<String,Long>> i = index.iterator(tx); i.hasNext();) {
+            Map.Entry<String,Long> entry = (Map.Entry<String,Long>)i.next();
+            assertEquals(key(counter),entry.getKey());
+            assertEquals(counter,(long)entry.getValue());
+            counter++;
+        }
+
+        this.index.unload(tx);
+        tx.commit();
+    }
+    
+    
+    public void testVisitor() throws Exception {
+        createPageFileAndIndex(100);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+        this.index.load(tx);
+        tx.commit();
+          
+        // Insert in reverse order..
+        doInsert(1000);
+        
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+
+        // BTree should iterate it in sorted order.
+        
+        index.visit(tx, new BTreeVisitor<String, Long>(){
+            public boolean isInterestedInKeysBetween(String first, String second) {
+                return true;
+            }
+            public void visit(List<String> keys, List<Long> values) {
+            }
+        });
+        
+
+        this.index.unload(tx);
+        tx.commit();
+    }
+
+
+    public void testRandomRemove() throws Exception {
+
+        createPageFileAndIndex(100);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+        this.index.load(tx);
+
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<String, String> sindex = new BTreeIndex<String,String>(pf, id);
+        sindex.setKeyMarshaller(StringMarshaller.INSTANCE);
+        sindex.setValueMarshaller(StringMarshaller.INSTANCE);
+        sindex.load(tx);
+
+        tx.commit();
+
+        final int count = 5000;
+
+        String payload = new String(new byte[2]);
+        for (int i = 0; i < count; i++) {
+            index.put(tx, key(i), (long)i);
+            sindex.put(tx, key(i), String.valueOf(i) + payload);
+            tx.commit();
+        }
+
+
+        Random rand = new Random(System.currentTimeMillis());
+        int i = 0, prev = 0;
+        while (!index.isEmpty(tx) || !sindex.isEmpty(tx)) {
+            prev = i;
+            i = rand.nextInt(count);
+            try {
+                index.remove(tx, key(i));
+                sindex.remove(tx, key(i));
+            } catch (Exception e) {
+                e.printStackTrace();
+                fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
+            }
+        }
+    }
+
+    public void testRandomAddRemove() throws Exception {
+
+        createPageFileAndIndex(1024);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+        this.index.load(tx);
+
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<String, String> sindex = new BTreeIndex<String,String>(pf, id);
+        sindex.setKeyMarshaller(StringMarshaller.INSTANCE);
+        sindex.setValueMarshaller(StringMarshaller.INSTANCE);
+        sindex.load(tx);
+
+        tx.commit();
+
+        Random rand = new Random(System.currentTimeMillis());
+        final int count = 50000;
+
+        String payload = new String(new byte[200]);
+        for (int i = 0; i < count; i++) {
+            int insertIndex = rand.nextInt(count);
+            index.put(tx, key(insertIndex), (long)insertIndex);
+            sindex.put(tx, key(insertIndex), String.valueOf(insertIndex) + payload);
+            tx.commit();
+        }
+
+
+        int i = 0, prev = 0;
+        while (!index.isEmpty(tx) || !sindex.isEmpty(tx)) {
+            prev = i;
+            i = rand.nextInt(count);
+            try {
+                index.remove(tx, key(i));
+                sindex.remove(tx, key(i));
+            } catch (Exception e) {
+                e.printStackTrace();
+                fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
+            }
+        }
+    }
+
+    public void testRemovePattern() throws Exception {
+        createPageFileAndIndex(100);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        final int count = 4000;
+        doInsert(count);
+
+        index.remove(tx, key(3697));
+        index.remove(tx, key(1566));
+
+        tx.commit();
+        index.clear(tx);
+        tx.commit();
+
+        doInsert(count);
+
+        Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx, key(1345));
+        while (iterator.hasNext()) {
+            Map.Entry<String, Long> val = iterator.next();
+        }
+
+        doRemoveBackwards(666);
+        Map.Entry<String, Long> first = index.getFirst(tx);
+        assertEquals(first.getValue(), Long.valueOf(666L));
+
+        for (int i=0; i<2000; i++) {
+            Map.Entry<String, Long> last = index.getLast(tx);
+            index.remove(tx, last.getKey());
+            tx.commit();
+        }
+
+        exerciseAnotherIndex(tx);
+
+        iterator = index.iterator(tx, key(100));
+        while (iterator.hasNext()) {
+            Map.Entry<String, Long> val = iterator.next();
+        }
+
+        Map.Entry<String, Long> last = index.getLast(tx);
+        assertEquals(last.getValue(), Long.valueOf(1999L));
+        index.clear(tx);
+        assertNull(index.getLast(tx));
+    }
+
+    public void testLargeValue() throws Exception {
+        //System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024));
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        //pf.setEnablePageCaching(false);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<Long, HashSet<String>> test = new BTreeIndex<Long, HashSet<String>>(pf, id);
+        test.setKeyMarshaller(LongMarshaller.INSTANCE);
+        test.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        tx =  pf.tx();
+        String val = new String(new byte[1024]);
+        final long numMessages = 10;
+        final int numConsumers = 200;
+
+        for (long i=0; i<numMessages; i++) {
+            HashSet<String> hs = new HashSet<String>();
+            for (int j=0; j<numConsumers;j++) {
+                hs.add(val + "SOME TEXT" + j);
+            }
+            test.put(tx, i, hs);
+        }
+        tx.commit();
+        tx =  pf.tx();
+        for (long i=0; i<numMessages; i++) {
+            HashSet<String> hs = new HashSet<String>();
+            for (int j=numConsumers; j<numConsumers*2;j++) {
+                hs.add(val + "SOME TEXT" + j);
+            }
+            test.put(tx, i, hs);
+        }
+
+        tx.commit();
+        tx =  pf.tx();
+        for (long i=0; i<numMessages; i++) {
+            assertTrue(test.containsKey(tx, i));
+            test.get(tx, i);
+        }
+        tx.commit();
+    }
+
+    public void testLargeValueOverflow() throws Exception {
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        BTreeIndex<Long, String> test = new BTreeIndex<Long, String>(pf, id);
+        test.setKeyMarshaller(LongMarshaller.INSTANCE);
+        test.setValueMarshaller(StringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        final int stringSize = 6*1024;
+        tx =  pf.tx();
+        String val = new String(new byte[stringSize]);
+        final long numMessages = 1;
+
+        for (long i=0; i<numMessages; i++) {
+            test.put(tx, i, val);
+        }
+        tx.commit();
+
+        exerciseAnotherIndex(tx);
+
+        tx =  pf.tx();
+        for (long i=0; i<numMessages; i++) {
+            assertTrue(test.containsKey(tx, i));
+            String s = test.get(tx, i);
+            assertEquals("len is as expected", stringSize, s.length());
+        }
+        tx.commit();
+    }
+
+    public void exerciseAnotherIndex(Transaction tx) throws Exception {
+        long id = tx.allocate().getPageId();
+
+        ListIndex<String, String> test = new ListIndex<String, String>(pf, id);
+        test.setKeyMarshaller(StringMarshaller.INSTANCE);
+        test.setValueMarshaller(StringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        final int count = 10000;
+
+        String payload = new String(new byte[1]);
+        for (int i = 0; i < count; i++) {
+            test.put(tx, key(i), String.valueOf(i) + payload);
+        }
+        tx.commit();
+
+        test.clear(tx);
+        tx.commit();
+    }
+
+    public void testIndexRepeatFillClearIncrementingPageReuse() throws Exception {
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.load();
+
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        BTreeIndex<Long, String> test = new BTreeIndex<Long, String>(pf, id);
+        test.setKeyMarshaller(LongMarshaller.INSTANCE);
+        test.setValueMarshaller(StringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        final int count = 5000;
+        final int reps = 2;
+        final long[] diffs = new long[reps];
+        long keyVal = 0;
+        final String payload = new String(new byte[50]);
+
+        LOG.info("PF diff:" + (pf.getPageCount() - pf.getFreePageCount()) + " pc:" + pf.getPageCount() + " f:" + pf.getFreePageCount() );
+
+        for (int i=0; i<reps; i++) {
+
+            for (int j = 0; j < count; j++) {
+                tx = pf.tx();
+                test.put(tx, keyVal++, payload);
+                tx.commit();
+            }
+
+            tx = pf.tx();
+            for (long k = keyVal - count; k < keyVal; k++) {
+                test.remove(tx, k);
+            }
+            test.clear(tx);
+            tx.commit();
+            diffs[i] = pf.getPageCount() - pf.getFreePageCount();
+
+            LOG.info("PF diff:" + (pf.getPageCount() - pf.getFreePageCount()) + " pc:" + pf.getPageCount() + " f:" + pf.getFreePageCount());
+        }
+        for (int i=1; i<diffs.length; i++) {
+            assertEquals("diff is constant:" + Arrays.toString(diffs), diffs[0],diffs[i]);
+        }
+    }
+
+    public void testListIndexConsistancyOverTime() throws Exception {
+
+        final int NUM_ITERATIONS = 50;
+
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        //pf.setEnablePageCaching(false);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        ListIndex<String, String> test = new ListIndex<String, String>(pf, id);
+        test.setKeyMarshaller(StringMarshaller.INSTANCE);
+        test.setValueMarshaller(StringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        int expectedListEntries = 0;
+        int nextSequenceId = 0;
+
+        LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entires and sparsely populating the sequences.");
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            test.add(tx, String.valueOf(expectedListEntries++), new String("AA"));
+
+            for (int j = 0; j < expectedListEntries; j++) {
+
+                String sequenceSet = test.get(tx, String.valueOf(j));
+
+                int startSequenceId = nextSequenceId;
+                for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
+                    sequenceSet.concat(String.valueOf(nextSequenceId++));
+                    test.put(tx, String.valueOf(j), sequenceSet);
+                }
+
+                sequenceSet = test.get(tx, String.valueOf(j));
+
+                for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
+                    //sequenceSet.remove(startSequenceId++);
+                    test.put(tx, String.valueOf(j), String.valueOf(j));
+                }
+            }
+        }
+
+        exerciseAnotherIndex(tx);
+        LOG.info("Checking if Index has the expected number of entries.");
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
+        }
+
+        LOG.info("Index has the expected number of entries.");
+
+        assertEquals(expectedListEntries, test.size());
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+            LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+            assertEquals(expectedListEntries - (i + 1), test.size());
+        }
+    }
+
+    void doInsertReverse(int count) throws Exception {
+        for (int i = count-1; i >= 0; i--) {
+            index.put(tx, key(i), (long)i);
+            tx.commit();
+        }
+    }
+    /**
+     * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+     * always insert to the end of the BTree.  
+     */
+    @Override
+    protected String key(int i) {
+        return "key:"+nf.format(i);
+    }
+
+    static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
+        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
+
+        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oout = new ObjectOutputStream(baos);
+            oout.writeObject(object);
+            oout.flush();
+            oout.close();
+            byte[] data = baos.toByteArray();
+            dataOut.writeInt(data.length);
+            dataOut.write(data);
+        }
+
+        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
+            int dataLen = dataIn.readInt();
+            byte[] data = new byte[dataLen];
+            dataIn.readFully(data);
+            ByteArrayInputStream bais = new ByteArrayInputStream(data);
+            ObjectInputStream oin = new ObjectInputStream(bais);
+            try {
+                return (HashSet<String>) oin.readObject();
+            } catch (ClassNotFoundException cfe) {
+                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
+                ioe.initCause(cfe);
+                throw ioe;
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+
+public class HashIndexBenchMark extends IndexBenchmark {
+
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+
+        Transaction tx = pf.tx();
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        HashIndex<String, Long> index = new HashIndex<String, Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+
+public class HashIndexTest extends IndexTestSupport {
+
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+        
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        HashIndex<String, Long> index = new HashIndex<String,Long>(pf, id);
+        index.setBinCapacity(12);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.util.IOHelper;
+
+/**
+ * @author chirino
+ */
+public abstract class IndexBenchmark extends TestCase {
+
+    // Slower machines might need to make this bigger.
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 5));
+    // How many times do we sample?
+    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "" + 60 * 1000 / SAMPLE_DURATION));
+    // How many indexes will we be benchmarking concurrently?
+    private static final int INDEX_COUNT = Integer.parseInt(System.getProperty("INDEX_COUNT", "" + 1));
+    // Indexes tend to perform worse when they get big.. so how many items
+    // should we put into the index before we start sampling.
+    private static final int INDEX_PRE_LOAD_COUNT = Integer.parseInt(System.getProperty("INDEX_PRE_LOAD_COUNT", "" + 10000 / INDEX_COUNT));
+
+    protected File ROOT_DIR;
+    protected final HashMap<String, Index<String, Long>> indexes = new HashMap<String, Index<String, Long>>();
+    protected PageFile pf;
+
+    public void setUp() throws Exception {
+        ROOT_DIR = new File(IOHelper.getDefaultDataDirectory());
+        IOHelper.delete(ROOT_DIR);
+
+        pf = new PageFile(ROOT_DIR, getClass().getName());
+        pf.load();
+    }
+
+    protected void tearDown() throws Exception {
+        Transaction tx = pf.tx();
+        for (Index<?, ?> i : indexes.values()) {
+            try {
+                i.unload(tx);
+            } catch (Throwable ignore) {
+            }
+        }
+        tx.commit();
+    }
+
+    abstract protected Index<String, Long> createIndex() throws Exception;
+
+    synchronized private Index<String, Long> openIndex(String name) throws Exception {
+        Transaction tx = pf.tx();
+        Index<String, Long> index = indexes.get(name);
+        if (index == null) {
+            index = createIndex();
+            index.load(tx);
+            indexes.put(name, index);
+        }
+        tx.commit();
+        return index;
+    }
+
+    class Producer extends Thread {
+        private final String name;
+        AtomicBoolean shutdown = new AtomicBoolean();
+
+        public Producer(String name) {
+            super("Producer: " + name);
+            this.name = name;
+        }
+
+        public void shutdown() {
+            shutdown.set(true);
+        }
+
+        @Override
+        public void run() {
+            try {
+
+                Transaction tx = pf.tx();
+
+                Index<String,Long> index = openIndex(name);
+                long counter = 0;
+                while (!shutdown.get()) {
+                    long c = counter;
+
+                    String key = key(c);
+                    index.put(tx, key, c);
+                    tx.commit();
+                    Thread.yield(); // This avoids consumer starvation..
+
+                    onProduced(counter++);
+                }
+
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+        public void onProduced(long counter) {
+        }
+    }
+
+    protected String key(long c) {
+        return "a-long-message-id-like-key-" + c;
+    }
+
+
+    class Consumer extends Thread {
+        private final String name;
+        AtomicBoolean shutdown = new AtomicBoolean();
+
+        public Consumer(String name) {
+            super("Consumer: " + name);
+            this.name = name;
+        }
+
+        public void shutdown() {
+            shutdown.set(true);
+        }
+
+        @Override
+        public void run() {
+            try {
+                Transaction tx = pf.tx();
+
+                Index<String,Long> index = openIndex(name);
+                long counter = 0;
+                while (!shutdown.get()) {
+                    long c = counter;
+                    String key = key(c);
+
+                    Long record = index.get(tx, key);
+                    if (record != null) {
+                        if( index.remove(tx, key) == null ) {
+                            System.out.print("Remove failed...");
+                        }
+                        tx.commit();
+                        onConsumed(counter++);
+                    }
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+        public void onConsumed(long counter) {
+        }
+    }
+
+    protected void dumpIndex(Index<String, Long> index) throws IOException {
+    }
+
+    public void testLoad() throws Exception {
+
+        final Producer producers[] = new Producer[INDEX_COUNT];
+        final Consumer consumers[] = new Consumer[INDEX_COUNT];
+        final CountDownLatch preloadCountDown = new CountDownLatch(INDEX_COUNT);
+        final AtomicLong producedRecords = new AtomicLong();
+        final AtomicLong consumedRecords = new AtomicLong();
+
+        System.out.println("Starting: " + INDEX_COUNT + " producers");
+        for (int i = 0; i < INDEX_COUNT; i++) {
+            producers[i] = new Producer("test-" + i) {
+                private boolean prelaodDone;
+
+                public void onProduced(long counter) {
+                    if (!prelaodDone && counter >= INDEX_PRE_LOAD_COUNT) {
+                        prelaodDone = true;
+                        preloadCountDown.countDown();
+                    }
+                    producedRecords.incrementAndGet();
+                }
+            };
+            producers[i].start();
+        }
+
+        long start = System.currentTimeMillis();
+        System.out.println("Waiting for each producer create " + INDEX_PRE_LOAD_COUNT + " records before starting the consumers.");
+        preloadCountDown.await();
+        long end = System.currentTimeMillis();
+        System.out.println("Preloaded " + INDEX_PRE_LOAD_COUNT * INDEX_COUNT + " records at " + (INDEX_PRE_LOAD_COUNT * INDEX_COUNT * 1000f / (end - start)) + " records/sec");
+
+        System.out.println("Starting: " + INDEX_COUNT + " consumers");
+        for (int i = 0; i < INDEX_COUNT; i++) {
+            consumers[i] = new Consumer("test-" + i) {
+                public void onConsumed(long counter) {
+                    consumedRecords.incrementAndGet();
+                }
+            };
+            consumers[i].start();
+        }
+
+        long sample_start = System.currentTimeMillis();
+        System.out.println("Taking " + SAMPLES + " performance samples every " + SAMPLE_DURATION + " ms");
+        System.out.println("time (s), produced, produce rate (r/s), consumed, consume rate (r/s), used memory (k)");
+        producedRecords.set(0);
+        consumedRecords.set(0);
+        for (int i = 0; i < SAMPLES; i++) {
+            start = System.currentTimeMillis();
+            Thread.sleep(SAMPLE_DURATION);
+            end = System.currentTimeMillis();
+            long p = producedRecords.getAndSet(0);
+            long c = consumedRecords.getAndSet(0);
+
+            long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+
+            System.out.println(((end-sample_start)/1000f)+", "+p+", "+(p * 1000f / (end - start)) + ", "+ c+", " + (c * 1000f / (end - start))+", "+(usedMemory/(1024)) );
+        }
+        System.out.println("Samples done... Shutting down the producers and consumers...");
+        for (int i = 0; i < INDEX_COUNT; i++) {
+            producers[i].shutdown();
+            consumers[i].shutdown();
+        }
+        for (int i = 0; i < INDEX_COUNT; i++) {
+            producers[i].join(1000 * 5);
+            consumers[i].join(1000 * 5);
+        }
+        System.out.println("Shutdown.");
+    }
+
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.util.IOHelper;
+
+/**
+ * Test a HashIndex
+ */
+public abstract class IndexTestSupport extends TestCase {
+
+    private static final int COUNT = 10000;
+
+    protected Index<String,Long> index;
+    protected File directory;
+    protected PageFile pf;
+    protected Transaction tx;
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        directory = new File(IOHelper.getDefaultDataDirectory() + System.currentTimeMillis());
+        IOHelper.delete(directory);
+    }
+
+    protected void tearDown() throws Exception {
+        if( pf!=null ) {
+            pf.unload();
+            pf.delete();
+        }
+        if (directory != null) {
+            IOHelper.delete(directory);
+        }
+    }
+
+    protected void createPageFileAndIndex(int pageSize) throws Exception {
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(pageSize);
+        pf.load();
+        tx = pf.tx();
+        this.index = createIndex();
+    }
+
+    abstract protected Index<String, Long> createIndex() throws Exception;
+
+    public void testIndex() throws Exception {
+        createPageFileAndIndex(500);
+        this.index.load(tx);
+        tx.commit();
+        doInsert(COUNT);
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+        checkRetrieve(COUNT);
+        doRemove(COUNT);
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+        doInsert(COUNT);
+        doRemoveHalf(COUNT);
+        doInsertHalf(COUNT);
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+        checkRetrieve(COUNT);
+        this.index.unload(tx);
+        tx.commit();
+    }
+
+    void doInsert(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            index.put(tx, key(i), (long)i);
+            tx.commit();
+        }
+    }
+
+    protected String key(int i) {
+        return "key:"+i;
+    }
+
+    void checkRetrieve(int count) throws IOException {
+        for (int i = 0; i < count; i++) {
+            Long item = index.get(tx, key(i));
+            assertNotNull("Key missing: "+key(i), item);
+        }
+    }
+
+    void doRemoveHalf(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            if (i % 2 == 0) {
+                assertNotNull("Expected remove to return value for index "+i, index.remove(tx, key(i)));
+                tx.commit();
+            }
+
+        }
+    }
+
+    void doInsertHalf(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            if (i % 2 == 0) {
+                index.put(tx, key(i), (long)i);
+                tx.commit();
+            }
+        }
+    }
+
+    void doRemove(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            assertNotNull("Expected remove to return value for index "+i, index.remove(tx, key(i)));
+            tx.commit();
+        }
+        for (int i = 0; i < count; i++) {
+            Long item = index.get(tx, key(i));
+            assertNull(item);
+        }
+    }
+
+    void doRemoveBackwards(int count) throws Exception {
+        for (int i = count - 1; i >= 0; i--) {
+            index.remove(tx, key(i));
+            tx.commit();
+        }
+        for (int i = 0; i < count; i++) {
+            Long item = index.get(tx, key(i));
+            assertNull(item);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.text.NumberFormat;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.Sequence;
+import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ListIndexTest extends IndexTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(ListIndexTest.class);
+    private NumberFormat nf;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        nf = NumberFormat.getIntegerInstance();
+        nf.setMinimumIntegerDigits(6);
+        nf.setGroupingUsed(false);
+    }
+
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        ListIndex<String, Long> index = new ListIndex<String, Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+
+        return index;
+    }
+
+    public void testSize() throws Exception {
+        createPageFileAndIndex(100);
+
+        ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        int count = 30;
+        tx = pf.tx();
+        doInsert(count);
+        tx.commit();
+        assertEquals("correct size", count, listIndex.size());
+
+        tx = pf.tx();
+        Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
+        while (iterator.hasNext()) {
+            iterator.next();
+            iterator.remove();
+            assertEquals("correct size", --count, listIndex.size());
+        }
+        tx.commit();
+
+        count = 30;
+        tx = pf.tx();
+        doInsert(count);
+        tx.commit();
+        assertEquals("correct size", count, listIndex.size());
+
+        tx = pf.tx();
+        listIndex.clear(tx);
+        assertEquals("correct size", 0, listIndex.size());
+        tx.commit();
+    }
+
+    public void testPut() throws Exception {
+        createPageFileAndIndex(100);
+
+        ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        int count = 30;
+        tx = pf.tx();
+        doInsert(count);
+        tx.commit();
+        assertEquals("correct size", count, listIndex.size());
+
+        tx = pf.tx();
+        Long value = listIndex.get(tx, key(10));
+        assertNotNull(value);
+        listIndex.put(tx, key(10), Long.valueOf(1024));
+        tx.commit();
+
+        tx = pf.tx();
+        value = listIndex.get(tx, key(10));
+        assertEquals(1024L, value.longValue());
+        assertTrue(listIndex.size() == 30);
+        tx.commit();
+
+        tx = pf.tx();
+        value = listIndex.put(tx, key(31), Long.valueOf(2048));
+        assertNull(value);
+        assertTrue(listIndex.size() == 31);
+        tx.commit();
+    }
+
+    public void testAddFirst() throws Exception {
+        createPageFileAndIndex(100);
+
+        ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        tx = pf.tx();
+
+        // put is add last
+        doInsert(10);
+        listIndex.addFirst(tx, key(10), (long) 10);
+        listIndex.addFirst(tx, key(11), (long) 11);
+
+        tx.commit();
+
+        tx = pf.tx();
+        int counter = 11;
+        Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
+        assertEquals(key(counter), iterator.next().getKey());
+        counter--;
+        assertEquals(key(counter), iterator.next().getKey());
+        counter--;
+        int count = 0;
+        while (iterator.hasNext() && count < counter) {
+            Map.Entry<String, Long> entry = (Map.Entry<String, Long>) iterator.next();
+            assertEquals(key(count), entry.getKey());
+            assertEquals(count, (long) entry.getValue());
+            count++;
+        }
+        tx.commit();
+    }
+
+    public void testPruning() throws Exception {
+        createPageFileAndIndex(100);
+
+        ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+
+        this.index.load(tx);
+        tx.commit();
+
+        long pageCount = index.getPageFile().getPageCount();
+        assertEquals(1, pageCount);
+
+        long freePageCount = index.getPageFile().getFreePageCount();
+        assertEquals("No free pages", 0, freePageCount);
+
+        tx = pf.tx();
+        doInsert(20);
+        tx.commit();
+
+        pageCount = index.getPageFile().getPageCount();
+        LOG.info("page count: " + pageCount);
+        assertTrue("used some pages", pageCount > 1);
+
+        tx = pf.tx();
+        // Remove the data.
+        doRemove(20);
+
+        tx.commit();
+
+        freePageCount = index.getPageFile().getFreePageCount();
+
+        LOG.info("FreePage count: " + freePageCount);
+        assertTrue("Some free pages " + freePageCount, freePageCount > 0);
+
+
+        LOG.info("add some more to use up free list");
+        tx = pf.tx();
+        doInsert(20);
+        tx.commit();
+
+        freePageCount = index.getPageFile().getFreePageCount();
+
+        LOG.info("FreePage count: " + freePageCount);
+        assertEquals("no free pages " + freePageCount, 0, freePageCount);
+        assertEquals("Page count is static", pageCount,  index.getPageFile().getPageCount());
+
+        this.index.unload(tx);
+        tx.commit();
+    }
+
+    public void testIterationAddFirst() throws Exception {
+        createPageFileAndIndex(100);
+        ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        tx = pf.tx();
+        final int entryCount = 200;
+        // Insert in reverse order..
+        doInsertReverse(entryCount);
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+
+        int counter = 0;
+        for (Iterator<Map.Entry<String, Long>> i = index.iterator(tx); i.hasNext(); ) {
+            Map.Entry<String, Long> entry = (Map.Entry<String, Long>) i.next();
+            assertEquals(key(counter), entry.getKey());
+            assertEquals(counter, (long) entry.getValue());
+            counter++;
+        }
+         assertEquals("We iterated over all entries", entryCount, counter);
+
+        tx = pf.tx();
+        // Remove the data.
+        doRemove(entryCount);
+        tx.commit();
+
+        this.index.unload(tx);
+        tx.commit();
+    }
+
+
+    public void testIteration() throws Exception {
+        createPageFileAndIndex(100);
+        ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        // Insert in reverse order..
+        final int entryCount = 200;
+        doInsert(entryCount);
+
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+
+        int counter = 0;
+        for (Iterator<Map.Entry<String, Long>> i = index.iterator(tx); i.hasNext(); ) {
+            Map.Entry<String, Long> entry = (Map.Entry<String, Long>) i.next();
+            assertEquals(key(counter), entry.getKey());
+            assertEquals(counter, (long) entry.getValue());
+            counter++;
+        }
+        assertEquals("We iterated over all entries", entryCount, counter);
+
+        this.index.unload(tx);
+        tx.commit();
+    }
+
+    public void testRandomRemove() throws Exception {
+
+        createPageFileAndIndex(4*1024);
+        ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        final int count = 4000;
+        doInsert(count);
+
+        Random rand = new Random(System.currentTimeMillis());
+        int i = 0, prev = 0;
+        while (!index.isEmpty(tx)) {
+            prev = i;
+            i = rand.nextInt(count);
+            try {
+                index.remove(tx, key(i));
+            } catch (Exception e) {
+                e.printStackTrace();
+                fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
+            }
+        }
+    }
+
+    public void testRemovePattern() throws Exception {
+        createPageFileAndIndex(100);
+        ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+
+        final int count = 4000;
+        doInsert(count);
+
+        index.remove(tx, key(3697));
+        index.remove(tx, key(1566));
+    }
+
+    public void testLargeAppendRemoveTimed() throws Exception {
+        createPageFileAndIndex(1024*4);
+        ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+        this.index.load(tx);
+        tx.commit();
+        final int COUNT = 50000;
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < COUNT; i++) {
+             listIndex.add(tx, key(i), (long) i);
+             tx.commit();
+        }
+        LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
+        LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
+
+        start = System.currentTimeMillis();
+        tx = pf.tx();
+        int removeCount = 0;
+        Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
+        while (iterator.hasNext()) {
+            iterator.next();
+            iterator.remove();
+            removeCount++;
+        }
+        tx.commit();
+        assertEquals("Removed all", COUNT, removeCount);
+        LOG.info("Time to remove " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
+        LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
+        LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount());
+    }
+
+    private int getMessageSize(int min, int max) {
+        return min + (int)(Math.random() * ((max - min) + 1));
+    }
+
+    public void testLargeValueOverflow() throws Exception {
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        ListIndex<Long, String> test = new ListIndex<Long, String>(pf, id);
+        test.setKeyMarshaller(LongMarshaller.INSTANCE);
+        test.setValueMarshaller(StringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        final long NUM_ADDITIONS = 32L;
+
+        LinkedList<Long> expected = new LinkedList<Long>();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; ++i) {
+            final int stringSize = getMessageSize(1, 4096);
+            String val = new String(new byte[stringSize]);
+            expected.add(Long.valueOf(stringSize));
+            test.add(tx, i, val);
+        }
+        tx.commit();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; i++) {
+            String s = test.get(tx, i);
+            assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+        }
+        tx.commit();
+
+        expected.clear();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; ++i) {
+            final int stringSize = getMessageSize(1, 4096);
+            String val = new String(new byte[stringSize]);
+            expected.add(Long.valueOf(stringSize));
+            test.addFirst(tx, i+NUM_ADDITIONS, val);
+        }
+        tx.commit();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; i++) {
+            String s = test.get(tx, i+NUM_ADDITIONS);
+            assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+        }
+        tx.commit();
+
+        expected.clear();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; ++i) {
+            final int stringSize = getMessageSize(1, 4096);
+            String val = new String(new byte[stringSize]);
+            expected.add(Long.valueOf(stringSize));
+            test.put(tx, i, val);
+        }
+        tx.commit();
+
+        tx =  pf.tx();
+        for (long i = 0; i < NUM_ADDITIONS; i++) {
+            String s = test.get(tx, i);
+            assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+        }
+        tx.commit();
+    }
+
+    void doInsertReverse(int count) throws Exception {
+        for (int i = count - 1; i >= 0; i--) {
+            ((ListIndex<String, Long>) index).addFirst(tx, key(i), (long) i);
+            tx.commit();
+        }
+    }
+
+    @Override
+    protected String key(int i) {
+        return "key:" + nf.format(i);
+    }
+
+    public void testListIndexConsistencyOverTime() throws Exception {
+
+        final int NUM_ITERATIONS = 100;
+
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        ListIndex<String, SequenceSet> test = new ListIndex<String, SequenceSet>(pf, id);
+        test.setKeyMarshaller(StringMarshaller.INSTANCE);
+        test.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        int expectedListEntries = 0;
+        int nextSequenceId = 0;
+
+        LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entires and sparsely populating the sequences.");
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            test.add(tx, String.valueOf(expectedListEntries++), new SequenceSet());
+
+            for (int j = 0; j < expectedListEntries; j++) {
+
+                SequenceSet sequenceSet = test.get(tx, String.valueOf(j));
+
+                int startSequenceId = nextSequenceId;
+                for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
+                    sequenceSet.add(nextSequenceId++);
+                    test.put(tx, String.valueOf(j), sequenceSet);
+                }
+
+                sequenceSet = test.get(tx, String.valueOf(j));
+
+                for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
+                    sequenceSet.remove(startSequenceId++);
+                    test.put(tx, String.valueOf(j), sequenceSet);
+                }
+            }
+        }
+
+        LOG.info("Checking if Index has the expected number of entries.");
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
+        }
+
+        LOG.info("Index has the expected number of entries.");
+
+        assertEquals(expectedListEntries, test.size());
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+            LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+            assertEquals(expectedListEntries - (i + 1), test.size());
+        }
+    }
+
+    public void testListLargeDataAddWithReverseRemove() throws Exception {
+
+        final int NUM_ITERATIONS = 100;
+
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        ListIndex<String, SequenceSet> test = new ListIndex<String, SequenceSet>(pf, id);
+        test.setKeyMarshaller(StringMarshaller.INSTANCE);
+        test.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        int expectedListEntries = 0;
+        int nextSequenceId = 0;
+
+        LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entries and sparsely populating the sequences.");
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            test.add(tx, String.valueOf(expectedListEntries++), new SequenceSet());
+
+            for (int j = 0; j < expectedListEntries; j++) {
+
+                SequenceSet sequenceSet = test.get(tx, String.valueOf(j));
+
+                int startSequenceId = nextSequenceId;
+                for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
+                    sequenceSet.add(nextSequenceId++);
+                    test.put(tx, String.valueOf(j), sequenceSet);
+                }
+
+                sequenceSet = test.get(tx, String.valueOf(j));
+
+                for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
+                    sequenceSet.remove(startSequenceId++);
+                    test.put(tx, String.valueOf(j), sequenceSet);
+                }
+            }
+        }
+
+        LOG.info("Checking if Index has the expected number of entries.");
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
+        }
+
+        LOG.info("Index has the expected number of entries.");
+
+        assertEquals(expectedListEntries, test.size());
+
+        for (int i = NUM_ITERATIONS - 1; i >= 0; --i) {
+            LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+            LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+            assertEquals(--expectedListEntries, test.size());
+        }
+    }
+
+    public void x_testSplitPerformance() throws Exception {
+
+        final int NUM_ITERATIONS = 200;
+        final int RANGE = 200000;
+
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        ListIndex<String, SequenceSet> test = new ListIndex<String, SequenceSet>(pf, id);
+        test.setKeyMarshaller(StringMarshaller.INSTANCE);
+        test.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            Sequence sequence = new Sequence(0);
+            sequence.setLast(RANGE);
+            SequenceSet sequenceSet  = new SequenceSet();
+            sequenceSet.add(sequence);
+            test.add(tx, String.valueOf(i), sequenceSet);
+        }
+
+        long start = System.currentTimeMillis();
+
+        // overflow the value in the last sequence
+        SequenceSet sequenceSet = test.get(tx, String.valueOf(NUM_ITERATIONS - 10));
+        for (int i=0; i<RANGE; i+=2) {
+            sequenceSet.remove(i);
+            test.put(tx, String.valueOf(NUM_ITERATIONS -1), sequenceSet);
+        }
+        LOG.info("duration: " + (System.currentTimeMillis() - start));
+    }
+
+    public void testListLargeDataAddAndNonSequentialRemove() throws Exception {
+
+        final int NUM_ITERATIONS = 100;
+
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        ListIndex<String, SequenceSet> test = new ListIndex<String, SequenceSet>(pf, id);
+        test.setKeyMarshaller(StringMarshaller.INSTANCE);
+        test.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        int expectedListEntries = 0;
+        int nextSequenceId = 0;
+
+        LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entires and sparsely populating the sequences.");
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            test.add(tx, String.valueOf(expectedListEntries++), new SequenceSet());
+
+            for (int j = 0; j < expectedListEntries; j++) {
+
+                SequenceSet sequenceSet = test.get(tx, String.valueOf(j));
+
+                int startSequenceId = nextSequenceId;
+                for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
+                    sequenceSet.add(nextSequenceId++);
+                    test.put(tx, String.valueOf(j), sequenceSet);
+                }
+
+                sequenceSet = test.get(tx, String.valueOf(j));
+
+                for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
+                    sequenceSet.remove(startSequenceId++);
+                    test.put(tx, String.valueOf(j), sequenceSet);
+                }
+            }
+        }
+
+        LOG.info("Checking if Index has the expected number of entries.");
+
+        for (int i = 0; i < NUM_ITERATIONS; ++i) {
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
+        }
+
+        LOG.info("Index has the expected number of entries.");
+
+        assertEquals(expectedListEntries, test.size());
+
+        for (int i = 0; i < NUM_ITERATIONS; i += 2) {
+            LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+            LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+            assertEquals(--expectedListEntries, test.size());
+        }
+
+        for (int i = NUM_ITERATIONS - 1; i > 0; i -= 2) {
+            LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+            assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+            assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+            LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+            assertEquals(--expectedListEntries, test.size());
+        }
+
+        assertEquals(0, test.size());
+    }
+
+    static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
+        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
+
+        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oout = new ObjectOutputStream(baos);
+            oout.writeObject(object);
+            oout.flush();
+            oout.close();
+            byte[] data = baos.toByteArray();
+            dataOut.writeInt(data.length);
+            dataOut.write(data);
+        }
+
+        @SuppressWarnings("unchecked")
+        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
+            int dataLen = dataIn.readInt();
+            byte[] data = new byte[dataLen];
+            dataIn.readFully(data);
+            ByteArrayInputStream bais = new ByteArrayInputStream(data);
+            ObjectInputStream oin = new ObjectInputStream(bais);
+            try {
+                return (HashSet<String>) oin.readObject();
+            } catch (ClassNotFoundException cfe) {
+                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
+                ioe.initCause(cfe);
+                throw ioe;
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+
+public class JournalTest extends TestCase {
+    protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
+
+    Journal dataManager;
+    File dir;
+
+    @Override
+    public void setUp() throws Exception {
+        dir = new File("target/tests/DataFileAppenderTest");
+        dir.mkdirs();
+        dataManager = new Journal();
+        dataManager.setDirectory(dir);
+        configure(dataManager);
+        dataManager.start();
+    }
+
+    protected void configure(Journal dataManager) {
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        dataManager.close();
+        IOHelper.delete(dir);
+    }
+
+    public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception {
+        final int iterations = 10;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        for (int i=0; i < iterations; i++) {
+            dataManager.write(data, new Runnable() {
+                public void run() {
+                    latch.countDown();
+                }
+            });
+        }
+        // at this point most probably dataManager.getInflightWrites().size() >= 0
+        // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+        assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS));
+    }
+
+    public void testBatchWriteCallbackCompleteAfterClose() throws Exception {
+        final int iterations = 10;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, new Runnable() {
+                public void run() {
+                    latch.countDown();
+                }
+            });
+        }
+        dataManager.close();
+        assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+        assertEquals("none written", 0, latch.getCount());
+    }
+
+    public void testBatchWriteCompleteAfterClose() throws Exception {
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        final int iterations = 10;
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, false);
+        }
+        dataManager.close();
+        assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty());
+    }
+
+    public void testBatchWriteToMaxMessageSize() throws Exception {
+        final int iterations = 4;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        Runnable done = new Runnable() {
+            public void run() {
+                latch.countDown();
+            }
+        };
+        int messageSize = DEFAULT_MAX_BATCH_SIZE / iterations;
+        byte[] message = new byte[messageSize];
+        ByteSequence data = new ByteSequence(message);
+
+        for (int i=0; i< iterations; i++) {
+            dataManager.write(data, done);
+        }
+
+        // write may take some time
+        assertTrue("all callbacks complete", latch.await(10, TimeUnit.SECONDS));
+    }
+
+    public void testNoBatchWriteWithSync() throws Exception {
+        ByteSequence data = new ByteSequence("DATA".getBytes());
+        final int iterations = 10;
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, true);
+            assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+        }
+    }
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.page;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+
+import junit.framework.TestCase;
+
+@SuppressWarnings("rawtypes")
+public class PageFileTest extends TestCase {
+
+    public void testCRUD() throws IOException {
+
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.load();
+
+        HashSet<String> expected = new HashSet<String>();
+
+        // Insert some data into the page file.
+        Transaction tx = pf.tx();
+        for (int i = 0; i < 100; i++) {
+            Page<String> page = tx.allocate();
+            assertEquals(Page.PAGE_FREE_TYPE, page.getType());
+
+            String t = "page:" + i;
+            expected.add(t);
+            page.set(t);
+            tx.store(page, StringMarshaller.INSTANCE, false);
+            tx.commit();
+        }
+
+        // Reload it...
+        pf.unload();
+        pf.load();
+        tx = pf.tx();
+
+        // Iterate it to make sure they are still there..
+        HashSet<String> actual = new HashSet<String>();
+        for (Page<String> page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
+            actual.add(page.get());
+        }
+        assertEquals(expected, actual);
+
+        // Remove the odd records..
+        for (int i = 0; i < 100; i++) {
+            if (i % 2 == 0) {
+                break;
+            }
+            String t = "page:" + i;
+            expected.remove(t);
+        }
+        for (Page<String> page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
+            if (!expected.contains(page.get())) {
+                tx.free(page);
+            }
+        }
+        tx.commit();
+
+        // Reload it...
+        pf.unload();
+        pf.load();
+        tx = pf.tx();
+
+        // Iterate it to make sure the even records are still there..
+        actual.clear();
+        for (Page<String> page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
+            actual.add((String)page.get());
+        }
+        assertEquals(expected, actual);
+
+        // Update the records...
+        HashSet<String> t = expected;
+        expected = new HashSet<String>();
+        for (String s : t) {
+            expected.add(s + ":updated");
+        }
+        for (Page<String> page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
+            page.set(page.get() + ":updated");
+            tx.store(page, StringMarshaller.INSTANCE, false);
+        }
+        tx.commit();
+
+        // Reload it...
+        pf.unload();
+        pf.load();
+        tx = pf.tx();
+
+        // Iterate it to make sure the updated records are still there..
+        actual.clear();
+        for (Page<String> page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
+            actual.add(page.get());
+        }
+        assertEquals(expected, actual);
+
+        pf.unload();
+    }
+
+    public void testStreams() throws IOException {
+
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.load();
+
+        Transaction tx = pf.tx();
+        Page page = tx.allocate();
+        tx.commit();
+
+        OutputStream pos = tx.openOutputStream(page, true);
+        DataOutputStream os = new DataOutputStream(pos);
+        for (int i = 0; i < 10000; i++) {
+            os.writeUTF("Test string:" + i);
+        }
+
+        os.close();
+        tx.commit();
+
+        // Reload the page file.
+        pf.unload();
+        pf.load();
+        tx = pf.tx();
+
+        InputStream pis = tx.openInputStream(page);
+        DataInputStream is = new DataInputStream(pis);
+        for (int i = 0; i < 10000; i++) {
+            assertEquals("Test string:" + i, is.readUTF());
+        }
+        assertEquals(-1, is.read());
+        is.close();
+
+        pf.unload();
+    }
+
+    public void testAddRollback() throws IOException {
+
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.load();
+
+        HashSet<String> expected = new HashSet<String>();
+
+        // Insert some data into the page file.
+        Transaction tx = pf.tx();
+        for (int i = 0; i < 100; i++) {
+            Page<String> page = tx.allocate();
+            assertEquals(Page.PAGE_FREE_TYPE, page.getType());
+
+            String t = "page:" + i;
+            page.set(t);
+            tx.store(page, StringMarshaller.INSTANCE, false);
+
+            // Rollback every other insert.
+            if (i % 2 == 0) {
+                expected.add(t);
+                tx.commit();
+            } else {
+                tx.rollback();
+            }
+
+        }
+
+        // Reload it...
+        pf.unload();
+        pf.load();
+        tx = pf.tx();
+
+        // Iterate it to make sure they are still there..
+        HashSet<String> actual = new HashSet<String>();
+        for (Page<String> page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
+            actual.add(page.get());
+        }
+        assertEquals(expected, actual);
+    }
+}

Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
------------------------------------------------------------------------------
    svn:eol-style = native