You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/25 23:11:21 UTC

svn commit: r758450 [2/2] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Wed Mar 25 22:11:15 2009
@@ -16,73 +16,36 @@
  */
 package org.apache.activemq.broker.store.kahadb;
 
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Set;
 import java.util.SortedSet;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.kahadb.Operation.AddOpperation;
-import org.apache.activemq.broker.store.kahadb.Operation.RemoveOpperation;
-import org.apache.activemq.broker.store.kahadb.StoredDBState.DBStateMarshaller;
-import org.apache.activemq.broker.store.kahadb.StoredDestinationState.StoredDestinationMarshaller;
-import org.apache.activemq.broker.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaCommitCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaDestination;
-import org.apache.activemq.broker.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.broker.store.kahadb.data.KahaLocalTransactionId;
-import org.apache.activemq.broker.store.kahadb.data.KahaLocation;
-import org.apache.activemq.broker.store.kahadb.data.KahaPrepareCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaRemoveDestinationCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaRollbackCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaTransactionInfo;
-import org.apache.activemq.broker.store.kahadb.data.KahaXATransactionId;
-import org.apache.activemq.broker.store.kahadb.data.KahaDestination.DestinationType;
-import org.apache.activemq.broker.store.kahadb.data.KahaDestination.KahaDestinationBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaEntryType.KahaEntryTypeCreatable;
-import org.apache.activemq.broker.store.kahadb.data.KahaLocalTransactionId.KahaLocalTransactionIdBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaLocation.KahaLocationBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaTraceCommand.KahaTraceCommandBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaTransactionInfo.KahaTransactionInfoBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaXATransactionId.KahaXATransactionIdBean;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
+import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage;
+import org.apache.activemq.broker.store.kahadb.Data.Trace;
+import org.apache.activemq.broker.store.kahadb.Data.Type;
+import org.apache.activemq.broker.store.kahadb.Data.MessageAdd.MessageAddBean;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAdd.QueueAddBean;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage.QueueAddMessageBean;
+import org.apache.activemq.broker.store.kahadb.Data.QueueRemove.QueueRemoveBean;
+import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage.QueueRemoveMessageBean;
+import org.apache.activemq.broker.store.kahadb.Data.Type.TypeCreatable;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.activemq.protobuf.PBMessage;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.wireformat.WireFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
@@ -93,8 +56,6 @@
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
 import org.apache.kahadb.util.LockFile;
-import org.apache.kahadb.util.LongMarshaller;
-import org.apache.kahadb.util.StringMarshaller;
 
 public class KahaDBStore implements Store {
 
@@ -106,11 +67,9 @@
     protected PageFile pageFile;
     protected Journal journal;
     
-    protected StoredDBState dbstate = new StoredDBState(this);
-    protected DBStateMarshaller dbstateMarshaller = new DBStateMarshaller(this);
+    protected StoredDBState dbstate = new StoredDBState();
 
     protected boolean failIfDatabaseIsLocked;
-
     protected boolean deleteAllMessages;
     protected File directory;
     protected Thread checkpointThread;
@@ -124,192 +83,16 @@
     protected AtomicBoolean started = new AtomicBoolean();
     protected AtomicBoolean opened = new AtomicBoolean();
     private LockFile lockFile;
-    WireFormat wireFormat = new OpenWireFormat();
-
-    public TransactionStore createTransactionStore() throws IOException {
-        return new KahaDBTransactionStore(this);
-    }
-
-    String subscriptionKey(String clientId, String subscriptionName){
-        return clientId+":"+subscriptionName;
-    }
-    
-    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        return new KahaDBMessageStore(this, destination);
-    }
-
-    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        return new KahaDBTopicMessageStore(this, destination);
-    }
-
-    /**
-     * Cleanup method to remove any state associated with the given destination.
-     * This method does not stop the message store (it might not be cached).
-     *
-     * @param destination Destination to forget
-     */
-    public void removeQueueMessageStore(ActiveMQQueue destination) {
-    }
-
-    /**
-     * Cleanup method to remove any state associated with the given destination
-     * This method does not stop the message store (it might not be cached).
-     *
-     * @param destination Destination to forget
-     */
-    public void removeTopicMessageStore(ActiveMQTopic destination) {
-    }
-
-    public void deleteAllMessages() throws IOException {
-        deleteAllMessages=true;
-    }
-    
-    public Set<ActiveMQDestination> getDestinations() {
-        try {
-            final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
-            synchronized(indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<IOException>(){
-                    public void execute(Transaction tx) throws IOException {
-                        for (Iterator<Entry<String, StoredDestinationState>> iterator = dbstate.destinations.iterator(tx); iterator.hasNext();) {
-                            Entry<String, StoredDestinationState> entry = iterator.next();
-                            rc.add(convert(entry.getKey()));
-                        }
-                    }
-                });
-            }
-            return rc;
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    public long getLastMessageBrokerSequenceId() throws IOException {
-        return 0;
-    }
-    
-    public long size() {
-        if ( !started.get() ) {
-            return 0;
-        }
-        try {
-            return journal.getDiskSize() + pageFile.getDiskSize();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void beginTransaction() throws IOException {
-        throw new IOException("Not yet implemented.");
-    }
-    public void commitTransaction() throws IOException {
-        throw new IOException("Not yet implemented.");
-    }
-    public void rollbackTransaction() throws IOException {
-        throw new IOException("Not yet implemented.");
-    }
-    
-    public void checkpoint(boolean sync) throws IOException {
-        checkpointCleanup(false);
-    }
-    
-    ///////////////////////////////////////////////////////////////////
-    // Internal helper methods.
-    ///////////////////////////////////////////////////////////////////
+    private Location nextRecoveryPosition;
+    private Location lastRecoveryPosition;
 
-    /**
-     * @param location
-     * @return
-     * @throws IOException
-     */
-    Message loadMessage(Location location) throws IOException {
-        KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location);
-        Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) );
-        return msg;
-    }
+    protected final Object indexMutex = new Object();
+    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
+    private final HashMap<AsciiBuffer, StoredDestinationState> storedDestinations = new HashMap<AsciiBuffer, StoredDestinationState>();
 
     ///////////////////////////////////////////////////////////////////
-    // Internal conversion methods.
+    // Lifecylce methods
     ///////////////////////////////////////////////////////////////////
