You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/15 19:04:15 UTC

svn commit: r825564 [5/5] - in /activemq/sandbox/activemq-flow: ./ activemq-util/src/main/java/org/apache/activemq/util/buffer/ hawtdb/ hawtdb/src/ hawtdb/src/main/ hawtdb/src/main/java/ hawtdb/src/main/java/org/ hawtdb/src/main/java/org/apache/ hawtdb...

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/ActionActor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/ActionActor.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/ActionActor.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/ActionActor.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ActionActor<A extends Actor> extends Actor {
+    
+    private Action<A> action;
+	
+    public ActionActor() {
+    }
+    
+	public ActionActor(String name, Action<A> action) {
+		super(name);
+        this.action = action;
+        this.action.init(cast());
+	}
+
+    @SuppressWarnings("unchecked")
+    private A cast() {
+        return (A) this;
+    }
+
+	public void run() throws Exception {
+		action.run(cast());
+	}
+
+    public Action<A> getAction() {
+        return action;
+    }
+
+    public void setAction(Action<A> action) {
+        this.action = action;
+        this.action.init(cast());
+    }
+	
+	
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Actor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Actor.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Actor.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Actor.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class Actor {
+    
+    public static enum ActorState {
+        STOPPED,
+        STARTING,
+        RUNNING,
+        STOPPING,
+    }
+
+    protected String name;
+	protected Thread thread;
+	protected AtomicReference<Actor.ActorState> state = new AtomicReference<Actor.ActorState>(ActorState.STOPPED);
+	
+    public Actor() {
+    }
+    
+	public Actor(String name) {
+		this.name = name;
+	}
+
+	public void start() {
+		if( state.compareAndSet(ActorState.STOPPED, ActorState.STARTING) ) {
+			thread = new Thread(new Runnable() {
+				public void run() {
+					if( state.compareAndSet(ActorState.STARTING, ActorState.RUNNING) ) {
+						try {
+							while( state.get()==ActorState.RUNNING ) {
+							    Actor.this.run();
+							}
+						} catch (Exception e) {
+							e.printStackTrace();
+						} finally {
+							state.set(ActorState.STOPPED);
+						}
+					}
+				}
+			}, name);
+			thread.start();
+		}
+	}
+	
+	public void stop() {
+		state.compareAndSet(ActorState.RUNNING, ActorState.STOPPING);
+	}
+	
+	public void waitForStop() throws InterruptedException {
+		stop();
+		thread.join();
+	}
+	
+	public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public AtomicReference<Actor.ActorState> getState() {
+        return state;
+    }
+
+    abstract protected void run() throws Exception;
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Benchmarker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Benchmarker.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Benchmarker.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/Benchmarker.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.activemq.metric.Period;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class Benchmarker {
+
+    public static abstract class BenchmarkAction<A extends Actor> implements Action<A> {
+        public final MetricCounter success = new MetricCounter();
+        public final MetricCounter failed = new MetricCounter();
+        protected final String name;
+
+        public BenchmarkAction(String name) {
+            this.name = name;
+            success.setName(name+" success");
+            failed.setName(name+" failed");
+        }
+
+        public void init(A actor) {
+        }
+
+        final public void run(final A actor) throws Exception {
+            try {
+                execute(actor);
+                success.increment();
+            } catch (Throwable e) {
+                failed.increment();
+            }
+        }
+
+        abstract protected void execute(A actor) throws Exception;
+
+        public String getName() {
+            return name;
+        }
+
+    }
+
+    int samples = 3;
+    int period = 1000 * 5;
+    String name;
+    
+    public void benchmark(ArrayList<? extends Actor> actors, ArrayList<? extends MetricCounter> metrics) throws Exception {
+        for (Actor actor : actors) {
+            actor.start();
+        }
+        try {
+            displayRates(metrics);
+        } finally {
+            for (Actor actor : actors) {
+                actor.stop();
+            }
+            for (Actor actor : actors) {
+                actor.waitForStop();
+            }
+        }
+    }
+
+    protected void displayRates(List<? extends MetricCounter> metrics) throws InterruptedException {
+        System.out.println("Gathering rates for: " + getName());
+        for (int i = 0; i < samples; i++) {
+            Period p = new Period();
+            Thread.sleep(period);
+            for (MetricCounter metric : metrics) {
+                System.out.println(metric.getRateSummary(p));
+                metric.reset();
+            }
+        }
+    }
+
+    public int getSamples() {
+        return samples;
+    }
+
+    public void setSamples(int samples) {
+        this.samples = samples;
+    }
+
+    public int getPeriod() {
+        return period;
+    }
+
+    public void setPeriod(int period) {
+        this.period = period;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+        
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/MapBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/MapBenchmark.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/MapBenchmark.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/MapBenchmark.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction;
+
+//import clojure.lang.IPersistentMap;
+//import clojure.lang.PersistentHashMap;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MapBenchmark {
+
+    static interface Node {
+        public Node create(Map<Long, Long> updates);
+        public void merge(Node node);
+        public Long get(Long i);
+        public void clear();
+    }
+    
+    static class SynchornizedMapNode implements Node {
+        Map<Long, Long> updates;
+        
+        public SynchornizedMapNode() {
+        }
+        
+        public SynchornizedMapNode(Map<Long, Long> updates) {
+            this.updates = updates;
+        }
+        
+        public Node create(Map<Long, Long> updates) {
+            return new SynchornizedMapNode(updates);
+        } 
+
+        public void merge(Node n) {
+            SynchornizedMapNode node = (SynchornizedMapNode) n;
+            synchronized(node) {
+                synchronized(this) {
+                    node.updates.putAll(this.updates);
+                    this.updates = node.updates;
+                }
+            }
+        }
+
+        synchronized public Long get(Long i) {
+            return updates.get(i);
+        }
+
+        synchronized public void clear() {
+            updates.clear();
+        }
+
+    }
+    
+//    @Test
+//    public void SynchornizedMapNode() throws Exception {
+//        nodeTest(new SynchornizedMapNode());
+//    }
+//    
+//    static class ClojureMapNode implements Node {
+//        volatile IPersistentMap updates;
+//        
+//        public ClojureMapNode() {
+//        }
+//        
+//        public ClojureMapNode(Map<Long, Long> updates) {
+//            this.updates = PersistentHashMap.create(updates);
+//        }
+//        
+//        public Node create(Map<Long, Long> updates) {
+//            return new SynchornizedMapNode(updates);
+//        } 
+//
+//        public void merge(Node n) {
+//            ClojureMapNode node = (ClojureMapNode) n;
+//            this.updates = (IPersistentMap) node.updates.cons(this.updates);
+//        }
+//
+//        synchronized public Long get(Long i) {
+//            return (Long) updates.valAt(i);
+//        }
+//
+//        synchronized public void clear() {
+//            updates = PersistentHashMap.EMPTY;
+//        }
+//
+//    }
+    
+//    @Test
+//    public void ClojureMapNode() throws Exception {
+//        nodeTest(new ClojureMapNode());
+//    }
+
+    public void nodeTest(final Node type) throws Exception {
+        final Node node = type.create(new HashMap<Long, Long>());
+        benchmark(5, new BenchmarkAction<MapActor>(type.getClass().getName()) {
+            @Override
+            protected void execute(MapActor actor) {
+                
+                // Test merges...
+                for (long i = 0; i < 100; i++) {
+                    Map<Long, Long> update = new HashMap<Long, Long>();
+                    update.put(i, i);
+                    node.merge(type.create(update));
+                }
+                
+                // Do gets
+                for (long i = 0; i < 100; i++) {
+                    node.get(i);
+                }
+                
+                node.clear();
+            }
+        });
+    }
+    
+    
+    
+//    
+//    @Test
+//    public void clojureMapManyReaders() throws Exception {
+//        IPersistentMap x = PersistentHashMap.EMPTY;
+//        for (long i = 0; i < 1000; i++) {
+//            x = x.assoc(i, i);
+//        }
+//        final IPersistentMap map = x;
+//        benchmark(5, new BenchmarkAction<MapActor>("ManyReaders:"+PersistentHashMap.class.getName()) {
+//            @Override
+//            protected void execute(MapActor actor) throws Exception {
+//                // Get them all..
+//                for (long j = 0; j < 10; j++) {
+//                    for (long i = 0; i < 1000; i++) {
+//                        map.valAt(i);
+//                    }
+//                }
+//                // Do a bunch of misses
+//                for (long i = 0; i < 1000; i++) {
+//                    map.valAt(-i);
+//                }
+//            }
+//        });
+//    }
+//    
+//    @Test
+//    public void javaMap() throws Exception {
+//        benchmark(1, new BenchmarkAction<MapActor>(HashMap.class.getName()) {
+//            @Override
+//            protected void execute(MapActor actor) {
+//                Map<Long, Long> map = new HashMap<Long, Long>();
+//                for (long i = 0; i < 1000; i++) {
+//                    map.put(i, i);
+//                }
+//                // Get them all..
+//                for (long j = 0; j < 10; j++) {
+//                    for (long i = 0; i < 1000; i++) {
+//                        map.get(i);
+//                    }
+//                }
+//                // Do a bunch of misses
+//                for (long i = 0; i < 1000; i++) {
+//                    map.get(-i);
+//                }
+//                // remove 1/2
+//                for (long i = 0; i < 1000; i++) {
+//                    if ((i % 2) == 0) {
+//                        map.remove(i);
+//                    }
+//                }
+//                // Remove 1/2, miss 1/2 /w misses.
+//                for (long i = 0; i < 1000; i++) {
+//                    map.remove(i);
+//                }
+//            }
+//        });
+//    }
+//
+//    @Test
+//    public void clojureMap() throws Exception {
+//        benchmark(1, new BenchmarkAction<MapActor>(PersistentHashMap.class.getName()) {
+//            @Override
+//            protected void execute(MapActor actor) throws Exception {
+//                IPersistentMap map = PersistentHashMap.EMPTY;
+//                for (long i = 0; i < 1000; i++) {
+//                    map = map.assoc(i, i);
+//                }
+//                // Get them all..
+//                for (long j = 0; j < 10; j++) {
+//                    for (long i = 0; i < 1000; i++) {
+//                        map.valAt(i);
+//                    }
+//                }
+//                // Do a bunch of misses
+//                for (long i = 0; i < 1000; i++) {
+//                    map.valAt(-i);
+//                }
+//                // remove 1/2
+//                for (long i = 0; i < 1000; i++) {
+//                    if ((i % 2) == 0) {
+//                        map = map.without(i);
+//                    }
+//                }
+//                // Remove 1/2, miss 1/2 /w misses.
+//                for (long i = 0; i < 1000; i++) {
+//                    map = map.without(i);
+//                }
+//            }
+//        });
+//    }
+//    
+    static class MapActor extends ActionActor<MapActor> {
+    }
+
+    private void benchmark(int count, BenchmarkAction<MapActor> action) throws Exception {
+        Benchmarker benchmark = new Benchmarker();
+        benchmark.setName(action.getName());
+        ArrayList<MapActor> actors = createActors(count, action);
+        benchmark.benchmark(actors, createMetrics(action));
+    }
+
+    protected ArrayList<MetricCounter> createMetrics(BenchmarkAction<MapActor> action) {
+        ArrayList<MetricCounter> metrics = new ArrayList<MetricCounter>();
+        metrics.add(action.success);
+        metrics.add(action.failed);
+        return metrics;
+    }
+
+    protected ArrayList<MapActor> createActors(int count, Action<MapActor> action) {
+        ArrayList<MapActor> actors = new ArrayList<MapActor>();
+        for (int i = 0; i < count; i++) {
+            MapActor actor = new MapActor();
+            actor.setName("actor:"+i);
+            actor.setAction(action);
+            actors.add(actor);
+        }
+        return actors;
+    }
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.marshaller.FixedBufferMarshaller;
+import org.apache.activemq.util.marshaller.LongMarshaller;
+import org.apache.hawtdb.api.Index;
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.internal.index.BTreeIndex;
+import org.apache.hawtdb.internal.index.BTreeIndex.Factory;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BTreeIndexBenchmark extends IndexBenchmark {
+
+    protected Index<Long, Buffer> createIndex(Transaction tx) {
+        Factory<Long, Buffer> factory = new BTreeIndex.Factory<Long, Buffer>();
+        factory.setKeyMarshaller(LongMarshaller.INSTANCE);
+        factory.setValueMarshaller(new FixedBufferMarshaller(DATA.length));
+        return factory.open(tx, tx.allocator().alloc(1));
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.PrintWriter;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.util.marshaller.LongMarshaller;
+import org.apache.activemq.util.marshaller.StringMarshaller;
+import org.apache.hawtdb.api.IndexVisitor;
+import org.apache.hawtdb.api.Index;
+import org.apache.hawtdb.internal.index.BTreeIndex;
+import org.apache.hawtdb.internal.index.BTreeIndex.Factory;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class BTreeIndexTest extends IndexTestSupport {
+
+    private NumberFormat nf;
+
+    @Before
+    protected void setUp() throws Exception {
+        nf = NumberFormat.getIntegerInstance();
+        nf.setMinimumIntegerDigits(6);
+        nf.setGroupingUsed(false);
+    }
+    
+    @Override
+    protected Index<String, Long> createIndex(int page) {
+        Factory<String,Long> factory = new Factory<String,Long>();
+        factory.setKeyMarshaller(StringMarshaller.INSTANCE);
+        factory.setValueMarshaller(LongMarshaller.INSTANCE);
+        if( page==-1 ) {
+            return factory.create(tx, tx.allocator().alloc(1));
+        } else {
+            return factory.open(tx, page);
+        }
+    }
+
+    /**
+     * Yeah, the current implementation does NOT try to balance the tree.  Here is 
+     * a test case showing that it gets out of balance.  
+     * 
+     * @throws Exception
+     */
+    public void disabled_testTreeBalancing() throws Exception {
+        createPageFileAndIndex((short) 100);
+
+        BTreeIndex<String, Long> index = ((BTreeIndex<String, Long>)this.index);
+        
+        doInsert(50);
+        
+        int minLeafDepth = index.getMinLeafDepth();
+        int maxLeafDepth = index.getMaxLeafDepth();
+        assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+        // Remove some of the data
+        doRemove(16);
+        minLeafDepth = index.getMinLeafDepth();
+        maxLeafDepth = index.getMaxLeafDepth();
+
+        System.out.println( "min:"+minLeafDepth );
+        System.out.println( "max:"+maxLeafDepth );
+        index.printStructure(new PrintWriter(System.out));
+
+        assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+        
+        tx.commit();
+    }
+    
+    @Test
+    public void testPruning() throws Exception {
+        createPageFileAndIndex((short)100);
+
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+
+        int minLeafDepth = index.getMinLeafDepth();
+        int maxLeafDepth = index.getMaxLeafDepth();
+        assertEquals(1, minLeafDepth);
+        assertEquals(1, maxLeafDepth);
+        
+        doInsert(1000);
+        
+        minLeafDepth = index.getMinLeafDepth();
+        maxLeafDepth = index.getMaxLeafDepth();
+        assertTrue("Depth of tree grew", minLeafDepth > 1);
+        assertTrue("Depth of tree grew", maxLeafDepth > 1);
+
+        // Remove the data.
+        doRemove(1000);
+        minLeafDepth = index.getMinLeafDepth();
+        maxLeafDepth = index.getMaxLeafDepth();
+
+        assertEquals(1, minLeafDepth);
+        assertEquals(1, maxLeafDepth);
+    }
+
+    @Test
+    public void testIteration() throws Exception {
+        createPageFileAndIndex((short)100);
+        
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+          
+        // Insert in reverse order..
+        doInsertReverse(1000);
+        
+        reloadIndex();
+        tx.commit();
+
+        // BTree should iterate it in sorted order.
+        int counter=0;
+        for (Map.Entry<String,Long> entry : index) {
+            assertEquals(key(counter),entry.getKey());
+            assertEquals(counter,(long)entry.getValue());
+            counter++;
+        }
+    }
+    
+    
+    @Test
+    public void testVisitor() throws Exception {
+        createPageFileAndIndex((short)100);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+          
+        // Insert in reverse order..
+        doInsert(1000);
+        
+        reloadIndex();
+        tx.commit();
+
+        // BTree should iterate it in sorted order.
+        
+        index.visit(new IndexVisitor<String, Long>(){
+            public boolean isInterestedInKeysBetween(String first, String second) {
+                return true;
+            }
+            public void visit(List<String> keys, List<Long> values) {
+            }
+            public boolean isSatiated() {
+                return false;
+            }
+        });
+
+    }
+    
+    void doInsertReverse(int count) throws Exception {
+        for (int i = count-1; i >= 0; i--) {
+            index.put(key(i), (long)i);
+            tx.commit();
+        }
+    }
+    /**
+     * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+     * always insert to the end of the BTree.  
+     */
+    @Override
+    protected String key(int i) {
+        return "key:"+nf.format(i);
+    }
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.marshaller.FixedBufferMarshaller;
+import org.apache.activemq.util.marshaller.LongMarshaller;
+import org.apache.hawtdb.api.Index;
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.internal.index.HashIndex.Factory;
+
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class HashIndexBenchmark extends IndexBenchmark {
+
+    protected Index<Long, Buffer> createIndex(Transaction tx) {
+        Factory<Long, Buffer> factory = new Factory<Long, Buffer>();
+        factory.setKeyMarshaller(LongMarshaller.INSTANCE);
+        factory.setValueMarshaller(new FixedBufferMarshaller(DATA.length));
+        return factory.open(tx, tx.allocator().alloc(1));
+    }
+    
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexTest.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexTest.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexTest.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import org.apache.activemq.util.marshaller.LongMarshaller;
+import org.apache.activemq.util.marshaller.StringMarshaller;
+import org.apache.hawtdb.api.Index;
+import org.apache.hawtdb.internal.index.HashIndex.Factory;
+
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class HashIndexTest extends IndexTestSupport {
+
+    @Override
+    protected Index<String, Long> createIndex(int page) {
+        Factory<String,Long> factory = new Factory<String,Long>();
+        factory.setKeyMarshaller(StringMarshaller.INSTANCE);
+        factory.setValueMarshaller(LongMarshaller.INSTANCE);
+        if( page==-1 ) {
+            return factory.create(tx, tx.allocator().alloc(1));
+        } else {
+            return factory.open(tx, page);
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import java.util.Random;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.Index;
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.internal.Action;
+import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction;
+import org.apache.hawtdb.internal.page.ConcurrentPageFile;
+import org.apache.hawtdb.internal.page.TransactionActor;
+import org.apache.hawtdb.internal.page.TransactionBenchmarker;
+import org.junit.Test;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class IndexBenchmark {
+    
+    static final public byte[] DATA = new byte[8];
+
+    class IndexActor extends TransactionActor<IndexActor> {
+        public Random random;
+        public Index<Long, Buffer> index;
+        
+        public void setName(String name) {
+            super.setName(name);
+            this.random = new Random(name.hashCode());
+        }
+        
+        @Override
+        public void setTx(Transaction tx) {
+            super.setTx(tx);
+            index = createIndex(tx);
+        }
+    }
+    
+    TransactionBenchmarker<IndexActor> benchmark = new TransactionBenchmarker<IndexActor>() {
+        protected IndexActor createActor(ConcurrentPageFile pageFile, Action<IndexActor> action, int i) {
+            return new IndexActor();
+        };
+    };
+
+    
+    @Test
+    public void insert() throws Exception {
+        benchmark.benchmark(1, new BenchmarkAction<IndexActor>("insert") {
+            long counter=0;
+            @Override
+            protected void execute(IndexActor actor) {
+                actor.index.put(counter++, new Buffer(DATA));
+            }
+        });
+    }
+
+
+    abstract protected Index<Long, Buffer> createIndex(Transaction tx);
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexTestSupport.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hawtdb.api.Index;
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.internal.page.ConcurrentPageFile;
+import org.apache.hawtdb.internal.page.ConcurrentPageFileFactory;
+import org.junit.After;
+import org.junit.Test;
+
+
+/**
+ * Tests an Index
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public abstract class IndexTestSupport {
+    
+    private ConcurrentPageFileFactory pff;
+    private ConcurrentPageFile pf;
+    protected Index<String,Long> index;
+    protected Transaction tx;
+
+    
+    protected ConcurrentPageFileFactory createConcurrentPageFileFactory() {
+        ConcurrentPageFileFactory rc = new ConcurrentPageFileFactory();
+        rc.setFile(new File("target/test-data/" + getClass().getName() + ".db"));
+        return rc;
+    }
+    
+    @After
+    protected void tearDown() throws Exception {
+        if( pf!=null ) {
+            pff.close();
+            pff = null;
+        }
+    }
+    
+    abstract protected Index<String,Long> createIndex(int page);
+
+    private static final int COUNT = 10000;
+    
+    public void createPageFileAndIndex(short pageSize) throws Exception {
+        pff = createConcurrentPageFileFactory();
+        pff.setPageSize(pageSize);
+        pff.getFile().delete();
+        pff.open();
+        pf = pff.getConcurrentPageFile();
+        tx = pf.tx();
+        index = createIndex(-1);
+        
+    }
+
+    protected void reloadAll() {
+        int page = index.getPage();
+        pff.close();
+        pff.open();
+        pf = pff.getConcurrentPageFile();
+        tx = pf.tx();
+        index = createIndex(page);
+    }
+    
+    protected void reloadIndex() {
+        int page = index.getPage();
+        tx.commit();
+        index = createIndex(page);
+    }
+
+    @Test
+    public void testIndexOperations() throws Exception {
+        createPageFileAndIndex((short) 500);
+        reloadIndex();
+        doInsert(COUNT);
+        reloadIndex();
+        tx.commit();
+        checkRetrieve(COUNT);
+        doRemove(COUNT);
+        reloadIndex();
+        tx.commit();
+        doInsert(COUNT);
+        doRemoveHalf(COUNT);
+        doInsertHalf(COUNT);
+        reloadIndex();
+        tx.commit();
+        checkRetrieve(COUNT);
+    }
+
+    void doInsert(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            index.put(key(i), (long)i);
+        }
+        tx.commit();
+    }
+
+    protected String key(int i) {
+        return "key:"+i;
+    }
+
+    void checkRetrieve(int count) throws IOException {
+        for (int i = 0; i < count; i++) {
+            Long item = index.get(key(i));
+            assertNotNull("Key missing: "+key(i), item);
+        }
+    }
+
+    void doRemoveHalf(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            if (i % 2 == 0) {
+                assertNotNull("Expected remove to return value for index "+i, index.remove(key(i)));
+            }
+        }
+        tx.commit();
+    }
+
+    void doInsertHalf(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            if (i % 2 == 0) {
+                index.put(key(i), (long)i);
+            }
+        }
+        tx.commit();
+    }
+
+    void doRemove(int count) throws Exception {
+        for (int i = 0; i < count; i++) {
+            assertNotNull("Expected remove to return value for index "+i, index.remove(key(i)));
+        }
+        tx.commit();
+        for (int i = 0; i < count; i++) {
+            Long item = index.get(key(i));
+            assertNull(item);
+        }
+    }
+
+    void doRemoveBackwards(int count) throws Exception {
+        for (int i = count - 1; i >= 0; i--) {
+            index.remove(key(i));
+        }
+        tx.commit();
+        for (int i = 0; i < count; i++) {
+            Long item = index.get(key(i));
+            assertNull(item);
+        }
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/io/MemoryMappedFileTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/io/MemoryMappedFileTest.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/io/MemoryMappedFileTest.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/io/MemoryMappedFileTest.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.io;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hawtdb.internal.io.MemoryMappedFile;
+import org.junit.Assert;
+
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MemoryMappedFileTest {
+
+    @org.junit.Test
+    public void basicOps() throws IOException {
+        File file = new File("target/foo.data");
+        file.delete();
+
+        MemoryMappedFile mmf = new MemoryMappedFile(file, 1024*1024*100);
+        
+        int PAGE_SIZE = 1024*4;
+        int LAST_PAGE = 100;
+        
+        byte expect[] = createData(PAGE_SIZE);
+        
+        mmf.write(0, expect);
+        mmf.write(LAST_PAGE *PAGE_SIZE, expect);
+        
+        // Validate data on the first page.
+        byte actual[] = new byte[PAGE_SIZE];
+        mmf.read(0, actual);
+        Assert.assertEquals('a', actual[0]);
+        Assert.assertEquals('a', actual[26]);
+        Assert.assertEquals('z', actual[26+25]);
+
+        // Validate data on the 3rd page.
+        actual = new byte[PAGE_SIZE];
+        mmf.read(PAGE_SIZE*LAST_PAGE, actual);
+        Assert.assertEquals('a', actual[0]);
+        Assert.assertEquals('a', actual[26]);
+        Assert.assertEquals('z', actual[26+25]);
+
+        mmf.sync();
+        mmf.close();
+
+    }
+
+    private byte[] createData(int size) {
+		byte[] rc = new byte[size];
+		for (int i = 0; i < rc.length; i++) {
+			rc[i] = (byte) ('a'+(i%26));
+		}
+		return rc;
+	}
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/journal/JournalTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/journal/JournalTest.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/journal/JournalTest.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/journal/JournalTest.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.internal.journal.Journal;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class JournalTest extends TestCase {
+    protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
+
+    Journal dataManager;
+    File dir;
+    
+    @Override
+    public void setUp() throws Exception {
+        dir = new File("target/tests/DataFileAppenderTest");
+        dir.mkdirs();
+        dataManager = new Journal();
+        dataManager.setDirectory(dir);
+        configure(dataManager);
+        dataManager.start();
+    }
+    
+    protected void configure(Journal dataManager) {
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        dataManager.close();
+        deleteFilesInDirectory(dir);
+        dir.delete();
+    }
+
+    private void deleteFilesInDirectory(File directory) {
+        File[] files = directory.listFiles();
+        for (int i=0; i<files.length; i++) {
+            File f = files[i];
+            if (f.isDirectory()) {
+                deleteFilesInDirectory(f);
+            }   
+            f.delete();
+        }  
+    }  
+
+    public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception {
+        final int iterations = 10;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        Buffer data = new Buffer("DATA".getBytes());
+        for (int i=0; i < iterations; i++) {
+            dataManager.write(data, new Runnable() {
+                public void run() {
+                    latch.countDown();                 
+                }
+            });
+        }
+        // at this point most probably dataManager.getInflightWrites().size() >= 0
+        // as the Thread created in DataFileAppender.enqueue() may not have caught up.
+        assertTrue("queued data is written", latch.await(5, TimeUnit.SECONDS));
+    }
+
+    public void testBatchWriteCallbackCompleteAfterClose() throws Exception {
+        final int iterations = 10;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        Buffer data = new Buffer("DATA".getBytes());
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, new Runnable() {
+                public void run() {
+                    latch.countDown();                 
+                }
+            });
+        }
+        dataManager.close();
+        assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+        assertEquals("none written", 0, latch.getCount());
+    }
+    
+    public void testBatchWriteCompleteAfterClose() throws Exception {
+        Buffer data = new Buffer("DATA".getBytes());
+        final int iterations = 10;
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, false);
+        }
+        dataManager.close();
+        assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty());
+    }
+    
+    public void testBatchWriteToMaxMessageSize() throws Exception {
+        final int iterations = 4;
+        final CountDownLatch latch = new CountDownLatch(iterations);
+        Runnable done = new Runnable() {
+            public void run() {
+                latch.countDown();                 
+            }
+        };
+        int messageSize = DEFAULT_MAX_BATCH_SIZE / iterations;
+        byte[] message = new byte[messageSize];
+        Buffer data = new Buffer(message);
+        
+        for (int i=0; i< iterations; i++) {
+            dataManager.write(data, done);
+        }
+        
+        // write may take some time
+        assertTrue("all callbacks complete", latch.await(10, TimeUnit.SECONDS));
+    }
+    
+    public void testNoBatchWriteWithSync() throws Exception {
+        Buffer data = new Buffer("DATA".getBytes());
+        final int iterations = 10;
+        for (int i=0; i<iterations; i++) {
+            dataManager.write(data, true);
+            assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
+        }
+    }
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.IOPagingException;
+import org.apache.hawtdb.api.OptimisticUpdateException;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.internal.page.ConcurrentPageFile;
+import org.apache.hawtdb.internal.page.ConcurrentPageFileFactory;
+import org.apache.hawtdb.internal.page.ExtentInputStream;
+import org.apache.hawtdb.internal.page.ExtentOutputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ConcurrentPageFileTest {
+
+    private ConcurrentPageFileFactory pff;
+
+    private ConcurrentPageFile pf;
+
+    protected ConcurrentPageFileFactory createConcurrentPageFileFactory() {
+        ConcurrentPageFileFactory rc = new ConcurrentPageFileFactory();
+        rc.setFile(new File("target/test-data/" + getClass().getName() + ".db"));
+        return rc;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        pff = createConcurrentPageFileFactory();
+        pff.getFile().delete();
+        pff.open();
+        pf = pff.getConcurrentPageFile();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        pff.close();
+    }
+
+    protected void reload() {
+        pff.close();
+        pff.open();
+        pf = pff.getConcurrentPageFile();
+    }
+
+    protected int store(Paged tx, String value) throws IOException {
+        int pageId = tx.allocator().alloc(1);
+        store(tx, pageId, value);
+        return pageId;
+    }
+
+    protected void store(Paged tx, int page, String value) {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream os = new DataOutputStream(baos);
+            os.writeUTF(value);
+            os.close();
+            tx.write(page, new Buffer(baos.toByteArray()));
+        } catch (IOException e) {
+            throw new IOPagingException(e);
+        }
+    }
+
+    protected String load(Paged paged, int page) {
+        try {
+            Buffer buffer = new Buffer(pff.getPageSize());
+            paged.read(page, buffer);
+            ByteArrayInputStream bais = new ByteArrayInputStream(buffer.data, buffer.offset, buffer.length);
+            DataInputStream is = new DataInputStream(bais);
+            return is.readUTF();
+        } catch (IOException e) {
+            throw new IOPagingException(e);
+        }
+    }
+
+    private final class StringEncoderDecoder implements EncoderDecoder<String> {
+        public String load(Paged paged, int page) {
+            return ConcurrentPageFileTest.this.load(paged, page);
+        }
+        public List<Integer> store(Paged paged, int page, String value) {
+            ConcurrentPageFileTest.this.store(paged, page, value);
+            return Collections.emptyList();
+        }
+        public void remove(Paged paged, int page) {
+        }
+    }
+
+    @Test
+    public void cacheAPI() throws IOException, ClassNotFoundException {
+
+        // Setup some pages that will be getting updated.
+        Transaction tx = pf.tx();
+        StringEncoderDecoder ENCODER = new StringEncoderDecoder();
+        tx.put(ENCODER, tx.allocator().alloc(1), "Hello");
+        tx.put(ENCODER, tx.allocator().alloc(1), "World");
+        tx.commit();
+
+        reload();
+        tx = pf.tx();
+
+        assertEquals("Hello", tx.get(ENCODER, 0));
+        assertEquals("World", tx.get(ENCODER, 1));
+
+    }
+    
+    
+    @Test
+    public void cacheAPIConflictingUpdateFails() throws IOException, ClassNotFoundException {
+
+        // Setup some pages that will be getting updated.
+        Transaction tx1 = pf.tx();
+        StringEncoderDecoder ENCODER = new StringEncoderDecoder();
+        tx1.put(ENCODER, tx1.allocator().alloc(1), "Hello");
+        tx1.put(ENCODER, tx1.allocator().alloc(1), "World");
+        tx1.commit();
+
+        tx1.put(ENCODER, 0, "Change 1");
+        
+        // Now commit a change to page 0
+        Transaction tx2 = pf.tx();
+        assertEquals("Hello", tx2.get(ENCODER, 0));  // We don't see tx1's change...
+        tx2.put(ENCODER, 0, "Change 2");
+        assertEquals("Change 2", tx2.get(ENCODER, 0)); // We can see our own change..
+        tx2.commit();
+
+        // Tx1 still does not see tx2's change...
+        assertEquals("Change 1", tx1.get(ENCODER, 0));
+        
+        try {
+            tx1.commit();
+            fail("expected OptimisticUpdateException");
+        } catch (OptimisticUpdateException expected) {
+        }
+
+    }
+
+    @Test
+    public void conflictingUpdateFails() throws IOException, ClassNotFoundException {
+
+        // Setup some pages that will be getting updated.
+        Transaction tx1 = pf.tx();
+        assertEquals(0, store(tx1, "Hello"));
+        assertEquals(1, store(tx1, "World"));
+        tx1.commit();
+
+        // Start a transaction that updates page 0
+        tx1 = pf.tx();
+        store(tx1, 0, "Change 1");
+
+        // Now commit a change to page 0
+        Transaction tx2 = pf.tx();
+        assertEquals("Hello", load(tx2, 0)); // We don't see tx1's change...
+        store(tx2, 0, "Change 2");
+        assertEquals("Change 2", load(tx2, 0)); // We can see our own change..
+        tx2.commit();
+
+        // Tx1 still does not see tx2's change...
+        assertEquals("Change 1", load(tx1, 0));
+
+        try {
+            tx1.commit();
+            fail("expected OptimisticUpdateException");
+        } catch (OptimisticUpdateException expected) {
+        }
+
+    }
+
+    @Test
+    public void pagesNotDirectlyUpdated() throws IOException, ClassNotFoundException {
+        // New allocations get stored in the final positions.
+        Transaction tx = pf.tx();
+        assertEquals(0, store(tx, "Hello"));
+        assertEquals(1, store(tx, "World"));
+
+        // It should be on the page file already..
+        assertEquals("Hello", load(pff.getPageFile(), 0));
+        assertEquals("World", load(pff.getPageFile(), 1));
+        tx.commit();
+
+        // Apply the updates.
+        pf.flush();
+        pf.applyRedos();
+
+        // Should still be there..
+        assertEquals("Hello", load(pff.getPageFile(), 0));
+        assertEquals("World", load(pff.getPageFile(), 1));
+
+        // Update the existing pages..
+        store(tx, 0, "Good");
+        store(tx, 1, "Bye");
+        tx.commit();
+
+        // A subsequent transaction can read the update.
+        assertEquals("Good", load(tx, 0));
+        assertEquals("Bye", load(tx, 1));
+        tx.commit();
+
+        // But the pages are should not be updated until the transaction gets
+        // applied.
+        assertEquals("Hello", load(pff.getPageFile(), 0));
+        assertEquals("World", load(pff.getPageFile(), 1));
+
+        // Apply them
+        pf.flush();
+        pf.applyRedos();
+
+        // We should see them now.
+        assertEquals("Good", load(pff.getPageFile(), 0));
+        assertEquals("Bye", load(pff.getPageFile(), 1));
+    }
+
+    @Test
+    public void crudOperations() throws IOException, ClassNotFoundException {
+        int COUNT = 10;
+
+        ArrayList<Integer> allocations = new ArrayList<Integer>();
+        HashSet<String> expected = new HashSet<String>();
+
+        // Insert some data into the page file.
+        Transaction tx = pf.tx();
+        for (int i = 0; i < COUNT; i++) {
+
+            int page = tx.allocator().alloc(1);
+            // Since the file is empty.. allocations should occur sequentially
+            assertEquals(i, page);
+
+            allocations.add(page);
+            String value = "page:" + i;
+            store(tx, page, value);
+            expected.add(value);
+            tx.commit();
+        }
+
+        // Reload it.. .
+        reload();
+        tx = pf.tx();
+
+        // Iterate it to make sure they are still there..
+        HashSet<String> actual = new HashSet<String>();
+        for (Integer page : allocations) {
+            actual.add((String) load(tx, page));
+        }
+        assertEquals(expected, actual);
+
+        // Remove the odd records..
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                break;
+            }
+            String t = "page:" + i;
+            expected.remove(t);
+        }
+        for (Integer page : new ArrayList<Integer>(allocations)) {
+            String t = (String) load(tx, page);
+            if (!expected.contains(t)) {
+                tx.allocator().free(page, 1);
+                allocations.remove(page);
+            }
+        }
+        tx.commit();
+
+        // Reload it...
+        reload();
+        tx = pf.tx();
+
+        // Iterate it to make sure the even records are still there..
+        actual.clear();
+        for (Integer page : allocations) {
+            String t = (String) load(tx, page);
+            actual.add(t);
+        }
+        assertEquals(expected, actual);
+
+        // Update the records...
+        HashSet<String> t = expected;
+        expected = new HashSet<String>();
+        for (String s : t) {
+            expected.add(s + ":updated");
+        }
+        for (Integer page : allocations) {
+            String value = (String) load(tx, page);
+            store(tx, page, value + ":updated");
+        }
+        tx.commit();
+
+        // Reload it...
+        reload();
+        tx = pf.tx();
+
+        // Iterate it to make sure the updated records are still there..
+        actual.clear();
+        for (Integer page : allocations) {
+            String value = (String) load(tx, page);
+            actual.add(value);
+        }
+        assertEquals(expected, actual);
+
+    }
+
+    @Test
+    public void testExtentStreams() throws IOException {
+        Transaction tx = pf.tx();
+        ExtentOutputStream eos = new ExtentOutputStream(tx);
+        DataOutputStream os = new DataOutputStream(eos);
+        for (int i = 0; i < 10000; i++) {
+            os.writeUTF("Test string:" + i);
+        }
+        os.close();
+        int page = eos.getPage();
+        tx.commit();
+
+        // Reload the page file.
+        reload();
+        tx = pf.tx();
+
+        ExtentInputStream eis = new ExtentInputStream(tx, page);
+        DataInputStream is = new DataInputStream(eis);
+        for (int i = 0; i < 10000; i++) {
+            assertEquals("Test string:" + i, is.readUTF());
+        }
+        assertEquals(-1, is.read());
+        is.close();
+    }
+
+    @Test
+    public void testAddRollback() throws IOException, ClassNotFoundException {
+
+        HashSet<String> expected = new HashSet<String>();
+
+        // Insert some data into the page file.
+        Transaction tx = pf.tx();
+        for (int i = 0; i < 100; i++) {
+            String t = "page:" + i;
+            int pageId = store(tx, t);
+
+            // Rollback every other insert.
+            if (i % 2 == 0) {
+                // Rolled back back tx's should have their allocated pages
+                // released..
+                assertEquals(pageId, i / 2);
+                expected.add(t);
+                tx.commit();
+            } else {
+                tx.rollback();
+            }
+
+        }
+
+        // Reload it...
+        reload();
+        tx = pf.tx();
+
+        // Iterate it to make sure they are still there..
+        HashSet<String> actual = new HashSet<String>();
+        for (int i = 0; i < 100; i++) {
+            if (i % 2 == 0) {
+                String t = load(tx, i / 2);
+                actual.add(t);
+            }
+        }
+        assertEquals(expected, actual);
+    }
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ExtentTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ExtentTest.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ExtentTest.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ExtentTest.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hawtdb.internal.page.ExtentInputStream;
+import org.apache.hawtdb.internal.page.ExtentOutputStream;
+import org.apache.hawtdb.internal.page.PageFile;
+import org.apache.hawtdb.internal.page.PageFileFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class ExtentTest {
+
+	private PageFileFactory pff;
+    private PageFile paged;
+
+    protected PageFileFactory createPageFileFactory() {
+	    PageFileFactory rc = new PageFileFactory();
+	    rc.setMappingSegementSize(rc.getPageSize()*3);
+	    rc.setFile(new File("target/test-data/"+getClass().getName()+".db"));
+	    return rc;
+	}
+    
+	@Before
+	public void setUp() throws Exception {
+        pff = createPageFileFactory();
+        pff.getFile().delete();
+        pff.open();
+        paged = pff.getPageFile();
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	    pff.close();
+	}
+	
+	protected void reload() {
+        pff.close();
+        pff.open();
+        paged = pff.getPageFile();
+	}
+	
+
+    @Test
+	public void testExtentStreams() throws IOException {
+        ExtentOutputStream eos = new ExtentOutputStream(paged);
+        DataOutputStream os = new DataOutputStream(eos);
+        for (int i = 0; i < 10000; i++) {
+            os.writeUTF("Test string:" + i);
+        }
+        os.close();
+        int page = eos.getPage();
+        
+        assertEquals(0, page);
+
+        // Reload the page file.
+        reload();
+
+        ExtentInputStream eis = new ExtentInputStream(paged, page);
+        DataInputStream is = new DataInputStream(eis);
+        for (int i = 0; i < 10000; i++) {
+            assertEquals("Test string:" + i, is.readUTF());
+        }
+        assertEquals(-1, is.read());
+        is.close();
+    }
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionActor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionActor.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionActor.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionActor.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.internal.ActionActor;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransactionActor<A extends TransactionActor<A>> extends ActionActor<A> {
+    private Transaction tx;
+
+    public void setTx(Transaction tx) {
+        this.tx = tx;
+    }
+
+    public Transaction tx() {
+        return tx;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmark.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.util.Random;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.internal.Action;
+import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction;
+import org.apache.hawtdb.internal.page.ConcurrentPageFile;
+import org.apache.hawtdb.internal.page.ConcurrentPageFileFactory;
+import org.apache.hawtdb.internal.page.TransactionBenchmarker.Callback;
+import org.junit.Test;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransactionBenchmark {
+
+    static private byte[] THE_DATA = new byte[1024 * 3];
+
+    static class RandomTxActor extends TransactionActor<RandomTxActor> {
+        public Random random;
+        public void setName(String name) {
+            super.setName(name);
+            this.random = new Random(name.hashCode());
+        }
+    }
+    
+    TransactionBenchmarker<RandomTxActor> benchmark = new TransactionBenchmarker<RandomTxActor>() {
+        protected RandomTxActor createActor(ConcurrentPageFile pageFile, Action<RandomTxActor> action, int i) {
+            return new RandomTxActor();
+        };
+    };
+    
+//    @Test
+//    public void append() throws Exception {
+//        benchmark.benchmark(1, new BenchmarkAction<RandomTxActor>("append") {
+//            @Override
+//            protected void execute(RandomTxActor actor) {
+//                int page = actor.tx().allocator().alloc(1);
+//                actor.tx().write(page, new Buffer(THE_DATA));
+//                actor.tx().commit();
+//            }
+//        });
+//    }
+
+    @Test
+    public void update() throws Exception {
+        final int INITIAL_PAGE_COUNT = 1024 * 100;
+        preallocate(INITIAL_PAGE_COUNT);
+        benchmark.benchmark(1, new BenchmarkAction<RandomTxActor>("update") {
+            @Override
+            protected void execute(RandomTxActor actor) {
+                int page = actor.random.nextInt(INITIAL_PAGE_COUNT);
+                actor.tx().write(page, new Buffer(THE_DATA));
+                actor.tx().commit();
+            }
+        });
+    }
+
+    
+    @Test
+    public void read() throws Exception {
+        final int INITIAL_PAGE_COUNT = 1024 * 100;
+        preallocate(INITIAL_PAGE_COUNT);
+        benchmark.benchmark(1, new BenchmarkAction<RandomTxActor>("read") {
+            @Override
+            protected void execute(RandomTxActor actor) {
+                int page = actor.random.nextInt(INITIAL_PAGE_COUNT);
+                actor.tx().read(page, new Buffer(THE_DATA));
+                actor.tx().commit();
+            }
+        });
+    }
+    
+    
+    private void preallocate(final int INITIAL_PAGE_COUNT) {
+        benchmark.setSetup(new Callback(){
+            public void run(ConcurrentPageFileFactory pff) throws Exception {
+                Transaction tx = pff.getConcurrentPageFile().tx();
+                for (int i = 0; i < INITIAL_PAGE_COUNT; i++) {
+                    int page = tx.allocator().alloc(1);
+                    tx.write(page, new Buffer(THE_DATA));
+                }
+                tx.commit();
+            }
+        });
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmarker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmarker.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmarker.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/page/TransactionBenchmarker.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.File;
+import java.util.ArrayList;
+
+import org.apache.activemq.metric.MetricCounter;
+import org.apache.hawtdb.internal.Action;
+import org.apache.hawtdb.internal.Benchmarker;
+import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction;
+import org.apache.hawtdb.internal.page.ConcurrentPageFile;
+import org.apache.hawtdb.internal.page.ConcurrentPageFileFactory;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class TransactionBenchmarker<A extends TransactionActor<A>> {
+    
+    public interface Callback {
+        public void run(ConcurrentPageFileFactory pff) throws Exception;
+    }
+    
+    private Callback setup;
+    private Callback tearDown;
+    
+    public void benchmark(int actorCount, BenchmarkAction<A> action) throws Exception {
+        ConcurrentPageFileFactory pff = new ConcurrentPageFileFactory();
+        pff.setFile(new File("target/test-data/" + getClass().getName() + ".db"));
+        pff.getFile().delete();
+        pff.open();
+        try {
+            if( setup!=null ) {
+                setup.run(pff);
+            }
+            ConcurrentPageFile pf = pff.getConcurrentPageFile();
+            Benchmarker benchmark = new Benchmarker();
+            benchmark.setName(action.getName());
+            ArrayList<A> actors = createActors(pf, actorCount, action);
+            benchmark.benchmark(actors, createMetrics(action));
+        } finally {
+            try {
+                if( tearDown!=null ) {
+                    tearDown.run(pff);
+                }
+            } finally {
+                pff.close();
+            }
+        }
+    }
+
+    protected ArrayList<MetricCounter> createMetrics(BenchmarkAction<A> action) {
+        ArrayList<MetricCounter> metrics = new ArrayList<MetricCounter>();
+        metrics.add(action.success);
+        metrics.add(action.failed);
+        return metrics;
+    }
+
+    protected ArrayList<A> createActors(ConcurrentPageFile pageFile, int count, Action<A> action) {
+        ArrayList<A> actors = new ArrayList<A>();
+        for (int i = 0; i < count; i++) {
+            A actor = createActor(pageFile, action, i);
+            actor.setName("actor:"+i);
+            actor.setAction(action);
+            actor.setTx(pageFile.tx());
+            actors.add(actor);
+        }
+        return actors;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected A createActor(ConcurrentPageFile pageFile, Action<A> action, int i) {
+        return (A) new TransactionActor();
+    }
+
+    public Callback getSetup() {
+        return setup;
+    }
+
+    public void setSetup(Callback setup) {
+        this.setup = setup;
+    }
+
+    public Callback getTearDown() {
+        return tearDown;
+    }
+
+    public void setTearDown(Callback tearDown) {
+        this.tearDown = tearDown;
+    }
+    
+    
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/util/RangesTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/util/RangesTest.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/util/RangesTest.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/java/org/apache/hawtdb/internal/util/RangesTest.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.util;
+
+import static org.apache.hawtdb.internal.util.Ranges.range;
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+
+import org.apache.hawtdb.internal.util.Ranges;
+import org.apache.hawtdb.internal.util.Ranges.Range;
+import org.junit.Test;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class RangesTest {
+
+    @Test
+    public void test() {
+
+        Ranges ranges = new Ranges();
+        
+        // Example of a simple range merges..
+        ranges.add(0, 5);
+        ranges.add(15, 5);
+        ranges.add(5,10);
+        assertEquals(ranges(range(0,20)), ranges.toArrayList());
+
+        // Remove which splits an existing range into 2.
+        ranges.remove(5,10);
+        assertEquals(ranges(range(0,5),range(15,20)), ranges.toArrayList());
+        
+        // overlapping add...
+        ranges.add(4,12);
+        assertEquals(ranges(range(0,20)), ranges.toArrayList());
+
+        // Removes are idempotent 
+        ranges.remove(5,10);
+        assertEquals(ranges(range(0,5),range(15,20)), ranges.toArrayList());
+        ranges.remove(5,10);
+        assertEquals(ranges(range(0,5),range(15,20)), ranges.toArrayList());
+
+        // Adds are idempotent 
+        ranges.add(5,10);
+        assertEquals(ranges(range(0,20)), ranges.toArrayList());
+        ranges.add(5,10);
+        assertEquals(ranges(range(0,20)), ranges.toArrayList());
+    }
+    
+    ArrayList<Range> ranges(Range... args) {
+        ArrayList<Range> rc = new ArrayList<Range>();
+        for (Range range : args) {
+            rc.add(range);
+        }
+        return rc;
+    }
+    
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/test/resources/log4j.properties?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/test/resources/log4j.properties (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/test/resources/log4j.properties Thu Oct 15 17:04:11 2009
@@ -0,0 +1,27 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=WARN, console
+log4j.logger.org.apache.hawtdb=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n

Modified: activemq/sandbox/activemq-flow/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/pom.xml?rev=825564&r1=825563&r2=825564&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/pom.xml (original)
+++ activemq/sandbox/activemq-flow/pom.xml Thu Oct 15 17:04:11 2009
@@ -127,6 +127,7 @@
     <module>activemq-store</module>
     <module>activemq-transport</module>
     <module>activemq-util</module>
+    <module>hawtdb</module>
     <module>kahadb</module>
     <module>activemq-protobuf</module>
     <module>activemq-client</module>
@@ -843,6 +844,7 @@
   <build>
     <pluginManagement>
       <plugins>
+      <!-- 
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-eclipse-plugin</artifactId>
@@ -853,6 +855,7 @@
             <outputDirectory>${basedir}/eclipse-classes</outputDirectory>
           </configuration>
         </plugin>
+       -->
         <plugin>
           <groupId>com.sun.tools.jxc.maven2</groupId>
           <artifactId>maven-jaxb-schemagen-plugin</artifactId>