You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/15 00:31:35 UTC
svn commit: r1409554 [1/2] - in /activemq/trunk:
activemq-core/src/test/java/org/apache/activemq/store/kahadb/disk/
activemq-kahadb-store/ activemq-kahadb-store/src/test/
activemq-kahadb-store/src/test/java/
activemq-kahadb-store/src/test/java/org/ act...
Author: tabish
Date: Wed Nov 14 23:31:34 2012
New Revision: 1409554
URL: http://svn.apache.org/viewvc?rev=1409554&view=rev
Log:
Move some of the KahaDB tests from core into the KahaDB store module
Added:
activemq/trunk/activemq-kahadb-store/src/test/
activemq/trunk/activemq-kahadb-store/src/test/java/
activemq/trunk/activemq-kahadb-store/src/test/java/org/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/
activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/util/SequenceSetTest.java (with props)
activemq/trunk/activemq-kahadb-store/src/test/resources/
activemq/trunk/activemq-kahadb-store/src/test/resources/log4j.properties (with props)
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/disk/
Modified:
activemq/trunk/activemq-kahadb-store/pom.xml
Modified: activemq/trunk/activemq-kahadb-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/pom.xml?rev=1409554&r1=1409553&r2=1409554&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/pom.xml (original)
+++ activemq/trunk/activemq-kahadb-store/pom.xml Wed Nov 14 23:31:34 2012
@@ -30,11 +30,8 @@
<name>ActiveMQ :: KahaDB Store</name>
<description>The ActiveMQ KahaDB Store Implementation</description>
- <properties>
- </properties>
-
<dependencies>
-
+
<!-- =============================== -->
<!-- Required Dependencies -->
<!-- =============================== -->
@@ -42,7 +39,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
</dependency>
-
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activeio-core</artifactId>
@@ -53,10 +49,6 @@
<artifactId>activemq-protobuf</artifactId>
<optional>false</optional>
</dependency>
- <dependency>
- <groupId>org.fusesource.mqtt-client</groupId>
- <artifactId>mqtt-client</artifactId>
- </dependency>
<!-- =============================== -->
<!-- Optional Dependencies -->
@@ -128,13 +120,6 @@
<artifactId>commons-net</artifactId>
</dependency>
- <!-- not really a dependency at all - just added optionally to get the generator working -->
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-openwire-generator</artifactId>
- <optional>true</optional>
- </dependency>
-
<!-- =============================== -->
<!-- Testing Dependencies -->
<!-- =============================== -->
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+
+public class BTreeIndexBenchMark extends IndexBenchmark {
+
+ private NumberFormat nf;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ nf = NumberFormat.getIntegerInstance();
+ nf.setMinimumIntegerDigits(10);
+ nf.setGroupingUsed(false);
+ }
+
+ @Override
+ protected Index<String, Long> createIndex() throws Exception {
+
+ Transaction tx = pf.tx();
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ BTreeIndex<String, Long> index = new BTreeIndex<String, Long>(pf, id);
+ index.setKeyMarshaller(StringMarshaller.INSTANCE);
+ index.setValueMarshaller(LongMarshaller.INSTANCE);
+
+ return index;
+ }
+
+ @Override
+ protected void dumpIndex(Index<String, Long> index) throws IOException {
+ Transaction tx = pf.tx();
+ ((BTreeIndex)index).printStructure(tx, System.out);
+ }
+
+ /**
+ * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+ * always insert to the end of the BTree.
+ */
+ @Override
+ protected String key(long i) {
+ return "a-long-message-id-like-key:"+nf.format(i);
+ }
+
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexBenchMark.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,582 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
+import java.text.NumberFormat;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BTreeIndexTest extends IndexTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(BTreeIndexTest.class);
+ private NumberFormat nf;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ nf = NumberFormat.getIntegerInstance();
+ nf.setMinimumIntegerDigits(6);
+ nf.setGroupingUsed(false);
+ }
+
+ @Override
+ protected Index<String, Long> createIndex() throws Exception {
+
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ BTreeIndex<String, Long> index = new BTreeIndex<String,Long>(pf, id);
+ index.setKeyMarshaller(StringMarshaller.INSTANCE);
+ index.setValueMarshaller(LongMarshaller.INSTANCE);
+
+ return index;
+ }
+
+ /**
+ * Yeah, the current implementation does NOT try to balance the tree. Here is
+ * a test case showing that it gets out of balance.
+ *
+ * @throws Exception
+ */
+ public void disabled_testTreeBalancing() throws Exception {
+ createPageFileAndIndex(100);
+
+ BTreeIndex index = ((BTreeIndex)this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ doInsert(50);
+
+ int minLeafDepth = index.getMinLeafDepth(tx);
+ int maxLeafDepth = index.getMaxLeafDepth(tx);
+ assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+ // Remove some of the data
+ doRemove(16);
+ minLeafDepth = index.getMinLeafDepth(tx);
+ maxLeafDepth = index.getMaxLeafDepth(tx);
+
+ System.out.println( "min:"+minLeafDepth );
+ System.out.println( "max:"+maxLeafDepth );
+ index.printStructure(tx, new PrintWriter(System.out));
+
+ assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+ this.index.unload(tx);
+ }
+
+ public void testPruning() throws Exception {
+ createPageFileAndIndex(100);
+
+ BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+
+ this.index.load(tx);
+ tx.commit();
+
+ int minLeafDepth = index.getMinLeafDepth(tx);
+ int maxLeafDepth = index.getMaxLeafDepth(tx);
+ assertEquals(1, minLeafDepth);
+ assertEquals(1, maxLeafDepth);
+
+ doInsert(1000);
+
+ minLeafDepth = index.getMinLeafDepth(tx);
+ maxLeafDepth = index.getMaxLeafDepth(tx);
+ assertTrue("Depth of tree grew", minLeafDepth > 1);
+ assertTrue("Depth of tree grew", maxLeafDepth > 1);
+
+ // Remove the data.
+ doRemove(1000);
+ minLeafDepth = index.getMinLeafDepth(tx);
+ maxLeafDepth = index.getMaxLeafDepth(tx);
+
+ assertEquals(1, minLeafDepth);
+ assertEquals(1, maxLeafDepth);
+
+ this.index.unload(tx);
+ tx.commit();
+ }
+
+ public void testIteration() throws Exception {
+ createPageFileAndIndex(500);
+ BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ // Insert in reverse order..
+ doInsertReverse(1000);
+
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
+
+ exerciseAnotherIndex(tx);
+
+ // BTree should iterate it in sorted order.
+ int counter=0;
+ for (Iterator<Map.Entry<String,Long>> i = index.iterator(tx); i.hasNext();) {
+ Map.Entry<String,Long> entry = (Map.Entry<String,Long>)i.next();
+ assertEquals(key(counter),entry.getKey());
+ assertEquals(counter,(long)entry.getValue());
+ counter++;
+ }
+
+ this.index.unload(tx);
+ tx.commit();
+ }
+
+
+ public void testVisitor() throws Exception {
+ createPageFileAndIndex(100);
+ BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ // Insert in reverse order..
+ doInsert(1000);
+
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
+
+ // BTree should iterate it in sorted order.
+
+ index.visit(tx, new BTreeVisitor<String, Long>(){
+ public boolean isInterestedInKeysBetween(String first, String second) {
+ return true;
+ }
+ public void visit(List<String> keys, List<Long> values) {
+ }
+ });
+
+
+ this.index.unload(tx);
+ tx.commit();
+ }
+
+
+ public void testRandomRemove() throws Exception {
+
+ createPageFileAndIndex(100);
+ BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+ this.index.load(tx);
+
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ BTreeIndex<String, String> sindex = new BTreeIndex<String,String>(pf, id);
+ sindex.setKeyMarshaller(StringMarshaller.INSTANCE);
+ sindex.setValueMarshaller(StringMarshaller.INSTANCE);
+ sindex.load(tx);
+
+ tx.commit();
+
+ final int count = 5000;
+
+ String payload = new String(new byte[2]);
+ for (int i = 0; i < count; i++) {
+ index.put(tx, key(i), (long)i);
+ sindex.put(tx, key(i), String.valueOf(i) + payload);
+ tx.commit();
+ }
+
+
+ Random rand = new Random(System.currentTimeMillis());
+ int i = 0, prev = 0;
+ while (!index.isEmpty(tx) || !sindex.isEmpty(tx)) {
+ prev = i;
+ i = rand.nextInt(count);
+ try {
+ index.remove(tx, key(i));
+ sindex.remove(tx, key(i));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
+ }
+ }
+ }
+
+ public void testRandomAddRemove() throws Exception {
+
+ createPageFileAndIndex(1024);
+ BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+ this.index.load(tx);
+
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ BTreeIndex<String, String> sindex = new BTreeIndex<String,String>(pf, id);
+ sindex.setKeyMarshaller(StringMarshaller.INSTANCE);
+ sindex.setValueMarshaller(StringMarshaller.INSTANCE);
+ sindex.load(tx);
+
+ tx.commit();
+
+ Random rand = new Random(System.currentTimeMillis());
+ final int count = 50000;
+
+ String payload = new String(new byte[200]);
+ for (int i = 0; i < count; i++) {
+ int insertIndex = rand.nextInt(count);
+ index.put(tx, key(insertIndex), (long)insertIndex);
+ sindex.put(tx, key(insertIndex), String.valueOf(insertIndex) + payload);
+ tx.commit();
+ }
+
+
+ int i = 0, prev = 0;
+ while (!index.isEmpty(tx) || !sindex.isEmpty(tx)) {
+ prev = i;
+ i = rand.nextInt(count);
+ try {
+ index.remove(tx, key(i));
+ sindex.remove(tx, key(i));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
+ }
+ }
+ }
+
+ public void testRemovePattern() throws Exception {
+ createPageFileAndIndex(100);
+ BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ final int count = 4000;
+ doInsert(count);
+
+ index.remove(tx, key(3697));
+ index.remove(tx, key(1566));
+
+ tx.commit();
+ index.clear(tx);
+ tx.commit();
+
+ doInsert(count);
+
+ Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx, key(1345));
+ while (iterator.hasNext()) {
+ Map.Entry<String, Long> val = iterator.next();
+ }
+
+ doRemoveBackwards(666);
+ Map.Entry<String, Long> first = index.getFirst(tx);
+ assertEquals(first.getValue(), Long.valueOf(666L));
+
+ for (int i=0; i<2000; i++) {
+ Map.Entry<String, Long> last = index.getLast(tx);
+ index.remove(tx, last.getKey());
+ tx.commit();
+ }
+
+ exerciseAnotherIndex(tx);
+
+ iterator = index.iterator(tx, key(100));
+ while (iterator.hasNext()) {
+ Map.Entry<String, Long> val = iterator.next();
+ }
+
+ Map.Entry<String, Long> last = index.getLast(tx);
+ assertEquals(last.getValue(), Long.valueOf(1999L));
+ index.clear(tx);
+ assertNull(index.getLast(tx));
+ }
+
+ public void testLargeValue() throws Exception {
+ //System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024));
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ //pf.setEnablePageCaching(false);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ BTreeIndex<Long, HashSet<String>> test = new BTreeIndex<Long, HashSet<String>>(pf, id);
+ test.setKeyMarshaller(LongMarshaller.INSTANCE);
+ test.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ tx = pf.tx();
+ String val = new String(new byte[1024]);
+ final long numMessages = 10;
+ final int numConsumers = 200;
+
+ for (long i=0; i<numMessages; i++) {
+ HashSet<String> hs = new HashSet<String>();
+ for (int j=0; j<numConsumers;j++) {
+ hs.add(val + "SOME TEXT" + j);
+ }
+ test.put(tx, i, hs);
+ }
+ tx.commit();
+ tx = pf.tx();
+ for (long i=0; i<numMessages; i++) {
+ HashSet<String> hs = new HashSet<String>();
+ for (int j=numConsumers; j<numConsumers*2;j++) {
+ hs.add(val + "SOME TEXT" + j);
+ }
+ test.put(tx, i, hs);
+ }
+
+ tx.commit();
+ tx = pf.tx();
+ for (long i=0; i<numMessages; i++) {
+ assertTrue(test.containsKey(tx, i));
+ test.get(tx, i);
+ }
+ tx.commit();
+ }
+
+ public void testLargeValueOverflow() throws Exception {
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.setWriteBatchSize(1);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ BTreeIndex<Long, String> test = new BTreeIndex<Long, String>(pf, id);
+ test.setKeyMarshaller(LongMarshaller.INSTANCE);
+ test.setValueMarshaller(StringMarshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ final int stringSize = 6*1024;
+ tx = pf.tx();
+ String val = new String(new byte[stringSize]);
+ final long numMessages = 1;
+
+ for (long i=0; i<numMessages; i++) {
+ test.put(tx, i, val);
+ }
+ tx.commit();
+
+ exerciseAnotherIndex(tx);
+
+ tx = pf.tx();
+ for (long i=0; i<numMessages; i++) {
+ assertTrue(test.containsKey(tx, i));
+ String s = test.get(tx, i);
+ assertEquals("len is as expected", stringSize, s.length());
+ }
+ tx.commit();
+ }
+
+ public void exerciseAnotherIndex(Transaction tx) throws Exception {
+ long id = tx.allocate().getPageId();
+
+ ListIndex<String, String> test = new ListIndex<String, String>(pf, id);
+ test.setKeyMarshaller(StringMarshaller.INSTANCE);
+ test.setValueMarshaller(StringMarshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ final int count = 10000;
+
+ String payload = new String(new byte[1]);
+ for (int i = 0; i < count; i++) {
+ test.put(tx, key(i), String.valueOf(i) + payload);
+ }
+ tx.commit();
+
+ test.clear(tx);
+ tx.commit();
+ }
+
+ public void testIndexRepeatFillClearIncrementingPageReuse() throws Exception {
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.load();
+
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ BTreeIndex<Long, String> test = new BTreeIndex<Long, String>(pf, id);
+ test.setKeyMarshaller(LongMarshaller.INSTANCE);
+ test.setValueMarshaller(StringMarshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ final int count = 5000;
+ final int reps = 2;
+ final long[] diffs = new long[reps];
+ long keyVal = 0;
+ final String payload = new String(new byte[50]);
+
+ LOG.info("PF diff:" + (pf.getPageCount() - pf.getFreePageCount()) + " pc:" + pf.getPageCount() + " f:" + pf.getFreePageCount() );
+
+ for (int i=0; i<reps; i++) {
+
+ for (int j = 0; j < count; j++) {
+ tx = pf.tx();
+ test.put(tx, keyVal++, payload);
+ tx.commit();
+ }
+
+ tx = pf.tx();
+ for (long k = keyVal - count; k < keyVal; k++) {
+ test.remove(tx, k);
+ }
+ test.clear(tx);
+ tx.commit();
+ diffs[i] = pf.getPageCount() - pf.getFreePageCount();
+
+ LOG.info("PF diff:" + (pf.getPageCount() - pf.getFreePageCount()) + " pc:" + pf.getPageCount() + " f:" + pf.getFreePageCount());
+ }
+ for (int i=1; i<diffs.length; i++) {
+ assertEquals("diff is constant:" + Arrays.toString(diffs), diffs[0],diffs[i]);
+ }
+ }
+
+ public void testListIndexConsistancyOverTime() throws Exception {
+
+ final int NUM_ITERATIONS = 50;
+
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ //pf.setEnablePageCaching(false);
+ pf.setWriteBatchSize(1);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ ListIndex<String, String> test = new ListIndex<String, String>(pf, id);
+ test.setKeyMarshaller(StringMarshaller.INSTANCE);
+ test.setValueMarshaller(StringMarshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ int expectedListEntries = 0;
+ int nextSequenceId = 0;
+
+ LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entires and sparsely populating the sequences.");
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ test.add(tx, String.valueOf(expectedListEntries++), new String("AA"));
+
+ for (int j = 0; j < expectedListEntries; j++) {
+
+ String sequenceSet = test.get(tx, String.valueOf(j));
+
+ int startSequenceId = nextSequenceId;
+ for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
+ sequenceSet.concat(String.valueOf(nextSequenceId++));
+ test.put(tx, String.valueOf(j), sequenceSet);
+ }
+
+ sequenceSet = test.get(tx, String.valueOf(j));
+
+ for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
+ //sequenceSet.remove(startSequenceId++);
+ test.put(tx, String.valueOf(j), String.valueOf(j));
+ }
+ }
+ }
+
+ exerciseAnotherIndex(tx);
+ LOG.info("Checking if Index has the expected number of entries.");
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
+ }
+
+ LOG.info("Index has the expected number of entries.");
+
+ assertEquals(expectedListEntries, test.size());
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+ LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+ assertEquals(expectedListEntries - (i + 1), test.size());
+ }
+ }
+
+ void doInsertReverse(int count) throws Exception {
+ for (int i = count-1; i >= 0; i--) {
+ index.put(tx, key(i), (long)i);
+ tx.commit();
+ }
+ }
+ /**
+ * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+ * always insert to the end of the BTree.
+ */
+ @Override
+ protected String key(int i) {
+ return "key:"+nf.format(i);
+ }
+
+ static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
+ final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
+
+ public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oout = new ObjectOutputStream(baos);
+ oout.writeObject(object);
+ oout.flush();
+ oout.close();
+ byte[] data = baos.toByteArray();
+ dataOut.writeInt(data.length);
+ dataOut.write(data);
+ }
+
+ public HashSet<String> readPayload(DataInput dataIn) throws IOException {
+ int dataLen = dataIn.readInt();
+ byte[] data = new byte[dataLen];
+ dataIn.readFully(data);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ ObjectInputStream oin = new ObjectInputStream(bais);
+ try {
+ return (HashSet<String>) oin.readObject();
+ } catch (ClassNotFoundException cfe) {
+ IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
+ ioe.initCause(cfe);
+ throw ioe;
+ }
+ }
+ }
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+
+public class HashIndexBenchMark extends IndexBenchmark {
+
+ @Override
+ protected Index<String, Long> createIndex() throws Exception {
+
+ Transaction tx = pf.tx();
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ HashIndex<String, Long> index = new HashIndex<String, Long>(pf, id);
+ index.setKeyMarshaller(StringMarshaller.INSTANCE);
+ index.setValueMarshaller(LongMarshaller.INSTANCE);
+
+ return index;
+ }
+
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexBenchMark.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+
+public class HashIndexTest extends IndexTestSupport {
+
+ @Override
+ protected Index<String, Long> createIndex() throws Exception {
+
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ HashIndex<String, Long> index = new HashIndex<String,Long>(pf, id);
+ index.setBinCapacity(12);
+ index.setKeyMarshaller(StringMarshaller.INSTANCE);
+ index.setValueMarshaller(LongMarshaller.INSTANCE);
+
+ return index;
+ }
+
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/HashIndexTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.util.IOHelper;
+
+/**
+ * @author chirino
+ */
+public abstract class IndexBenchmark extends TestCase {
+
+ // Slower machines might need to make this bigger.
+ private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 5));
+ // How many times do we sample?
+ private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "" + 60 * 1000 / SAMPLE_DURATION));
+ // How many indexes will we be benchmarking concurrently?
+ private static final int INDEX_COUNT = Integer.parseInt(System.getProperty("INDEX_COUNT", "" + 1));
+ // Indexes tend to perform worse when they get big.. so how many items
+ // should we put into the index before we start sampling.
+ private static final int INDEX_PRE_LOAD_COUNT = Integer.parseInt(System.getProperty("INDEX_PRE_LOAD_COUNT", "" + 10000 / INDEX_COUNT));
+
+ protected File ROOT_DIR;
+ protected final HashMap<String, Index<String, Long>> indexes = new HashMap<String, Index<String, Long>>();
+ protected PageFile pf;
+
+ public void setUp() throws Exception {
+ ROOT_DIR = new File(IOHelper.getDefaultDataDirectory());
+ IOHelper.delete(ROOT_DIR);
+
+ pf = new PageFile(ROOT_DIR, getClass().getName());
+ pf.load();
+ }
+
+ protected void tearDown() throws Exception {
+ Transaction tx = pf.tx();
+ for (Index<?, ?> i : indexes.values()) {
+ try {
+ i.unload(tx);
+ } catch (Throwable ignore) {
+ }
+ }
+ tx.commit();
+ }
+
+ abstract protected Index<String, Long> createIndex() throws Exception;
+
+ synchronized private Index<String, Long> openIndex(String name) throws Exception {
+ Transaction tx = pf.tx();
+ Index<String, Long> index = indexes.get(name);
+ if (index == null) {
+ index = createIndex();
+ index.load(tx);
+ indexes.put(name, index);
+ }
+ tx.commit();
+ return index;
+ }
+
+ class Producer extends Thread {
+ private final String name;
+ AtomicBoolean shutdown = new AtomicBoolean();
+
+ public Producer(String name) {
+ super("Producer: " + name);
+ this.name = name;
+ }
+
+ public void shutdown() {
+ shutdown.set(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ Transaction tx = pf.tx();
+
+ Index<String,Long> index = openIndex(name);
+ long counter = 0;
+ while (!shutdown.get()) {
+ long c = counter;
+
+ String key = key(c);
+ index.put(tx, key, c);
+ tx.commit();
+ Thread.yield(); // This avoids consumer starvation..
+
+ onProduced(counter++);
+ }
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void onProduced(long counter) {
+ }
+ }
+
+ protected String key(long c) {
+ return "a-long-message-id-like-key-" + c;
+ }
+
+
+ class Consumer extends Thread {
+ private final String name;
+ AtomicBoolean shutdown = new AtomicBoolean();
+
+ public Consumer(String name) {
+ super("Consumer: " + name);
+ this.name = name;
+ }
+
+ public void shutdown() {
+ shutdown.set(true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ Transaction tx = pf.tx();
+
+ Index<String,Long> index = openIndex(name);
+ long counter = 0;
+ while (!shutdown.get()) {
+ long c = counter;
+ String key = key(c);
+
+ Long record = index.get(tx, key);
+ if (record != null) {
+ if( index.remove(tx, key) == null ) {
+ System.out.print("Remove failed...");
+ }
+ tx.commit();
+ onConsumed(counter++);
+ }
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void onConsumed(long counter) {
+ }
+ }
+
+ protected void dumpIndex(Index<String, Long> index) throws IOException {
+ }
+
+ public void testLoad() throws Exception {
+
+ final Producer producers[] = new Producer[INDEX_COUNT];
+ final Consumer consumers[] = new Consumer[INDEX_COUNT];
+ final CountDownLatch preloadCountDown = new CountDownLatch(INDEX_COUNT);
+ final AtomicLong producedRecords = new AtomicLong();
+ final AtomicLong consumedRecords = new AtomicLong();
+
+ System.out.println("Starting: " + INDEX_COUNT + " producers");
+ for (int i = 0; i < INDEX_COUNT; i++) {
+ producers[i] = new Producer("test-" + i) {
+ private boolean prelaodDone;
+
+ public void onProduced(long counter) {
+ if (!prelaodDone && counter >= INDEX_PRE_LOAD_COUNT) {
+ prelaodDone = true;
+ preloadCountDown.countDown();
+ }
+ producedRecords.incrementAndGet();
+ }
+ };
+ producers[i].start();
+ }
+
+ long start = System.currentTimeMillis();
+ System.out.println("Waiting for each producer create " + INDEX_PRE_LOAD_COUNT + " records before starting the consumers.");
+ preloadCountDown.await();
+ long end = System.currentTimeMillis();
+ System.out.println("Preloaded " + INDEX_PRE_LOAD_COUNT * INDEX_COUNT + " records at " + (INDEX_PRE_LOAD_COUNT * INDEX_COUNT * 1000f / (end - start)) + " records/sec");
+
+ System.out.println("Starting: " + INDEX_COUNT + " consumers");
+ for (int i = 0; i < INDEX_COUNT; i++) {
+ consumers[i] = new Consumer("test-" + i) {
+ public void onConsumed(long counter) {
+ consumedRecords.incrementAndGet();
+ }
+ };
+ consumers[i].start();
+ }
+
+ long sample_start = System.currentTimeMillis();
+ System.out.println("Taking " + SAMPLES + " performance samples every " + SAMPLE_DURATION + " ms");
+ System.out.println("time (s), produced, produce rate (r/s), consumed, consume rate (r/s), used memory (k)");
+ producedRecords.set(0);
+ consumedRecords.set(0);
+ for (int i = 0; i < SAMPLES; i++) {
+ start = System.currentTimeMillis();
+ Thread.sleep(SAMPLE_DURATION);
+ end = System.currentTimeMillis();
+ long p = producedRecords.getAndSet(0);
+ long c = consumedRecords.getAndSet(0);
+
+ long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+
+ System.out.println(((end-sample_start)/1000f)+", "+p+", "+(p * 1000f / (end - start)) + ", "+ c+", " + (c * 1000f / (end - start))+", "+(usedMemory/(1024)) );
+ }
+ System.out.println("Samples done... Shutting down the producers and consumers...");
+ for (int i = 0; i < INDEX_COUNT; i++) {
+ producers[i].shutdown();
+ consumers[i].shutdown();
+ }
+ for (int i = 0; i < INDEX_COUNT; i++) {
+ producers[i].join(1000 * 5);
+ consumers[i].join(1000 * 5);
+ }
+ System.out.println("Shutdown.");
+ }
+
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexBenchmark.java
------------------------------------------------------------------------------
svn:executable = *
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.util.IOHelper;
+
+/**
+ * Test a HashIndex
+ */
+public abstract class IndexTestSupport extends TestCase {
+
+ private static final int COUNT = 10000;
+
+ protected Index<String,Long> index;
+ protected File directory;
+ protected PageFile pf;
+ protected Transaction tx;
+
+ /**
+ * @throws java.lang.Exception
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ super.setUp();
+ directory = new File(IOHelper.getDefaultDataDirectory() + System.currentTimeMillis());
+ IOHelper.delete(directory);
+ }
+
+ protected void tearDown() throws Exception {
+ if( pf!=null ) {
+ pf.unload();
+ pf.delete();
+ }
+ if (directory != null) {
+ IOHelper.delete(directory);
+ }
+ }
+
+ protected void createPageFileAndIndex(int pageSize) throws Exception {
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(pageSize);
+ pf.load();
+ tx = pf.tx();
+ this.index = createIndex();
+ }
+
+ abstract protected Index<String, Long> createIndex() throws Exception;
+
+ public void testIndex() throws Exception {
+ createPageFileAndIndex(500);
+ this.index.load(tx);
+ tx.commit();
+ doInsert(COUNT);
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
+ checkRetrieve(COUNT);
+ doRemove(COUNT);
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
+ doInsert(COUNT);
+ doRemoveHalf(COUNT);
+ doInsertHalf(COUNT);
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
+ checkRetrieve(COUNT);
+ this.index.unload(tx);
+ tx.commit();
+ }
+
+ void doInsert(int count) throws Exception {
+ for (int i = 0; i < count; i++) {
+ index.put(tx, key(i), (long)i);
+ tx.commit();
+ }
+ }
+
+ protected String key(int i) {
+ return "key:"+i;
+ }
+
+ void checkRetrieve(int count) throws IOException {
+ for (int i = 0; i < count; i++) {
+ Long item = index.get(tx, key(i));
+ assertNotNull("Key missing: "+key(i), item);
+ }
+ }
+
+ void doRemoveHalf(int count) throws Exception {
+ for (int i = 0; i < count; i++) {
+ if (i % 2 == 0) {
+ assertNotNull("Expected remove to return value for index "+i, index.remove(tx, key(i)));
+ tx.commit();
+ }
+
+ }
+ }
+
+ void doInsertHalf(int count) throws Exception {
+ for (int i = 0; i < count; i++) {
+ if (i % 2 == 0) {
+ index.put(tx, key(i), (long)i);
+ tx.commit();
+ }
+ }
+ }
+
+ void doRemove(int count) throws Exception {
+ for (int i = 0; i < count; i++) {
+ assertNotNull("Expected remove to return value for index "+i, index.remove(tx, key(i)));
+ tx.commit();
+ }
+ for (int i = 0; i < count; i++) {
+ Long item = index.get(tx, key(i));
+ assertNull(item);
+ }
+ }
+
+ void doRemoveBackwards(int count) throws Exception {
+ for (int i = count - 1; i >= 0; i--) {
+ index.remove(tx, key(i));
+ tx.commit();
+ }
+ for (int i = 0; i < count; i++) {
+ Long item = index.get(tx, key(i));
+ assertNull(item);
+ }
+ }
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/IndexTestSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.index;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.text.NumberFormat;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.Sequence;
+import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ListIndexTest extends IndexTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(ListIndexTest.class);
+ private NumberFormat nf;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ nf = NumberFormat.getIntegerInstance();
+ nf.setMinimumIntegerDigits(6);
+ nf.setGroupingUsed(false);
+ }
+
+ @Override
+ protected Index<String, Long> createIndex() throws Exception {
+
+ long id = tx.allocate().getPageId();
+ tx.commit();
+
+ ListIndex<String, Long> index = new ListIndex<String, Long>(pf, id);
+ index.setKeyMarshaller(StringMarshaller.INSTANCE);
+ index.setValueMarshaller(LongMarshaller.INSTANCE);
+
+ return index;
+ }
+
+ public void testSize() throws Exception {
+ createPageFileAndIndex(100);
+
+ ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ int count = 30;
+ tx = pf.tx();
+ doInsert(count);
+ tx.commit();
+ assertEquals("correct size", count, listIndex.size());
+
+ tx = pf.tx();
+ Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
+ while (iterator.hasNext()) {
+ iterator.next();
+ iterator.remove();
+ assertEquals("correct size", --count, listIndex.size());
+ }
+ tx.commit();
+
+ count = 30;
+ tx = pf.tx();
+ doInsert(count);
+ tx.commit();
+ assertEquals("correct size", count, listIndex.size());
+
+ tx = pf.tx();
+ listIndex.clear(tx);
+ assertEquals("correct size", 0, listIndex.size());
+ tx.commit();
+ }
+
+ public void testPut() throws Exception {
+ createPageFileAndIndex(100);
+
+ ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ int count = 30;
+ tx = pf.tx();
+ doInsert(count);
+ tx.commit();
+ assertEquals("correct size", count, listIndex.size());
+
+ tx = pf.tx();
+ Long value = listIndex.get(tx, key(10));
+ assertNotNull(value);
+ listIndex.put(tx, key(10), Long.valueOf(1024));
+ tx.commit();
+
+ tx = pf.tx();
+ value = listIndex.get(tx, key(10));
+ assertEquals(1024L, value.longValue());
+ assertTrue(listIndex.size() == 30);
+ tx.commit();
+
+ tx = pf.tx();
+ value = listIndex.put(tx, key(31), Long.valueOf(2048));
+ assertNull(value);
+ assertTrue(listIndex.size() == 31);
+ tx.commit();
+ }
+
+ public void testAddFirst() throws Exception {
+ createPageFileAndIndex(100);
+
+ ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ tx = pf.tx();
+
+ // put is add last
+ doInsert(10);
+ listIndex.addFirst(tx, key(10), (long) 10);
+ listIndex.addFirst(tx, key(11), (long) 11);
+
+ tx.commit();
+
+ tx = pf.tx();
+ int counter = 11;
+ Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
+ assertEquals(key(counter), iterator.next().getKey());
+ counter--;
+ assertEquals(key(counter), iterator.next().getKey());
+ counter--;
+ int count = 0;
+ while (iterator.hasNext() && count < counter) {
+ Map.Entry<String, Long> entry = (Map.Entry<String, Long>) iterator.next();
+ assertEquals(key(count), entry.getKey());
+ assertEquals(count, (long) entry.getValue());
+ count++;
+ }
+ tx.commit();
+ }
+
+ public void testPruning() throws Exception {
+ createPageFileAndIndex(100);
+
+ ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+
+ this.index.load(tx);
+ tx.commit();
+
+ long pageCount = index.getPageFile().getPageCount();
+ assertEquals(1, pageCount);
+
+ long freePageCount = index.getPageFile().getFreePageCount();
+ assertEquals("No free pages", 0, freePageCount);
+
+ tx = pf.tx();
+ doInsert(20);
+ tx.commit();
+
+ pageCount = index.getPageFile().getPageCount();
+ LOG.info("page count: " + pageCount);
+ assertTrue("used some pages", pageCount > 1);
+
+ tx = pf.tx();
+ // Remove the data.
+ doRemove(20);
+
+ tx.commit();
+
+ freePageCount = index.getPageFile().getFreePageCount();
+
+ LOG.info("FreePage count: " + freePageCount);
+ assertTrue("Some free pages " + freePageCount, freePageCount > 0);
+
+
+ LOG.info("add some more to use up free list");
+ tx = pf.tx();
+ doInsert(20);
+ tx.commit();
+
+ freePageCount = index.getPageFile().getFreePageCount();
+
+ LOG.info("FreePage count: " + freePageCount);
+ assertEquals("no free pages " + freePageCount, 0, freePageCount);
+ assertEquals("Page count is static", pageCount, index.getPageFile().getPageCount());
+
+ this.index.unload(tx);
+ tx.commit();
+ }
+
+ public void testIterationAddFirst() throws Exception {
+ createPageFileAndIndex(100);
+ ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ tx = pf.tx();
+ final int entryCount = 200;
+ // Insert in reverse order..
+ doInsertReverse(entryCount);
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
+
+ int counter = 0;
+ for (Iterator<Map.Entry<String, Long>> i = index.iterator(tx); i.hasNext(); ) {
+ Map.Entry<String, Long> entry = (Map.Entry<String, Long>) i.next();
+ assertEquals(key(counter), entry.getKey());
+ assertEquals(counter, (long) entry.getValue());
+ counter++;
+ }
+ assertEquals("We iterated over all entries", entryCount, counter);
+
+ tx = pf.tx();
+ // Remove the data.
+ doRemove(entryCount);
+ tx.commit();
+
+ this.index.unload(tx);
+ tx.commit();
+ }
+
+
+ public void testIteration() throws Exception {
+ createPageFileAndIndex(100);
+ ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ // Insert in reverse order..
+ final int entryCount = 200;
+ doInsert(entryCount);
+
+ this.index.unload(tx);
+ tx.commit();
+ this.index.load(tx);
+ tx.commit();
+
+ int counter = 0;
+ for (Iterator<Map.Entry<String, Long>> i = index.iterator(tx); i.hasNext(); ) {
+ Map.Entry<String, Long> entry = (Map.Entry<String, Long>) i.next();
+ assertEquals(key(counter), entry.getKey());
+ assertEquals(counter, (long) entry.getValue());
+ counter++;
+ }
+ assertEquals("We iterated over all entries", entryCount, counter);
+
+ this.index.unload(tx);
+ tx.commit();
+ }
+
+ public void testRandomRemove() throws Exception {
+
+ createPageFileAndIndex(4*1024);
+ ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ final int count = 4000;
+ doInsert(count);
+
+ Random rand = new Random(System.currentTimeMillis());
+ int i = 0, prev = 0;
+ while (!index.isEmpty(tx)) {
+ prev = i;
+ i = rand.nextInt(count);
+ try {
+ index.remove(tx, key(i));
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("unexpected exception on " + i + ", prev: " + prev + ", ex: " + e);
+ }
+ }
+ }
+
+ public void testRemovePattern() throws Exception {
+ createPageFileAndIndex(100);
+ ListIndex<String, Long> index = ((ListIndex<String, Long>) this.index);
+ this.index.load(tx);
+ tx.commit();
+
+ final int count = 4000;
+ doInsert(count);
+
+ index.remove(tx, key(3697));
+ index.remove(tx, key(1566));
+ }
+
+ public void testLargeAppendRemoveTimed() throws Exception {
+ createPageFileAndIndex(1024*4);
+ ListIndex<String, Long> listIndex = ((ListIndex<String, Long>) this.index);
+ this.index.load(tx);
+ tx.commit();
+ final int COUNT = 50000;
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < COUNT; i++) {
+ listIndex.add(tx, key(i), (long) i);
+ tx.commit();
+ }
+ LOG.info("Time to add " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
+ LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
+
+ start = System.currentTimeMillis();
+ tx = pf.tx();
+ int removeCount = 0;
+ Iterator<Map.Entry<String, Long>> iterator = index.iterator(tx);
+ while (iterator.hasNext()) {
+ iterator.next();
+ iterator.remove();
+ removeCount++;
+ }
+ tx.commit();
+ assertEquals("Removed all", COUNT, removeCount);
+ LOG.info("Time to remove " + COUNT + ": " + (System.currentTimeMillis() - start) + " mills");
+ LOG.info("Page count: " + listIndex.getPageFile().getPageCount());
+ LOG.info("Page free count: " + listIndex.getPageFile().getFreePageCount());
+ }
+
+ private int getMessageSize(int min, int max) {
+ return min + (int)(Math.random() * ((max - min) + 1));
+ }
+
+ public void testLargeValueOverflow() throws Exception {
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.setEnablePageCaching(false);
+ pf.setWriteBatchSize(1);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ ListIndex<Long, String> test = new ListIndex<Long, String>(pf, id);
+ test.setKeyMarshaller(LongMarshaller.INSTANCE);
+ test.setValueMarshaller(StringMarshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ final long NUM_ADDITIONS = 32L;
+
+ LinkedList<Long> expected = new LinkedList<Long>();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; ++i) {
+ final int stringSize = getMessageSize(1, 4096);
+ String val = new String(new byte[stringSize]);
+ expected.add(Long.valueOf(stringSize));
+ test.add(tx, i, val);
+ }
+ tx.commit();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; i++) {
+ String s = test.get(tx, i);
+ assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+ }
+ tx.commit();
+
+ expected.clear();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; ++i) {
+ final int stringSize = getMessageSize(1, 4096);
+ String val = new String(new byte[stringSize]);
+ expected.add(Long.valueOf(stringSize));
+ test.addFirst(tx, i+NUM_ADDITIONS, val);
+ }
+ tx.commit();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; i++) {
+ String s = test.get(tx, i+NUM_ADDITIONS);
+ assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+ }
+ tx.commit();
+
+ expected.clear();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; ++i) {
+ final int stringSize = getMessageSize(1, 4096);
+ String val = new String(new byte[stringSize]);
+ expected.add(Long.valueOf(stringSize));
+ test.put(tx, i, val);
+ }
+ tx.commit();
+
+ tx = pf.tx();
+ for (long i = 0; i < NUM_ADDITIONS; i++) {
+ String s = test.get(tx, i);
+ assertEquals("string length did not match expected", expected.get((int)i), Long.valueOf(s.length()));
+ }
+ tx.commit();
+ }
+
+ void doInsertReverse(int count) throws Exception {
+ for (int i = count - 1; i >= 0; i--) {
+ ((ListIndex<String, Long>) index).addFirst(tx, key(i), (long) i);
+ tx.commit();
+ }
+ }
+
+ @Override
+ protected String key(int i) {
+ return "key:" + nf.format(i);
+ }
+
+ public void testListIndexConsistencyOverTime() throws Exception {
+
+ final int NUM_ITERATIONS = 100;
+
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.setEnablePageCaching(false);
+ pf.setWriteBatchSize(1);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ ListIndex<String, SequenceSet> test = new ListIndex<String, SequenceSet>(pf, id);
+ test.setKeyMarshaller(StringMarshaller.INSTANCE);
+ test.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ int expectedListEntries = 0;
+ int nextSequenceId = 0;
+
+ LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entires and sparsely populating the sequences.");
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ test.add(tx, String.valueOf(expectedListEntries++), new SequenceSet());
+
+ for (int j = 0; j < expectedListEntries; j++) {
+
+ SequenceSet sequenceSet = test.get(tx, String.valueOf(j));
+
+ int startSequenceId = nextSequenceId;
+ for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
+ sequenceSet.add(nextSequenceId++);
+ test.put(tx, String.valueOf(j), sequenceSet);
+ }
+
+ sequenceSet = test.get(tx, String.valueOf(j));
+
+ for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
+ sequenceSet.remove(startSequenceId++);
+ test.put(tx, String.valueOf(j), sequenceSet);
+ }
+ }
+ }
+
+ LOG.info("Checking if Index has the expected number of entries.");
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
+ }
+
+ LOG.info("Index has the expected number of entries.");
+
+ assertEquals(expectedListEntries, test.size());
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+ LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+ assertEquals(expectedListEntries - (i + 1), test.size());
+ }
+ }
+
+ public void testListLargeDataAddWithReverseRemove() throws Exception {
+
+ final int NUM_ITERATIONS = 100;
+
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.setEnablePageCaching(false);
+ pf.setWriteBatchSize(1);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ ListIndex<String, SequenceSet> test = new ListIndex<String, SequenceSet>(pf, id);
+ test.setKeyMarshaller(StringMarshaller.INSTANCE);
+ test.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ int expectedListEntries = 0;
+ int nextSequenceId = 0;
+
+ LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entries and sparsely populating the sequences.");
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ test.add(tx, String.valueOf(expectedListEntries++), new SequenceSet());
+
+ for (int j = 0; j < expectedListEntries; j++) {
+
+ SequenceSet sequenceSet = test.get(tx, String.valueOf(j));
+
+ int startSequenceId = nextSequenceId;
+ for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
+ sequenceSet.add(nextSequenceId++);
+ test.put(tx, String.valueOf(j), sequenceSet);
+ }
+
+ sequenceSet = test.get(tx, String.valueOf(j));
+
+ for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
+ sequenceSet.remove(startSequenceId++);
+ test.put(tx, String.valueOf(j), sequenceSet);
+ }
+ }
+ }
+
+ LOG.info("Checking if Index has the expected number of entries.");
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
+ }
+
+ LOG.info("Index has the expected number of entries.");
+
+ assertEquals(expectedListEntries, test.size());
+
+ for (int i = NUM_ITERATIONS - 1; i >= 0; --i) {
+ LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+ LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+ assertEquals(--expectedListEntries, test.size());
+ }
+ }
+
+ public void x_testSplitPerformance() throws Exception {
+
+ final int NUM_ITERATIONS = 200;
+ final int RANGE = 200000;
+
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ ListIndex<String, SequenceSet> test = new ListIndex<String, SequenceSet>(pf, id);
+ test.setKeyMarshaller(StringMarshaller.INSTANCE);
+ test.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ Sequence sequence = new Sequence(0);
+ sequence.setLast(RANGE);
+ SequenceSet sequenceSet = new SequenceSet();
+ sequenceSet.add(sequence);
+ test.add(tx, String.valueOf(i), sequenceSet);
+ }
+
+ long start = System.currentTimeMillis();
+
+ // overflow the value in the last sequence
+ SequenceSet sequenceSet = test.get(tx, String.valueOf(NUM_ITERATIONS - 10));
+ for (int i=0; i<RANGE; i+=2) {
+ sequenceSet.remove(i);
+ test.put(tx, String.valueOf(NUM_ITERATIONS -1), sequenceSet);
+ }
+ LOG.info("duration: " + (System.currentTimeMillis() - start));
+ }
+
+ public void testListLargeDataAddAndNonSequentialRemove() throws Exception {
+
+ final int NUM_ITERATIONS = 100;
+
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.setEnablePageCaching(false);
+ pf.setWriteBatchSize(1);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ ListIndex<String, SequenceSet> test = new ListIndex<String, SequenceSet>(pf, id);
+ test.setKeyMarshaller(StringMarshaller.INSTANCE);
+ test.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ int expectedListEntries = 0;
+ int nextSequenceId = 0;
+
+ LOG.info("Loading up the ListIndex with "+NUM_ITERATIONS+" entires and sparsely populating the sequences.");
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ test.add(tx, String.valueOf(expectedListEntries++), new SequenceSet());
+
+ for (int j = 0; j < expectedListEntries; j++) {
+
+ SequenceSet sequenceSet = test.get(tx, String.valueOf(j));
+
+ int startSequenceId = nextSequenceId;
+ for (int ix = 0; ix < NUM_ITERATIONS; ix++) {
+ sequenceSet.add(nextSequenceId++);
+ test.put(tx, String.valueOf(j), sequenceSet);
+ }
+
+ sequenceSet = test.get(tx, String.valueOf(j));
+
+ for (int ix = 0; ix < NUM_ITERATIONS - 1; ix++) {
+ sequenceSet.remove(startSequenceId++);
+ test.put(tx, String.valueOf(j), sequenceSet);
+ }
+ }
+ }
+
+ LOG.info("Checking if Index has the expected number of entries.");
+
+ for (int i = 0; i < NUM_ITERATIONS; ++i) {
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.get(tx, String.valueOf(i)));
+ }
+
+ LOG.info("Index has the expected number of entries.");
+
+ assertEquals(expectedListEntries, test.size());
+
+ for (int i = 0; i < NUM_ITERATIONS; i += 2) {
+ LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+ LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+ assertEquals(--expectedListEntries, test.size());
+ }
+
+ for (int i = NUM_ITERATIONS - 1; i > 0; i -= 2) {
+ LOG.debug("Size of ListIndex before removal of entry ["+i+"] is: " + test.size());
+
+ assertTrue("List should contain Key["+i+"]",test.containsKey(tx, String.valueOf(i)));
+ assertNotNull("List contents of Key["+i+"] should not be null", test.remove(tx, String.valueOf(i)));
+ LOG.debug("Size of ListIndex after removal of entry ["+i+"] is: " + test.size());
+
+ assertEquals(--expectedListEntries, test.size());
+ }
+
+ assertEquals(0, test.size());
+ }
+
+ static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
+ final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
+
+ public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oout = new ObjectOutputStream(baos);
+ oout.writeObject(object);
+ oout.flush();
+ oout.close();
+ byte[] data = baos.toByteArray();
+ dataOut.writeInt(data.length);
+ dataOut.write(data);
+ }
+
+ @SuppressWarnings("unchecked")
+ public HashSet<String> readPayload(DataInput dataIn) throws IOException {
+ int dataLen = dataIn.readInt();
+ byte[] data = new byte[dataLen];
+ dataIn.readFully(data);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ ObjectInputStream oin = new ObjectInputStream(bais);
+ try {
+ return (HashSet<String>) oin.readObject();
+ } catch (ClassNotFoundException cfe) {
+ IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
+ ioe.initCause(cfe);
+ throw ioe;
+ }
+ }
+ }
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/index/ListIndexTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+
+public class JournalTest extends TestCase {
+ protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
+
+ Journal dataManager;
+ File dir;
+
+ @Override
+ public void setUp() throws Exception {
+ dir = new File("target/tests/DataFileAppenderTest");
+ dir.mkdirs();
+ dataManager = new Journal();
+ dataManager.setDirectory(dir);
+ configure(dataManager);
+ dataManager.start();
+ }
+
+ protected void configure(Journal dataManager) {
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ dataManager.close();
+ IOHelper.delete(dir);
+ }
+
+ public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception {
+ final int iterations = 10;
+ final CountDownLatch latch = new CountDownLatch(iterations);
+ ByteSequence data = new ByteSequence("DATA".getBytes());
+ for (int i=0; i < iterations; i++) {
+ dataManager.write(data, new Runnable() {
+ public void run() {
+ latch.countDown();
+ }
+ });
+ }
+ // at this point most probably dataManager.getInflightWrites().size() >= 0
+ // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+ assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS));
+ }
+
+ public void testBatchWriteCallbackCompleteAfterClose() throws Exception {
+ final int iterations = 10;
+ final CountDownLatch latch = new CountDownLatch(iterations);
+ ByteSequence data = new ByteSequence("DATA".getBytes());
+ for (int i=0; i<iterations; i++) {
+ dataManager.write(data, new Runnable() {
+ public void run() {
+ latch.countDown();
+ }
+ });
+ }
+ dataManager.close();
+ assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+ assertEquals("none written", 0, latch.getCount());
+ }
+
+ public void testBatchWriteCompleteAfterClose() throws Exception {
+ ByteSequence data = new ByteSequence("DATA".getBytes());
+ final int iterations = 10;
+ for (int i=0; i<iterations; i++) {
+ dataManager.write(data, false);
+ }
+ dataManager.close();
+ assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty());
+ }
+
+ public void testBatchWriteToMaxMessageSize() throws Exception {
+ final int iterations = 4;
+ final CountDownLatch latch = new CountDownLatch(iterations);
+ Runnable done = new Runnable() {
+ public void run() {
+ latch.countDown();
+ }
+ };
+ int messageSize = DEFAULT_MAX_BATCH_SIZE / iterations;
+ byte[] message = new byte[messageSize];
+ ByteSequence data = new ByteSequence(message);
+
+ for (int i=0; i< iterations; i++) {
+ dataManager.write(data, done);
+ }
+
+ // write may take some time
+ assertTrue("all callbacks complete", latch.await(10, TimeUnit.SECONDS));
+ }
+
+ public void testNoBatchWriteWithSync() throws Exception {
+ ByteSequence data = new ByteSequence("DATA".getBytes());
+ final int iterations = 10;
+ for (int i=0; i<iterations; i++) {
+ dataManager.write(data, true);
+ assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+ }
+ }
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/JournalTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java?rev=1409554&view=auto
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java (added)
+++ activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java Wed Nov 14 23:31:34 2012
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.page;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
+
+import junit.framework.TestCase;
+
+@SuppressWarnings("rawtypes")
+public class PageFileTest extends TestCase {
+
+ public void testCRUD() throws IOException {
+
+ PageFile pf = new PageFile(new File("target/test-data"), getName());
+ pf.delete();
+ pf.load();
+
+ HashSet<String> expected = new HashSet<String>();
+
+ // Insert some data into the page file.
+ Transaction tx = pf.tx();
+ for (int i = 0; i < 100; i++) {
+ Page<String> page = tx.allocate();
+ assertEquals(Page.PAGE_FREE_TYPE, page.getType());
+
+ String t = "page:" + i;
+ expected.add(t);
+ page.set(t);
+ tx.store(page, StringMarshaller.INSTANCE, false);
+ tx.commit();
+ }
+
+ // Reload it...
+ pf.unload();
+ pf.load();
+ tx = pf.tx();
+
+ // Iterate it to make sure they are still there..
+ HashSet<String> actual = new HashSet<String>();
+ for (Page<String> page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
+ actual.add(page.get());
+ }
+ assertEquals(expected, actual);
+
+ // Remove the odd records..
+ for (int i = 0; i < 100; i++) {
+ if (i % 2 == 0) {
+ break;
+ }
+ String t = "page:" + i;
+ expected.remove(t);
+ }
+ for (Page<String> page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
+ if (!expected.contains(page.get())) {
+ tx.free(page);
+ }
+ }
+ tx.commit();
+
+ // Reload it...
+ pf.unload();
+ pf.load();
+ tx = pf.tx();
+
+ // Iterate it to make sure the even records are still there..
+ actual.clear();
+ for (Page<String> page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
+ actual.add((String)page.get());
+ }
+ assertEquals(expected, actual);
+
+ // Update the records...
+ HashSet<String> t = expected;
+ expected = new HashSet<String>();
+ for (String s : t) {
+ expected.add(s + ":updated");
+ }
+ for (Page<String> page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
+ page.set(page.get() + ":updated");
+ tx.store(page, StringMarshaller.INSTANCE, false);
+ }
+ tx.commit();
+
+ // Reload it...
+ pf.unload();
+ pf.load();
+ tx = pf.tx();
+
+ // Iterate it to make sure the updated records are still there..
+ actual.clear();
+ for (Page<String> page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
+ actual.add(page.get());
+ }
+ assertEquals(expected, actual);
+
+ pf.unload();
+ }
+
+ public void testStreams() throws IOException {
+
+ PageFile pf = new PageFile(new File("target/test-data"), getName());
+ pf.delete();
+ pf.load();
+
+ Transaction tx = pf.tx();
+ Page page = tx.allocate();
+ tx.commit();
+
+ OutputStream pos = tx.openOutputStream(page, true);
+ DataOutputStream os = new DataOutputStream(pos);
+ for (int i = 0; i < 10000; i++) {
+ os.writeUTF("Test string:" + i);
+ }
+
+ os.close();
+ tx.commit();
+
+ // Reload the page file.
+ pf.unload();
+ pf.load();
+ tx = pf.tx();
+
+ InputStream pis = tx.openInputStream(page);
+ DataInputStream is = new DataInputStream(pis);
+ for (int i = 0; i < 10000; i++) {
+ assertEquals("Test string:" + i, is.readUTF());
+ }
+ assertEquals(-1, is.read());
+ is.close();
+
+ pf.unload();
+ }
+
+ public void testAddRollback() throws IOException {
+
+ PageFile pf = new PageFile(new File("target/test-data"), getName());
+ pf.delete();
+ pf.load();
+
+ HashSet<String> expected = new HashSet<String>();
+
+ // Insert some data into the page file.
+ Transaction tx = pf.tx();
+ for (int i = 0; i < 100; i++) {
+ Page<String> page = tx.allocate();
+ assertEquals(Page.PAGE_FREE_TYPE, page.getType());
+
+ String t = "page:" + i;
+ page.set(t);
+ tx.store(page, StringMarshaller.INSTANCE, false);
+
+ // Rollback every other insert.
+ if (i % 2 == 0) {
+ expected.add(t);
+ tx.commit();
+ } else {
+ tx.rollback();
+ }
+
+ }
+
+ // Reload it...
+ pf.unload();
+ pf.load();
+ tx = pf.tx();
+
+ // Iterate it to make sure they are still there..
+ HashSet<String> actual = new HashSet<String>();
+ for (Page<String> page : tx) {
+ tx.load(page, StringMarshaller.INSTANCE);
+ actual.add(page.get());
+ }
+ assertEquals(expected, actual);
+ }
+}
Propchange: activemq/trunk/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
------------------------------------------------------------------------------
svn:eol-style = native