You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:24:44 UTC

svn commit: r961062 [4/14] - in /activemq/sandbox/activemq-apollo-actor: ./ activemq-all/ activemq-all/src/test/ide-resources/ activemq-all/src/test/java/org/apache/activemq/jaxb/ activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...

Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Wed Jul  7 03:24:02 2010
@@ -27,36 +27,50 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.Store.DuplicateKeyException;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
-import org.apache.activemq.queue.QueueDescriptor;
-import org.apache.activemq.util.marshaller.LongMarshaller;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
-import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.page.Page;
-import org.apache.kahadb.page.Transaction;
+import org.fusesource.hawtdb.api.BTreeIndexFactory;
+import org.fusesource.hawtdb.api.SortedIndex;
+import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.util.marshaller.*;
 
 public class DestinationEntity {
 
+    private static final BTreeIndexFactory<Long, QueueRecord> queueIndexFactory = new BTreeIndexFactory<Long, QueueRecord>();
+    private static final BTreeIndexFactory<Long, Long> trackingIndexFactory = new BTreeIndexFactory<Long, Long>();
+    private static final BTreeIndexFactory<Long, Long> statsIndexFactory = new BTreeIndexFactory<Long, Long>();
+
+    static {
+        queueIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+        queueIndexFactory.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
+        queueIndexFactory.setDeferredEncoding(true);
+
+        trackingIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+        trackingIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+        trackingIndexFactory.setDeferredEncoding(true);
+
+        statsIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+        statsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+        statsIndexFactory.setDeferredEncoding(true);
+    }
+
     public final static Marshaller<DestinationEntity> MARSHALLER = new VariableMarshaller<DestinationEntity>() {
 
         public DestinationEntity readPayload(DataInput dataIn) throws IOException {
             DestinationEntity value = new DestinationEntity();
-            value.queueIndex = new BTreeIndex<Long, QueueRecord>(dataIn.readLong());
-            value.trackingIndex = new BTreeIndex<Long, Long>(dataIn.readLong());
+            value.queueIndex = dataIn.readInt();
+            value.trackingIndex =  dataIn.readInt();
             value.descriptor = Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.readPayload(dataIn);
-            value.metaData = new Page<DestinationMetaData>(dataIn.readLong());
             return value;
         }
 
         public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
-            dataOut.writeLong(value.queueIndex.getPageId());
-            dataOut.writeLong(value.trackingIndex.getPageId());
+            dataOut.writeInt(value.queueIndex);
+            dataOut.writeInt(value.trackingIndex);
             Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.writePayload(value.descriptor, dataOut);
-            dataOut.writeLong(value.metaData.getPageId());
         }
 
         public int estimatedSize(DestinationEntity object) {
@@ -65,30 +79,13 @@ public class DestinationEntity {
 
     };
 
-    public final static Marshaller<DestinationMetaData> META_DATA_MARSHALLER = new VariableMarshaller<DestinationMetaData>() {
-        public DestinationMetaData readPayload(DataInput dataIn) throws IOException {
-            DestinationMetaData value = new DestinationMetaData();
-            value.count = dataIn.readInt();
-            value.size = dataIn.readLong();
-            return value;
-        }
-
-        public void writePayload(DestinationMetaData value, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(value.count);
-            dataOut.writeLong(value.size);
-        }
-
-        public int estimatedSize(DestinationMetaData object) {
-            throw new UnsupportedOperationException();
-        }
-    };
-
     public Class<DestinationEntity> getType() {
         return DestinationEntity.class;
     }
 
-    private BTreeIndex<Long, QueueRecord> queueIndex;
-    private BTreeIndex<Long, Long> trackingIndex;
+    private int queueIndex;
+    private int trackingIndex;
+    private int statsIndex;
 
     // Descriptor for this queue:
     private QueueDescriptor descriptor;
@@ -96,78 +93,77 @@ public class DestinationEntity {
     // Child Partitions:
     private HashSet<DestinationEntity> partitions;
 
-    // Holds volatile queue meta data
-    private Page<DestinationMetaData> metaData;
-
     // /////////////////////////////////////////////////////////////////
     // Lifecycle Methods.
     // /////////////////////////////////////////////////////////////////
     public void allocate(Transaction tx) throws IOException {
-        queueIndex = new BTreeIndex<Long, QueueRecord>(tx.allocate());
-        trackingIndex = new BTreeIndex<Long, Long>(tx.allocate());
-        metaData = tx.allocate();
-        metaData.set(new DestinationMetaData());
-        tx.store(metaData, META_DATA_MARSHALLER, true);
-    }
+        queueIndex = tx.alloc();
+        queueIndexFactory.create(tx, queueIndex);
 
-    public void deallocate(Transaction tx) throws IOException {
-        queueIndex.clear(tx);
-        trackingIndex.clear(tx);
-        tx.free(trackingIndex.getPageId());
-        tx.free(queueIndex.getPageId());
-        tx.free(metaData.getPageId());
-        queueIndex = null;
-        trackingIndex = null;
-        metaData = null;
-    }
+        trackingIndex = tx.alloc();
+        trackingIndexFactory.create(tx, trackingIndex);
 
-    public void load(Transaction tx) throws IOException {
-        if (queueIndex.getPageFile() == null) {
+        statsIndex = tx.alloc();
+        statsIndexFactory.create(tx, statsIndex);
+        setStats(tx, 0,0);
+    }
 
-            queueIndex.setPageFile(tx.getPageFile());
-            queueIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-            queueIndex.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
-            queueIndex.load(tx);
-        }
+    public void deallocate(Transaction tx) throws IOException {
+        queueIndex(tx).clear();
+        tx.free(trackingIndex);
 
-        if (trackingIndex.getPageFile() == null) {
+        trackingIndex(tx).clear();
+        tx.free(queueIndex);
 
-            trackingIndex.setPageFile(tx.getPageFile());
-            trackingIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-            trackingIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-            trackingIndex.load(tx);
-        }
+        statsIndex(tx).clear();
+        tx.free(statsIndex);
+    }
 
-        tx.load(metaData, META_DATA_MARSHALLER);
+    private SortedIndex<Long, QueueRecord> queueIndex(Transaction tx) {
+        return queueIndexFactory.open(tx, queueIndex);
+    }
+    private SortedIndex<Long, Long> trackingIndex(Transaction tx) {
+        return trackingIndexFactory.open(tx, trackingIndex);
+    }
+    private SortedIndex<Long, Long> statsIndex(Transaction tx) {
+        return statsIndexFactory.open(tx, statsIndex);
     }
 
     private static final boolean unlimited(Number val) {
         return val == null || val.longValue() < 0;
     }
 
-    private DestinationMetaData getMetaData(Transaction tx) throws IOException {
-        tx.load(metaData, META_DATA_MARSHALLER);
-        return metaData.get();
-    }
-
     // /////////////////////////////////////////////////////////////////
     // Message Methods.
     // /////////////////////////////////////////////////////////////////
-    
+
+    public static final Long SIZE_STAT = 0L;
+    public static final Long COUNT_STAT = 0L;
+
     public long getSize(Transaction tx) throws IOException {
-        return getMetaData(tx).size;
+        return statsIndex(tx).get(SIZE_STAT);
     }
 
     public int getCount(Transaction tx) throws IOException {
-        return getMetaData(tx).count;
+        return (int)(long)statsIndex(tx).get(COUNT_STAT);
     }
 
     public long getFirstSequence(Transaction tx) throws IOException {
-        return getMetaData(tx).count == 0 ? 0 : queueIndex.getFirst(tx).getValue().getQueueKey();
+        Entry<Long, QueueRecord> entry = queueIndex(tx).getFirst();
+        if( entry!=null ) {
+            return entry.getValue().getQueueKey();
+        } else {
+            return 0;
+        }
     }
 
     public long getLastSequence(Transaction tx) throws IOException {
-        return getMetaData(tx).count == 0 ? 0 : queueIndex.getLast(tx).getValue().getQueueKey();
+        Entry<Long, QueueRecord> entry = queueIndex(tx).getLast();
+        if( entry!=null ) {
+            return entry.getValue().getQueueKey();
+        } else {
+            return 0;
+        }
     }
 
     public void setQueueDescriptor(QueueDescriptor queue) {
@@ -207,7 +203,7 @@ public class DestinationEntity {
 
     public void add(Transaction tx, QueueAddMessage command) throws IOException, DuplicateKeyException {
 
-        Long existing = trackingIndex.put(tx, command.getMessageKey(), command.getQueueKey());
+        Long existing = trackingIndex(tx).put(command.getMessageKey(), command.getQueueKey());
         if (existing == null) {
             QueueRecord value = new QueueRecord();
             value.setAttachment(command.getAttachment());
@@ -215,7 +211,7 @@ public class DestinationEntity {
             value.setQueueKey(command.getQueueKey());
             value.setSize(command.getMessageSize());
 
-            QueueRecord rc = queueIndex.put(tx, value.getQueueKey(), value);
+            QueueRecord rc = queueIndex(tx).put(value.getQueueKey(), value);
             if (rc == null) {
                 // TODO It seems a little inefficient to continually serialize
                 // the queue size. It might be better to update this only at
@@ -225,8 +221,7 @@ public class DestinationEntity {
                 // It is also possible that we might want to remove this update
                 // altogether in favor of scanning the whole queue at recovery
                 // time (at the cost of startup time)
-                getMetaData(tx).update(1, command.getMessageSize());
-                tx.store(metaData, META_DATA_MARSHALLER, true);
+                addStats(tx, 1, command.getMessageSize());
             } else {
                 throw new Store.FatalStoreException(new Store.DuplicateKeyException("Duplicate sequence number " + command.getQueueKey() + " for " + descriptor.getQueueName()));
             }
@@ -235,6 +230,18 @@ public class DestinationEntity {
         }
     }
 
+    private void addStats(Transaction tx, int count, int size) {
+        SortedIndex<Long, Long> index = statsIndex(tx);
+        index.put(COUNT_STAT, index.get(COUNT_STAT)+count);
+        index.put(SIZE_STAT, index.get(SIZE_STAT)+size);
+    }
+    
+    private void setStats(Transaction tx, int count, int size) {
+        SortedIndex<Long, Long> index = statsIndex(tx);
+        index.put(COUNT_STAT, new Long(count));
+        index.put(SIZE_STAT, new Long(size));
+    }
+
     /**
      * Removes a queue record returning the corresponding element tracking number.
      * @param tx The transaction under which to do the removal
@@ -243,12 +250,11 @@ public class DestinationEntity {
      * @throws IOException
      */
     public long remove(Transaction tx, long queueKey) throws IOException {
-        QueueRecord qr = queueIndex.remove(tx, queueKey);
+        QueueRecord qr = queueIndex(tx).remove(queueKey);
         if(qr != null)
         {
-            trackingIndex.remove(tx, qr.getMessageKey());
-            getMetaData(tx).update(-1, -qr.getSize());
-            tx.store(metaData, META_DATA_MARSHALLER, true);
+            trackingIndex(tx).remove(qr.getMessageKey());
+            addStats(tx, -1, -qr.getSize());
             return qr.getMessageKey();
         }
         return -1;
@@ -264,10 +270,10 @@ public class DestinationEntity {
         
         Iterator<Entry<Long, QueueRecord>> iterator;
         if (unlimited(firstQueueKey)) {
-            iterator = queueIndex.iterator(tx);
+            iterator = queueIndex(tx).iterator();
 
         } else {
-            iterator = queueIndex.iterator(tx, firstQueueKey);
+            iterator = queueIndex(tx).iterator(firstQueueKey);
         }
         boolean sequenceLimited = !unlimited(maxQueueKey);
         boolean countLimited = !unlimited(max);
@@ -286,21 +292,8 @@ public class DestinationEntity {
     }
 
     public Iterator<Entry<Long, Long>> listTrackingNums(Transaction tx) throws IOException {
-        return trackingIndex.iterator(tx);
+        return trackingIndex(tx).iterator();
     }
 
-    public static class DestinationMetaData {
-        int count;
-        long size;
 
-        public void update(int count, long size) {
-            this.count += count;
-            this.size += size;
-        }
-
-        public void set(int count, long size) {
-            this.count = count;
-            this.size = size;
-        }
-    }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Wed Jul  7 03:24:02 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.kahadb.Data.MapAdd;
 import org.apache.activemq.broker.store.kahadb.Data.MapEntryPut;
@@ -57,7 +58,6 @@ import org.apache.activemq.broker.store.
 import org.apache.activemq.protobuf.InvalidProtocolBufferException;
 import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.activemq.protobuf.PBMessage;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.util.LockFile;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
@@ -65,11 +65,11 @@ import org.apache.activemq.util.buffer.D
 import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.journal.Journal;
-import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.page.Page;
-import org.apache.kahadb.page.PageFile;
-import org.apache.kahadb.page.Transaction;
+import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.api.TxPageFile;
+import org.fusesource.hawtdb.api.TxPageFileFactory;
+import org.fusesource.hawtdb.internal.journal.Journal;
+import org.fusesource.hawtdb.internal.journal.Location;
 
 public class KahaDBStore implements Store {
 
@@ -81,15 +81,16 @@ public class KahaDBStore implements Stor
     private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
-    private static final Buffer BEGIN_UNIT_OF_WORK_DATA = new Buffer(new byte[] { BEGIN_UNIT_OF_WORK });
-    private static final Buffer END_UNIT_OF_WORK_DATA = new Buffer(new byte[] { END_UNIT_OF_WORK });
-    private static final Buffer CANCEL_UNIT_OF_WORK_DATA = new Buffer(new byte[] { CANCEL_UNIT_OF_WORK });
-    private static final Buffer FLUSH_DATA = new Buffer(new byte[] { FLUSH });
+    private static final org.fusesource.hawtdb.util.buffer.Buffer BEGIN_UNIT_OF_WORK_DATA = new org.fusesource.hawtdb.util.buffer.Buffer(new byte[] { BEGIN_UNIT_OF_WORK });
+    private static final org.fusesource.hawtdb.util.buffer.Buffer END_UNIT_OF_WORK_DATA = new org.fusesource.hawtdb.util.buffer.Buffer(new byte[] { END_UNIT_OF_WORK });
+    private static final org.fusesource.hawtdb.util.buffer.Buffer CANCEL_UNIT_OF_WORK_DATA = new org.fusesource.hawtdb.util.buffer.Buffer(new byte[] { CANCEL_UNIT_OF_WORK });
+    private static final org.fusesource.hawtdb.util.buffer.Buffer FLUSH_DATA = new org.fusesource.hawtdb.util.buffer.Buffer(new byte[] { FLUSH });
 
     public static final int CLOSED_STATE = 1;
     public static final int OPEN_STATE = 2;
 
-    protected PageFile pageFile;
+    protected TxPageFileFactory pageFileFactory = new TxPageFileFactory();
+    protected TxPageFile pageFile;
     protected Journal journal;
 
     protected RootEntity rootEntity = new RootEntity();
@@ -98,12 +99,9 @@ public class KahaDBStore implements Stor
     protected boolean deleteAllMessages;
     protected File directory;
     protected Thread checkpointThread;
-    protected boolean enableJournalDiskSyncs = true;
+
     long checkpointInterval = 5 * 1000;
     long cleanupInterval = 30 * 1000;
-    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
-    boolean enableIndexWriteAsync = false;
-    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
 
     protected AtomicBoolean started = new AtomicBoolean();
     protected AtomicBoolean opened = new AtomicBoolean();
@@ -115,6 +113,7 @@ public class KahaDBStore implements Stor
     protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
     private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
     private boolean recovering;
+    private int journalMaxFileLength = 1024*1024*20;
 
     private static class UoWOperation {
         public TypeCreatable bean;
@@ -156,16 +155,11 @@ public class KahaDBStore implements Stor
     private void loadPageFile() throws IOException {
         indexLock.writeLock().lock();
         try {
-            final PageFile pageFile = getPageFile();
-            pageFile.load();
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            final TxPageFile pageFile = getPageFile();
+            execute(new Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    if (pageFile.getPageCount() == 0) {
+                    if ( !tx.allocator().isAllocated(0) ) {
                         rootEntity.allocate(tx);
-                    } else {
-                        Page<RootEntity> page = tx.load(0, RootEntity.MARSHALLER);
-                        rootEntity = page.get();
-                        rootEntity.setPageId(0);
                     }
                     rootEntity.load(tx);
                 }
@@ -177,6 +171,24 @@ public class KahaDBStore implements Stor
         }
     }
 
+    interface Closure<T extends Exception> {
+        public void execute(Transaction tx) throws T;
+    }
+
+    private <T extends Exception> void execute(Closure<T> closure) throws T {
+        Transaction tx = pageFile.tx();
+        boolean committed=false;
+        try {
+            closure.execute(tx);
+            tx.commit();
+            committed=true;
+        } finally {
+            if( !committed ) {
+                tx.rollback();
+            }
+        }
+    }
+
     /**
      * @throws IOException
      */
@@ -210,7 +222,7 @@ public class KahaDBStore implements Stor
                 journal.delete();
                 journal.close();
                 journal = null;
-                getPageFile().delete();
+                pageFileFactory.getFile().delete();
                 rootEntity = new RootEntity();
                 LOG.info("Persistence store purged.");
                 deleteAllMessages = false;
@@ -270,7 +282,8 @@ public class KahaDBStore implements Stor
 
             indexLock.writeLock().lock();
             try {
-                pageFile.unload();
+                pageFileFactory.close();
+                pageFile = null;
                 rootEntity = new RootEntity();
                 journal.close();
             } finally {
@@ -284,11 +297,11 @@ public class KahaDBStore implements Stor
     }
 
     public void unload() throws IOException, InterruptedException {
-        if (pageFile.isLoaded()) {
+        if (pageFile !=null) {
             indexLock.writeLock().lock();
             try {
                 rootEntity.setState(CLOSED_STATE);
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                execute(new Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         // Set the last update to the next update (otherwise
                         // we'll replay the last update
@@ -315,7 +328,6 @@ public class KahaDBStore implements Stor
      * 
      * @throws IOException
      * @throws IOException
-     * @throws InvalidLocationException
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
@@ -330,7 +342,7 @@ public class KahaDBStore implements Stor
                 int uowCounter = 0;
                 while (recoveryPosition != null) {
 
-                    Buffer data = journal.read(recoveryPosition);
+                    Buffer data = convert(journal.read(recoveryPosition));
                     if (data.length == 1 && data.data[0] == BEGIN_UNIT_OF_WORK) {
                         uow = pageFile.tx();
                     } else if (data.length == 1 && data.data[0] == END_UNIT_OF_WORK) {
@@ -352,7 +364,7 @@ public class KahaDBStore implements Stor
                             updateIndex(uow, message.toType(), (MessageBuffer) message, location);
                             uowCounter++;
                         } else {
-                            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                            execute(new Closure<IOException>() {
                                 public void execute(Transaction tx) throws IOException {
                                     updateIndex(tx, message.toType(), (MessageBuffer) message, location);
                                     rootEntity.setLastUpdate(location);
@@ -369,7 +381,7 @@ public class KahaDBStore implements Stor
             }
 
             // We may have to undo some index updates.
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            execute(new Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     recoverIndex(tx);
                 }
@@ -380,6 +392,10 @@ public class KahaDBStore implements Stor
         }
     }
 
+    private Buffer convert(org.fusesource.hawtdb.util.buffer.Buffer buffer) {
+        return new Buffer(buffer.data, buffer.offset, buffer.length);
+    }
+
     public void incrementalRecover() throws IOException {
         indexLock.writeLock().lock();
         try {
@@ -397,7 +413,7 @@ public class KahaDBStore implements Stor
                 final TypeCreatable message = load(lastRecoveryPosition);
                 final Location location = lastRecoveryPosition;
 
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                execute(new Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         updateIndex(tx, message.toType(), (MessageBuffer) message, location);
                     }
@@ -449,7 +465,7 @@ public class KahaDBStore implements Stor
                 if (!opened.get()) {
                     return;
                 }
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                execute(new Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         checkpointUpdate(tx, cleanup);
                     }
@@ -473,7 +489,7 @@ public class KahaDBStore implements Stor
     public void checkpoint(org.apache.activemq.util.Callback closure) throws Exception {
         indexLock.writeLock().lock();
         try {
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+            execute(new Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     checkpointUpdate(tx, false);
                 }
@@ -565,12 +581,12 @@ public class KahaDBStore implements Stor
             long start = System.currentTimeMillis();
             final Location location;
             synchronized (journal) {
-                location = journal.write(os.toBuffer(), onFlush);
+                location = journal.write(convert(os.toBuffer()), onFlush);
             }
             long start2 = System.currentTimeMillis();
 
             if (tx == null) {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                execute(new Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         updateIndex(tx, data.toType(), message, location);
                     }
@@ -592,6 +608,10 @@ public class KahaDBStore implements Stor
 
     }
 
+    private org.fusesource.hawtdb.util.buffer.Buffer convert(Buffer buffer) {
+        return new org.fusesource.hawtdb.util.buffer.Buffer(buffer.data, buffer.offset, buffer.length);
+    }
+
     /**
      * Loads a previously stored PBMessage
      * 
@@ -600,7 +620,7 @@ public class KahaDBStore implements Stor
      * @throws IOException
      */
     private TypeCreatable load(Location location) throws IOException {
-        Buffer data = journal.read(location);
+        Buffer data = convert(journal.read(location));
         return load(location, data);
     }
 
@@ -636,10 +656,10 @@ public class KahaDBStore implements Stor
             queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
             break;
         case SUBSCRIPTION_ADD:
-            rootEntity.addSubscription(tx, (SubscriptionAdd) command);
+            rootEntity.addSubscription((SubscriptionAdd) command);
             break;
         case SUBSCRIPTION_REMOVE:
-            rootEntity.removeSubscription(tx, ((SubscriptionRemove) command).getName());
+            rootEntity.removeSubscription(((SubscriptionRemove) command).getName());
             break;
         case TRANSACTION_BEGIN:
         case TRANSACTION_ADD_MESSAGE:
@@ -977,7 +997,7 @@ public class KahaDBStore implements Stor
             storeAtomic();
             SubscriptionRecord old;
             try {
-                old = rootEntity.getSubscription(tx, record.getName());
+                old = rootEntity.getSubscription(record.getName());
                 if (old != null && !old.equals(record)) {
                     throw new DuplicateKeyException("Subscription already exists: " + record.getName());
                 } else {
@@ -1175,30 +1195,23 @@ public class KahaDBStore implements Stor
     // IoC Properties.
     // /////////////////////////////////////////////////////////////////
 
-    protected PageFile createPageFile() {
-        PageFile index = new PageFile(directory, "db");
-        index.setEnableWriteThread(isEnableIndexWriteAsync());
-        index.setWriteBatchSize(getIndexWriteBatchSize());
-        return index;
-    }
-
-    protected Journal createJournal() {
-        Journal manager = new Journal();
-        manager.setDirectory(directory);
-        manager.setMaxFileLength(getJournalMaxFileLength());
-        return manager;
-    }
-
-    private PageFile getPageFile() {
+    private TxPageFile getPageFile() {
         if (pageFile == null) {
-            pageFile = createPageFile();
+            pageFileFactory.setFile(new File(directory, "db"));
+            pageFileFactory.setDrainOnClose(false);
+            pageFileFactory.setSync(true);
+            pageFileFactory.setUseWorkerThread(true);
+            pageFileFactory.open();
+            pageFile = pageFileFactory.getTxPageFile();
         }
         return pageFile;
     }
 
     private Journal getJournal() {
         if (journal == null) {
-            journal = createJournal();
+            journal = new Journal();
+            journal.setDirectory(directory);
+            journal.setMaxFileLength(getJournalMaxFileLength());
         }
         return journal;
     }
@@ -1223,30 +1236,6 @@ public class KahaDBStore implements Stor
         this.deleteAllMessages = deleteAllMessages;
     }
 
-    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
-        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
-    }
-
-    public int getIndexWriteBatchSize() {
-        return setIndexWriteBatchSize;
-    }
-
-    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
-        this.enableIndexWriteAsync = enableIndexWriteAsync;
-    }
-
-    boolean isEnableIndexWriteAsync() {
-        return enableIndexWriteAsync;
-    }
-
-    public boolean isEnableJournalDiskSyncs() {
-        return enableJournalDiskSyncs;
-    }
-
-    public void setEnableJournalDiskSyncs(boolean syncWrites) {
-        this.enableJournalDiskSyncs = syncWrites;
-    }
-
     public long getCheckpointInterval() {
         return checkpointInterval;
     }
@@ -1263,20 +1252,39 @@ public class KahaDBStore implements Stor
         this.cleanupInterval = cleanupInterval;
     }
 
-    public void setJournalMaxFileLength(int journalMaxFileLength) {
-        this.journalMaxFileLength = journalMaxFileLength;
+    public boolean isFailIfDatabaseIsLocked() {
+        return failIfDatabaseIsLocked;
+    }
+
+    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
     }
 
     public int getJournalMaxFileLength() {
         return journalMaxFileLength;
     }
+    public void setJournalMaxFileLength(int journalMaxFileLength) {
+        this.journalMaxFileLength = journalMaxFileLength;
+    }
 
-    public boolean isFailIfDatabaseIsLocked() {
-        return failIfDatabaseIsLocked;
+    public int getIndexMaxPages() {
+        return pageFileFactory.getMaxPages();
+    }
+    public void setIndexMaxPages(int maxPages) {
+        pageFileFactory.setMaxPages(maxPages);
     }
 
-    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
-        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+    public short getIndexPageSize() {
+        return pageFileFactory.getPageSize();
+    }
+    public void setIndexPageSize(short pageSize) {
+        pageFileFactory.setPageSize(pageSize);
     }
 
+    public int getIndexMappingSegementSize() {
+        return pageFileFactory.getMappingSegementSize();
+    }
+    public void setIndexMappingSegementSize(int mappingSegementSize) {
+        pageFileFactory.setMappingSegementSize(mappingSegementSize);
+    }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Wed Jul  7 03:24:02 2010
@@ -20,13 +20,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store.QueueRecord;
-import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
-import org.apache.kahadb.journal.Location;
+import org.fusesource.hawtdb.util.marshaller.Marshaller;
+import org.fusesource.hawtdb.util.marshaller.VariableMarshaller;
 
 public class Marshallers {
 
@@ -71,74 +70,6 @@ public class Marshallers {
         }
     };
 
-    public final static Marshaller<Location> LOCATION_MARSHALLER = new Marshaller<Location>() {
-
-        public Location readPayload(DataInput dataIn) throws IOException {
-            Location rc = new Location();
-            rc.setDataFileId(dataIn.readInt());
-            rc.setOffset(dataIn.readInt());
-            return rc;
-        }
-
-        public void writePayload(Location object, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(object.getDataFileId());
-            dataOut.writeInt(object.getOffset());
-        }
-
-        public boolean isDeepCopySupported() {
-            return true;
-        }
-
-        public Location deepCopy(Location source) {
-            return new Location(source);
-        }
-
-        public int getFixedSize() {
-            return 8;
-        }
-
-        public int estimatedSize(Location object) {
-            throw new UnsupportedOperationException();
-        }
-
-    };
-
-    public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new VariableMarshaller<AsciiBuffer>() {
-
-        public AsciiBuffer readPayload(DataInput dataIn) throws IOException {
-            byte data[] = new byte[dataIn.readShort()];
-            dataIn.readFully(data);
-            return new AsciiBuffer(data);
-        }
-
-        public void writePayload(AsciiBuffer object, DataOutput dataOut) throws IOException {
-            dataOut.writeShort(object.length);
-            dataOut.write(object.data, object.offset, object.length);
-        }
-
-        public int estimatedSize(AsciiBuffer object) {
-            throw new UnsupportedOperationException();
-        }
-    };
-
-    public final static Marshaller<Buffer> BUFFER_MARSHALLER = new VariableMarshaller<Buffer>() {
-
-        public Buffer readPayload(DataInput dataIn) throws IOException {
-            byte data[] = new byte[dataIn.readShort()];
-            dataIn.readFully(data);
-            return new Buffer(data);
-        }
-
-        public void writePayload(Buffer object, DataOutput dataOut) throws IOException {
-            dataOut.writeShort(object.length);
-            dataOut.write(object.data, object.offset, object.length);
-        }
-
-        public int estimatedSize(Buffer object) {
-            throw new UnsupportedOperationException();
-        }
-    };
-
     public final static Marshaller<QueueDescriptor> QUEUE_DESCRIPTOR_MARSHALLER = new VariableMarshaller<QueueDescriptor>() {
 
         public QueueDescriptor readPayload(DataInput dataIn) throws IOException {
@@ -169,4 +100,52 @@ public class Marshallers {
             throw new UnsupportedOperationException();
         }
     };
+
+
+
+    static abstract public class AbstractBufferMarshaller<T extends Buffer> extends org.fusesource.hawtdb.util.marshaller.VariableMarshaller<T> {
+
+        public void writePayload(T value, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(value.length);
+            dataOut.write(value.data, value.offset, value.length);
+        }
+
+        public T readPayload(DataInput dataIn) throws IOException {
+            int size = dataIn.readInt();
+            byte[] data = new byte[size];
+            dataIn.readFully(data);
+            return createBuffer(data);
+        }
+
+        abstract protected T createBuffer(byte [] data);
+
+        public T deepCopy(T source) {
+            return createBuffer(source.deepCopy().data);
+        }
+
+        public boolean isDeepCopySupported() {
+            return true;
+        }
+
+        public int estimatedSize(T object) {
+            return object.length+4;
+        }
+
+    }
+
+    public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new AbstractBufferMarshaller<AsciiBuffer>() {
+        @Override
+        protected AsciiBuffer createBuffer(byte[] data) {
+            return new AsciiBuffer(data);
+        }
+
+    };
+
+    public final static Marshaller<Buffer> BUFFER_MARSHALLER = new AbstractBufferMarshaller<Buffer>() {
+        @Override
+        protected Buffer createBuffer(byte[] data) {
+            return new Buffer(data);
+        }
+    };
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java Wed Jul  7 03:24:02 2010
@@ -23,7 +23,8 @@ import java.io.IOException;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.marshaller.Marshaller;
 import org.apache.activemq.util.marshaller.VariableMarshaller;
-import org.apache.kahadb.journal.Location;
+import org.fusesource.hawtdb.internal.journal.Location;
+import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
 
 public class MessageKeys {
 
@@ -42,14 +43,14 @@ public class MessageKeys {
     
     public static final Marshaller<MessageKeys> MARSHALLER = new VariableMarshaller<MessageKeys>() {
         public MessageKeys readPayload(DataInput dataIn) throws IOException {
-            Location location = Marshallers.LOCATION_MARSHALLER.readPayload(dataIn);
+            Location location = LocationMarshaller.INSTANCE.readPayload(dataIn);
             byte data[] = new byte[dataIn.readShort()];
             dataIn.readFully(data);
             return new MessageKeys(new AsciiBuffer(data), location);
         }
 
         public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
-            Marshallers.LOCATION_MARSHALLER.writePayload(object.location, dataOut);
+            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
             dataOut.writeShort(object.messageId.length);
             dataOut.write(object.messageId.data, object.messageId.offset, object.messageId.length);
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Wed Jul  7 03:24:02 2010
@@ -16,18 +16,11 @@
  */
 package org.apache.activemq.broker.store.kahadb;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.io.*;
+import java.util.*;
 import java.util.Map.Entry;
 
+import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.Store.KeyNotFoundException;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
@@ -36,97 +29,137 @@ import org.apache.activemq.broker.store.
 import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd;
 import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd.SubscriptionAddBuffer;
 import org.apache.activemq.protobuf.InvalidProtocolBufferException;
-import org.apache.activemq.queue.QueueDescriptor;
-import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.util.marshaller.IntegerMarshaller;
-import org.apache.activemq.util.marshaller.LongMarshaller;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
-import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.index.BTreeVisitor;
-import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.page.Page;
-import org.apache.kahadb.page.Transaction;
+import org.fusesource.hawtdb.api.*;
+import org.fusesource.hawtdb.internal.journal.Location;
+import org.apache.activemq.util.buffer.*;
+import org.fusesource.hawtdb.util.marshaller.LongMarshaller;
+import org.fusesource.hawtdb.util.marshaller.IntegerMarshaller;
+import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
 
 public class RootEntity {
 
-    //TODO remove this one performance testing is complete. 
+    //TODO remove this once performance testing is complete. 
     private static final boolean USE_LOC_INDEX = true;
-
     private static final int VERSION = 0;
 
-    public final static Marshaller<RootEntity> MARSHALLER = new VariableMarshaller<RootEntity>() {
-        public RootEntity readPayload(DataInput is) throws IOException {
-            RootEntity rc = new RootEntity();
-            rc.state = is.readInt();
-            is.readInt(); //VERSION 
-            rc.maxMessageKey = is.readLong();
-            rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
+
+    private static final BTreeIndexFactory<Long, Location> messageKeyIndexFactory = new BTreeIndexFactory<Long, Location>();
+    private static final BTreeIndexFactory<Integer, Long> locationIndexFactory = new BTreeIndexFactory<Integer, Long>();
+    private static final BTreeIndexFactory<Long, Long> messageRefsIndexFactory = new BTreeIndexFactory<Long, Long>();
+    private static final BTreeIndexFactory<AsciiBuffer, DestinationEntity> destinationIndexFactory = new BTreeIndexFactory<AsciiBuffer, DestinationEntity>();
+    private static final BTreeIndexFactory<AsciiBuffer, Buffer> subscriptionIndexFactory = new BTreeIndexFactory<AsciiBuffer, Buffer>();
+    private static final BTreeIndexFactory<AsciiBuffer, Integer> mapIndexFactory = new BTreeIndexFactory<AsciiBuffer, Integer>();
+    private static final BTreeIndexFactory<AsciiBuffer, Buffer> mapInstanceIndexFactory = new BTreeIndexFactory<AsciiBuffer, Buffer>();
+
+    static {
+        messageKeyIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+        messageKeyIndexFactory.setValueMarshaller(LocationMarshaller.INSTANCE);
+        messageKeyIndexFactory.setDeferredEncoding(true);
+
+        locationIndexFactory.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+        locationIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+        locationIndexFactory.setDeferredEncoding(true);
+
+        messageRefsIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+        messageRefsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+        messageRefsIndexFactory.setDeferredEncoding(true);
+
+        destinationIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        destinationIndexFactory.setValueMarshaller(DestinationEntity.MARSHALLER);
+        destinationIndexFactory.setDeferredEncoding(true);
+
+        subscriptionIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        subscriptionIndexFactory.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+        subscriptionIndexFactory.setDeferredEncoding(true);
+
+        mapIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        mapIndexFactory.setValueMarshaller(IntegerMarshaller.INSTANCE);
+        mapIndexFactory.setDeferredEncoding(true);
+    }
+
+    // The root page the this object's state is stored on.
+    // private Page<StoredDBState> page;
+
+    // State information about the index
+    Data data;
+    private long maxMessageKey;
+
+    static class Data {
+        private int state;
+        // Message Indexes
+        private long maxMessageKey;
+        private Location lastUpdate;
+        
+        private SortedIndex<Long, Location> messageKeyIndex;
+        private SortedIndex<Integer, Long> locationIndex;
+        private SortedIndex<Long, Long> messageRefsIndex; // Maps message key to ref
+        // count:
+
+        // The destinations
+        private SortedIndex<AsciiBuffer, DestinationEntity> destinationIndex;
+
+        // Subscriptions
+        private SortedIndex<AsciiBuffer, Buffer> subscriptionIndex;
+
+        // Maps:
+        private SortedIndex<AsciiBuffer, Integer> mapIndex;
+
+        public void create(Transaction tx) {
+            state = KahaDBStore.CLOSED_STATE;
+            messageKeyIndex = messageKeyIndexFactory.create(tx, tx.alloc());
             if (USE_LOC_INDEX)
-                rc.locationIndex = new BTreeIndex<Integer, Long>(is.readLong());
-            rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
-            rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
-            rc.subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(is.readLong());
-            rc.mapIndex = new BTreeIndex<AsciiBuffer, Long>(is.readLong());
-            if (is.readBoolean()) {
-                rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
-            } else {
-                rc.lastUpdate = null;
-            }
-            return rc;
+                locationIndex = locationIndexFactory.create(tx, tx.alloc());
+            destinationIndex = destinationIndexFactory.create(tx, tx.alloc());
+            messageRefsIndex = messageRefsIndexFactory.create(tx, tx.alloc());
+            subscriptionIndex = subscriptionIndexFactory.create(tx, tx.alloc());
+            mapIndex = mapIndexFactory.create(tx, tx.alloc());
+
         }
+    }
 
-        public void writePayload(RootEntity object, DataOutput os) throws IOException {
+    EncoderDecoder<Data>  DATA_ENCODER_DECODER = new AbstractStreamEncoderDecoder<Data>() {
+        @Override
+        protected void encode(Paged paged, DataOutputStream os, Data object) throws IOException {
             os.writeInt(object.state);
             os.writeInt(VERSION);
             os.writeLong(object.maxMessageKey);
-            os.writeLong(object.messageKeyIndex.getPageId());
+            os.writeInt(object.messageKeyIndex.getPage());
             if (USE_LOC_INDEX)
-                os.writeLong(object.locationIndex.getPageId());
-            os.writeLong(object.destinationIndex.getPageId());
-            os.writeLong(object.messageRefsIndex.getPageId());
-            os.writeLong(object.subscriptionIndex.getPageId());
-            os.writeLong(object.mapIndex.getPageId());
+                os.writeInt(object.locationIndex.getPage());
+            os.writeInt(object.destinationIndex.getPage());
+            os.writeInt(object.messageRefsIndex.getPage());
+            os.writeInt(object.subscriptionIndex.getPage());
+            os.writeInt(object.mapIndex.getPage());
             if (object.lastUpdate != null) {
                 os.writeBoolean(true);
-                Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
+                LocationMarshaller.INSTANCE.writePayload(object.lastUpdate, os);
             } else {
                 os.writeBoolean(false);
             }
         }
 
-        public int estimatedSize(RootEntity object) {
-            throw new UnsupportedOperationException();
+        @Override
+        protected RootEntity.Data decode(Paged paged, DataInputStream is) throws IOException {
+            Data rc = new Data();
+            rc.state = is.readInt();
+            is.readInt(); //VERSION
+            rc.maxMessageKey = is.readLong();
+            rc.messageKeyIndex = messageKeyIndexFactory.open(paged, is.readInt());
+            if (USE_LOC_INDEX)
+                rc.locationIndex = locationIndexFactory.open(paged, is.readInt());
+            rc.destinationIndex = destinationIndexFactory.open(paged, is.readInt());
+            rc.messageRefsIndex = messageRefsIndexFactory.open(paged, is.readInt());
+            rc.subscriptionIndex = subscriptionIndexFactory.open(paged, is.readInt());
+            rc.mapIndex = mapIndexFactory.open(paged, is.readInt());
+            if (is.readBoolean()) {
+                rc.lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+            } else {
+                rc.lastUpdate = null;
+            }
+            return rc;
         }
     };
 
-    // The root page the this object's state is stored on.
-    // private Page<StoredDBState> page;
-
-    // State information about the index
-    private long pageId;
-    private int state;
-    private Location lastUpdate;
-    private boolean loaded;
-
-    // Message Indexes
-    private long maxMessageKey;
-    private BTreeIndex<Long, Location> messageKeyIndex;
-    private BTreeIndex<Integer, Long> locationIndex;
-    private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref
-    // count:
-
-    // The destinations
-    private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
-    private final TreeMap<AsciiBuffer, DestinationEntity> destinations = new TreeMap<AsciiBuffer, DestinationEntity>();
-
-    // Subscriptions
-    private BTreeIndex<AsciiBuffer, Buffer> subscriptionIndex;
-
-    // Maps:
-    private BTreeIndex<AsciiBuffer, Long> mapIndex;
-    private TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer, Buffer>> mapCache = new TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer,Buffer>>();
 
     // /////////////////////////////////////////////////////////////////
     // Lifecycle Methods.
@@ -134,71 +167,25 @@ public class RootEntity {
 
     public void allocate(Transaction tx) throws IOException {
         // First time this is created.. Initialize a new pagefile.
-        Page<RootEntity> page = tx.allocate();
-        pageId = page.getPageId();
+        int pageId = tx.alloc();
         assert pageId == 0;
-
-        state = KahaDBStore.CLOSED_STATE;
-
-        messageKeyIndex = new BTreeIndex<Long, Location>(tx.getPageFile(), tx.allocate().getPageId());
-        if (USE_LOC_INDEX)
-            locationIndex = new BTreeIndex<Integer, Long>(tx.getPageFile(), tx.allocate().getPageId());
-        destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
-        messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
-        subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), tx.allocate().getPageId());
-        mapIndex = new BTreeIndex<AsciiBuffer, Long>(tx.getPageFile(), tx.allocate().getPageId());
-
-        page.set(this);
-        tx.store(page, MARSHALLER, true);
+        data = new Data();
+        data.create(tx);
+        tx.put(DATA_ENCODER_DECODER, pageId, data);
     }
 
     public void load(Transaction tx) throws IOException {
-        messageKeyIndex.setPageFile(tx.getPageFile());
-        messageKeyIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-        messageKeyIndex.setValueMarshaller(Marshallers.LOCATION_MARSHALLER);
-        messageKeyIndex.load(tx);
+        data = tx.get(DATA_ENCODER_DECODER, 0);
+
         // Update max message key:
-        Entry<Long, Location> last = messageKeyIndex.getLast(tx);
+        maxMessageKey = data.maxMessageKey;
+        Entry<Long, Location> last = data.messageKeyIndex.getLast();
         if (last != null) {
             if (last.getKey() > maxMessageKey) {
                 maxMessageKey = last.getKey();
             }
         }
 
-        if (USE_LOC_INDEX) {
-            locationIndex.setPageFile(tx.getPageFile());
-            locationIndex.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-            locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-            locationIndex.load(tx);
-        }
-
-        subscriptionIndex.setPageFile(tx.getPageFile());
-        subscriptionIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
-        subscriptionIndex.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
-        subscriptionIndex.load(tx);
-
-        destinationIndex.setPageFile(tx.getPageFile());
-        destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
-        destinationIndex.setValueMarshaller(DestinationEntity.MARSHALLER);
-        destinationIndex.load(tx);
-
-        messageRefsIndex.setPageFile(tx.getPageFile());
-        messageRefsIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-        messageRefsIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-        messageRefsIndex.load(tx);
-
-        // Keep the StoredDestinations loaded
-        destinations.clear();
-        for (Iterator<Entry<AsciiBuffer, DestinationEntity>> iterator = destinationIndex.iterator(tx); iterator.hasNext();) {
-            Entry<AsciiBuffer, DestinationEntity> entry = iterator.next();
-            entry.getValue().load(tx);
-            try {
-                addToDestinationCache(entry.getValue());
-            } catch (KeyNotFoundException e) {
-                //
-            }
-        }
-
         // Build up the queue partition hierarchy:
         try {
             constructQueueHierarchy();
@@ -207,61 +194,6 @@ public class RootEntity {
             ioe.initCause(e);
             throw ioe;
         }
-
-        //Load Maps:
-        mapIndex.setPageFile(tx.getPageFile());
-        mapIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
-        mapIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-        mapIndex.load(tx);
-
-        //Load all of the maps and cache them:
-        for (Iterator<Entry<AsciiBuffer, Long>> iterator = mapIndex.iterator(tx); iterator.hasNext();) {
-            Entry<AsciiBuffer, Long> entry = iterator.next();
-            BTreeIndex<AsciiBuffer, Buffer> map = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), entry.getValue());
-            map.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
-            map.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
-            map.load(tx);
-            mapCache.put(entry.getKey(), map);
-        }
-
-    }
-
-    /**
-     * Adds the destination to the destination cache
-     * 
-     * @param entity
-     *            The destination to cache.
-     * @throws KeyNotFoundException
-     *             If the parent queue could not be found.
-     */
-    private void addToDestinationCache(DestinationEntity entity) throws KeyNotFoundException {
-        QueueDescriptor queue = entity.getDescriptor();
-
-        // If loaded add a reference to us from the parent:
-        if (loaded) {
-            if (queue.getParent() != null) {
-                DestinationEntity parent = destinations.get(queue.getParent());
-                if (parent == null) {
-                    throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
-                }
-                parent.addPartition(entity);
-            }
-        }
-
-        destinations.put(queue.getQueueName(), entity);
-    }
-
-    private void removeFromDestinationCache(DestinationEntity entity) {
-        QueueDescriptor queue = entity.getDescriptor();
-
-        // If the queue is loaded remove the parent reference:
-        if (loaded) {
-            if (queue.getParent() != null) {
-                DestinationEntity parent = destinations.get(queue.getParent());
-                parent.removePartition(entity);
-            }
-        }
-        destinations.remove(queue.getQueueName());
     }
 
     /**
@@ -270,10 +202,11 @@ public class RootEntity {
      * @throws KeyNotFoundException
      */
     private void constructQueueHierarchy() throws KeyNotFoundException {
-        for (DestinationEntity destination : destinations.values()) {
+        for (Entry<AsciiBuffer, DestinationEntity> entry : data.destinationIndex) {
+            DestinationEntity destination = entry.getValue();
             QueueDescriptor queue = destination.getDescriptor();
             if (queue.getParent() != null) {
-                DestinationEntity parent = destinations.get(queue.getParent());
+                DestinationEntity parent = data.destinationIndex.get(queue.getParent());
                 if (parent == null) {
                     throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
                 } else {
@@ -283,10 +216,10 @@ public class RootEntity {
         }
     }
 
+    @Deprecated // TODO: keep data immutable
     public void store(Transaction tx) throws IOException {
-        Page<RootEntity> page = tx.load(pageId, null);
-        page.set(this);
-        tx.store(page, RootEntity.MARSHALLER, true);
+        // TODO: need ot make Data immutable..
+        tx.put(DATA_ENCODER_DECODER, 0, data);
     }
 
     // /////////////////////////////////////////////////////////////////
@@ -301,55 +234,51 @@ public class RootEntity {
         if (id > maxMessageKey) {
             maxMessageKey = id;
         }
-        Location previous = messageKeyIndex.put(tx, id, location);
+        Location previous = data.messageKeyIndex.put(id, location);
         if (previous != null) {
             // Message existed.. undo the index update we just did. Chances
             // are it's a transaction replay.
-            messageKeyIndex.put(tx, id, previous);
+            data.messageKeyIndex.put(id, previous);
         } else {
             if (USE_LOC_INDEX) {
-                Long refs = locationIndex.get(tx, location.getDataFileId());
+                Long refs = data.locationIndex.get(location.getDataFileId());
                 if (refs == null) {
-                    locationIndex.put(tx, location.getDataFileId(), new Long(1));
+                    data.locationIndex.put(location.getDataFileId(), new Long(1));
                 } else {
-                    locationIndex.put(tx, location.getDataFileId(), new Long(refs.longValue() + 1));
+                    data.locationIndex.put(location.getDataFileId(), new Long(refs.longValue() + 1));
                 }
             }
         }
     }
 
-    public void messageRemove(Transaction tx, Long messageKey) throws IOException {
+    public void messageRemove(Long messageKey) {
         // Location location = messageKeyIndex.remove(tx, messageKey);
-        Location location = messageKeyIndex.remove(tx, messageKey);
+        Location location = data.messageKeyIndex.remove(messageKey);
         if (USE_LOC_INDEX && location != null) {
-            Long refs = locationIndex.get(tx, location.getDataFileId());
+            Long refs = data.locationIndex.get(location.getDataFileId());
             if (refs != null) {
                 if (refs.longValue() <= 1) {
-                    locationIndex.remove(tx, location.getDataFileId());
+                    data.locationIndex.remove(location.getDataFileId());
                 } else {
-                    locationIndex.put(tx, location.getDataFileId(), new Long(refs.longValue() - 1));
+                    data.locationIndex.put(location.getDataFileId(), new Long(refs.longValue() - 1));
                 }
             }
         }
     }
 
     public Location messageGetLocation(Transaction tx, Long messageKey) {
-        try {
-            return messageKeyIndex.get(tx, messageKey);
-        } catch (IOException e) {
-            throw new Store.FatalStoreException(e);
-        }
+        return data.messageKeyIndex.get(messageKey);
     }
 
     public void addMessageRef(Transaction tx, AsciiBuffer queueName, Long messageKey) {
         try {
-            Long refs = messageRefsIndex.get(tx, messageKey);
+            Long refs = data.messageRefsIndex.get(messageKey);
             if (refs == null) {
-                messageRefsIndex.put(tx, messageKey, new Long(1));
+                data.messageRefsIndex.put(messageKey, new Long(1));
             } else {
-                messageRefsIndex.put(tx, messageKey, new Long(1 + refs.longValue()));
+                data.messageRefsIndex.put(messageKey, new Long(1 + refs.longValue()));
             }
-        } catch (IOException e) {
+        } catch (RuntimeException e) {
             throw new Store.FatalStoreException(e);
         }
 
@@ -357,17 +286,17 @@ public class RootEntity {
 
     public void removeMessageRef(Transaction tx, AsciiBuffer queueName, Long messageKey) {
         try {
-            Long refs = messageRefsIndex.get(tx, messageKey);
+            Long refs = data.messageRefsIndex.get(messageKey);
             if (refs != null) {
                 if (refs.longValue() <= 1) {
-                    messageRefsIndex.remove(tx, messageKey);
+                    data.messageRefsIndex.remove(messageKey);
                     // If this is the last record remove, the message
-                    messageRemove(tx, messageKey);
+                    messageRemove(messageKey);
                 } else {
-                    messageRefsIndex.put(tx, messageKey, new Long(refs.longValue() - 1));
+                    data.messageRefsIndex.put(messageKey, new Long(refs.longValue() - 1));
                 }
             }
-        } catch (IOException e) {
+        } catch (RuntimeException e) {
             throw new Store.FatalStoreException(e);
         }
     }
@@ -388,7 +317,7 @@ public class RootEntity {
 
         final LinkedList<SubscriptionRecord> rc = new LinkedList<SubscriptionRecord>();
 
-        subscriptionIndex.visit(tx, new BTreeVisitor<AsciiBuffer, Buffer>() {
+        data.subscriptionIndex.visit(new IndexVisitor<AsciiBuffer, Buffer>() {
             public boolean isInterestedInKeysBetween(AsciiBuffer first, AsciiBuffer second) {
                 return true;
             }
@@ -412,21 +341,18 @@ public class RootEntity {
     }
 
     /**
-     * @param tx
      * @param name
      * @throws IOException
      */
-    public void removeSubscription(Transaction tx, AsciiBuffer name) throws IOException {
-        subscriptionIndex.remove(tx, name);
+    public void removeSubscription(AsciiBuffer name) throws IOException {
+        data.subscriptionIndex.remove(name);
     }
 
     /**
-     * @param tx
-     * @param name
      * @throws IOException
      */
-    public void addSubscription(Transaction tx, SubscriptionAdd subscription) throws IOException {
-        subscriptionIndex.put(tx, subscription.getName(), subscription.freeze().toFramedBuffer());
+    public void addSubscription(SubscriptionAdd subscription) throws IOException {
+        data.subscriptionIndex.put(subscription.getName(), subscription.freeze().toFramedBuffer());
     }
 
     /**
@@ -434,8 +360,8 @@ public class RootEntity {
      * @return
      * @throws IOException
      */
-    public SubscriptionRecord getSubscription(Transaction tx, AsciiBuffer name) throws IOException {
-        return toSubscriptionRecord(subscriptionIndex.get(tx, name));
+    public SubscriptionRecord getSubscription(AsciiBuffer name) throws IOException {
+        return toSubscriptionRecord(data.subscriptionIndex.get(name));
     }
 
     /**
@@ -475,22 +401,16 @@ public class RootEntity {
     // Queue Methods.
     // /////////////////////////////////////////////////////////////////
     public void queueAdd(Transaction tx, QueueDescriptor queue) throws IOException {
-        if (destinationIndex.get(tx, queue.getQueueName()) == null) {
+        if (data.destinationIndex.get(queue.getQueueName()) == null) {
             DestinationEntity rc = new DestinationEntity();
             rc.setQueueDescriptor(queue);
             rc.allocate(tx);
-            destinationIndex.put(tx, queue.getQueueName(), rc);
-            rc.load(tx);
-            try {
-                addToDestinationCache(rc);
-            } catch (KeyNotFoundException e) {
-                throw new Store.FatalStoreException("Inconsistent QueueStore: " + e.getMessage(), e);
-            }
+            data.destinationIndex.put(queue.getQueueName(), rc);
         }
     }
 
     public void queueRemove(Transaction tx, QueueDescriptor queue) throws IOException {
-        DestinationEntity destination = destinations.get(queue.getQueueName());
+        DestinationEntity destination = data.destinationIndex.get(queue.getQueueName());
         if (destination != null) {
             // Remove the message references.
             // TODO this should probably be optimized.
@@ -499,21 +419,23 @@ public class RootEntity {
                 Long messageKey = messages.next().getKey();
                 removeMessageRef(tx, queue.getQueueName(), messageKey);
             }
-            destinationIndex.remove(tx, queue.getQueueName());
-            removeFromDestinationCache(destination);
+            data.destinationIndex.remove(queue.getQueueName());
             destination.deallocate(tx);
         }
     }
 
     public DestinationEntity getDestination(QueueDescriptor queue) {
-        return destinations.get(queue.getQueueName());
+        return data.destinationIndex.get(queue.getQueueName());
     }
 
     public Iterator<QueueQueryResult> queueList(Transaction tx, short type, QueueDescriptor firstQueue, int max) throws IOException {
         LinkedList<QueueQueryResult> results = new LinkedList<QueueQueryResult>();
-        Collection<DestinationEntity> values = (firstQueue == null ? destinations.values() : destinations.tailMap(firstQueue.getQueueName()).values());
 
-        for (DestinationEntity de : values) {
+        final Iterator<Entry<AsciiBuffer, DestinationEntity>> i;
+        i = data.destinationIndex.iterator(firstQueue==null? null : firstQueue.getQueueName());
+        while (i.hasNext()) {
+            Entry<AsciiBuffer, DestinationEntity> entry = i.next();
+            DestinationEntity de = entry.getValue();
             if (results.size() >= max) {
                 break;
             }
@@ -537,7 +459,7 @@ public class RootEntity {
         if (partitions != null && partitions.hasNext()) {
             result.partitions = new LinkedList<QueueQueryResult>();
             while (partitions.hasNext()) {
-                result.partitions.add(queryQueue(tx, destinations.get(partitions.next().getDescriptor().getQueueName())));
+                result.partitions.add(queryQueue(tx, getDestination(partitions.next().getDescriptor()) ));
             }
         }
 
@@ -548,84 +470,82 @@ public class RootEntity {
     // Map Methods.
     // /////////////////////////////////////////////////////////////////
     public final void mapAdd(AsciiBuffer key, Transaction tx) throws IOException {
-        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(key);
-
-        if (map == null) {
-            long pageId = tx.allocate().getPageId();
-            map = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), pageId);
-            map.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
-            map.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
-            map.load(tx);
-            mapIndex.put(tx, key, pageId);
-            mapCache.put(key, map);
+        final Integer page = data.mapIndex.get(key);
+        if (page == null) {
+            int pageId = tx.alloc();
+            SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.create(tx, pageId);
+            data.mapIndex.put(key, pageId);
         }
     }
 
     public final void mapRemove(AsciiBuffer key, Transaction tx) throws IOException {
-        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.remove(key);
-        if (map != null) {
-            map.clear(tx);
-            map.unload(tx);
-            mapIndex.remove(tx, key);
+        final Integer pageId = data.mapIndex.remove(key);
+        if (pageId != null) {
+            SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
+            map.clear();
+            tx.free(pageId);
         }
     }
 
     public final void mapAddEntry(AsciiBuffer name, AsciiBuffer key, Buffer value, Transaction tx) throws IOException {
-        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
-        if (map == null) {
-            mapAdd(name, tx);
-            map = mapCache.get(name);
+        Integer pageId = data.mapIndex.get(name);
+        if (pageId == null) {
+            pageId = tx.alloc();
+            SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.create(tx, pageId);
+            data.mapIndex.put(key, pageId);
         }
-
-        map.put(tx, key, value);
-
+        SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
+        map.put(key, value);
     }
 
     public final void mapRemoveEntry(AsciiBuffer name, AsciiBuffer key, Transaction tx) throws IOException, KeyNotFoundException {
-        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
-        if (map == null) {
+        Integer pageId = data.mapIndex.get(name);
+        if (pageId == null) {
             throw new KeyNotFoundException(name.toString());
         }
-        map.remove(tx, key);
+        SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
+        map.remove(key);
     }
 
     public final Buffer mapGetEntry(AsciiBuffer name, AsciiBuffer key, Transaction tx) throws IOException, KeyNotFoundException {
-        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
-        if (map == null) {
+        Integer pageId = data.mapIndex.get(name);
+        if (pageId == null) {
             throw new KeyNotFoundException(name.toString());
         }
-        return map.get(tx, key);
+        SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
+        return map.get(key);
     }
 
     public final Iterator<AsciiBuffer> mapList(AsciiBuffer first, int count, Transaction tx) {
         LinkedList<AsciiBuffer> results = new LinkedList<AsciiBuffer>();
 
-        Collection<AsciiBuffer> values = (first == null ? mapCache.keySet() : mapCache.tailMap(first).keySet());
-        for (AsciiBuffer key : values) {
-            results.add(key);
+        final Iterator<Entry<AsciiBuffer, Integer>> i = data.mapIndex.iterator(first);
+        while (i.hasNext()) {
+            final Entry<AsciiBuffer, Integer> entry = i.next();
+            results.add(entry.getKey());
         }
 
         return results.iterator();
     }
 
     public final Iterator<AsciiBuffer> mapListKeys(AsciiBuffer name, AsciiBuffer first, int count, Transaction tx) throws IOException, KeyNotFoundException {
-        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
-        if (map == null) {
+        Integer pageId = data.mapIndex.get(name);
+        if (pageId == null) {
             throw new KeyNotFoundException(name.toString());
         }
 
+        SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
         final LinkedList<AsciiBuffer> results = new LinkedList<AsciiBuffer>();
 
         if (first != null && count > 0) {
-            map.visit(tx, new BTreeVisitor.GTEVisitor<AsciiBuffer, Buffer>(first, count) {
-
+            map.visit(new IndexVisitor.PredicateVisitor<AsciiBuffer, Buffer>(IndexVisitor.PredicateVisitor.gte(first), count){
                 @Override
                 protected void matched(AsciiBuffer key, Buffer value) {
                     results.add(key);
                 }
             });
         } else {
-            Iterator<Entry<AsciiBuffer, Buffer>> iterator = map.iterator(tx);
+            Iterator<Entry<AsciiBuffer, Buffer>> iterator = map.iterator();
             while (iterator.hasNext()) {
                 Entry<AsciiBuffer, Buffer> e = iterator.next();
                 results.add(e.getKey());
@@ -638,29 +558,22 @@ public class RootEntity {
     // /////////////////////////////////////////////////////////////////
     // Map Methods.
     // /////////////////////////////////////////////////////////////////
-
-    public long getPageId() {
-        return pageId;
-    }
-
-    public void setPageId(long pageId) {
-        this.pageId = pageId;
-    }
-
     public int getState() {
-        return state;
+        return data.state;
     }
 
+    @Deprecated // TODO: keep data immutable
     public void setState(int state) {
-        this.state = state;
+        this.data.state = state;
     }
 
     public Location getLastUpdate() {
-        return lastUpdate;
+        return data.lastUpdate;
     }
 
+    @Deprecated // TODO: keep data immutable
     public void setLastUpdate(Location lastUpdate) {
-        this.lastUpdate = lastUpdate;
+        this.data.lastUpdate = lastUpdate;
     }
 
     private static class QueueQueryResultImpl implements QueueQueryResult {
@@ -713,7 +626,7 @@ public class RootEntity {
         //so that we can be sure that all journal entries are on disk prior to 
         //index update. 
 
-        //Scan MessageKey Index to find message keys past the last append 
+        //Scan MessageKey SortedIndex to find message keys past the last append 
         //location:
         //        final ArrayList<Long> matches = new ArrayList<Long>();
         //        messageKeyIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
@@ -776,7 +689,7 @@ public class RootEntity {
     final void removeGCCandidates(final TreeSet<Integer> gcCandidateSet, Transaction tx) throws IOException {
 
         // Don't GC files after the first in progress tx
-        Location firstTxLocation = lastUpdate;
+        Location firstTxLocation = data.lastUpdate;
 
         if (firstTxLocation != null) {
             while (!gcCandidateSet.isEmpty()) {
@@ -799,7 +712,7 @@ public class RootEntity {
 
         // Go through the location index to see if we can remove gc candidates:
         // Use a visitor to cut down the number of pages that we load
-        locationIndex.visit(tx, new BTreeVisitor<Integer, Long>() {
+        data.locationIndex.visit(new IndexVisitor<Integer, Long>() {
             int last = -1;
 
             public boolean isInterestedInKeysBetween(Integer first, Integer second) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Wed Jul  7 03:24:02 2010
@@ -94,6 +94,7 @@ import org.apache.activemq.state.Command
 import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.Dispatch;
 
 public class OpenwireProtocolHandler implements ProtocolHandler, PersistListener {
 
@@ -634,7 +635,7 @@ public class OpenwireProtocolHandler imp
                 };
             } else {
 
-                limiter = new SizeLimiter<OpenWireMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
+                limiter = new SizeLimiter<OpenWireMessageDelivery>(1024*64, 1024*32);
             }
 
             controller = new FlowController<OpenWireMessageDelivery>(new FlowControllable<OpenWireMessageDelivery>() {
@@ -694,7 +695,7 @@ public class OpenwireProtocolHandler imp
             }
             controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
             controller.useOverFlowQueue(false);
-            controller.setExecutor(connection.getDispatcher().getGlobalQueue(DispatchPriority.HIGH));
+            controller.setExecutor(Dispatch.getGlobalQueue());
             super.onFlowOpened(controller);
         }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Wed Jul  7 03:24:02 2010
@@ -16,21 +16,9 @@
  */
 package org.apache.activemq.openwire;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.transport.InactivityMonitor;
-import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.buffer.BufferEditor;
 import org.apache.activemq.util.buffer.DataByteArrayInputStream;
@@ -38,6 +26,15 @@ import org.apache.activemq.util.buffer.D
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.wireformat.WireFormatFactory;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * 
  */
@@ -284,6 +281,10 @@ public final class OpenWireFormat implem
         return doUnmarshal(dataIn);
     }
 
+    public Object unmarshal(ReadableByteChannel channel) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Used by NIO or AIO transports
      */
@@ -586,9 +587,6 @@ public final class OpenWireFormat implem
         return preferedWireFormatInfo;
     }
 
-    public boolean inReceive() {
-        return receivingMessage.get();
-    }
 
     public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
 
@@ -645,16 +643,6 @@ public final class OpenWireFormat implem
         return version2;
     }
 
-    public Transport createTransportFilters(Transport transport, Map options) {
-
-        if (transport.isUseInactivityMonitor()) {
-            transport = new InactivityMonitor(transport, this);
-        }
-
-        transport = new WireFormatNegotiator(transport, this, 1);
-        transport = new ResponseCorrelator(transport);
-        return transport;
-    }
 
 	public WireFormatFactory getWireFormatFactory() {
 		return new OpenWireFormatFactory();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Wed Jul  7 03:24:02 2010
@@ -117,7 +117,7 @@ public class ConnectionStateTracker exte
         // Restore the connections.
         for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
             ConnectionState connectionState = iter.next();
-            transport.oneway(connectionState.getInfo());
+            transport.oneway(connectionState.getInfo(), null);
             restoreTempDestinations(transport, connectionState);
 
             if (restoreSessions) {
@@ -130,7 +130,7 @@ public class ConnectionStateTracker exte
         }
         //now flush messages
         for (Message msg:messageCache.values()) {
-            transport.oneway(msg);
+            transport.oneway(msg, null);
         }
     }
 
@@ -145,7 +145,7 @@ public class ConnectionStateTracker exte
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("tx replay: " + command);
                 }
-                transport.oneway(command);
+                transport.oneway(command, null);
             }
         }
     }
@@ -159,7 +159,7 @@ public class ConnectionStateTracker exte
         // Restore the connection's sessions
         for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
             SessionState sessionState = (SessionState)iter2.next();
-            transport.oneway(sessionState.getInfo());
+            transport.oneway(sessionState.getInfo(), null);
 
             if (restoreProducers) {
                 restoreProducers(transport, sessionState);
@@ -180,7 +180,7 @@ public class ConnectionStateTracker exte
         // Restore the session's consumers
         for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) {
             ConsumerState consumerState = (ConsumerState)iter3.next();
-            transport.oneway(consumerState.getInfo());
+            transport.oneway(consumerState.getInfo(), null);
         }
     }
 
@@ -193,7 +193,7 @@ public class ConnectionStateTracker exte
         // Restore the session's producers
         for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
             ProducerState producerState = (ProducerState)iter3.next();
-            transport.oneway(producerState.getInfo());
+            transport.oneway(producerState.getInfo(), null);
         }
     }
 
@@ -206,7 +206,7 @@ public class ConnectionStateTracker exte
         throws IOException {
         // Restore the connection's temp destinations.
         for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) {
-            transport.oneway((DestinationInfo)iter2.next());
+            transport.oneway((DestinationInfo)iter2.next(), null);
         }
     }