You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/06 19:12:18 UTC

svn commit: r692685 - in /activemq/sandbox/kahadb/src/main/java/org/apache/kahadb: index/BTreeIndex.java index/BTreeNode.java journal/Journal.java page/Page.java store/KahaDBPersistenceAdaptor.java store/MessageDatabase.java

Author: chirino
Date: Sat Sep  6 10:12:18 2008
New Revision: 692685

URL: http://svn.apache.org/viewvc?rev=692685&view=rev
Log:
- You can now selectivly visit BTree nodes using the BTreeVisitor 
- The message store now checkpoints and cleans up un-used journal files periodically.
- Removing the last item in a BTree node did not result in a page write, on restart, the last item would still be there


Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java Sat Sep  6 10:12:18 2008
@@ -225,6 +225,10 @@
     synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, Key initialKey) throws IOException {
         return root.iterator(tx, initialKey);
     }
+    
+    synchronized public void visit(Transaction tx, BTreeVisitor<Key, Value> visitor) throws IOException {
+        root.visit(tx, visitor);
+    }
 
     synchronized Value getFirst(Transaction tx) throws IOException {
         return root.getFirst(tx);

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Sat Sep  6 10:12:18 2008
@@ -31,7 +31,6 @@
 import org.apache.kahadb.index.BTreeIndex.Prefixer;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.page.Transaction.PageOverflowIOException;
 
 
 /**
@@ -39,7 +38,6 @@
  * one Page of a PageFile.
  */
 public final class BTreeNode<Key,Value> {
-    private static final Log LOG = LogFactory.getLog(BTreeNode.class);
 
     // The index that this node is part of.
     private final BTreeIndex<Key,Value> index;
@@ -285,13 +283,10 @@
                 Value oldValue = values[idx];
                 setLeafData(arrayDelete(keys, idx), arrayDelete(values, idx));
                 
-                if( keys.length!=0 ) {
-                    index.storeNode(tx, this, true);
+                if( keys.length==0 && parent!=null) {
+                    tx.free(getPage());
                 } else {
-                    // If this leaf is empty and is not the root node..
-                    if( parent!=null ) {
-                        tx.free(getPage());
-                    }
+                    index.storeNode(tx, this, true);
                 }
                 
                 return oldValue;
@@ -505,6 +500,30 @@
         }
     }
     
+    public void visit(Transaction tx, BTreeVisitor<Key, Value> visitor) throws IOException {
+        if (visitor == null) {
+            throw new IllegalArgumentException("Visitor cannot be null");
+        }
+        if( isBranch() ) {
+            for(int i=0; i < this.children.length; i++) {
+                Key key1 = null;
+                if( i!=0 ) {
+                    key1 = keys[i-1];
+                }
+                Key key2 = null;
+                if( i!=this.children.length-1 ) {
+                    key1 = keys[i];
+                }
+                if( visitor.isInterestedInKeysBetween(key1, key2) ) {
+                    BTreeNode<Key, Value> child = getChild(tx, i);
+                    child.visit(tx, visitor);
+                }
+            }
+        } else {
+            visitor.visit(keys, values);
+        }
+    }
+    
     public Value getFirst(Transaction tx) throws IOException {
         BTreeNode<Key, Value> node = this;
         while( node .isBranch() ) {
@@ -716,6 +735,11 @@
     public void setNext(long next) {
         this.next = next;
     }
+    
+    @Override
+    public String toString() {
+        return "[BTreeNode "+(isBranch()?"branch":"leaf")+": "+Arrays.asList(keys)+"]";
+    }
 
 }
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Sat Sep  6 10:12:18 2008
@@ -403,36 +403,21 @@
         }
     }
 
-    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer> inProgress) throws IOException {
-        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
-        unUsed.removeAll(inUse);
-        unUsed.removeAll(inProgress);
-
-        List<DataFile> purgeList = new ArrayList<DataFile>();
-        for (Integer key : unUsed) {
-            DataFile dataFile = fileMap.get(key);
-            purgeList.add(dataFile);
-        }
-        for (DataFile dataFile : purgeList) {
-            if (dataFile != dataFiles.getTail()) {
-                forceRemoveDataFile(dataFile);
-            }
-        }
-    }
-
     public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
         Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
         unUsed.removeAll(inUse);
 