-    
-    KahaTransactionInfo createTransactionInfo(TransactionId txid) {
-        if( txid ==null ) {
-            return null;
-        }
-        KahaTransactionInfoBean rc = new KahaTransactionInfoBean();
-        
-        // Link it up to the previous record that was part of the transaction.
-        ArrayList<Operation> tx = inflightTransactions.get(txid);
-        if( tx!=null ) {
-            rc.setPreviousEntry(convert(tx.get(tx.size()-1).location));
-        }
-        
-        if( txid.isLocalTransaction() ) {
-            LocalTransactionId t = (LocalTransactionId)txid;
-            KahaLocalTransactionIdBean kahaTxId = new KahaLocalTransactionIdBean();
-            kahaTxId.setConnectionId(t.getConnectionId().getValue());
-            kahaTxId.setTransacitonId(t.getValue());
-            rc.setLocalTransacitonId(kahaTxId);
-        } else {
-            XATransactionId t = (XATransactionId)txid;
-            KahaXATransactionIdBean kahaTxId = new KahaXATransactionIdBean();
-            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
-            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
-            kahaTxId.setFormatId(t.getFormatId());
-            rc.setXaTransacitonId(kahaTxId);
-        }
-        return rc;
-    }
-    
-    KahaLocation convert(Location location) {
-        KahaLocationBean rc = new KahaLocationBean();
-        rc.setLogId(location.getDataFileId());
-        rc.setOffset(location.getOffset());
-        return rc;
-    }
-    
-    KahaDestination convert(ActiveMQDestination dest) {
-        KahaDestinationBean rc = new KahaDestinationBean();
-        rc.setName(dest.getPhysicalName());
-        switch( dest.getDestinationType() ) {
-        case ActiveMQDestination.QUEUE_TYPE:
-            rc.setType(DestinationType.QUEUE);
-            return rc;
-        case ActiveMQDestination.TOPIC_TYPE:
-            rc.setType(DestinationType.TOPIC);
-            return rc;
-        case ActiveMQDestination.TEMP_QUEUE_TYPE:
-            rc.setType(DestinationType.TEMP_QUEUE);
-            return rc;
-        case ActiveMQDestination.TEMP_TOPIC_TYPE:
-            rc.setType(DestinationType.TEMP_TOPIC);
-            return rc;
-        default:
-            return null;
-        }
-    }
-
-    ActiveMQDestination convert(String dest) {
-        int p = dest.indexOf(":");
-        if( p<0 ) {
-            throw new IllegalArgumentException("Not in the valid destination format");
-        }
-        int type = Integer.parseInt(dest.substring(0, p));
-        String name = dest.substring(p+1);
-        
-        switch( KahaDestination.DestinationType.valueOf(type) ) {
-        case QUEUE:
-            return new ActiveMQQueue(name);
-        case TOPIC:
-            return new ActiveMQTopic(name);
-        case TEMP_QUEUE:
-            return new ActiveMQTempQueue(name);
-        case TEMP_TOPIC:
-            return new ActiveMQTempTopic(name);
-        default:    
-            throw new IllegalArgumentException("Not in the valid destination format");
-        }
-    }
-
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
         	load();
@@ -329,35 +112,24 @@
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     if (pageFile.getPageCount() == 0) {
-                        // First time this is created.. Initialize the metadata
-                        Page<StoredDBState> page = tx.allocate();
-                        assert page.getPageId() == 0;
-                        page.set(dbstate);
-                        dbstate.page = page;
-                        dbstate.state = CLOSED_STATE;
-                        dbstate.destinations = new BTreeIndex<String, StoredDestinationState>(pageFile, tx.allocate().getPageId());
-
-                        tx.store(dbstate.page, dbstateMarshaller, true);
+                        dbstate.allocate(tx);
                     } else {
-                        Page<StoredDBState> page = tx.load(0, dbstateMarshaller);
+                        Page<StoredDBState> page = tx.load(0, StoredDBState.MARSHALLER);
                         dbstate = page.get();
                         dbstate.page = page;
                     }
-                    dbstate.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
-                    dbstate.destinations.setValueMarshaller(new StoredDestinationMarshaller(KahaDBStore.this));
-                    dbstate.destinations.load(tx);
+                    dbstate.load(tx);
                 }
             });
             pageFile.flush();
-            
-            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
-            // Perhaps we should just keep an index of file
+
+            // Keep a cache of the StoredDestinations
             storedDestinations.clear();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    for (Iterator<Entry<String, StoredDestinationState>> iterator = dbstate.destinations.iterator(tx); iterator.hasNext();) {
-                        Entry<String, StoredDestinationState> entry = iterator.next();
-                        StoredDestinationState sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
+                    for (Iterator<Entry<AsciiBuffer, StoredDestinationState>> iterator = dbstate.destinations.iterator(tx); iterator.hasNext();) {
+                        Entry<AsciiBuffer, StoredDestinationState> entry = iterator.next();
+                        StoredDestinationState sd = loadStoredDestination(tx, entry.getKey());
                         storedDestinations.put(entry.getKey(), sd);
                     }
                 }
@@ -365,6 +137,20 @@
         }
 	}
 	
+	
+    private StoredDestinationState loadStoredDestination(Transaction tx, AsciiBuffer key) throws IOException {
+        // Try to load the existing indexes..
+        StoredDestinationState rc = dbstate.destinations.get(tx, key);
+        if (rc == null) {
+            // Brand new destination.. allocate indexes for it.
+            rc = new StoredDestinationState();
+            rc.allocate(tx);
+            dbstate.destinations.put(tx, key, rc);
+        }
+        rc.load(tx);
+        return rc;
+    }
+    
 	/**
 	 * @throws IOException
 	 */
@@ -434,25 +220,23 @@
 	
 	            pageFile.unload();
 	            pageFile.delete();
-	            dbstate = new StoredDBState(this);
+	            dbstate = new StoredDBState();
 	            
 	            LOG.info("Persistence store purged.");
 	            deleteAllMessages = false;
 	            
 	            loadPageFile();
 	        }
-	        store(new KahaTraceCommandBean().setMessage("LOADED " + new Date()));
-
+	        store( new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())));
         }
 
     }
 
-    
 	public void close() throws IOException, InterruptedException {
 		if( opened.compareAndSet(true, false)) {
 	        synchronized (indexMutex) {
 	            pageFile.unload();
-	            dbstate = new StoredDBState(this);
+	            dbstate = new StoredDBState();
 	        }
 	        journal.close();
 	        checkpointThread.join();
@@ -465,11 +249,9 @@
         synchronized (indexMutex) {
             if( pageFile.isLoaded() ) {
                 dbstate.state = CLOSED_STATE;
-                dbstate.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
-    
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
-                        tx.store(dbstate.page, dbstateMarshaller, true);
+                        tx.store(dbstate.page, StoredDBState.MARSHALLER, true);
                     }
                 });
                 close();
@@ -477,22 +259,9 @@
         }
     }
 
