You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/05 00:46:44 UTC

svn commit: r692288 [2/3] - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/ main/java/org/apache/kahadb/impl/ main/java/org/apache/kahadb/impl/async/ main/java/org/apache/kahadb/impl/container/ main/java/org/apache/kahadb/impl/data/ main/...

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+
+import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
+import org.apache.kahadb.journal.DataFileAppender.WriteKey;
+import org.apache.kahadb.util.ByteSequence;
+
+/**
+ * Optimized Store reader and updater. Single threaded and synchronous. Use in
+ * conjunction with the DataFileAccessorPool of concurrent use.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class DataFileAccessor {
+
+    private final DataFile dataFile;
+    private final Map<WriteKey, WriteCommand> inflightWrites;
+    private final RandomAccessFile file;
+    private boolean disposed;
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param fileId
+     * @throws IOException
+     */
+    public DataFileAccessor(Journal dataManager, DataFile dataFile) throws IOException {
+        this.dataFile = dataFile;
+        this.inflightWrites = dataManager.getInflightWrites();
+        this.file = dataFile.openRandomAccessFile(false);
+    }
+
+    public DataFile getDataFile() {
+        return dataFile;
+    }
+
+    public void dispose() {
+        if (disposed) {
+            return;
+        }
+        disposed = true;
+        try {
+            dataFile.closeRandomAccessFile(file);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public ByteSequence readRecord(Location location) throws IOException {
+
+        if (!location.isValid()) {
+            throw new IOException("Invalid location: " + location);
+        }
+
+        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        if (asyncWrite != null) {
+            return asyncWrite.data;
+        }
+
+        try {
+
+            if (location.getSize() == Location.NOT_SET) {
+                file.seek(location.getOffset());
+                location.setSize(file.readInt());
+                file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+            } else {
+                file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+            }
+
+            byte[] data = new byte[location.getSize() - Journal.ITEM_HEAD_FOOT_SPACE];
+            file.readFully(data);
+            return new ByteSequence(data, 0, data.length);
+
+        } catch (RuntimeException e) {
+            throw new IOException("Invalid location: " + location + ", : " + e);
+        }
+    }
+
+    public void readLocationDetails(Location location) throws IOException {
+        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        if (asyncWrite != null) {
+            location.setSize(asyncWrite.location.getSize());
+            location.setType(asyncWrite.location.getType());
+        } else {
+            file.seek(location.getOffset());
+            location.setSize(file.readInt());
+            location.setType(file.readByte());
+        }
+    }
+
+    public boolean readLocationDetailsAndValidate(Location location) {
+        try {
+            WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+            if (asyncWrite != null) {
+                location.setSize(asyncWrite.location.getSize());
+                location.setType(asyncWrite.location.getType());
+            } else {
+                file.seek(location.getOffset());
+                location.setSize(file.readInt());
+                location.setType(file.readByte());
+
+                byte data[] = new byte[3];
+                file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
+                file.readFully(data);
+                if (data[0] != Journal.ITEM_HEAD_SOR[0]
+                    || data[1] != Journal.ITEM_HEAD_SOR[1]
+                    || data[2] != Journal.ITEM_HEAD_SOR[2]) {
+                    return false;
+                }
+                file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
+                file.readFully(data);
+                if (data[0] != Journal.ITEM_HEAD_EOR[0]
+                    || data[1] != Journal.ITEM_HEAD_EOR[1]
+                    || data[2] != Journal.ITEM_HEAD_EOR[2]) {
+                    return false;
+                }
+            }
+        } catch (IOException e) {
+            return false;
+        }
+        return true;
+    }
+
+    public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
+
+        file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+        int size = Math.min(data.getLength(), location.getSize());
+        file.write(data.getData(), data.getOffset(), size);
+        if (sync) {
+            file.getFD().sync();
+        }
+
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used to pool DataFileAccessors.
+ * 
+ * @author chirino
+ */
+public class DataFileAccessorPool {
+
+    private final Journal dataManager;
+    private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
+    private boolean closed;
+    private int maxOpenReadersPerFile = 5;
+
+    class Pool {
+
+        private final DataFile file;
+        private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
+        private boolean used;
+        private int openCounter;
+        private boolean disposed;
+
+        public Pool(DataFile file) {
+            this.file = file;
+        }
+
+        public DataFileAccessor openDataFileReader() throws IOException {
+            DataFileAccessor rc = null;
+            if (pool.isEmpty()) {
+                rc = new DataFileAccessor(dataManager, file);
+            } else {
+                rc = (DataFileAccessor)pool.remove(pool.size() - 1);
+            }
+            used = true;
+            openCounter++;
+            return rc;
+        }
+
+        public synchronized void closeDataFileReader(DataFileAccessor reader) {
+            openCounter--;
+            if (pool.size() >= maxOpenReadersPerFile || disposed) {
+                reader.dispose();
+            } else {
+                pool.add(reader);
+            }
+        }
+
+        public synchronized void clearUsedMark() {
+            used = false;
+        }
+
+        public synchronized boolean isUsed() {
+            return used;
+        }
+
+        public synchronized void dispose() {
+            for (DataFileAccessor reader : pool) {
+                reader.dispose();
+            }
+            pool.clear();
+            disposed = true;
+        }
+
+        public synchronized int getOpenCounter() {
+            return openCounter;
+        }
+
+    }
+
+    public DataFileAccessorPool(Journal dataManager) {
+        this.dataManager = dataManager;
+    }
+
+    synchronized void clearUsedMark() {
+        for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = (Pool)iter.next();
+            pool.clearUsedMark();
+        }
+    }
+
+    synchronized void disposeUnused() {
+        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = iter.next();
+            if (!pool.isUsed()) {
+                pool.dispose();
+                iter.remove();
+            }
+        }
+    }
+
+    synchronized void disposeDataFileAccessors(DataFile dataFile) {
+        if (closed) {
+            throw new IllegalStateException("Closed.");
+        }
+        Pool pool = pools.get(dataFile.getDataFileId());
+        if (pool != null) {
+            if (pool.getOpenCounter() == 0) {
+                pool.dispose();
+                pools.remove(dataFile.getDataFileId());
+            } else {
+                throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
+            }
+        }
+    }
+
+    synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
+        if (closed) {
+            throw new IOException("Closed.");
+        }
+
+        Pool pool = pools.get(dataFile.getDataFileId());
+        if (pool == null) {
+            pool = new Pool(dataFile);
+            pools.put(dataFile.getDataFileId(), pool);
+        }
+        return pool.openDataFileReader();
+    }
+
+    synchronized void closeDataFileAccessor(DataFileAccessor reader) {
+        Pool pool = pools.get(reader.getDataFile().getDataFileId());
+        if (pool == null || closed) {
+            reader.dispose();
+        } else {
+            pool.closeDataFileReader(reader);
+        }
+    }
+
+    public synchronized void close() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = iter.next();
+            pool.dispose();
+        }
+        pools.clear();
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ * An optimized writer to do batch appends to a data file. This object is thread
+ * safe and gains throughput as you increase the number of concurrent writes it
+ * does.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFileAppender {
+
+    protected static final byte[] RESERVED_SPACE = new byte[Journal.ITEM_HEAD_RESERVED_SPACE];
+    protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
+
+    protected final Journal dataManager;
+    protected final Map<WriteKey, WriteCommand> inflightWrites;
+    protected final Object enqueueMutex = new Object(){};
+    protected WriteBatch nextWriteBatch;
+
+    protected boolean shutdown;
+    protected IOException firstAsyncException;
+    protected final CountDownLatch shutdownDone = new CountDownLatch(1);
+    protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
+
+    private boolean running;
+    private Thread thread;
+
+    public static class WriteKey {
+        private final int file;
+        private final long offset;
+        private final int hash;
+
+        public WriteKey(Location item) {
+            file = item.getDataFileId();
+            offset = item.getOffset();
+            // TODO: see if we can build a better hash
+            hash = (int)(file ^ offset);
+        }
+
+        public int hashCode() {
+            return hash;
+        }
+
+        public boolean equals(Object obj) {
+            if (obj instanceof WriteKey) {
+                WriteKey di = (WriteKey)obj;
+                return di.file == file && di.offset == offset;
+            }
+            return false;
+        }
+    }
+
+    public class WriteBatch {
+
+        public final DataFile dataFile;
+        public final WriteCommand first;
+        public final CountDownLatch latch = new CountDownLatch(1);
+        public int size;
+
+        public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
+            this.dataFile = dataFile;
+            this.first = write;
+            size += write.location.getSize();
+        }
+
+        public boolean canAppend(DataFile dataFile, WriteCommand write) {
+            if (dataFile != this.dataFile) {
+                return false;
+            }
+            if (size + write.location.getSize() >= maxWriteBatchSize) {
+                return false;
+            }
+            return true;
+        }
+
+        public void append(WriteCommand write) throws IOException {
+            this.first.getTailNode().linkAfter(write);
+            size += write.location.getSize();
+        }
+    }
+
+    public static class WriteCommand extends LinkedNode {
+        public final Location location;
+        public final ByteSequence data;
+        final boolean sync;
+        public final Runnable onComplete;
+
+        public WriteCommand(Location location, ByteSequence data, boolean sync) {
+            this.location = location;
+            this.data = data;
+            this.sync = sync;
+            this.onComplete=null;
+        }
+
+        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
+            this.location = location;
+            this.data = data;
+			this.onComplete = onComplete;
+            this.sync = false;
+		}
+    }
+
+
+    /**
+     * Construct a Store writer
+     * 
+     * @param fileId
+     */
+    public DataFileAppender(Journal dataManager) {
+        this.dataManager = dataManager;
+        this.inflightWrites = this.dataManager.getInflightWrites();
+    }
+
+    /**
+     * @param type
+     * @param marshaller
+     * @param payload
+     * @param type
+     * @param sync
+     * @return
+     * @throws IOException
+     * @throws
+     * @throws
+     */
+    public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
+
+        // Write the packet our internal buffer.
+        int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        WriteBatch batch;
+        WriteCommand write = new WriteCommand(location, data, sync);
+
+        // Locate datafile and enqueue into the executor in sychronized block so
+        // that writes get equeued onto the executor in order that they were assigned
+        // by the data manager (which is basically just appending)
+
+        synchronized (this) {
+            // Find the position where this item will land at.
+            DataFile dataFile = dataManager.allocateLocation(location);
+            if( !sync ) {
+                inflightWrites.put(new WriteKey(location), write);
+            }
+            batch = enqueue(dataFile, write);
+        }
+        location.setLatch(batch.latch);
+        if (sync) {
+            try {
+                batch.latch.await();
+            } catch (InterruptedException e) {
+                throw new InterruptedIOException();
+            }
+        }
+
+        return location;
+    }
+    
+	public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
+        // Write the packet our internal buffer.
+        int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
+
+        final Location location = new Location();
+        location.setSize(size);
+        location.setType(type);
+
+        WriteBatch batch;
+        WriteCommand write = new WriteCommand(location, data, onComplete);
+
+        // Locate datafile and enqueue into the executor in sychronized block so
+        // that writes get equeued onto the executor in order that they were assigned
+        // by the data manager (which is basically just appending)
+
+        synchronized (this) {
+            // Find the position where this item will land at.
+            DataFile dataFile = dataManager.allocateLocation(location);
+            inflightWrites.put(new WriteKey(location), write);
+            batch = enqueue(dataFile, write);
+        }
+        location.setLatch(batch.latch);
+
+        return location;
+	}
+
+    private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
+        synchronized (enqueueMutex) {
+            WriteBatch rc = null;
+            if (shutdown) {
+                throw new IOException("Async Writter Thread Shutdown");
+            }
+            if (firstAsyncException != null) {
+                throw firstAsyncException;
+            }
+
+            if (!running) {
+                running = true;
+                thread = new Thread() {
+                    public void run() {
+                        processQueue();
+                    }
+                };
+                thread.setPriority(Thread.MAX_PRIORITY);
+                thread.setDaemon(true);
+                thread.setName("ActiveMQ Data File Writer");
+                thread.start();
+            }
+
+            if (nextWriteBatch == null) {
+                nextWriteBatch = new WriteBatch(dataFile, write);
+                rc = nextWriteBatch;
+                enqueueMutex.notify();
+            } else {
+                // Append to current batch if possible..
+                if (nextWriteBatch.canAppend(dataFile, write)) {
+                    nextWriteBatch.append(write);
+                    rc = nextWriteBatch;
+                } else {
+                    // Otherwise wait for the queuedCommand to be null
+                    try {
+                        while (nextWriteBatch != null) {
+                            enqueueMutex.wait();
+                        }
+                    } catch (InterruptedException e) {
+                        throw new InterruptedIOException();
+                    }
+                    if (shutdown) {
+                        throw new IOException("Async Writter Thread Shutdown");
+                    }
+
+                    // Start a new batch.
+                    nextWriteBatch = new WriteBatch(dataFile, write);
+                    rc = nextWriteBatch;
+                    enqueueMutex.notify();
+                }
+            }
+            return rc;
+        }
+    }
+
+    public void close() throws IOException {
+        synchronized (enqueueMutex) {
+            if (!shutdown) {
+                shutdown = true;
+                if (running) {
+                    enqueueMutex.notifyAll();
+                } else {
+                    shutdownDone.countDown();
+                }
+            }
+        }
+
+        try {
+            shutdownDone.await();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
+
+    }
+
+    /**
+     * The async processing loop that writes to the data files and does the
+     * force calls.
+     * 
+     * Since the file sync() call is the slowest of all the operations, this
+     * algorithm tries to 'batch' or group together several file sync() requests
+     * into a single file sync() call. The batching is accomplished attaching
+     * the same CountDownLatch instance to every force request in a group.
+     * 
+     */
+    protected void processQueue() {
+        DataFile dataFile = null;
+        RandomAccessFile file = null;
+        try {
+
+            DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
+            while (true) {
+
+                Object o = null;
+
+                // Block till we get a command.
+                synchronized (enqueueMutex) {
+                    while (true) {
+                        if (nextWriteBatch != null) {
+                            o = nextWriteBatch;
+                            nextWriteBatch = null;
+                            break;
+                        }
+                        if (shutdown) {
+                            return;
+                        }
+                        enqueueMutex.wait();
+                    }
+                    enqueueMutex.notify();
+                }
+
+                WriteBatch wb = (WriteBatch)o;
+                if (dataFile != wb.dataFile) {
+                    if (file != null) {
+                        dataFile.closeRandomAccessFile(file);
+                    }
+                    dataFile = wb.dataFile;
+                    file = dataFile.openRandomAccessFile(true);
+                }
+
+                WriteCommand write = wb.first;
+
+                // Write all the data.
+                // Only need to seek to first location.. all others
+                // are in sequence.
+                file.seek(write.location.getOffset());
+
+                
+                boolean forceToDisk=false;
+                
+                // 
+                // is it just 1 big write?
+                if (wb.size == write.location.getSize()) {
+                    forceToDisk = write.sync | write.onComplete!=null;
+                    
+                    // Just write it directly..
+                    file.writeInt(write.location.getSize());
+                    file.writeByte(write.location.getType());
+                    file.write(RESERVED_SPACE);
+                    file.write(Journal.ITEM_HEAD_SOR);
+                    file.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                    file.write(Journal.ITEM_HEAD_EOR);
+
+                } else {
+
+                    // Combine the smaller writes into 1 big buffer
+                    while (write != null) {
+                        forceToDisk |= write.sync | write.onComplete!=null;
+
+                        buff.writeInt(write.location.getSize());
+                        buff.writeByte(write.location.getType());
+                        buff.write(RESERVED_SPACE);
+                        buff.write(Journal.ITEM_HEAD_SOR);
+                        buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                        buff.write(Journal.ITEM_HEAD_EOR);
+
+                        write = (WriteCommand)write.getNext();
+                    }
+
+                    // Now do the 1 big write.
+                    ByteSequence sequence = buff.toByteSequence();
+                    file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+                    buff.reset();
+                }
+
+                if( forceToDisk ) {
+                    file.getFD().sync();
+                }
+                
+                WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+                dataManager.setLastAppendLocation(lastWrite.location);
+
+                // Now that the data is on disk, remove the writes from the in
+                // flight
+                // cache.
+                write = wb.first;
+                while (write != null) {
+                    if (!write.sync) {
+                        inflightWrites.remove(new WriteKey(write.location));
+                    }
+                    if( write.onComplete !=null ) {
+                    	 try {
+							write.onComplete.run();
+						} catch (Throwable e) {
+							e.printStackTrace();
+						}
+                    }
+                    write = (WriteCommand)write.getNext();
+                }
+                
+                // Signal any waiting threads that the write is on disk.
+                wb.latch.countDown();
+            }
+        } catch (IOException e) {
+            synchronized (enqueueMutex) {
+                firstAsyncException = e;
+            }
+        } catch (InterruptedException e) {
+        } finally {
+            try {
+                if (file != null) {
+                    dataFile.closeRandomAccessFile(file);
+                }
+            } catch (Throwable ignore) {
+            }
+            shutdownDone.countDown();
+        }
+    }
+
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,753 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
+import org.apache.kahadb.journal.DataFileAppender.WriteKey;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.Scheduler;
+
+
+
+/**
+ * Manages DataFiles
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class Journal {
+
+    public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
+    public static final int ITEM_HEAD_RESERVED_SPACE = 21;
+    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+    public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
+    public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
+    public static final int ITEM_FOOT_SPACE = 3; // EOR
+
+    public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
+
+    public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // 
+    public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // 
+
+    public static final byte DATA_ITEM_TYPE = 1;
+    public static final byte REDO_ITEM_TYPE = 2;
+    public static final String DEFAULT_DIRECTORY = "data";
+    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
+    public static final String DEFAULT_FILE_PREFIX = "data-";
+    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
+    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
+    public static final int PREFERED_DIFF = 1024 * 512;
+
+    private static final Log LOG = LogFactory.getLog(Journal.class);
+
+    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
+
+    protected File directory = new File(DEFAULT_DIRECTORY);
+    protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
+    protected String filePrefix = DEFAULT_FILE_PREFIX;
+    protected ControlFile controlFile;
+    protected boolean started;
+    protected boolean useNio = true;
+
+    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+    protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
+
+    protected DataFileAppender appender;
+    protected DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+
+    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
+    protected DataFile currentWriteFile;
+
+    protected Location mark;
+    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+    protected Runnable cleanupTask;
+    protected final AtomicLong storeSize;
+    protected boolean archiveDataLogs;
+    
+    public Journal(AtomicLong storeSize) {
+        this.storeSize=storeSize;
+    }
+    
+    public Journal() {
+        this(new AtomicLong());
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized void start() throws IOException {
+        if (started) {
+            return;
+        }
+
+        started = true;
+        preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
+        lock();
+
+        ByteSequence sequence = controlFile.load();
+        if (sequence != null && sequence.getLength() > 0) {
+            unmarshallState(sequence);
+        }
+        if (useNio) {
+            appender = new NIODataFileAppender(this);
+        } else {
+            appender = new DataFileAppender(this);
+        }
+
+        File[] files = directory.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String n) {
+                return dir.equals(directory) && n.startsWith(filePrefix);
+            }
+        });
+       
+        if (files != null) {
+            for (int i = 0; i < files.length; i++) {
+                try {
+                    File file = files[i];
+                    String n = file.getName();
+                    String numStr = n.substring(filePrefix.length(), n.length());
+                    int num = Integer.parseInt(numStr);
+                    DataFile dataFile = new DataFile(file, num, preferedFileLength);
+                    fileMap.put(dataFile.getDataFileId(), dataFile);
+                    storeSize.addAndGet(dataFile.getLength());
+                } catch (NumberFormatException e) {
+                    // Ignore file that do not match the pattern.
+                }
+            }
+
+            // Sort the list so that we can link the DataFiles together in the
+            // right order.
+            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+            Collections.sort(l);
+            currentWriteFile = null;
+            for (DataFile df : l) {
+                if (currentWriteFile != null) {
+                    currentWriteFile.linkAfter(df);
+                }
+                currentWriteFile = df;
+                fileByFileMap.put(df.getFile(), df);
+            }
+        }
+
+        // Need to check the current Write File to see if there was a partial
+        // write to it.
+        if (currentWriteFile != null) {
+
+            // See if the lastSyncedLocation is valid..
+            Location l = lastAppendLocation.get();
+            if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+                l = null;
+            }
+
+            // If we know the last location that was ok.. then we can skip lots
+            // of checking
+            try{
+            l = recoveryCheck(currentWriteFile, l);
+            lastAppendLocation.set(l);
+            }catch(IOException e){
+            	LOG.warn("recovery check failed", e);
+            }
+        }
+
+        storeState(false);
+
+        cleanupTask = new Runnable() {
+            public void run() {
+                cleanup();
+            }
+        };
+        Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
+    }
+
+    public void lock() throws IOException {
+        synchronized (this) {
+            if (controlFile == null) {
+                IOHelper.mkdirs(directory);
+                controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
+            }
+            controlFile.lock();
+        }
+    }
+
+    protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
+        if (location == null) {
+            location = new Location();
+            location.setDataFileId(dataFile.getDataFileId());
+            location.setOffset(0);
+        }
+        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+        try {
+            reader.readLocationDetails(location);
+            while (reader.readLocationDetailsAndValidate(location)) {
+                location.setOffset(location.getOffset() + location.getSize());
+            }
+        } finally {
+            accessorPool.closeDataFileAccessor(reader);
+        }
+        dataFile.setLength(location.getOffset());
+        return location;
+    }
+
+    protected void unmarshallState(ByteSequence sequence) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
+        DataInputStream dis = new DataInputStream(bais);
+        if (dis.readBoolean()) {
+            mark = new Location();
+            mark.readExternal(dis);
+        } else {
+            mark = null;
+        }
+        if (dis.readBoolean()) {
+            Location l = new Location();
+            l.readExternal(dis);
+            lastAppendLocation.set(l);
+        } else {
+            lastAppendLocation.set(null);
+        }
+    }
+
+    private synchronized ByteSequence marshallState() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+
+        if (mark != null) {
+            dos.writeBoolean(true);
+            mark.writeExternal(dos);
+        } else {
+            dos.writeBoolean(false);
+        }
+        Location l = lastAppendLocation.get();
+        if (l != null) {
+            dos.writeBoolean(true);
+            l.writeExternal(dos);
+        } else {
+            dos.writeBoolean(false);
+        }
+
+        byte[] bs = baos.toByteArray();
+        return new ByteSequence(bs, 0, bs.length);
+    }
+
+    synchronized DataFile allocateLocation(Location location) throws IOException {
+        if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
+            int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
+
+            String fileName = filePrefix + nextNum;
+            File file = new File(directory, fileName);
+            DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
+            //actually allocate the disk space
+            nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
+            fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+            fileByFileMap.put(file, nextWriteFile);
+            if (currentWriteFile != null) {
+                currentWriteFile.linkAfter(nextWriteFile);
+                if (currentWriteFile.isUnused()) {
+                    removeDataFile(currentWriteFile);
+                }
+            }
+            currentWriteFile = nextWriteFile;
+
+        }
+        location.setOffset(currentWriteFile.getLength());
+        location.setDataFileId(currentWriteFile.getDataFileId().intValue());
+        int size = location.getSize();
+        currentWriteFile.incrementLength(size);
+        currentWriteFile.increment();
+        storeSize.addAndGet(size);
+        return currentWriteFile;
+    }
+    
+    public synchronized void removeLocation(Location location) throws IOException{
+       
+        DataFile dataFile = getDataFile(location);
+        dataFile.decrement();
+    }
+
+    synchronized DataFile getDataFile(Location item) throws IOException {
+        Integer key = Integer.valueOf(item.getDataFileId());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
+        }
+        return dataFile;
+    }
+    
+    synchronized File getFile(Location item) throws IOException {
+        Integer key = Integer.valueOf(item.getDataFileId());
+        DataFile dataFile = fileMap.get(key);
+        if (dataFile == null) {
+            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file " + filePrefix  + item.getDataFileId());
+        }
+        return dataFile.getFile();
+    }
+
+    private DataFile getNextDataFile(DataFile dataFile) {
+        return (DataFile)dataFile.getNext();
+    }
+
+    public synchronized void close() throws IOException {
+        if (!started) {
+            return;
+        }
+        Scheduler.cancel(cleanupTask);
+        accessorPool.close();
+        storeState(false);
+        appender.close();
+        fileMap.clear();
+        fileByFileMap.clear();
+        controlFile.unlock();
+        controlFile.dispose();
+        started = false;
+    }
+
+    synchronized void cleanup() {
+        if (accessorPool != null) {
+            accessorPool.disposeUnused();
+        }
+    }
+
+    public synchronized boolean delete() throws IOException {
+
+        // Close all open file handles...
+        appender.close();
+        accessorPool.close();
+
+        boolean result = true;
+        for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = (DataFile)i.next();
+            storeSize.addAndGet(-dataFile.getLength());
+            result &= dataFile.delete();
+        }
+        fileMap.clear();
+        fileByFileMap.clear();
+        lastAppendLocation.set(null);
+        mark = null;
+        currentWriteFile = null;
+
+        // reopen open file handles...
+        accessorPool = new DataFileAccessorPool(this);
+        if (useNio) {
+            appender = new NIODataFileAppender(this);
+        } else {
+            appender = new DataFileAppender(this);
+        }
+        return result;
+    }
+
+    public synchronized void addInterestInFile(int file) throws IOException {
+        if (file >= 0) {
+            Integer key = Integer.valueOf(file);
+            DataFile dataFile = (DataFile)fileMap.get(key);
+            if (dataFile == null) {
+                throw new IOException("That data file does not exist");
+            }
+            addInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void addInterestInFile(DataFile dataFile) {
+        if (dataFile != null) {
+            dataFile.increment();
+        }
+    }
+
+    public synchronized void removeInterestInFile(int file) throws IOException {
+        if (file >= 0) {
+            Integer key = Integer.valueOf(file);
+            DataFile dataFile = (DataFile)fileMap.get(key);
+            removeInterestInFile(dataFile);
+        }
+       
+    }
+
+    synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
+        if (dataFile != null) {
+            if (dataFile.decrement() <= 0) {
+                removeDataFile(dataFile);
+            }
+        }
+    }
+
+    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
+        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
+        unUsed.removeAll(inUse);
+        unUsed.removeAll(inProgress);
+                
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (Integer key : unUsed) {
+            DataFile dataFile = (DataFile)fileMap.get(key);
+            purgeList.add(dataFile);
+        }
+        for (DataFile dataFile : purgeList) {
+            if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
+                forceRemoveDataFile(dataFile);
+            }
+        }
+    }
+
+    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
+        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
+        unUsed.removeAll(inUse);
+                
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (Integer key : unUsed) {
+        	// Only add files less than the lastFile..
+        	if( key.intValue() < lastFile.intValue() ) {
+                DataFile dataFile = (DataFile)fileMap.get(key);
+                purgeList.add(dataFile);
+        	}
+        }
+        for (DataFile dataFile : purgeList) {
+            forceRemoveDataFile(dataFile);
+        }
+	}
+
+    public synchronized void consolidateDataFiles() throws IOException {
+        List<DataFile> purgeList = new ArrayList<DataFile>();
+        for (DataFile dataFile : fileMap.values()) {
+            if (dataFile.isUnused()) {
+                purgeList.add(dataFile);
+            }
+        }
+        for (DataFile dataFile : purgeList) {
+            removeDataFile(dataFile);
+        }
+    }
+
+    private synchronized void removeDataFile(DataFile dataFile) throws IOException {
+
+        // Make sure we don't delete too much data.
+        if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
+            LOG.debug("Won't remove DataFile" + dataFile);
+        	return;
+        }
+        forceRemoveDataFile(dataFile);
+    }
+    
+    private synchronized void forceRemoveDataFile(DataFile dataFile)
+            throws IOException {
+        accessorPool.disposeDataFileAccessors(dataFile);
+        fileByFileMap.remove(dataFile.getFile());
+        DataFile removed = fileMap.remove(dataFile.getDataFileId());
+        storeSize.addAndGet(-dataFile.getLength());
+        dataFile.unlink();
+        if (archiveDataLogs) {
+            dataFile.move(getDirectoryArchive());
+            LOG.info("moved data file " + dataFile + " to "
+                    + getDirectoryArchive());
+        } else {
+            boolean result = dataFile.delete();
+            LOG.info("discarding data file " + dataFile
+                    + (result ? "successful " : "failed"));
+        }
+    }
+
+    /**
+     * @return the maxFileLength
+     */
+    public int getMaxFileLength() {
+        return maxFileLength;
+    }
+
+    /**
+     * @param maxFileLength the maxFileLength to set
+     */
+    public void setMaxFileLength(int maxFileLength) {
+        this.maxFileLength = maxFileLength;
+    }
+
+    public String toString() {
+        return "DataManager:(" + filePrefix + ")";
+    }
+
+    public synchronized Location getMark() throws IllegalStateException {
+        return mark;
+    }
+
+    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
+
+        Location cur = null;
+        while (true) {
+            if (cur == null) {
+                if (location == null) {
+                    DataFile head = (DataFile)currentWriteFile.getHeadNode();
+                    cur = new Location();
+                    cur.setDataFileId(head.getDataFileId());
+                    cur.setOffset(0);
+                } else {
+                    // Set to the next offset..
+                	if( location.getSize() == -1 ) {
+                		cur = new Location(location);
+                	}  else {
+	            		cur = new Location(location);
+	            		cur.setOffset(location.getOffset()+location.getSize());
+                	}
+                }
+            } else {
+                cur.setOffset(cur.getOffset() + cur.getSize());
+            }
+
+            DataFile dataFile = getDataFile(cur);
+
+            // Did it go into the next file??
+            if (dataFile.getLength() <= cur.getOffset()) {
+                dataFile = getNextDataFile(dataFile);
+                if (dataFile == null) {
+                    return null;
+                } else {
+                    cur.setDataFileId(dataFile.getDataFileId().intValue());
+                    cur.setOffset(0);
+                }
+            }
+
+            // Load in location size and type.
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+                reader.readLocationDetails(cur);
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+
+            if (cur.getType() == 0) {
+                return null;
+            } else if (cur.getType() > 0) {
+                // Only return user records.
+                return cur;
+            }
+        }
+    }
+    
+    public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
+        DataFile df = fileByFileMap.get(file);
+        return getNextLocation(df, lastLocation,thisFileOnly);
+    }
+    
+    public synchronized Location getNextLocation(DataFile dataFile,
+            Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
+
+        Location cur = null;
+        while (true) {
+            if (cur == null) {
+                if (lastLocation == null) {
+                    DataFile head = (DataFile)dataFile.getHeadNode();
+                    cur = new Location();
+                    cur.setDataFileId(head.getDataFileId());
+                    cur.setOffset(0);
+                } else {
+                    // Set to the next offset..
+                    cur = new Location(lastLocation);
+                    cur.setOffset(cur.getOffset() + cur.getSize());
+                }
+            } else {
+                cur.setOffset(cur.getOffset() + cur.getSize());
+            }
+
+            
+            // Did it go into the next file??
+            if (dataFile.getLength() <= cur.getOffset()) {
+                if (thisFileOnly) {
+                    return null;
+                }else {
+                dataFile = getNextDataFile(dataFile);
+                if (dataFile == null) {
+                    return null;
+                } else {
+                    cur.setDataFileId(dataFile.getDataFileId().intValue());
+                    cur.setOffset(0);
+                }
+                }
+            }
+
+            // Load in location size and type.
+            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+            try {
+                reader.readLocationDetails(cur);
+            } finally {
+                accessorPool.closeDataFileAccessor(reader);
+            }
+
+            if (cur.getType() == 0) {
+                return null;
+            } else if (cur.getType() > 0) {
+                // Only return user records.
+                return cur;
+            }
+        }
+    }
+
+    public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
+        DataFile dataFile = getDataFile(location);
+        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+        ByteSequence rc = null;
+        try {
+            rc = reader.readRecord(location);
+        } finally {
+            accessorPool.closeDataFileAccessor(reader);
+        }
+        return rc;
+    }
+
+    public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
+        synchronized (this) {
+            mark = location;
+        }
+        storeState(sync);
+    }
+
+    protected synchronized void storeState(boolean sync) throws IOException {
+        ByteSequence state = marshallState();
+        appender.storeItem(state, Location.MARK_TYPE, sync);
+        controlFile.store(state, sync);
+    }
+
+    public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
+        return loc;
+    }
+    
+    public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
+        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
+        return loc;
+    }
+
+    public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
+        return appender.storeItem(data, type, sync);
+    }
+
+    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
+        DataFile dataFile = getDataFile(location);
+        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
+        try {
+            updater.updateRecord(location, data, sync);
+        } finally {
+            accessorPool.closeDataFileAccessor(updater);
+        }
+    }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setDirectory(File directory) {
+        this.directory = directory;
+    }
+
+    public String getFilePrefix() {
+        return filePrefix;
+    }
+
+    public void setFilePrefix(String filePrefix) {
+        this.filePrefix = filePrefix;
+    }
+
+    public Map<WriteKey, WriteCommand> getInflightWrites() {
+        return inflightWrites;
+    }
+
+    public Location getLastAppendLocation() {
+        return lastAppendLocation.get();
+    }
+
+    public void setLastAppendLocation(Location lastSyncedLocation) {
+        this.lastAppendLocation.set(lastSyncedLocation);
+    }
+
+	public boolean isUseNio() {
+		return useNio;
+	}
+
+	public void setUseNio(boolean useNio) {
+		this.useNio = useNio;
+	}
+	
+	public File getDirectoryArchive() {
+        return directoryArchive;
+    }
+
+    public void setDirectoryArchive(File directoryArchive) {
+        this.directoryArchive = directoryArchive;
+    }
+    
+    public boolean isArchiveDataLogs() {
+        return archiveDataLogs;
+    }
+
+    public void setArchiveDataLogs(boolean archiveDataLogs) {
+        this.archiveDataLogs = archiveDataLogs;
+    }
+
+    synchronized public Integer getCurrentDataFileId() {
+        if( currentWriteFile==null )
+            return null;
+        return currentWriteFile.getDataFileId();
+    }
+    
+    /**
+     * Get a set of files - only valid after start()
+     * @return files currently being used
+     */
+    public Set<File> getFiles(){
+        return fileByFileMap.keySet();
+    }
+
+	synchronized public long getDiskSize() {
+		long rc=0;
+        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
+        while( cur !=null ) {
+        	rc += cur.getLength();
+        	cur = (DataFile) cur.getNext();
+        }
+		return rc;
+	}
+
+	synchronized public long getDiskSizeUntil(Location startPosition) {
+		long rc=0;
+        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
+        while( cur !=null ) {
+        	if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
+        		return rc + startPosition.getOffset();
+        	}
+        	rc += cur.getLength();
+        	cur = (DataFile) cur.getNext();
+        }
+		return rc;
+	}
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Used as a location in the data store.
+ * 
+ * @version $Revision: 1.2 $
+ */
+public final class Location implements Comparable<Location> {
+
+    public static final byte MARK_TYPE = -1;
+    public static final byte USER_TYPE = 1;
+    public static final byte NOT_SET_TYPE = 0;
+    public static final int NOT_SET = -1;
+
+    private int dataFileId = NOT_SET;
+    private int offset = NOT_SET;
+    private int size = NOT_SET;
+    private byte type = NOT_SET_TYPE;
+    private CountDownLatch latch;
+
+    public Location() {
+    }
+
+    Location(Location item) {
+        this.dataFileId = item.dataFileId;
+        this.offset = item.offset;
+        this.size = item.size;
+        this.type = item.type;
+    }
+
+    boolean isValid() {
+        return dataFileId != NOT_SET;
+    }
+
+    /**
+     * @return the size of the data record including the header.
+     */
+    public int getSize() {
+        return size;
+    }
+
+    /**
+     * @param size the size of the data record including the header.
+     */
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+    /**
+     * @return the size of the payload of the record.
+     */
+    public int getPaylodSize() {
+        return size - Journal.ITEM_HEAD_FOOT_SPACE;
+    }
+
+    public int getOffset() {
+        return offset;
+    }
+
+    public void setOffset(int offset) {
+        this.offset = offset;
+    }
+
+    public int getDataFileId() {
+        return dataFileId;
+    }
+
+    public void setDataFileId(int file) {
+        this.dataFileId = file;
+    }
+
+    public byte getType() {
+        return type;
+    }
+
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+    public String toString() {
+        String result = "offset = " + offset + ", file = " + dataFileId + ", size = " + size + ", type = "
+                        + type;
+        return result;
+    }
+
+    public void writeExternal(DataOutput dos) throws IOException {
+        dos.writeInt(dataFileId);
+        dos.writeInt(offset);
+        dos.writeInt(size);
+        dos.writeByte(type);
+    }
+
+    public void readExternal(DataInput dis) throws IOException {
+        dataFileId = dis.readInt();
+        offset = dis.readInt();
+        size = dis.readInt();
+        type = dis.readByte();
+    }
+
+    public CountDownLatch getLatch() {
+        return latch;
+    }
+
+    public void setLatch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+
+    public int compareTo(Location o) {
+        Location l = (Location)o;
+        if (dataFileId == l.dataFileId) {
+            int rc = offset - l.offset;
+            return rc;
+        }
+        return dataFileId - l.dataFileId;
+    }
+
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof Location) {
+            result = compareTo((Location)o) == 0;
+        }
+        return result;
+    }
+
+    public int hashCode() {
+        return dataFileId ^ offset;
+    }
+
+}

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more
+ * efficently copy data to files.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class NIODataFileAppender extends DataFileAppender {
+
+    public NIODataFileAppender(Journal fileManager) {
+        super(fileManager);
+    }
+
+    /**
+     * The async processing loop that writes to the data files and does the
+     * force calls.
+     * 
+     * Since the file sync() call is the slowest of all the operations, this
+     * algorithm tries to 'batch' or group together several file sync() requests
+     * into a single file sync() call. The batching is accomplished attaching
+     * the same CountDownLatch instance to every force request in a group.
+     * 
+     */
+    protected void processQueue() {
+        DataFile dataFile = null;
+        RandomAccessFile file = null;
+        FileChannel channel = null;
+
+        try {
+
+            ByteBuffer header = ByteBuffer.allocateDirect(Journal.ITEM_HEAD_SPACE);
+            ByteBuffer footer = ByteBuffer.allocateDirect(Journal.ITEM_FOOT_SPACE);
+            ByteBuffer buffer = ByteBuffer.allocateDirect(maxWriteBatchSize);
+
+            // Populate the static parts of the headers and footers..
+            header.putInt(0); // size
+            header.put((byte)0); // type
+            header.put(RESERVED_SPACE); // reserved
+            header.put(Journal.ITEM_HEAD_SOR);
+            footer.put(Journal.ITEM_HEAD_EOR);
+
+            while (true) {
+
+                Object o = null;
+
+                // Block till we get a command.
+                synchronized (enqueueMutex) {
+                    while (true) {
+                        if (nextWriteBatch != null) {
+                            o = nextWriteBatch;
+                            nextWriteBatch = null;
+                            break;
+                        }
+                        if (shutdown) {
+                            return;
+                        }
+                        enqueueMutex.wait();
+                    }
+                    enqueueMutex.notify();
+                }
+
+                WriteBatch wb = (WriteBatch)o;
+                if (dataFile != wb.dataFile) {
+                    if (file != null) {
+                        dataFile.closeRandomAccessFile(file);
+                    }
+                    dataFile = wb.dataFile;
+                    file = dataFile.openRandomAccessFile(true);
+                    channel = file.getChannel();
+                }
+
+                WriteCommand write = wb.first;
+
+                // Write all the data.
+                // Only need to seek to first location.. all others
+                // are in sequence.
+                file.seek(write.location.getOffset());
+
+                
+                boolean forceToDisk=false;
+                
+                // 
+                // is it just 1 big write?
+                if (wb.size == write.location.getSize()) {
+                    forceToDisk = write.sync | write.onComplete!=null;
+                    
+                    header.clear();
+                    header.putInt(write.location.getSize());
+                    header.put(write.location.getType());
+                    header.clear();
+                    transfer(header, channel);
+                    ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+                                                        write.data.getLength());
+                    transfer(source, channel);
+                    footer.clear();
+                    transfer(footer, channel);
+
+                } else {
+
+                    // Combine the smaller writes into 1 big buffer
+                    while (write != null) {
+                        forceToDisk |= write.sync | write.onComplete!=null;
+                        
+                        header.clear();
+                        header.putInt(write.location.getSize());
+                        header.put(write.location.getType());
+                        header.clear();
+                        copy(header, buffer);
+                        assert !header.hasRemaining();
+
+                        ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
+                                                            write.data.getLength());
+                        copy(source, buffer);
+                        assert !source.hasRemaining();
+
+                        footer.clear();
+                        copy(footer, buffer);
+                        assert !footer.hasRemaining();
+
+                        write = (WriteCommand)write.getNext();
+                    }
+
+                    // Fully write out the buffer..
+                    buffer.flip();
+                    transfer(buffer, channel);
+                    buffer.clear();
+                }
+
+                if( forceToDisk ) {
+                    file.getChannel().force(false);
+                }
+
+                WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+                dataManager.setLastAppendLocation(lastWrite.location);
+
+                // Now that the data is on disk, remove the writes from the in
+                // flight
+                // cache.
+                write = wb.first;
+                while (write != null) {
+                    if (!write.sync) {
+                        inflightWrites.remove(new WriteKey(write.location));
+                    }
+                    if (write.onComplete != null) {
+						try {
+							write.onComplete.run();
+						} catch (Throwable e) {
+							e.printStackTrace();
+						}
+					}
+                    write = (WriteCommand)write.getNext();
+                }
+                
+                // Signal any waiting threads that the write is on disk.
+                wb.latch.countDown();
+            }
+
+        } catch (IOException e) {
+            synchronized (enqueueMutex) {
+                firstAsyncException = e;
+            }
+        } catch (InterruptedException e) {
+        } finally {
+            try {
+                if (file != null) {
+                    dataFile.closeRandomAccessFile(file);
+                }
+            } catch (IOException e) {
+            }
+            shutdownDone.countDown();
+        }
+    }
+
+    /**
+     * Copy the bytes in header to the channel.
+     * 
+     * @param header - source of data
+     * @param channel - destination where the data will be written.
+     * @throws IOException
+     */
+    private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
+        while (header.hasRemaining()) {
+            channel.write(header);
+        }
+    }
+
+    private int copy(ByteBuffer src, ByteBuffer dest) {
+        int rc = Math.min(dest.remaining(), src.remaining());
+        if (rc > 0) {
+            // Adjust our limit so that we don't overflow the dest buffer.
+            int limit = src.limit();
+            src.limit(src.position() + rc);
+            dest.put(src);
+            // restore the limit.
+            src.limit(limit);
+        }
+        return rc;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Allows you to open a data file in read only mode.  Useful when working with 
+ * archived data files.
+ */
+public class ReadOnlyDataFile extends DataFile {
+
+    ReadOnlyDataFile(File file, int number, int preferedSize) {
+        super(file, number, preferedSize);
+    }
+    
+    
+    public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+        RandomAccessFile rc = new RandomAccessFile(file, "r");
+        // When we start to write files size them up so that the OS has a chance
+        // to allocate the file contigously.
+        if (appender) {
+            if (length < preferedSize) {
+                rc.setLength(preferedSize);
+            }
+        }
+        return rc;
+    }
+
+    public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+        file.close();
+    }
+
+    public synchronized boolean delete() throws IOException {
+        throw new RuntimeException("Not valid on a read only file.");
+    }
+    
+    public synchronized void move(File targetDirectory) throws IOException{
+        throw new RuntimeException("Not valid on a read only file.");
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.Scheduler;
+
+/**
+ * An AsyncDataManager that works in read only mode against multiple data directories.
+ * Useful for reading back archived data files.
+ */
+public class ReadOnlyJournal extends Journal {
+    
+    private static final Log LOG = LogFactory.getLog(ReadOnlyJournal.class);
+    private final ArrayList<File> dirs;
+
+    public ReadOnlyJournal(final ArrayList<File> dirs) {
+        this.dirs = dirs;
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized void start() throws IOException {
+        if (started) {
+            return;
+        }
+
+        started = true;
+                
+        ArrayList<File> files = new ArrayList<File>();
+        for (File directory : dirs) {
+            final File d = directory;
+            File[] f = d.listFiles(new FilenameFilter() {
+                public boolean accept(File dir, String n) {
+                    return dir.equals(d) && n.startsWith(filePrefix);
+                }
+            });
+            for (int i = 0; i < f.length; i++) {
+                files.add(f[i]);
+            }
+        }
+       
+        for (File file : files) {
+            try {
+                String n = file.getName();
+                String numStr = n.substring(filePrefix.length(), n.length());
+                int num = Integer.parseInt(numStr);
+                DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
+                fileMap.put(dataFile.getDataFileId(), dataFile);
+                storeSize.addAndGet(dataFile.getLength());
+            } catch (NumberFormatException e) {
+                // Ignore file that do not match the pattern.
+            }
+        }
+
+        // Sort the list so that we can link the DataFiles together in the
+        // right order.
+        List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
+        Collections.sort(dataFiles);
+        currentWriteFile = null;
+        for (DataFile df : dataFiles) {
+            if (currentWriteFile != null) {
+                currentWriteFile.linkAfter(df);
+            }
+            currentWriteFile = df;
+            fileByFileMap.put(df.getFile(), df);
+        }
+        
+        // Need to check the current Write File to see if there was a partial
+        // write to it.
+        if (currentWriteFile != null) {
+
+            // See if the lastSyncedLocation is valid..
+            Location l = lastAppendLocation.get();
+            if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+                l = null;
+            }
+        }
+    }
+    
+    public synchronized void close() throws IOException {
+        if (!started) {
+            return;
+        }
+        accessorPool.close();
+        fileMap.clear();
+        fileByFileMap.clear();
+        started = false;
+    }
+
+    
+    public Location getFirstLocation() throws IllegalStateException, IOException {
+        if( currentWriteFile == null ) {
+            return null;
+        }
+        
+        DataFile first = (DataFile)currentWriteFile.getHeadNode();
+        Location cur = new Location();
+        cur.setDataFileId(first.getDataFileId());
+        cur.setOffset(0);
+        cur.setSize(0);
+        return getNextLocation(cur);
+    }
+    
+    @Override
+    public synchronized boolean delete() throws IOException {
+        throw new RuntimeException("Cannot delete a ReadOnlyAsyncDataManager");
+    }    
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html Thu Sep  4 15:46:42 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+ journal based data storage - scalable and fast
+
+</body>
+</html>

Propchange: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692288&r1=692287&r2=692288&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java Thu Sep  4 15:46:42 2008
@@ -52,7 +52,7 @@
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.kahadb.impl.async.Location;
+import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.store.MessageDatabase.StoredDestination;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692288&r1=692287&r2=692288&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Thu Sep  4 15:46:42 2008
@@ -33,9 +33,9 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.Marshaller;
 import org.apache.kahadb.StringMarshaller;
-import org.apache.kahadb.impl.async.AsyncDataManager;
-import org.apache.kahadb.impl.async.Location;
-import org.apache.kahadb.page.BTreeIndex;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Journal;
+import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
@@ -98,7 +98,7 @@
 
     
     protected PageFile pageFile;
-    protected  AsyncDataManager asyncDataManager;
+    protected  Journal asyncDataManager;
     protected  Metadata metadata = new Metadata();
 
     protected  MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
@@ -577,8 +577,8 @@
         return pf;
     }
 
-    private AsyncDataManager createAsyncDataManager() {
-        AsyncDataManager manager = new AsyncDataManager();
+    private Journal createAsyncDataManager() {
+        Journal manager = new Journal();
         manager.setDirectory(new File(directory, "journal"));
         manager.setMaxFileLength(1024 * 1024 * 20);
         manager.setUseNio(false);

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.kahadb.LongMarshaller;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.page.Transaction;
+
+public class BTreeIndexBenchMark extends IndexBenchmark {
+
+    private NumberFormat nf;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        nf = NumberFormat.getIntegerInstance();
+        nf.setMinimumIntegerDigits(10);
+        nf.setGroupingUsed(false);
+    }
+    
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+
+        Transaction tx = pf.tx();
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<String, Long> index = new BTreeIndex<String, Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+    
+    @Override
+    protected void dumpIndex(Index<String, Long> index) throws IOException {
+        Transaction tx = pf.tx();
+        ((BTreeIndex)index).printStructure(tx, System.out);
+    }
+
+    /**
+     * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+     * always insert to the end of the BTree.  
+     */
+    @Override
+    protected String key(long i) {
+        return "a-long-message-id-like-key:"+nf.format(i);
+    }
+
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.PrintWriter;
+import java.text.NumberFormat;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.kahadb.LongMarshaller;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.index.Index;
+
+public class BTreeIndexTest extends IndexTestSupport {
+
+    private NumberFormat nf;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        nf = NumberFormat.getIntegerInstance();
+        nf.setMinimumIntegerDigits(6);
+        nf.setGroupingUsed(false);
+    }
+    
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+        
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<String, Long> index = new BTreeIndex<String,Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+
+    /**
+     * Yeah, the current implementation does NOT try to balance the tree.  Here is 
+     * a test case showing that it gets out of balance.  
+     * 
+     * @throws Exception
+     */
+    public void disabled_testTreeBalancing() throws Exception {
+        createPageFileAndIndex(100);
+
+        BTreeIndex index = ((BTreeIndex)this.index);
+        this.index.load();
+        
+        doInsert(50);
+        
+        int minLeafDepth = index.getMinLeafDepth(tx);
+        int maxLeafDepth = index.getMaxLeafDepth(tx);
+        assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+        // Remove some of the data
+        doRemove(16);
+        minLeafDepth = index.getMinLeafDepth(tx);
+        maxLeafDepth = index.getMaxLeafDepth(tx);
+
+        System.out.println( "min:"+minLeafDepth );
+        System.out.println( "max:"+maxLeafDepth );
+        index.printStructure(tx, new PrintWriter(System.out));
+
+        assertTrue("Tree is balanced", maxLeafDepth-minLeafDepth <= 1);
+
+        this.index.unload();
+    }
+    
+    public void testPruning() throws Exception {
+        createPageFileAndIndex(100);
+
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+
+        this.index.load();
+     
+        int minLeafDepth = index.getMinLeafDepth(tx);
+        int maxLeafDepth = index.getMaxLeafDepth(tx);
+        assertEquals(1, minLeafDepth);
+        assertEquals(1, maxLeafDepth);
+        
+        doInsert(1000);
+        
+        minLeafDepth = index.getMinLeafDepth(tx);
+        maxLeafDepth = index.getMaxLeafDepth(tx);
+        assertTrue("Depth of tree grew", minLeafDepth > 1);
+        assertTrue("Depth of tree grew", maxLeafDepth > 1);
+
+        // Remove the data.
+        doRemove(1000);
+        minLeafDepth = index.getMinLeafDepth(tx);
+        maxLeafDepth = index.getMaxLeafDepth(tx);
+
+        assertEquals(1, minLeafDepth);
+        assertEquals(1, maxLeafDepth);
+
+        this.index.unload();
+    }
+
+    public void testIteration() throws Exception {
+        createPageFileAndIndex(100);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+        this.index.load();
+          
+        // Insert in reverse order..
+        doInsertReverse(1000);
+        
+        this.index.unload();
+        this.index.load();
+
+        // BTree should iterate it in sorted order.
+        int counter=0;
+        for (Iterator<Map.Entry<String,Long>> i = index.iterator(tx); i.hasNext();) {
+            Map.Entry<String,Long> entry = (Map.Entry<String,Long>)i.next();
+            assertEquals(key(counter),entry.getKey());
+            assertEquals(counter,(long)entry.getValue());
+            counter++;
+        }
+
+        this.index.unload();
+    }
+    
+    void doInsertReverse(int count) throws Exception {
+        for (int i = count-1; i >= 0; i--) {
+            index.put(tx, key(i), (long)i);
+            tx.commit();
+        }
+    }
+    /**
+     * Overriding so that this generates keys that are the worst case for the BTree. Keys that
+     * always insert to the end of the BTree.  
+     */
+    @Override
+    protected String key(int i) {
+        return "key:"+nf.format(i);
+    }
+}