-        List<DataFile> purgeList = new ArrayList<DataFile>();
         for (Integer key : unUsed) {
-            // Only add files less than the lastFile..
-            if (key.intValue() < lastFile.intValue()) {
-                DataFile dataFile = fileMap.get(key);
-                purgeList.add(dataFile);
+            // Don't remove files that come after the lastFile
+            if (lastFile !=null && key >= lastFile ) {
+                continue;
+            }
+            DataFile dataFile = fileMap.get(key);
+            
+            // Can't remove the last file either.
+            if( dataFile == dataFiles.getTail() ) {
+                continue;
             }
-        }
-        for (DataFile dataFile : purgeList) {
             forceRemoveDataFile(dataFile);
         }
     }
@@ -489,7 +474,7 @@
     }
 
     public String toString() {
-        return "DataManager:(" + filePrefix + ")";
+        return directory.toString();
     }
 
     public synchronized Location getMark() throws IllegalStateException {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java Sat Sep  6 10:12:18 2008
@@ -102,10 +102,6 @@
         next = is.readLong();
     }
 
-    public String toString() {
-        return "Page:" + getPageId();
-    }
-
     public long getPageId() {
         return pageId;
     }
@@ -130,5 +126,9 @@
         return next;
     }
 
+    public String toString() {
+        return "[Page:" + getPageId()+", type: "+type+"]";
+    }
+
 
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java Sat Sep  6 10:12:18 2008
@@ -79,24 +79,12 @@
 public class KahaDBPersistenceAdaptor extends MessageDatabase implements PersistenceAdapter {
 
     private WireFormat wireFormat = new OpenWireFormat();
-    private AtomicBoolean started = new AtomicBoolean();
 
     public void setBrokerName(String brokerName) {
     }
     public void setUsageManager(SystemUsage usageManager) {
     }
 
-    public void start() throws Exception {
-        if ( started.compareAndSet(false,true) ) {
-            load();
-        }
-    }
-    public void stop() throws Exception {
-        if ( started.compareAndSet(true,false) ) {
-            unload();
-        }
-    }
-
     public TransactionStore createTransactionStore() throws IOException {
         return new TransactionStore(){
             
@@ -164,6 +152,7 @@
             command.setMessage(ByteString.copyFrom(packet.getData(), packet.getOffset(), packet.getLength()));
 
             store(command, message.isResponseRequired());
+            
         }
         
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Sat Sep  6 10:12:18 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.kahadb.store;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.File;
@@ -29,6 +31,7 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.TreeMap;
 import java.util.Map.Entry;
 
@@ -42,11 +45,14 @@
 import org.apache.kahadb.Marshaller;
 import org.apache.kahadb.StringMarshaller;
 import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.index.BTreeNode;
+import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.page.Transaction.PageOverflowIOException;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
 import org.apache.kahadb.store.data.KahaDestination;
@@ -63,35 +69,56 @@
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
 
 public class MessageDatabase {
 
-
     private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
     private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
 
     public static final int CLOSED_STATE = 1;
     public static final int OPEN_STATE = 2;
-    
+
     protected class Metadata {
         protected Page<Metadata> page;
         protected int state;
         protected BTreeIndex<String, StoredDestination> destinations;
         protected Location lastUpdate;
         protected Location firstInProgressTransactionLocation;
-        
+
         public void read(DataInput is) throws IOException {
             state = is.readInt();
             destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
-            lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
-            firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
+            if (is.readBoolean()) {
+                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+            } else {
+                lastUpdate = null;
+            }
+            if (is.readBoolean()) {
+                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
+            } else {
+                firstInProgressTransactionLocation = null;
+            }
         }
 
         public void write(DataOutput os) throws IOException {
             os.writeInt(state);
             os.writeLong(destinations.getPageId());
-            LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
-            LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
+
+            if (lastUpdate != null) {
+                os.writeBoolean(true);
+                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
+            } else {
+                os.writeBoolean(false);
+            }
+
+            if (firstInProgressTransactionLocation != null) {
+                os.writeBoolean(true);
+                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
+            } else {
+                os.writeBoolean(false);
+            }
         }
     }
 
@@ -111,25 +138,38 @@
         }
     }
 
-    
     protected PageFile pageFile;
-    protected  Journal asyncDataManager;
-    protected  Metadata metadata = new Metadata();
+    protected Journal asyncDataManager;
+    protected Metadata metadata = new Metadata();
 
-    protected  MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
+    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
 
-    protected  boolean failIfJournalIsLocked;
+    protected boolean failIfJournalIsLocked;
 
-    protected  boolean deleteAllMessages;
-    protected  File directory;
+    protected boolean deleteAllMessages;
+    protected File directory;
     protected boolean recovering;
-    protected  Thread checkpointThread;
+    protected Thread checkpointThread;
+    protected AtomicBoolean started = new AtomicBoolean();
 
     public MessageDatabase() {
     }
 
+    public void start() throws Exception {
+        if (started.compareAndSet(false, true)) {
+            load();
+        }
+    }
+
+    public void stop() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            unload();
+        }
+    }
+
     public void load() throws IOException {
 
+        recovering=true;
         if (asyncDataManager == null) {
             asyncDataManager = createAsyncDataManager();
         }
@@ -160,61 +200,115 @@
 
             store(new KahaTraceCommand().setMessage("DELETED " + new Date()));
 
-            LOG.info("Journal deleted: ");
+            LOG.info("Persistence store purged.");
             deleteAllMessages = false;
         }
 