-    /**
-     * @return
-     */
-    private Location getFirstInProgressTxLocation() {
-        Location l = null;
-        if (!inflightTransactions.isEmpty()) {
-            l = inflightTransactions.values().iterator().next().get(0).getLocation();
-        }
-        if (!preparedTransactions.isEmpty()) {
-            Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
-            if (l==null || t.compareTo(l) <= 0) {
-                l = t;
-            }
-        }
-        return l;
-    }
+    ///////////////////////////////////////////////////////////////////
+    // Recovery methods
+    ///////////////////////////////////////////////////////////////////
 
     /**
      * Move all the messages that were in the journal into long term storage. We
@@ -511,9 +280,16 @@
 	        if( recoveryPosition!=null ) {
 		        int redoCounter = 0;
 		        while (recoveryPosition != null) {
-		            KahaEntryTypeCreatable message = load(recoveryPosition);
+		            final TypeCreatable message = load(recoveryPosition);
+		            final Location location = lastRecoveryPosition;
 		            dbstate.lastUpdate = recoveryPosition;
-		            process(message, recoveryPosition);
+		            
+	                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+	                    public void execute(Transaction tx) throws IOException {
+	                        updateIndex(tx, message.toType(), (MessageBuffer)message, location);
+	                    }
+	                });		            
+		            
 		            redoCounter++;
 		            recoveryPosition = journal.getNextLocation(recoveryPosition);
 		        }
@@ -530,34 +306,61 @@
         }
     }
     
+    public void incrementalRecover() throws IOException {
+        synchronized (indexMutex) {
+            if( nextRecoveryPosition == null ) {
+                if( lastRecoveryPosition==null ) {
+                    nextRecoveryPosition = getRecoveryPosition();
+                } else {
+                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+                }           
+            }
+            while (nextRecoveryPosition != null) {
+                lastRecoveryPosition = nextRecoveryPosition;
+                dbstate.lastUpdate = lastRecoveryPosition;
+                final TypeCreatable message = load(lastRecoveryPosition);
+                final Location location = lastRecoveryPosition;
+                
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        updateIndex(tx, message.toType(), (MessageBuffer)message, location);
+                    }
+                });                 
+
+                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+            }
+        }
+    }
+    
 	protected void recoverIndex(Transaction tx) throws IOException {
         long start = System.currentTimeMillis();
         // It is possible index updates got applied before the journal updates.. 
         // in that case we need to removed references to messages that are not in the journal
         final Location lastAppendLocation = journal.getLastAppendLocation();
         long undoCounter=0;
-        
-        // Go through all the destinations to see if they have messages past the lastAppendLocation
-        for (StoredDestinationState sd : storedDestinations.values()) {
-        	
-            final ArrayList<Long> matches = new ArrayList<Long>();
-            // Find all the Locations that are >= than the last Append Location.
-            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
-				@Override
-				protected void matched(Location key, Long value) {
-					matches.add(value);
-				}
-            });
-            
-            
-            for (Long sequenceId : matches) {
-                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-                sd.locationIndex.remove(tx, keys.location);
-                sd.messageIdIndex.remove(tx, keys.messageId);
-                undoCounter++;
-                // TODO: do we need to modify the ack positions for the pub sub case?
-			}
-        }
+
+// TODO        
+//        // Go through all the destinations to see if they have messages past the lastAppendLocation
+//        for (StoredDestinationState sd : storedDestinations.values()) {
+//        	
+//            final ArrayList<Long> matches = new ArrayList<Long>();
+//            // Find all the Locations that are >= than the last Append Location.
+//            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
+//				@Override
+//				protected void matched(Location key, Long value) {
+//					matches.add(value);
+//				}
+//            });
+//            
+//            
+//            for (Long sequenceId : matches) {
+//                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+//                sd.locationIndex.remove(tx, keys.location);
+//                sd.messageIdIndex.remove(tx, keys.messageId);
+//                undoCounter++;
+//                // TODO: do we need to modify the ack positions for the pub sub case?
+//			}
+//        }
         long end = System.currentTimeMillis();
         if( undoCounter > 0 ) {
         	// The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
@@ -565,28 +368,6 @@
 	        LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
         }
 	}
-
-	private Location nextRecoveryPosition;
-	private Location lastRecoveryPosition;
-
-	public void incrementalRecover() throws IOException {
-        synchronized (indexMutex) {
-	        if( nextRecoveryPosition == null ) {
-	        	if( lastRecoveryPosition==null ) {
-	        		nextRecoveryPosition = getRecoveryPosition();
-	        	} else {
-	                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
-	        	}        	
-	        }
-	        while (nextRecoveryPosition != null) {
-	        	lastRecoveryPosition = nextRecoveryPosition;
-	            dbstate.lastUpdate = lastRecoveryPosition;
-	            KahaEntryTypeCreatable message = load(lastRecoveryPosition);
-	            process(message, lastRecoveryPosition);            
-	            nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
-	        }
-        }
-	}
 	
     public Location getLastUpdatePosition() throws IOException {
         return dbstate.lastUpdate;
@@ -594,12 +375,6 @@
     
 	private Location getRecoveryPosition() throws IOException {
 		
-        // If we need to recover the transactions..
-        if (dbstate.firstInProgressTransactionLocation != null) {
-            return dbstate.firstInProgressTransactionLocation;
-        }
-        
-        // Perhaps there were no transactions...
         if( dbstate.lastUpdate!=null) {
             // Start replay at the record after the last one recorded in the index file.
             return journal.getNextLocation(dbstate.lastUpdate);
@@ -642,602 +417,370 @@
             closure.execute();
         }
 	}
+    
+    /**
+     * @param tx
+     * @throws IOException
+     */
+    private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
+
+        LOG.debug("Checkpoint started.");
+        
+        dbstate.state = OPEN_STATE;
+        tx.store(dbstate.page, StoredDBState.MARSHALLER, true);
+        pageFile.flush();
+
+        if( cleanup ) {
+        	
+        	final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet());
+        	
+        	// Don't GC files under replication
+        	if( journalFilesBeingReplicated!=null ) {
+        		gcCandidateSet.removeAll(journalFilesBeingReplicated);
+        	}
+        	
+        	// Don't GC files after the first in progress tx
+        	Location firstTxLocation = dbstate.lastUpdate;
+            
+            if( firstTxLocation!=null ) {
+            	while( !gcCandidateSet.isEmpty() ) {
+            		Integer last = gcCandidateSet.last();
+            		if( last >= firstTxLocation.getDataFileId() ) {
+            			gcCandidateSet.remove(last);
+            		} else {
+            			break;
+            		}
+            	}
+            }
+
+            // Go through all the destinations to see if any of them can remove GC candidates.
+            for (StoredDestinationState sd : storedDestinations.values()) {
+            	if( gcCandidateSet.isEmpty() ) {
+                	break;
+                }
+                
+                // Use a visitor to cut down the number of pages that we load
+                dbstate.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+                    int last=-1;
+                    public boolean isInterestedInKeysBetween(Location first, Location second) {
+                    	if( first==null ) {
+                    		SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
+                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+                    			subset.remove(second.getDataFileId());
+                    		}
+							return !subset.isEmpty();
+                    	} else if( second==null ) {
+                    		SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
+                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+                    			subset.remove(first.getDataFileId());
+                    		}
+							return !subset.isEmpty();
+                    	} else {
+                    		SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
+                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+                    			subset.remove(first.getDataFileId());
+                    		}
+                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+                    			subset.remove(second.getDataFileId());
+                    		}
+							return !subset.isEmpty();
+                    	}
+                    }
+    
+                    public void visit(List<Location> keys, List<Long> values) {
+                    	for (Location l : keys) {
+                            int fileId = l.getDataFileId();
+							if( last != fileId ) {
+                        		gcCandidateSet.remove(fileId);
+                                last = fileId;
+                            }
+						}                        
+                    }
+    
+                });
+            }
+
+            if( !gcCandidateSet.isEmpty() ) {
+	            LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
+	            journal.removeDataFiles(gcCandidateSet);
+            }
+        }
+        
+        LOG.debug("Checkpoint done.");
+    }
+    
+    public HashSet<Integer> getJournalFilesBeingReplicated() {
+		return journalFilesBeingReplicated;
+	}
+    
+    ///////////////////////////////////////////////////////////////////
+    // Store interface
+    ///////////////////////////////////////////////////////////////////
+    long messageSequence;
 
-    // /////////////////////////////////////////////////////////////////
-    // Methods call by the broker to update and query the store.
-    // /////////////////////////////////////////////////////////////////
-    public Location store(KahaEntryTypeCreatable data) throws IOException {
+    public Location store(TypeCreatable data) throws IOException {
         return store(data, false);
     }
 
     /**
      * All updated are are funneled through this method. The updates a converted
-     * to a JournalMessage which is logged to the journal and then the data from
-     * the JournalMessage is used to update the index just like it would be done
-     * durring a recovery process.
+     * to a PBMessage which is logged to the journal and then the data from
+     * the PBMessage is used to update the index just like it would be done
+     * during a recovery process.
+     * @throws IOException 
      */
     @SuppressWarnings("unchecked")
-    public Location store(KahaEntryTypeCreatable data, boolean sync) throws IOException {
-    	MessageBuffer message = ((PBMessage) data).freeze();
+    public Location store(final TypeCreatable data, boolean sync) throws IOException {
+        final MessageBuffer message = ((PBMessage) data).freeze();
         int size = message.serializedSizeUnframed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-        os.writeByte(data.toKahaEntryType().getNumber());
+        os.writeByte(data.toType().getNumber());
         message.writeUnframed(os);
 
         long start = System.currentTimeMillis();
-        Location location = journal.write(os.toByteSequence(), sync);
+        final Location location = journal.write(os.toByteSequence(), sync);
         long start2 = System.currentTimeMillis();
-        process(data, location);
-    	long end = System.currentTimeMillis();
-    	if( end-start > 100 ) { 
-    		LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
-    	}
+        
+        synchronized (indexMutex) {
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    updateIndex(tx, data.toType(), message, location);
+                }
+            });
+        }
+
+        long end = System.currentTimeMillis();
+        if( end-start > 100 ) { 
+            LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
+        }
 
         synchronized (indexMutex) {
-        	dbstate.lastUpdate = location;
+            dbstate.lastUpdate = location;
         }
         return location;
     }
-
+    
     /**
-     * Loads a previously stored JournalMessage
+     * Loads a previously stored PBMessage
      * 
      * @param location
      * @return
      * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    public KahaEntryTypeCreatable load(Location location) throws IOException {
+    public TypeCreatable load(Location location) throws IOException {
         ByteSequence data = journal.read(location);
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
         byte readByte = is.readByte();
-        KahaEntryType type = KahaEntryType.valueOf(readByte);
+        Type type = Type.valueOf(readByte);
         if( type == null ) {
             throw new IOException("Could not load journal record. Invalid location: "+location);
         }
         MessageBuffer message = type.parseUnframed(new Buffer(data.data, data.offset+1, data.length-1));
-        return (KahaEntryTypeCreatable)message;
+        return (TypeCreatable)message;
     }
 
-    // /////////////////////////////////////////////////////////////////
-    // Journaled record processing methods. Once the record is journaled,
-    // these methods handle applying the index updates. These may be called
-    // from the recovery method too so they need to be idempotent
-    // /////////////////////////////////////////////////////////////////
-
-    private void process(KahaEntryTypeCreatable data, final Location location) throws IOException {
-        switch(data.toKahaEntryType()) {
-        case KAHA_ADD_MESSAGE_COMMAND:
-            process((KahaAddMessageCommand)data, location);
-            return;
-        case KAHA_COMMIT_COMMAND:
-            process((KahaCommitCommand)data, location);
-            return;
-        case KAHA_PREPARE_COMMAND:
-            process((KahaPrepareCommand)data, location);
-            return;
-        case KAHA_REMOVE_DESTINATION_COMMAND:
-            process((KahaRemoveDestinationCommand)data, location);
-            return;
-        case KAHA_REMOVE_MESSAGE_COMMAND:
-            process((KahaRemoveMessageCommand)data, location);
+    @SuppressWarnings("unchecked")
+    public void updateIndex(Transaction tx, Type type, MessageBuffer message, Location location) {
+        switch (type) {
+        case MESSAGE_ADD:
+            messageAdd(tx, (MessageAdd)message, location);
             return;
-        case KAHA_ROLLBACK_COMMAND:
-            process((KahaRollbackCommand)data, location);
+        case QUEUE_ADD:
+            queueAdd(tx, (QueueAdd)message, location);
             return;
-        case KAHA_SUBSCRIPTION_COMMAND:
-            process((KahaSubscriptionCommand)data, location);
+        case QUEUE_ADD_MESSAGE:
+            queueAddMessage(tx, (QueueAdd)message, location);
             return;
-        case KAHA_TRACE_COMMAND:
-            process((KahaTraceCommand)data, location);
+        case QUEUE_REMOVE_MESSAGE:
+            queueRemoveMessage(tx, (QueueRemoveMessage)message, location);
             return;
+        case TRANSACTION_BEGIN:
+        case TRANSACTION_ADD_MESSAGE:
+        case TRANSACTION_REMOVE_MESSAGE:
+        case TRANSACTION_COMMIT:
+        case TRANSACTION_ROLLBACK:
+        case MAP_ADD:
+        case MAP_REMOVE:
+        case MAP_ENTRY_PUT:
+        case MAP_ENTRY_REMOVE:
+        case STREAM_OPEN:
+        case STREAM_WRITE:
+        case STREAM_CLOSE:
+        case STREAM_REMOVE:
+            throw new UnsupportedOperationException();
         }
     }
 
-    private void process(final KahaAddMessageCommand command, final Location location) throws IOException {
-        if (command.hasTransactionInfo()) {
-            synchronized (indexMutex) {
-                ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
-                inflightTx.add(new AddOpperation(this, command, location));
-            }
-        } else {
-            synchronized (indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        upadateIndex(tx, command, location);
-                    }
-                });
-            }
-        }
+    private void messageAdd(Transaction tx, MessageAdd message, Location location) {
+    }
+    private void queueAdd(Transaction tx, QueueAdd message, Location location) {
+    }
+    private void queueAddMessage(Transaction tx, QueueAdd message, Location location) {
+    }
+    private void queueRemoveMessage(Transaction tx, QueueRemoveMessage message, Location location) {
     }
 
-    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
-        if (command.hasTransactionInfo()) {
-            synchronized (indexMutex) {
-                ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
-                inflightTx.add(new RemoveOpperation(this, command, location));
-            }
-        } else {
-            synchronized (indexMutex) {
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        updateIndex(tx, command, location);
-                    }
-                });
+    class KahaDBSession implements Session {
+        
+        ///////////////////////////////////////////////////////////////
+        // Message related methods.
+        ///////////////////////////////////////////////////////////////
+        public Long messageAdd(MessageRecord message) {
+            try {
+                Long id = dbstate.nextMessageId++;
+                MessageAddBean bean = new MessageAddBean();
+                bean.setBuffer(message.getBuffer());
+                bean.setEncoding(message.getEncoding());
+                bean.setMessageId(message.getMessageId());
+                bean.setMessageKey(id); 
+                bean.setStreamKey(message.getStreamKey());
+                store(bean);
+                return id;
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
             }
         }
-
-    }
-
-    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
-        synchronized (indexMutex) {
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    updateIndex(tx, command, location);
-                }
-            });
+        public Long messageGetKey(AsciiBuffer messageId) {
+            return null;
         }
-    }
-
-    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
-        synchronized (indexMutex) {
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    updateIndex(tx, command, location);
-                }
-            });
+        public MessageRecord messageGetRecord(Long key) {
+            return null;
         }
-    }
 
-    protected void process(KahaCommitCommand command, Location location) throws IOException {
-        TransactionId key = key(command.getTransactionInfo());
-        synchronized (indexMutex) {
-            ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
-            if (inflightTx == null) {
-                inflightTx = preparedTransactions.remove(key);
-            }
-            if (inflightTx == null) {
-                return;
+        ///////////////////////////////////////////////////////////////
+        // Queue related methods.
+        ///////////////////////////////////////////////////////////////
+        public void queueAdd(AsciiBuffer queueName) {
+            try {
+                store(new QueueAddBean().setQueueName(queueName));
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
             }
-
-            final ArrayList<Operation> messagingTx = inflightTx;
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    for (Operation op : messagingTx) {
-                        op.execute(tx);
-                    }
-                }
-            });
         }
-    }
-
-    protected void process(KahaPrepareCommand command, Location location) {
-        synchronized (indexMutex) {
-            TransactionId key = key(command.getTransactionInfo());
-            ArrayList<Operation> tx = inflightTransactions.remove(key);
-            if (tx != null) {
-                preparedTransactions.put(key, tx);
+        public boolean queueRemove(AsciiBuffer queueName) {
+            try {
+                store(new QueueRemoveBean().setQueueName(queueName));
+                return false;
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
             }
         }
-    }
-
-    protected void process(KahaRollbackCommand command, Location location) {
-        synchronized (indexMutex) {
-            TransactionId key = key(command.getTransactionInfo());
-            ArrayList<Operation> tx = inflightTransactions.remove(key);
-            if (tx == null) {
-                preparedTransactions.remove(key);
+        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
+            return null;
+        }
+        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
+            try {
+                Long queueKey = 1L;
+                QueueAddMessageBean bean = new QueueAddMessageBean();
+                bean.setQueueName(queueName);
+                bean.setAttachment(record.getAttachment());
+                bean.setMessageKey(record.getMessageKey());
+                bean.setQueueKey(queueKey);
+                store(bean);
+                return queueKey;
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
             }
         }
-    }
-
-    // /////////////////////////////////////////////////////////////////
-    // These methods do the actual index updates.
-    // /////////////////////////////////////////////////////////////////
-
-    protected final Object indexMutex = new Object();
-	private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
-
-    void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
-        StoredDestinationState sd = getStoredDestination(command.getDestination(), tx);
+        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+            try {
+                QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
+                bean.setQueueKey(queueKey);
+                bean.setQueueName(queueName);
+                store(bean);
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            }
 
-        // Skip adding the message to the index if this is a topic and there are
-        // no subscriptions.
-        if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
-            return;
         }
-
-        // Add the message.
-        long id = sd.nextMessageId++;
-        Long previous = sd.locationIndex.put(tx, location, id);
-        if( previous == null ) {
-            sd.messageIdIndex.put(tx, command.getMessageId(), id);
-            sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
-        } else {
-            // restore the previous value.. Looks like this was a redo of a previously
-            // added message.  We don't want to assing it a new id as the other indexes would 
-            // be wrong..
-            sd.locationIndex.put(tx, location, previous);
+        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
+            return null;
         }
         
-    }
-
-    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
-        StoredDestinationState sd = getStoredDestination(command.getDestination(), tx);
-        if (!command.hasSubscriptionKey()) {
-            
-            // In the queue case we just remove the message from the index..
-            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
-            if (sequenceId != null) {
-                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-                sd.locationIndex.remove(tx, keys.location);
-            }
-        } else {
-            // In the topic case we need remove the message once it's been acked
-            // by all the subs
-            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
-
-            // Make sure it's a valid message id...
-            if (sequence != null) {
-                String subscriptionKey = command.getSubscriptionKey();
-                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
-
-                // The following method handles deleting un-referenced messages.
-                removeAckLocation(tx, sd, subscriptionKey, prev);
-
-                // Add it to the new location set.
-                addAckLocation(sd, sequence, subscriptionKey);
-            }
-
-        }
-    }
-
-    private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
-        StoredDestinationState sd = getStoredDestination(command.getDestination(), tx);
-        sd.orderIndex.clear(tx);
-        sd.orderIndex.unload(tx);
-        tx.free(sd.orderIndex.getPageId());
         
-        sd.locationIndex.clear(tx);
-        sd.locationIndex.unload(tx);
-        tx.free(sd.locationIndex.getPageId());
-
-        sd.messageIdIndex.clear(tx);
-        sd.messageIdIndex.unload(tx);
-        tx.free(sd.messageIdIndex.getPageId());
-
-        if (sd.subscriptions != null) {
-            sd.subscriptions.clear(tx);
-            sd.subscriptions.unload(tx);
-            tx.free(sd.subscriptions.getPageId());
-
-            sd.subscriptionAcks.clear(tx);
-            sd.subscriptionAcks.unload(tx);
-            tx.free(sd.subscriptionAcks.getPageId());
-        }
-
-        String key = key(command.getDestination());
-        storedDestinations.remove(key);
-        dbstate.destinations.remove(tx, key);
-    }
-
-    private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
-        StoredDestinationState sd = getStoredDestination(command.getDestination(), tx);
-
-        // If set then we are creating it.. otherwise we are destroying the sub
-        if (command.hasSubscriptionInfo()) {
-            String subscriptionKey = command.getSubscriptionKey();
-            sd.subscriptions.put(tx, subscriptionKey, command);
-            long ackLocation=-1;
-            if (!command.getRetroactive()) {
-                ackLocation = sd.nextMessageId-1;
-            }
-
-            sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
-            addAckLocation(sd, ackLocation, subscriptionKey);
-        } else {
-            // delete the sub...
-            String subscriptionKey = command.getSubscriptionKey();
-            sd.subscriptions.remove(tx, subscriptionKey);
-            Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
-            if( prev!=null ) {
-                removeAckLocation(tx, sd, subscriptionKey, prev);
-            }
+        ///////////////////////////////////////////////////////////////
+        // Map related methods.
+        ///////////////////////////////////////////////////////////////
+        public boolean mapAdd(AsciiBuffer map) {
+            return false;
         }
-
-    }
-    
-    /**
-     * @param tx
-     * @throws IOException
-     */
-    private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
-
-        LOG.debug("Checkpoint started.");
-        
-        dbstate.state = OPEN_STATE;
-        dbstate.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
-        tx.store(dbstate.page, dbstateMarshaller, true);
-        pageFile.flush();
-
-        if( cleanup ) {
-        	
-        	final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet());
-        	
-        	// Don't GC files under replication
-        	if( journalFilesBeingReplicated!=null ) {
-        		gcCandidateSet.removeAll(journalFilesBeingReplicated);
-        	}
-        	
-        	// Don't GC files after the first in progress tx
-        	Location firstTxLocation = dbstate.lastUpdate;
-            if( dbstate.firstInProgressTransactionLocation!=null ) {
-                firstTxLocation = dbstate.firstInProgressTransactionLocation;
-            }
-            
-            if( firstTxLocation!=null ) {
-            	while( !gcCandidateSet.isEmpty() ) {
-            		Integer last = gcCandidateSet.last();
-            		if( last >= firstTxLocation.getDataFileId() ) {
-            			gcCandidateSet.remove(last);
-            		} else {
-            			break;
-            		}
-            	}
-            }
-
-            // Go through all the destinations to see if any of them can remove GC candidates.
-            for (StoredDestinationState sd : storedDestinations.values()) {
-            	if( gcCandidateSet.isEmpty() ) {
-                	break;
-                }
-                
-                // Use a visitor to cut down the number of pages that we load
-                sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
-                    int last=-1;
-                    public boolean isInterestedInKeysBetween(Location first, Location second) {
-                    	if( first==null ) {
-                    		SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
-                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
-                    			subset.remove(second.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	} else if( second==null ) {
-                    		SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
-                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
-                    			subset.remove(first.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	} else {
-                    		SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
-                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
-                    			subset.remove(first.getDataFileId());
-                    		}
-                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
-                    			subset.remove(second.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	}
-                    }
-    
-                    public void visit(List<Location> keys, List<Long> values) {
-                    	for (Location l : keys) {
-                            int fileId = l.getDataFileId();
-							if( last != fileId ) {
-                        		gcCandidateSet.remove(fileId);
-                                last = fileId;
-                            }
-						}                        
-                    }
-    
-                });
-            }
-
-            if( !gcCandidateSet.isEmpty() ) {
-	            LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
-	            journal.removeDataFiles(gcCandidateSet);
-            }
+        public boolean mapRemove(AsciiBuffer map) {
+            return false;
         }