-        pageFile.load();
-        pageFile.tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                if (tx.getPageCount() == 0) {
-                    // First time this is created.. Initialize the metadata
-                    Page<Metadata> page = tx.allocate();
-                    assert page.getPageId() == 0;
-                    page.set(metadata);
-                    metadata.page = page;
-                    metadata.state = CLOSED_STATE;
-                    metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
-                    metadata.lastUpdate = new Location(0,0);
-                    metadata.firstInProgressTransactionLocation = new Location(0,0);
-                    
-                    tx.store(metadata.page, metadataMarshaller, true);
-                } else {
-                    Page<Metadata> page = tx.load(0, metadataMarshaller);
-                    metadata = page.get();
-                    metadata.page = page;
-                }
-                metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
-                metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
-                metadata.destinations.load();
-            }
-        });
+        synchronized (indexMutex) {
+            pageFile.load();
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    if (tx.getPageCount() == 0) {
+                        // First time this is created.. Initialize the metadata
+                        Page<Metadata> page = tx.allocate();
+                        assert page.getPageId() == 0;
+                        page.set(metadata);
+                        metadata.page = page;
+                        metadata.state = CLOSED_STATE;
+                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
+
+                        tx.store(metadata.page, metadataMarshaller, true);
+                        
+                        store(new KahaTraceCommand().setMessage("CREATED " + new Date()));
+                    } else {
+                        Page<Metadata> page = tx.load(0, metadataMarshaller);
+                        metadata = page.get();
+                        metadata.page = page;
+                    }
+                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
+                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
+                    metadata.destinations.load();
+                }
+            });
+            
+            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
+            // Perhaps we should just keep an index of file 
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
+                        Entry<String, StoredDestination> entry = iterator.next();
+                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
+                        storedDestinations.put(entry.getKey(), sd);
+                    }
+                }
+            });
+
+            // Replay the the journal to get the indexes up to date with the
+            // latest
+            // updates.
+            recover();
+        }
+        recovering=false;
 
-        // Replay the the journal to get the indexes up to date with the latest 
-        // updates.
-        recover();
-        
         checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
             public void run() {
-                doCheckpoint();
+                try {
+                    long start = System.currentTimeMillis();
+                    while (started.get()) {
+                        Thread.sleep(500);
+                        long now = System.currentTimeMillis();
+                        if( now - start >= 1000*1000 ) {
+                            checkpoint();
+                            start = now;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // Looks like someone really wants us to exit this thread...
+                }
             }
         };
+        checkpointThread.start();
 
     }
-    
-    public void unload() throws IOException {
-        metadata.destinations.unload();
-        metadata.state = CLOSED_STATE;
-        pageFile.tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                tx.store(metadata.page, metadataMarshaller, true);
-            }
-        });
-        metadata = new Metadata();
-        pageFile.unload();
+
+    public void unload() throws IOException, InterruptedException {
+        checkpointThread.join();
+
+        synchronized (indexMutex) {
+            
+            metadata.state = CLOSED_STATE;
+            metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
+
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    tx.store(metadata.page, metadataMarshaller, true);
+                }
+            });
+
+            metadata.destinations.unload();
+            pageFile.unload();
+            metadata = new Metadata();
+        }
+        store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new Date()));
         asyncDataManager.close();
     }
 