-        
-        LOG.debug("Checkpoint done.");
-    }
-    
-    public HashSet<Integer> getJournalFilesBeingReplicated() {
-		return journalFilesBeingReplicated;
-	}
-
-    // /////////////////////////////////////////////////////////////////
-    // StoredDestination related implementation methods.
-    // /////////////////////////////////////////////////////////////////
-
-
-	private final HashMap<String, StoredDestinationState> storedDestinations = new HashMap<String, StoredDestinationState>();
-
-    protected StoredDestinationState getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
-        String key = key(destination);
-        StoredDestinationState rc = storedDestinations.get(key);
-        if (rc == null) {
-            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
-            rc = loadStoredDestination(tx, key, topic);
-            // Cache it. We may want to remove/unload destinations from the
-            // cache that are not used for a while
-            // to reduce memory usage.
-            storedDestinations.put(key, rc);
+        public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
+            return null;
         }
-        return rc;
-    }
-
-    /**
-     * @param tx
-     * @param key
-     * @param topic
-     * @return
-     * @throws IOException
-     */
-    private StoredDestinationState loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
-        // Try to load the existing indexes..
-        StoredDestinationState rc = dbstate.destinations.get(tx, key);
-        if (rc == null) {
-            // Brand new destination.. allocate indexes for it.
-            rc = new StoredDestinationState();
-            rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
-            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
-            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
-
-            if (topic) {
-                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
-                rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
-            }
-            dbstate.destinations.put(tx, key, rc);
+        public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
+            return null;
         }