+    /**
+     * @return
+     */
+    private Location getFirstInProgressTxLocation() {
+        Location l = null;
+        if (!inflightTransactions.isEmpty()) {
+            l = inflightTransactions.values().iterator().next().get(0).getLocation();
+        }
+        if (!preparedTransactions.isEmpty()) {
+            Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
+            if (l==null || t.compareTo(l) <= 0) {
+                l = t;
+            }
+        }
+        return l;
+    }
 
     /**
      * Move all the messages that were in the journal into long term storage. We
@@ -226,37 +320,73 @@
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
+
+        long start = System.currentTimeMillis();
         Location pos = null;
+        
+        // We need to recover the transactions..
+        if (metadata.firstInProgressTransactionLocation != null) {
+            pos = metadata.firstInProgressTransactionLocation;
+        }
+        
+        // Perhaps there were no transactions...
+        if( pos==null && metadata.lastUpdate!=null) {
+            // Start replay at the record after the last one recorded in the index file.
+            pos = asyncDataManager.getNextLocation(metadata.lastUpdate);
+            // No journal records need to be recovered.
+            if( pos == null ) {
+                return;
+            }
+        }
+        
+        // Do we need to start from the begining?
+        if (pos == null) {
+            // This loads the first position.
+            pos = asyncDataManager.getNextLocation(null);
+        }
+
         int redoCounter = 0;
-        LOG.info("Journal Recovery Started from: " + asyncDataManager);
-        long start = System.currentTimeMillis();
-        // While we have records in the journal.
-        while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
+        LOG.info("Journal Recovery Started from: " + asyncDataManager + " at " + pos.getDataFileId() + ":" + pos.getOffset());
+
+        while (pos != null) {
             JournalCommand message = load(pos);
             process(message, pos);
             redoCounter++;
+            pos = asyncDataManager.getNextLocation(pos);
         }
+
         Location location = store(new KahaTraceCommand().setMessage("RECOVERED " + new Date()), true);
-        asyncDataManager.setMark(location, true);
         long end = System.currentTimeMillis();
         LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
     }
 
-    private void doCheckpoint() {
+    private void checkpoint() {
+        try {
+            synchronized (indexMutex) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        checkpointUpdate(tx);
+                    }
+                });
+                pageFile.checkpoint();
+            }
+            store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
+        } catch (IOException e) {
+        }
     }
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Methods call by the broker to update and query the store.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     public Location store(JournalCommand data) throws IOException {
         return store(data, false);
     }
-    
+
     /**
-     * All updated are are funneled through this method.  The updates a converted to 
-     * a JournalMessage which is logged to the journal and then the data from 
-     * the JournalMessage is used to update the index just like it would be done durring 
-     * a recovery process.
+     * All updated are are funneled through this method. The updates a converted
+     * to a JournalMessage which is logged to the journal and then the data from
+     * the JournalMessage is used to update the index just like it would be done
+     * durring a recovery process.
      */
     public Location store(JournalCommand data, boolean sync) throws IOException {
         int size = data.serializedSize();
@@ -265,11 +395,12 @@
         data.writeTo(os);
         Location location = asyncDataManager.write(os.getByteSequence(), sync);
         process(data, location);
-        metadata.lastUpdate = location;
+        if( !recovering ) {
+            metadata.lastUpdate = location;
+        }
         return location;
     }
 
-
     /**
      * Loads a previously stored JournalMessage
      * 
@@ -285,13 +416,13 @@
         message.mergeFrom(is);
         return message;
     }
-    
-    ///////////////////////////////////////////////////////////////////
+
+    // /////////////////////////////////////////////////////////////////
     // Journaled record processing methods. Once the record is journaled,
     // these methods handle applying the index updates. These may be called
     // from the recovery method too so they need to be idempotent
-    ///////////////////////////////////////////////////////////////////
-    
+    // /////////////////////////////////////////////////////////////////
+
     private void process(JournalCommand data, final Location location) throws IOException {
         data.visit(new Visitor() {
             @Override
@@ -318,12 +449,12 @@
             public void visit(KahaRollbackCommand command) throws IOException {
                 process(command, location);
             }
-            
+
             @Override
             public void visit(KahaRemoveDestinationCommand command) throws IOException {
                 process(command, location);
             }
-            
+
             @Override
             public void visit(KahaSubscriptionCommand command) throws IOException {
                 process(command, location);
@@ -333,10 +464,12 @@
 
     private void process(final KahaAddMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
-            ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
-            inflightTx.add(new AddOpperation(command, location));
+            synchronized (indexMutex) {
+                ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+                inflightTx.add(new AddOpperation(command, location));
+            }
         } else {
-            synchronized(indexMutex) {
+            synchronized (indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         upadateIndex(tx, command, location);
@@ -348,10 +481,12 @@
 
     protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
-            ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
-            inflightTx.add(new RemoveOpperation(command, location));
+            synchronized (indexMutex) {
+                ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+                inflightTx.add(new RemoveOpperation(command, location));
+            }
         } else {
-            synchronized(indexMutex) {
+            synchronized (indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         updateIndex(tx, command, location);
@@ -361,9 +496,9 @@
         }
 
     }
-    
+
     protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
-        synchronized(indexMutex) {
+        synchronized (indexMutex) {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     updateIndex(tx, command, location);
@@ -371,9 +506,9 @@
             });
         }
     }
-    
+
     protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
-        synchronized(indexMutex) {
+        synchronized (indexMutex) {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     updateIndex(tx, command, location);
@@ -381,19 +516,19 @@
             });
         }
     }
-    
+
     protected void process(KahaCommitCommand command, Location location) throws IOException {
         TransactionId key = key(command.getTransactionInfo());
-        ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
-        if (inflightTx == null) {
-            inflightTx = preparedTransactions.remove(key);
-        }
-        if( inflightTx == null ) {
-            return;
-        }
-        
-        final ArrayList<Operation> messagingTx = inflightTx;
-        synchronized(indexMutex) {
+        synchronized (indexMutex) {
+            ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
+            if (inflightTx == null) {
+                inflightTx = preparedTransactions.remove(key);
+            }
+            if (inflightTx == null) {
+                return;
+            }
+
+            final ArrayList<Operation> messagingTx = inflightTx;
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     for (Operation op : messagingTx) {
@@ -403,37 +538,42 @@
             });
         }
     }
-    
+
     protected void process(KahaPrepareCommand command, Location location) {
-        TransactionId key = key(command.getTransactionInfo());
-        ArrayList<Operation> tx = inflightTransactions.remove(key);
-        if (tx != null) {
-            preparedTransactions.put(key, tx);
+        synchronized (indexMutex) {
+            TransactionId key = key(command.getTransactionInfo());
+            ArrayList<Operation> tx = inflightTransactions.remove(key);
+            if (tx != null) {
+                preparedTransactions.put(key, tx);
+            }
         }
     }
 
     protected void process(KahaRollbackCommand command, Location location) {
-        TransactionId key = key(command.getTransactionInfo());
-        ArrayList<Operation> tx = inflightTransactions.remove(key);
-        if (tx == null) {
-            preparedTransactions.remove(key);
+        synchronized (indexMutex) {
+            TransactionId key = key(command.getTransactionInfo());
+            ArrayList<Operation> tx = inflightTransactions.remove(key);
+            if (tx == null) {
+                preparedTransactions.remove(key);
+            }
         }
     }
-    
-    ///////////////////////////////////////////////////////////////////
+
+    // /////////////////////////////////////////////////////////////////
     // These methods do the actual index updates.
-    ///////////////////////////////////////////////////////////////////
-    
+    // /////////////////////////////////////////////////////////////////
+
     protected final Object indexMutex = new Object();
-    
+
     private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
 
-        // Skip adding the message to the index if this is a topic and there are no subscriptions.
-        if( sd.subscriptions!=null && sd.ackLocations.isEmpty() ) {
+        // Skip adding the message to the index if this is a topic and there are
+        // no subscriptions.
+        if (sd.subscriptions != null && sd.ackLocations.isEmpty()) {
             return;
         }
-        
+
         // Add the message.
         sd.orderIndex.put(tx, location, command.getMessageId());
         sd.messageIdIndex.put(tx, command.getMessageId(), location);
@@ -441,32 +581,32 @@
 
     private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
-        if( !command.hasSubscriptionKey() ) {
+        if (!command.hasSubscriptionKey()) {
             // In the queue case we just remove the message from the index..
             Location messageLocation = sd.messageIdIndex.remove(tx, command.getMessageId());
-            if( messageLocation!=null ) {
+            if (messageLocation != null) {
                 sd.orderIndex.remove(tx, messageLocation);
             }
         } else {
-            // In the topic case we need remove the message once it's been acked by all the subs
+            // In the topic case we need remove the message once it's been acked
+            // by all the subs
             Location messageLocation = sd.messageIdIndex.get(tx, command.getMessageId());
-            
+
             // Make sure it's a valid message id...
-            if( messageLocation!=null ) {
+            if (messageLocation != null) {
                 String subscriptionKey = command.getSubscriptionKey();
                 Location prev = sd.subscriptionAcks.put(tx, subscriptionKey, messageLocation);
-                
+
                 // The following method handles deleting un-referenced messages.
                 removeAckLocation(tx, sd, subscriptionKey, prev);
-                
+
                 // Add it to the new location set.
                 addAckLocation(sd, messageLocation, subscriptionKey);
             }
-            
+
         }
-        metadata.lastUpdate = ackLocation;
     }
-    
+
     private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         sd.orderIndex.clear(tx);
@@ -476,7 +616,7 @@
         tx.free(sd.orderIndex.getPageId());
         tx.free(sd.messageIdIndex.getPageId());
 
-        if( sd.subscriptions!=null ) {
+        if (sd.subscriptions != null) {
             sd.subscriptions.clear(tx);
             sd.subscriptionAcks.clear(tx);
             sd.subscriptions.unload();
@@ -484,26 +624,26 @@
             tx.free(sd.subscriptions.getPageId());
             tx.free(sd.subscriptionAcks.getPageId());
         }
-        
+
         String key = key(command.getDestination());
         storedDestinations.remove(key);
         metadata.destinations.remove(tx, key);
     }
-    
+
     private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
-        
+
         // If set then we are creating it.. otherwise we are destroying the sub
-        if( command.hasSubscriptionInfo() ) {
+        if (command.hasSubscriptionInfo()) {
             String subscriptionKey = command.getSubscriptionKey();
             sd.subscriptions.put(tx, subscriptionKey, command);
             Location ackLocation;
-            if( command.getRetroactive() ) {
-                ackLocation = new Location(0,0);
+            if (command.getRetroactive()) {
+                ackLocation = new Location(0, 0);
             } else {
                 ackLocation = location;
             }
-            
+
             sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
             addAckLocation(sd, ackLocation, subscriptionKey);
         } else {
@@ -515,12 +655,60 @@
         }
 
     }
-
     
+    /**
+     * @param tx
+     * @throws IOException
+     */
+    private void checkpointUpdate(Transaction tx) throws IOException {
+
+        // Find empty journal files to remove.
+        final HashSet<Integer> inUseFiles = new HashSet<Integer>();
+        
+        for (StoredDestination sd : storedDestinations.values()) {
+            // Use a visitor to cut down the number of pages that we load
+            sd.orderIndex.visit(tx, new BTreeVisitor<Location, String>() {
+                int last=-1;
+                public boolean isInterestedInKeysBetween(Location first, Location second) {
+                    if( second!=null ) {
+                        if( last+1 == second.getDataFileId() ) {
+                            last++;
+                            inUseFiles.add(last);
+                        }
+                        if( last == second.getDataFileId() ) {
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+
+                public void visit(Location[] keys, String[] values) {
+                    for (int i = 0; i < keys.length; i++) {
+                        if( last == keys[i].getDataFileId() ) {
+                            inUseFiles.add(keys[i].getDataFileId());
+                            last = keys[i].getDataFileId();
+                        }
+                    }
+                    
+                }
+            });
+        }
+        
+        metadata.state = OPEN_STATE;
+        metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
+        tx.store(metadata.page, metadataMarshaller, true);
+
+        Location l = metadata.lastUpdate;
+        if( metadata.firstInProgressTransactionLocation!=null ) {
+            l = metadata.firstInProgressTransactionLocation;
+        }
+        asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
+    }
+
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // StoredDestination related implementation methods.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
 
     private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
 
@@ -541,7 +729,7 @@
         HashMap<String, Location> subscriptionCursors;
         TreeMap<Location, HashSet<String>> ackLocations;
     }
-    
+
     protected class StoredDestinationMarshaller implements Marshaller<StoredDestination> {
         public Class<StoredDestination> getType() {
             return StoredDestination.class;
@@ -551,8 +739,8 @@
             StoredDestination value = new StoredDestination();
             value.orderIndex = new BTreeIndex<Location, String>(pageFile, dataIn.readLong());
             value.messageIdIndex = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
-            
-            if( dataIn.readBoolean() ) {
+
+            if (dataIn.readBoolean()) {
                 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
                 value.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
             }
@@ -562,7 +750,7 @@
         public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
             dataOut.writeLong(value.orderIndex.getPageId());
             dataOut.writeLong(value.messageIdIndex.getPageId());
-            if( value.subscriptions !=null ) {
+            if (value.subscriptions != null) {
                 dataOut.writeBoolean(true);
                 dataOut.writeLong(value.subscriptions.getPageId());
                 dataOut.writeLong(value.subscriptionAcks.getPageId());
@@ -571,10 +759,10 @@
             }
         }
     }
-    
+
     static class LocationMarshaller implements Marshaller<Location> {
-        final static LocationMarshaller INSTANCE = new  LocationMarshaller();
-        
+        final static LocationMarshaller INSTANCE = new LocationMarshaller();
+
         public Class<Location> getType() {
             return Location.class;
         }
@@ -591,22 +779,26 @@
             dataOut.writeInt(object.getOffset());
         }
     }
-    
+
     static class KahaSubscriptionCommandMarshaller implements Marshaller<KahaSubscriptionCommand> {
-        final static KahaSubscriptionCommandMarshaller INSTANCE = new  KahaSubscriptionCommandMarshaller();
-        
+        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
+
         public Class<KahaSubscriptionCommand> getType() {
             return KahaSubscriptionCommand.class;
         }
 
         public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
             KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
-            rc.mergeFrom((InputStream)dataIn);
+            byte[] ba = new byte[dataIn.readShort()];
+            dataIn.readFully(ba);
+            rc.mergeFrom(ba);
             return rc;
         }
 
         public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
-            object.writeTo((OutputStream)dataOut);
+            byte[] ba = object.toByteArray();
+            dataOut.writeShort(ba.length);
+            dataOut.write(ba);
         }
     }
 
@@ -614,57 +806,71 @@
         String key = key(destination);
         StoredDestination rc = storedDestinations.get(key);
         if (rc == null) {
-            // Try to load the existing indexes..
-            rc = metadata.destinations.get(tx, key);
-            if( rc ==null ) {
-                // Brand new destination.. allocate indexes for it.
-                rc = new StoredDestination();
-                rc.orderIndex = new BTreeIndex<Location, String>(pageFile, tx.allocate());
-                rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, tx.allocate());
-                
-                if( destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC ) {
-                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
-                    rc.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, tx.allocate());
-                }
+            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
+            rc = loadStoredDestination(tx, key, topic);
+            // Cache it. We may want to remove/unload destinations from the
+            // cache that are not used for a while
+            // to reduce memory usage.
+            storedDestinations.put(key, rc);
+        }
+        return rc;
+    }
+
+    /**
+     * @param tx
+     * @param key
+     * @param topic
+     * @return
+     * @throws IOException
+     */
+    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
+        // Try to load the existing indexes..
+        StoredDestination rc = metadata.destinations.get(tx, key);
+        if (rc == null) {
+            // Brand new destination.. allocate indexes for it.
+            rc = new StoredDestination();
+            rc.orderIndex = new BTreeIndex<Location, String>(pageFile, tx.allocate());
+            rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, tx.allocate());
+
+            if (topic) {
+                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
+                rc.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, tx.allocate());
             }