-
-        // Configure the marshalers and load.
-        rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-        rc.orderIndex.setValueMarshaller(MessageKeys.MARSHALLER);
-        rc.orderIndex.load(tx);
-
-        // Figure out the next key using the last entry in the destination.
-        Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx);
-        if( lastEntry!=null ) {
-            rc.nextMessageId = lastEntry.getKey()+1;
-        }
-
-        rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
-        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-        rc.locationIndex.load(tx);
-
-        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
-        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
-        rc.messageIdIndex.load(tx);
-        
-        // If it was a topic...
-        if (topic) {
-
-            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
-            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
-            rc.subscriptions.load(tx);
-
-            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
-            rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
-            rc.subscriptionAcks.load(tx);
-
-            rc.ackPositions = new TreeMap<Long, HashSet<String>>();
-            rc.subscriptionCursors = new HashMap<String, Long>();
-
-            for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
-                Entry<String, Long> entry = iterator.next();
-                addAckLocation(rc, entry.getValue(), entry.getKey());
-            }
-
+        public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+            return null;
         }
-        return rc;
-    }
-
-    /**
-     * @param sd
-     * @param messageSequence
-     * @param subscriptionKey
-     */
-    private void addAckLocation(StoredDestinationState sd, Long messageSequence, String subscriptionKey) {
-        HashSet<String> hs = sd.ackPositions.get(messageSequence);
-        if (hs == null) {
-            hs = new HashSet<String>();
-            sd.ackPositions.put(messageSequence, hs);
+        public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+            return null;
+        }
+        public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
+            return null;
         }
-        hs.add(subscriptionKey);
-    }
 
-    /**
-     * @param tx
-     * @param sd
-     * @param subscriptionKey
-     * @param sequenceId
-     * @throws IOException
-     */
-    private void removeAckLocation(Transaction tx, StoredDestinationState sd, String subscriptionKey, Long sequenceId) throws IOException {
-        // Remove the sub from the previous location set..
-        if (sequenceId != null) {
-            HashSet<String> hs = sd.ackPositions.get(sequenceId);
-            if (hs != null) {
-                hs.remove(subscriptionKey);
-                if (hs.isEmpty()) {
-                    HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
-                    sd.ackPositions.remove(sequenceId);
-
-                    // Did we just empty out the first set in the
-                    // ordered list of ack locations? Then it's time to
-                    // delete some messages.
-                    if (hs == firstSet) {
-
-                        // Find all the entries that need to get deleted.
-                        ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
-                        for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
-                            Entry<Long, MessageKeys> entry = iterator.next();
-                            if (entry.getKey().compareTo(sequenceId) <= 0) {
-                                // We don't do the actually delete while we are
-                                // iterating the BTree since
-                                // iterating would fail.
-                                deletes.add(entry);
-                            }
-                        }
+        ///////////////////////////////////////////////////////////////
+        // Stream related methods.
+        ///////////////////////////////////////////////////////////////
+        public Long streamOpen() {
+            return null;
+        }
+        public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
+        }
+        public void streamClose(Long streamKey) throws KeyNotFoundException {
+        }
+        public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
+            return null;
+        }
+        public boolean streamRemove(Long streamKey) {
+            return false;
+        }
 
-                        // Do the actual deletes.
-                        for (Entry<Long, MessageKeys> entry : deletes) {
-                            sd.locationIndex.remove(tx, entry.getValue().location);
-                            sd.messageIdIndex.remove(tx,entry.getValue().messageId);
-                            sd.orderIndex.remove(tx,entry.getKey());
-                        }
-                    }
-                }
-            }
+        ///////////////////////////////////////////////////////////////
+        // Transaction related methods.
+        ///////////////////////////////////////////////////////////////
+        public void transactionAdd(Buffer txid) {
+        }
+        public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
+        }
+        public void transactionCommit(Buffer txid) throws KeyNotFoundException {
+        }
+        public Iterator<Buffer> transactionList(Buffer first, int max) {
+            return null;
+        }
+        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
         }
+        public void transactionRollback(Buffer txid) throws KeyNotFoundException {
+        }
+        
     }