-            
-            // Configure the marshalers and load.
-            rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
-            rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
-            rc.orderIndex.load();
-            
-            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
-            rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
-            rc.messageIdIndex.load();
-            
-            // If it was a topic...
-            if( rc.subscriptions!=null ) {
-                
-                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
-                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
-                rc.subscriptions.load();
-                
-                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
-                rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
-                rc.subscriptionAcks.load();
-                
-                rc.ackLocations = new TreeMap<Location, HashSet<String>>();
-                rc.subscriptionCursors = new HashMap<String, Location>();
-                
-                for (Iterator<Entry<String, Location>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
-                    Entry<String,Location> entry = iterator.next();
-                    addAckLocation(rc, entry.getValue(), entry.getKey());
-                }
+            metadata.destinations.put(tx, key, rc);
+        }
+
+        // Configure the marshalers and load.
+        rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
+        rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
+        rc.orderIndex.load();
+
+        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
+        rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
+        rc.messageIdIndex.load();
+
+        // If it was a topic...
+        if (topic) {
+
+            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
+            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
+            rc.subscriptions.load();
+
+            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
+            rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
+            rc.subscriptionAcks.load();
 
+            rc.ackLocations = new TreeMap<Location, HashSet<String>>();
+            rc.subscriptionCursors = new HashMap<String, Location>();
+
+            for (Iterator<Entry<String, Location>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+                Entry<String, Location> entry = iterator.next();
+                addAckLocation(rc, entry.getValue(), entry.getKey());
             }
-            
-            // Cache it.  We may want to remove/unload destinations from the cache that are not used for a while
-            // to reduce memory usage.
-            storedDestinations.put(key, rc);
+
         }
         return rc;
     }