-
-    private String key(KahaDestination destination) {
-        return destination.getType().getNumber() + ":" + destination.getName();
+    
+    public <R, T extends Exception> R execute(final Callback<R, T> callback, final Runnable onFlush) throws T {
+        KahaDBSession session = new KahaDBSession();
+        R rc = callback.execute(session);
+        return rc;
     }
 
-    // /////////////////////////////////////////////////////////////////
-    // Transaction related implementation methods.
-    // /////////////////////////////////////////////////////////////////
-    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
-    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
- 
-    private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
-        TransactionId key = key(info);
-        ArrayList<Operation> tx = inflightTransactions.get(key);
-        if (tx == null) {
-            tx = new ArrayList<Operation>();
-            inflightTransactions.put(key, tx);
-        }
-        return tx;
-    }
-
-    private TransactionId key(KahaTransactionInfo transactionInfo) {
-        if (transactionInfo.hasLocalTransacitonId()) {
-            KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
-            LocalTransactionId rc = new LocalTransactionId();
-            rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
-            rc.setValue(tx.getTransacitonId());
-            return rc;
-        } else {
-            KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
-            XATransactionId rc = new XATransactionId();
-            rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
-            rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
-            rc.setFormatId(tx.getFormatId());
-            return rc;
-        }
+    public void flush() {
     }
+    
+    ///////////////////////////////////////////////////////////////////
+    // IoC Properties.
+    ///////////////////////////////////////////////////////////////////
 
-    private PageFile createPageFile() {
+    protected PageFile createPageFile() {
         PageFile index = new PageFile(directory, "db");
         index.setEnableWriteThread(isEnableIndexWriteAsync());
         index.setWriteBatchSize(getIndexWriteBatchSize());
         return index;
     }
 
-    private Journal createJournal() {
+    protected Journal createJournal() {
         Journal manager = new Journal();
         manager.setDirectory(directory);
         manager.setMaxFileLength(getJournalMaxFileLength());
@@ -1312,15 +855,15 @@
         if (pageFile == null) {
             pageFile = createPageFile();
         }
-		return pageFile;
-	}
+        return pageFile;
+    }
 
-	public Journal getJournal() {
+    public Journal getJournal() {
         if (journal == null) {
             journal = createJournal();
         }
-		return journal;
-	}
+        return journal;
+    }
 
     public boolean isFailIfDatabaseIsLocked() {
         return failIfDatabaseIsLocked;
@@ -1330,118 +873,4 @@
         this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
     }
 
-    ///////////////////////////////////////////////////////////////////
-    // Store interface
-    ///////////////////////////////////////////////////////////////////
-    class KahaDBSession implements Session {
-
-        public void commit() {
-        }
-        
-        ///////////////////////////////////////////////////////////////
-        // Message related methods.
-        ///////////////////////////////////////////////////////////////
-        public Long messageAdd(MessageRecord message) {
-            return null;
-        }
-        public Long messageGetKey(AsciiBuffer messageId) {
-            return null;
-        }
-        public MessageRecord messageGetRecord(Long key) {
-            return null;
-        }
-
-        ///////////////////////////////////////////////////////////////
-        // Queue related methods.
-        ///////////////////////////////////////////////////////////////
-        public void queueAdd(AsciiBuffer queueName) {
-        }
-        public boolean queueRemove(AsciiBuffer queueName) {
-            return false;
-        }
-        public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
-            return null;
-        }
-        public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
-            return null;
-        }
-        public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
-        }
-        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
-            return null;
-        }
-        
-        
-        ///////////////////////////////////////////////////////////////
-        // Map related methods.
-        ///////////////////////////////////////////////////////////////
-        public boolean mapAdd(AsciiBuffer map) {
-            return false;
-        }
-        public boolean mapRemove(AsciiBuffer map) {
-            return false;
-        }
-        public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
-            return null;
-        }
-        public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
-            return null;
-        }
-        public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
-            return null;
-        }
-        public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
-            return null;
-        }
-        public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
-            return null;
-        }
-
-        ///////////////////////////////////////////////////////////////
-        // Stream related methods.
-        ///////////////////////////////////////////////////////////////
-        public Long streamOpen() {
-            return null;
-        }
-        public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
-        }
-        public void streamClose(Long streamKey) throws KeyNotFoundException {
-        }
-        public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
-            return null;
-        }
-        public boolean streamRemove(Long streamKey) {
-            return false;
-        }
-
-        ///////////////////////////////////////////////////////////////
-        // Transaction related methods.
-        ///////////////////////////////////////////////////////////////
-        public void transactionAdd(Buffer txid) {
-        }
-        public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
-        }
-        public void transactionCommit(Buffer txid) throws KeyNotFoundException {
-        }
-        public Iterator<Buffer> transactionList(Buffer first, int max) {
-            return null;
-        }
-        public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
-        }
-        public void transactionRollback(Buffer txid) throws KeyNotFoundException {
-        }
-        
-    }
-    public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable onFlush) throws T {
-        KahaDBSession session = new KahaDBSession();
-        R rc = callback.execute(session);
-        session.commit();
-        if( onFlush!=null ) {
-            onFlush.run();
-        }
-        return rc;
-    }
-
-    public void flush() {
-    }
 }

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=758450&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Wed Mar 25 22:11:15 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.util.Marshaller;
+
+public class Marshallers {
+    
+    public final static Marshaller<Location> LOCATION_MARSHALLER = new Marshaller<Location>() {
+    
+        public Class<Location> getType() {
+            return Location.class;
+        }
+    
+        public Location readPayload(DataInput dataIn) throws IOException {
+            Location rc = new Location();
+            rc.setDataFileId(dataIn.readInt());
+            rc.setOffset(dataIn.readInt());
+            return rc;
+        }
+    
+        public void writePayload(Location object, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(object.getDataFileId());
+            dataOut.writeInt(object.getOffset());
+        }
+    };
+    
+    
+    public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new Marshaller<AsciiBuffer>() {
+    
+        public Class<AsciiBuffer> getType() {
+            return AsciiBuffer.class;
+        }
+    
+        public AsciiBuffer readPayload(DataInput dataIn) throws IOException {
+            byte data[] = new byte[dataIn.readShort()];
+            dataIn.readFully(data);
+            return new AsciiBuffer(data);
+        }
+    
+        public void writePayload(AsciiBuffer object, DataOutput dataOut) throws IOException {
+            dataOut.writeShort(object.length);
+            dataOut.write(object.data, object.offset, object.length);
+        }
+    };
+    
+    public final static Marshaller<Buffer> BUFFER_MARSHALLER = new Marshaller<Buffer>() {
+    
+        public Class<Buffer> getType() {
+            return Buffer.class;
+        }
+    
+        public Buffer readPayload(DataInput dataIn) throws IOException {
+            byte data[] = new byte[dataIn.readShort()];
+            dataIn.readFully(data);
+            return new Buffer(data);
+        }
+    
+        public void writePayload(Buffer object, DataOutput dataOut) throws IOException {
+            dataOut.writeShort(object.length);
+            dataOut.write(object.data, object.offset, object.length);
+        }
+    };
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java Wed Mar 25 22:11:15 2009
@@ -20,16 +20,17 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.util.Marshaller;
 
 public class MessageKeys {
     public static final MessageKeysMarshaller MARSHALLER = new MessageKeysMarshaller();
 
-    final String messageId;
+    final AsciiBuffer messageId;
     final Location location;
     
-    public MessageKeys(String messageId, Location location) {
+    public MessageKeys(AsciiBuffer messageId, Location location) {
         this.messageId=messageId;
         this.location=location;
     }
@@ -46,12 +47,14 @@
         }
 
         public MessageKeys readPayload(DataInput dataIn) throws IOException {
-            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
+            byte data[] = new byte[dataIn.readShort()];
+            return new MessageKeys(new AsciiBuffer(data), Marshallers.LOCATION_MARSHALLER.readPayload(dataIn));
         }
 
         public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
-            dataOut.writeUTF(object.messageId);
-            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
+            dataOut.writeShort(object.messageId.length);
+            dataOut.write(object.messageId.data, object.messageId.offset, object.messageId.length);
+            Marshallers.LOCATION_MARSHALLER.writePayload(object.location, dataOut);
         }
     }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java Wed Mar 25 22:11:15 2009
@@ -20,78 +20,76 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.Marshaller;
 
 public class StoredDBState {
     
-    protected final KahaDBStore store;
     protected Page<StoredDBState> page;
     protected int state;
-    protected BTreeIndex<String, StoredDestinationState> destinations;
+    protected BTreeIndex<AsciiBuffer, StoredDestinationState> destinations;
     protected Location lastUpdate;
-    protected Location firstInProgressTransactionLocation;
-
-    public StoredDBState(KahaDBStore store) {
-        this.store = store;
-    }
 
+    // We index the messages 3 ways: by sequence id, by journal location, and by message id.
+    long nextMessageId;
+    protected BTreeIndex<Long, MessageKeys> orderIndex;
+    protected BTreeIndex<Location, Long> locationIndex;
+    protected BTreeIndex<AsciiBuffer, Long> messageIdIndex;
 
-    public void read(DataInput is) throws IOException {
-        state = is.readInt();
-        destinations = new BTreeIndex<String, StoredDestinationState>(store.pageFile, is.readLong());
-        if (is.readBoolean()) {
-            lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
-        } else {
-            lastUpdate = null;
-        }
-        if (is.readBoolean()) {
-            firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
-        } else {
-            firstInProgressTransactionLocation = null;
-        }
-    }
 
     public void write(DataOutput os) throws IOException {
-        os.writeInt(state);
-        os.writeLong(destinations.getPageId());
-
-        if (lastUpdate != null) {
-            os.writeBoolean(true);
-            LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
-        } else {
-            os.writeBoolean(false);
-        }
 
-        if (firstInProgressTransactionLocation != null) {
-            os.writeBoolean(true);
-            LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
-        } else {
-            os.writeBoolean(false);
-        }
     }
     
+    public final static DBStateMarshaller MARSHALLER = new DBStateMarshaller();
     static public class DBStateMarshaller implements Marshaller<StoredDBState> {
-        private final KahaDBStore store;
-
-        public DBStateMarshaller(KahaDBStore store) {
-            this.store = store;
-        }
-
         public Class<StoredDBState> getType() {
             return StoredDBState.class;
         }
 
-        public StoredDBState readPayload(DataInput dataIn) throws IOException {
-            StoredDBState rc = new StoredDBState(this.store);
-            rc.read(dataIn);
+        public StoredDBState readPayload(DataInput is) throws IOException {
+            StoredDBState rc = new StoredDBState();
+            rc.state = is.readInt();
+            rc.destinations = new BTreeIndex<AsciiBuffer, StoredDestinationState>(is.readLong());
+            if (is.readBoolean()) {
+                rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
+            } else {
+                rc.lastUpdate = null;
+            }
             return rc;
         }
 
-        public void writePayload(StoredDBState object, DataOutput dataOut) throws IOException {
-            object.write(dataOut);
+        public void writePayload(StoredDBState object, DataOutput os) throws IOException {
+            os.writeInt(object.state);
+            os.writeLong(object.destinations.getPageId());
+            if (object.lastUpdate != null) {
+                os.writeBoolean(true);
+                Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
+            } else {
+                os.writeBoolean(false);
+            }
         }
     }
+
+    public void allocate(Transaction tx) throws IOException {
+        // First time this is created.. Initialize a new pagefile.
+        page = tx.allocate();
+        assert page.getPageId() == 0;
+        page.set(this);
+        
+        state = KahaDBStore.CLOSED_STATE;
+        destinations = new BTreeIndex<AsciiBuffer, StoredDestinationState>(tx.getPageFile(), tx.allocate().getPageId());
+        tx.store(page, MARSHALLER, true);
+    }
+    
+    public void load(Transaction tx) throws IOException {
+        destinations.setPageFile(tx.getPageFile());
+        destinations.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        destinations.setValueMarshaller(StoredDestinationState.MARSHALLER);
+        destinations.load(tx);
+    }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java Wed Mar 25 22:11:15 2009
@@ -19,62 +19,51 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.TreeMap;
+import java.util.Map.Entry;
 
-import org.apache.activemq.broker.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.Marshaller;
 
 public class StoredDestinationState {
-    long nextMessageId;
-    BTreeIndex<Long, MessageKeys> orderIndex;
-    BTreeIndex<Location, Long> locationIndex;
-    BTreeIndex<String, Long> messageIdIndex;
-
-    // These bits are only set for Topics
-    BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
-    BTreeIndex<String, Long> subscriptionAcks;
-    HashMap<String, Long> subscriptionCursors;
-    TreeMap<Long, HashSet<String>> ackPositions;
     
-    public static class StoredDestinationMarshaller implements Marshaller<StoredDestinationState> {
-        private final KahaDBStore store;
+    long nextMessageId;
+    BTreeIndex<Long, Long> orderIndex;
 
-        public StoredDestinationMarshaller(KahaDBStore store) {
-            this.store = store;
+    public void allocate(Transaction tx) throws IOException {
+        orderIndex = new BTreeIndex<Long, Long>(tx.allocate());
+    }
+    
+    public void load(Transaction tx) throws IOException {
+        orderIndex.setPageFile(tx.getPageFile());
+        orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+        orderIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        orderIndex.load(tx);
+
+        // Figure out the next key using the last entry in the destination.
+        Entry<Long, Long> lastEntry = orderIndex.getLast(tx);
+        if( lastEntry!=null ) {
+            nextMessageId = lastEntry.getKey()+1;
         }
+    }
 
+    public final static StoredDestinationMarshaller MARSHALLER = new StoredDestinationMarshaller();
+    public static class StoredDestinationMarshaller implements Marshaller<StoredDestinationState> {
+        
         public Class<StoredDestinationState> getType() {
             return StoredDestinationState.class;
         }
 
         public StoredDestinationState readPayload(DataInput dataIn) throws IOException {
             StoredDestinationState value = new StoredDestinationState();
-            value.orderIndex = new BTreeIndex<Long, MessageKeys>(store.pageFile, dataIn.readLong());
-            value.locationIndex = new BTreeIndex<Location, Long>(store.pageFile, dataIn.readLong());
-            value.messageIdIndex = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong());
-
-            if (dataIn.readBoolean()) {
-                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(store.pageFile, dataIn.readLong());
-                value.subscriptionAcks = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong());
-            }
+            value.orderIndex = new BTreeIndex<Long, Long>(dataIn.readLong());
             return value;
         }
 
         public void writePayload(StoredDestinationState value, DataOutput dataOut) throws IOException {
             dataOut.writeLong(value.orderIndex.getPageId());
-            dataOut.writeLong(value.locationIndex.getPageId());
-            dataOut.writeLong(value.messageIdIndex.getPageId());
-            if (value.subscriptions != null) {
-                dataOut.writeBoolean(true);
-                dataOut.writeLong(value.subscriptions.getPageId());
-                dataOut.writeLong(value.subscriptionAcks.getPageId());
-            } else {
-                dataOut.writeBoolean(false);
-            }
         }
     }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Wed Mar 25 22:11:15 2009
@@ -23,8 +23,6 @@
 import java.util.TreeMap;
 
 import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
-import org.apache.activemq.broker.store.Store.Session.QueueRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.util.ByteArrayOutputStream;

Modified: activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto Wed Mar 25 22:11:15 2009
@@ -21,25 +21,32 @@
 
 enum Type {
   //| option java_create_message="true";
-  MESSAGE_ADD = 1;
-  QUEUE_ADD = 2;
-  QUEUE_ADD_MESSAGE = 3;
-  QUEUE_REMOVE_MESSAGE = 4;
-  TRANSACTION_BEGIN = 5;
-  TRANSACTION_ADD_MESSAGE = 6;
-  TRANSACTION_REMOVE_MESSAGE = 7;
-  TRANSACTION_COMMIT = 8;
-  TRANSACTION_ROLLBACK = 9;
-  MAP_ADD = 10;
-  MAP_REMOVE = 11;
-  MAP_ENTRY_PUT = 12;
-  MAP_ENTRY_REMOVE = 13;
-  STREAM_OPEN = 14;
-  STREAM_WRITE = 15;
-  STREAM_CLOSE = 16;
-  STREAM_REMOVE = 17;
+  MESSAGE_ADD = 0;
+  QUEUE_ADD = 10;
+  QUEUE_REMOVE = 11;
+  QUEUE_ADD_MESSAGE = 12;
+  QUEUE_REMOVE_MESSAGE = 13;
+  TRANSACTION_BEGIN = 20;
+  TRANSACTION_ADD_MESSAGE = 21;
+  TRANSACTION_REMOVE_MESSAGE = 22;
+  TRANSACTION_COMMIT = 23;
+  TRANSACTION_ROLLBACK = 24;
+  MAP_ADD = 30;
+  MAP_REMOVE = 31;
+  MAP_ENTRY_PUT = 32;
+  MAP_ENTRY_REMOVE = 33;
+  STREAM_OPEN = 40;
+  STREAM_WRITE = 41;
+  STREAM_CLOSE = 42;
+  STREAM_REMOVE = 43;
+  
+  TRACE = 100;
 }
 
+message Trace {
+  optional bytes message = 2 [java_override_type = "AsciiBuffer"];
+}  
+
 ///////////////////////////////////////////////////////////////
 // Message related operations.
 ///////////////////////////////////////////////////////////////

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Wed Mar 25 22:11:15 2009
@@ -21,9 +21,9 @@
 
 import junit.framework.TestCase;
 
+import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.broker.store.Store.VoidCallback;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 
 public abstract class StoreTestBase extends TestCase {