-    
+
     /**
      * @param sd
      * @param messageLocation
@@ -672,14 +878,13 @@
      */
     private void addAckLocation(StoredDestination sd, Location messageLocation, String subscriptionKey) {
         HashSet<String> hs = sd.ackLocations.get(messageLocation);
-        if( hs == null ) {
+        if (hs == null) {
             hs = new HashSet<String>();
             sd.ackLocations.put(messageLocation, hs);
         }
         hs.add(subscriptionKey);
     }
-    
-    
+
     /**
      * @param tx
      * @param sd
@@ -689,37 +894,37 @@
      */
     private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Location location) throws IOException {
         // Remove the sub from the previous location set..
-        if( location!=null ) {
+        if (location != null) {
             HashSet<String> hs = sd.ackLocations.get(location);
-            if(hs!=null) {
+            if (hs != null) {
                 hs.remove(subscriptionKey);
-                if( hs.isEmpty() ) {
+                if (hs.isEmpty()) {
                     HashSet<String> firstSet = sd.ackLocations.values().iterator().next();
                     sd.ackLocations.remove(location);
-                    
-                    // Did we just empty out the first set in the 
-                    // ordered list of ack locations?  Then it's time to 
+
+                    // Did we just empty out the first set in the
+                    // ordered list of ack locations? Then it's time to
                     // delete some messages.
-                    if( hs==firstSet ) {
+                    if (hs == firstSet) {
 
-                        
                         // Find all the entries that need to get deleted.
                         ArrayList<Entry<Location, String>> deletes = new ArrayList<Entry<Location, String>>();
                         for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
                             Entry<Location, String> entry = iterator.next();
-                            while( entry.getKey().compareTo(location) <= 0 ) {
-                                // We don't do the actually delete while we are iterating the BTree since 
+                            while (entry.getKey().compareTo(location) <= 0) {
+                                // We don't do the actually delete while we are
+                                // iterating the BTree since
                                 // iterating would fail.
                                 deletes.add(entry);
                             }
                         }
-                        
+
                         // Do the actual deletes.
                         for (Entry<Location, String> entry : deletes) {
                             sd.messageIdIndex.remove(tx, entry.getValue());
                             sd.orderIndex.remove(tx, entry.getKey());
                         }
-                        
+
                     }
                 }
             }
@@ -727,15 +932,14 @@
     }
 
     private String key(KahaDestination destination) {
-        return destination.getType().getNumber()+":"+destination.getName();
+        return destination.getType().getNumber() + ":" + destination.getName();
     }
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Transaction related implementation methods.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
     protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
-       
 
     private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
         TransactionId key = key(info);
@@ -766,15 +970,21 @@
 
     abstract class Operation {
         final Location location;
+
         public Operation(Location location) {
             this.location = location;
         }
+
+        public Location getLocation() {
+            return location;
+        }
+
         abstract public void execute(Transaction tx) throws IOException;
     }
-    
+
     class AddOpperation extends Operation {
         final KahaAddMessageCommand command;
-        
+
         public AddOpperation(KahaAddMessageCommand command, Location location) {
             super(location);
             this.command = command;
@@ -783,15 +993,15 @@
         public void execute(Transaction tx) throws IOException {
             upadateIndex(tx, command, location);
         }
-        
+
         public KahaAddMessageCommand getCommand() {
             return command;
         }
     }
-    
+
     class RemoveOpperation extends Operation {
         final KahaRemoveMessageCommand command;
-        
+
         public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
             super(location);
             this.command = command;
@@ -805,13 +1015,11 @@
             return command;
         }
     }
-    
-    
-    ///////////////////////////////////////////////////////////////////
+
+    // /////////////////////////////////////////////////////////////////
     // Initialization related implementation methods.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
 
-    
     private PageFile createPageFile() {
         PageFile pf = new PageFile(directory, "database");
         return pf;
@@ -841,5 +1049,4 @@
         this.deleteAllMessages = deleteAllMessages;
     }
 
-
 }