You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/26 17:47:35 UTC

svn commit: r758737 - in /activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store: ./ kahadb/ memory/

Author: chirino
Date: Thu Mar 26 16:47:24 2009
New Revision: 758737

URL: http://svn.apache.org/viewvc?rev=758737&view=rev
Log:
Filling out the kahadb impl.

Added:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
Removed:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Thu Mar 26 16:47:24 2009
@@ -462,15 +462,18 @@
                 RestoredMessageImpl rm = new RestoredMessageImpl();
                 // TODO should update jms redelivery here.
                 rm.qRecord = records.next();
-                rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
-                rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
-                if (rm.handler == null) {
-                    try {
-                        rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.encoding.toString());
-                        protocolHandlers.put(rm.mRecord.encoding.toString(), rm.handler);
-                    } catch (Throwable thrown) {
-                        throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
+                try {
+                    rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
+                    rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
+                    if (rm.handler == null) {
+                        try {
+                            rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.encoding.toString());
+                            protocolHandlers.put(rm.mRecord.encoding.toString(), rm.handler);
+                        } catch (Throwable thrown) {
+                            throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
+                        }
                     }
+                } catch (KeyNotFoundException shouldNotHappen) {
                 }
             }
         }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Thu Mar 26 16:47:24 2009
@@ -230,7 +230,7 @@
         
         public Long messageAdd(MessageRecord message);
         public Long messageGetKey(AsciiBuffer messageId);
-        public MessageRecord messageGetRecord(Long key);
+        public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException;
 
         public Long streamOpen();
         public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
@@ -249,7 +249,7 @@
         // Queue related methods.
         public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);
         public void queueAdd(AsciiBuffer queueName);
-        public boolean queueRemove(AsciiBuffer queueName);
+        public void queueRemove(AsciiBuffer queueName);
         
 
         public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=758737&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Thu Mar 26 16:47:24 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.store.Store.QueueRecord;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.index.BTreeVisitor;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.Marshaller;
+
+public class DestinationEntity {
+    
+    public final static  Marshaller<DestinationEntity> MARSHALLER = new Marshaller<DestinationEntity>() {
+        
+        public Class<DestinationEntity> getType() {
+            return DestinationEntity.class;
+        }
+
+        public DestinationEntity readPayload(DataInput dataIn) throws IOException {
+            DestinationEntity value = new DestinationEntity();
+            value.queueIndex = new BTreeIndex<Long, QueueRecord>(dataIn.readLong());
+            return value;
+        }
+
+        public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
+            dataOut.writeLong(value.queueIndex.getPageId());
+        }
+    };
+
+    private long nextQueueKey;
+    private BTreeIndex<Long, QueueRecord> queueIndex;
+
+    ///////////////////////////////////////////////////////////////////
+    // Lifecycle Methods.
+    ///////////////////////////////////////////////////////////////////
+    public void allocate(Transaction tx) throws IOException {
+        queueIndex = new BTreeIndex<Long, QueueRecord>(tx.allocate());
+    }
+    
+    public void deallocate(Transaction tx) throws IOException {
+        queueIndex.clear(tx);
+        tx.free(queueIndex.getPageId());
+        queueIndex=null;
+    }
+    
+    public void load(Transaction tx) throws IOException {
+        if( queueIndex.getPageFile()==null ) {
+            
+            queueIndex.setPageFile(tx.getPageFile());
+            queueIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+            queueIndex.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
+            queueIndex.load(tx);
+    
+            // Figure out the next key using the last entry in the destination.
+            Entry<Long, QueueRecord> lastEntry = queueIndex.getLast(tx);
+            if( lastEntry!=null ) {
+                nextQueueKey = lastEntry.getKey()+1;
+            }
+        }
+    }
+    
+    ///////////////////////////////////////////////////////////////////
+    // Message Methods.
+    ///////////////////////////////////////////////////////////////////
+    public Long nextQueueKey() {
+        return nextQueueKey++;
+    }
+    
+    public void add(Transaction tx, QueueAddMessage command) throws IOException {
+        QueueRecord value = new QueueRecord();
+        value.setAttachment(command.getAttachment());
+        value.setMessageKey(command.getMessageKey());
+        value.setQueueKey(command.getQueueKey());
+        queueIndex.put(tx, value.getQueueKey(), value);
+    }
+
+    public void remove(Transaction tx, long queueKey) throws IOException {
+        queueIndex.remove(tx, queueKey);
+    }
+
+    public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, final int max) throws IOException {
+        final ArrayList<QueueRecord> rc = new ArrayList<QueueRecord>(max);
+        queueIndex.visit(tx, new BTreeVisitor.GTEVisitor<Long, QueueRecord>(firstQueueKey) {
+            @Override
+            public boolean isInterestedInKeysBetween(Long first, Long second) {
+                if (rc.size() >= max)
+                    return false;
+                return super.isInterestedInKeysBetween(first, second);
+            }
+
+            @Override
+            protected void matched(Long key, QueueRecord value) {
+                if (rc.size() >= max)
+                    return;
+                rc.add(value);
+            }
+        });
+        return rc.iterator();
+    }
+
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Thu Mar 26 16:47:24 2009
@@ -18,19 +18,19 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
+import org.apache.activemq.broker.store.kahadb.Data.QueueRemove;
 import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage;
 import org.apache.activemq.broker.store.kahadb.Data.Trace;
 import org.apache.activemq.broker.store.kahadb.Data.Type;
@@ -46,7 +46,6 @@
 import org.apache.activemq.protobuf.PBMessage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Page;
@@ -67,7 +66,7 @@
     protected PageFile pageFile;
     protected Journal journal;
     
-    protected StoredDBState dbstate = new StoredDBState();
+    protected RootEntity rootEntity = new RootEntity();
 
     protected boolean failIfDatabaseIsLocked;
     protected boolean deleteAllMessages;
@@ -86,9 +85,8 @@
     private Location nextRecoveryPosition;
     private Location lastRecoveryPosition;
 
-    protected final Object indexMutex = new Object();
+    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
     private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
-    private final HashMap<AsciiBuffer, StoredDestinationState> storedDestinations = new HashMap<AsciiBuffer, StoredDestinationState>();
 
     ///////////////////////////////////////////////////////////////////
     // Lifecylce methods
@@ -106,50 +104,30 @@
     }
 
 	private void loadPageFile() throws IOException {
-		synchronized (indexMutex) {
+	    indexLock.writeLock().lock();
+		try {
 		    final PageFile pageFile = getPageFile();
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     if (pageFile.getPageCount() == 0) {
-                        dbstate.allocate(tx);
+                        rootEntity.allocate(tx);
                     } else {
-                        Page<StoredDBState> page = tx.load(0, StoredDBState.MARSHALLER);
-                        dbstate = page.get();
-                        dbstate.page = page;
+                        Page<RootEntity> page = tx.load(0, RootEntity.MARSHALLER);
+                        rootEntity = page.get();
+                        rootEntity.setPageId(0);
                     }
-                    dbstate.load(tx);
+                    rootEntity.load(tx);
                 }
             });
             pageFile.flush();
 
-            // Keep a cache of the StoredDestinations
-            storedDestinations.clear();
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    for (Iterator<Entry<AsciiBuffer, StoredDestinationState>> iterator = dbstate.destinations.iterator(tx); iterator.hasNext();) {
-                        Entry<AsciiBuffer, StoredDestinationState> entry = iterator.next();
-                        StoredDestinationState sd = loadStoredDestination(tx, entry.getKey());
-                        storedDestinations.put(entry.getKey(), sd);
-                    }
-                }
-            });
+        } finally {
+            indexLock.writeLock().unlock();
         }
 	}
 	
 	
-    private StoredDestinationState loadStoredDestination(Transaction tx, AsciiBuffer key) throws IOException {
-        // Try to load the existing indexes..
-        StoredDestinationState rc = dbstate.destinations.get(tx, key);
-        if (rc == null) {
-            // Brand new destination.. allocate indexes for it.
-            rc = new StoredDestinationState();
-            rc.allocate(tx);
-            dbstate.destinations.put(tx, key, rc);
-        }
-        rc.load(tx);
-        return rc;
-    }
     
 	/**
 	 * @throws IOException
@@ -211,8 +189,8 @@
 	}
 	
     public void load() throws IOException {
-    	
-        synchronized (indexMutex) {
+    	indexLock.writeLock().lock();
+        try {
 	    	open();
 	    	
 	        if (deleteAllMessages) {
@@ -220,7 +198,7 @@
 	
 	            pageFile.unload();
 	            pageFile.delete();
-	            dbstate = new StoredDBState();
+	            rootEntity = new RootEntity();
 	            
 	            LOG.info("Persistence store purged.");
 	            deleteAllMessages = false;
@@ -228,15 +206,21 @@
 	            loadPageFile();
 	        }
 	        store( new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())));
+        } finally {
+            indexLock.writeLock().unlock();
         }
 
     }
 
 	public void close() throws IOException, InterruptedException {
 		if( opened.compareAndSet(true, false)) {
-	        synchronized (indexMutex) {
+		    
+	        indexLock.writeLock().lock();
+	        try {
 	            pageFile.unload();
-	            dbstate = new StoredDBState();
+	            rootEntity = new RootEntity();
+	        } finally {
+	            indexLock.writeLock().unlock();
 	        }
 	        journal.close();
 	        checkpointThread.join();
@@ -246,16 +230,19 @@
 	}
 	
     public void unload() throws IOException, InterruptedException {
-        synchronized (indexMutex) {
+        indexLock.writeLock().lock();
+        try {
             if( pageFile.isLoaded() ) {
-                dbstate.state = CLOSED_STATE;
+                rootEntity.setState(CLOSED_STATE);
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
-                        tx.store(dbstate.page, StoredDBState.MARSHALLER, true);
+                        rootEntity.store(tx);
                     }
                 });
                 close();
             }
+        } finally {
+            indexLock.writeLock().unlock();
         }
     }
 
@@ -273,7 +260,8 @@
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
-        synchronized (indexMutex) {
+        indexLock.writeLock().lock();
+        try {
 	        long start = System.currentTimeMillis();
 	        
 	        Location recoveryPosition = getRecoveryPosition();
@@ -282,7 +270,7 @@
 		        while (recoveryPosition != null) {
 		            final TypeCreatable message = load(recoveryPosition);
 		            final Location location = lastRecoveryPosition;
-		            dbstate.lastUpdate = recoveryPosition;
+		            rootEntity.setLastUpdate(recoveryPosition);
 		            
 	                pageFile.tx().execute(new Transaction.Closure<IOException>() {
 	                    public void execute(Transaction tx) throws IOException {
@@ -303,11 +291,14 @@
                     recoverIndex(tx);
                 }
             });
+        } finally {
+            indexLock.writeLock().unlock();
         }
     }
     
     public void incrementalRecover() throws IOException {
-        synchronized (indexMutex) {
+        indexLock.writeLock().lock();
+        try {
             if( nextRecoveryPosition == null ) {
                 if( lastRecoveryPosition==null ) {
                     nextRecoveryPosition = getRecoveryPosition();
@@ -317,7 +308,7 @@
             }
             while (nextRecoveryPosition != null) {
                 lastRecoveryPosition = nextRecoveryPosition;
-                dbstate.lastUpdate = lastRecoveryPosition;
+                rootEntity.setLastUpdate(lastRecoveryPosition);
                 final TypeCreatable message = load(lastRecoveryPosition);
                 final Location location = lastRecoveryPosition;
                 
@@ -329,6 +320,8 @@
 
                 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
             }
+        } finally {
+            indexLock.writeLock().unlock();
         }
     }
     
@@ -370,14 +363,14 @@
 	}
 	
     public Location getLastUpdatePosition() throws IOException {
-        return dbstate.lastUpdate;
+        return rootEntity.getLastUpdate();
     }
     
 	private Location getRecoveryPosition() throws IOException {
 		
-        if( dbstate.lastUpdate!=null) {
+        if( rootEntity.getLastUpdate()!=null) {
             // Start replay at the record after the last one recorded in the index file.
-            return journal.getNextLocation(dbstate.lastUpdate);
+            return journal.getNextLocation(rootEntity.getLastUpdate());
         }
         
         // This loads the first position.
@@ -387,7 +380,8 @@
     protected void checkpointCleanup(final boolean cleanup) {
         try {
         	long start = System.currentTimeMillis();
-            synchronized (indexMutex) {
+            indexLock.writeLock().lock();
+            try {
             	if( !opened.get() ) {
             		return;
             	}
@@ -396,6 +390,8 @@
                         checkpointUpdate(tx, cleanup);
                     }
                 });
+            } finally {
+                indexLock.writeLock().unlock();
             }
         	long end = System.currentTimeMillis();
         	if( end-start > 100 ) { 
@@ -408,13 +404,16 @@
 
     
 	public void checkpoint(org.apache.activemq.util.Callback closure) throws Exception {
-        synchronized (indexMutex) {
+        indexLock.writeLock().lock();
+        try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     checkpointUpdate(tx, false);
                 }
             });
             closure.execute();
+        } finally {
+            indexLock.writeLock().unlock();
         }
 	}
     
@@ -426,8 +425,8 @@
 
         LOG.debug("Checkpoint started.");
         
-        dbstate.state = OPEN_STATE;
-        tx.store(dbstate.page, StoredDBState.MARSHALLER, true);
+        rootEntity.setState(OPEN_STATE);
+        rootEntity.store(tx);
         pageFile.flush();
 
         if( cleanup ) {
@@ -440,7 +439,7 @@
         	}
         	
         	// Don't GC files after the first in progress tx
-        	Location firstTxLocation = dbstate.lastUpdate;
+        	Location firstTxLocation = rootEntity.getLastUpdate();
             
             if( firstTxLocation!=null ) {
             	while( !gcCandidateSet.isEmpty() ) {
@@ -453,52 +452,52 @@
             	}
             }
 
-            // Go through all the destinations to see if any of them can remove GC candidates.
-            for (StoredDestinationState sd : storedDestinations.values()) {
-            	if( gcCandidateSet.isEmpty() ) {
-                	break;
-                }
-                
-                // Use a visitor to cut down the number of pages that we load
-                dbstate.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
-                    int last=-1;
-                    public boolean isInterestedInKeysBetween(Location first, Location second) {
-                    	if( first==null ) {
-                    		SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
-                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
-                    			subset.remove(second.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	} else if( second==null ) {
-                    		SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
-                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
-                    			subset.remove(first.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	} else {
-                    		SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
-                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
-                    			subset.remove(first.getDataFileId());
-                    		}
-                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
-                    			subset.remove(second.getDataFileId());
-                    		}
-							return !subset.isEmpty();
-                    	}
-                    }
-    
-                    public void visit(List<Location> keys, List<Long> values) {
-                    	for (Location l : keys) {
-                            int fileId = l.getDataFileId();
-							if( last != fileId ) {
-                        		gcCandidateSet.remove(fileId);
-                                last = fileId;
-                            }
-						}                        
-                    }
-    
-                });
-            }
+//            // Go through all the destinations to see if any of them can remove GC candidates.
+//            for (StoredDestinationState sd : storedDestinations.values()) {
+//            	if( gcCandidateSet.isEmpty() ) {
+//                	break;
+//                }
+//                
+//                // Use a visitor to cut down the number of pages that we load
+//                dbstate.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+//                    int last=-1;
+//                    public boolean isInterestedInKeysBetween(Location first, Location second) {
+//                    	if( first==null ) {
+//                    		SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
+//                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+//                    			subset.remove(second.getDataFileId());
+//                    		}
+//							return !subset.isEmpty();
+//                    	} else if( second==null ) {
+//                    		SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
+//                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+//                    			subset.remove(first.getDataFileId());
+//                    		}
+//							return !subset.isEmpty();
+//                    	} else {
+//                    		SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
+//                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+//                    			subset.remove(first.getDataFileId());
+//                    		}
+//                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+//                    			subset.remove(second.getDataFileId());
+//                    		}
+//							return !subset.isEmpty();
+//                    	}
+//                    }
+//    
+//                    public void visit(List<Location> keys, List<Long> values) {
+//                    	for (Location l : keys) {
+//                            int fileId = l.getDataFileId();
+//							if( last != fileId ) {
+//                        		gcCandidateSet.remove(fileId);
+//                                last = fileId;
+//                            }
+//						}                        
+//                    }
+//    
+//                });
+//            }
 
             if( !gcCandidateSet.isEmpty() ) {
 	            LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
@@ -541,22 +540,23 @@
         final Location location = journal.write(os.toByteSequence(), sync);
         long start2 = System.currentTimeMillis();
         
-        synchronized (indexMutex) {
+        
+        try {
+            indexLock.writeLock().lock();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     updateIndex(tx, data.toType(), message, location);
                 }
             });
+            rootEntity.setLastUpdate(location);
+        } finally {
+            indexLock.writeLock().unlock();
         }
 
         long end = System.currentTimeMillis();
         if( end-start > 100 ) { 
             LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
         }
-
-        synchronized (indexMutex) {
-            dbstate.lastUpdate = location;
-        }
         return location;
     }
     
@@ -581,20 +581,24 @@
     }
 
     @SuppressWarnings("unchecked")
-    public void updateIndex(Transaction tx, Type type, MessageBuffer message, Location location) {
+    public void updateIndex(Transaction tx, Type type, MessageBuffer command, Location location) throws IOException {
         switch (type) {
         case MESSAGE_ADD:
-            messageAdd(tx, (MessageAdd)message, location);
+            messageAdd(tx, (MessageAdd)command, location);
             return;
         case QUEUE_ADD:
-            queueAdd(tx, (QueueAdd)message, location);
+            queueAdd(tx, (QueueAdd)command, location);
+            return;
+        case QUEUE_REMOVE:
+            queueRemove(tx, (QueueRemove)command, location);
             return;
         case QUEUE_ADD_MESSAGE:
-            queueAddMessage(tx, (QueueAdd)message, location);
+            queueAddMessage(tx, (QueueAddMessage)command, location);
             return;
         case QUEUE_REMOVE_MESSAGE:
-            queueRemoveMessage(tx, (QueueRemoveMessage)message, location);
+            queueRemoveMessage(tx, (QueueRemoveMessage)command, location);
             return;
+            
         case TRANSACTION_BEGIN:
         case TRANSACTION_ADD_MESSAGE:
         case TRANSACTION_REMOVE_MESSAGE:
@@ -612,90 +616,116 @@
         }
     }
 
-    private void messageAdd(Transaction tx, MessageAdd message, Location location) {
+    private void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
+        rootEntity.messageAdd(tx, command, location);
+    }
+    
+    private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
+        rootEntity.queueAdd(tx, command.getQueueName());
     }
-    private void queueAdd(Transaction tx, QueueAdd message, Location location) {
+    
+    private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
+        rootEntity.queueRemove(tx, command.getQueueName());
     }
-    private void queueAddMessage(Transaction tx, QueueAdd message, Location location) {
+    
+    private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
+        DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
+        if( destination!=null ) {
+            destination.add(tx, command);
+        }
     }
-    private void queueRemoveMessage(Transaction tx, QueueRemoveMessage message, Location location) {
+    private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
+        DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
+        if( destination!=null ) {
+            destination.remove(tx, command.getQueueKey());
+        }
     }
 
     class KahaDBSession implements Session {
+        ArrayList<TypeCreatable> updates = new ArrayList<TypeCreatable>();
+
+        private Transaction tx; 
+        private Transaction tx() {
+            if( tx ==null ) {
+                indexLock.readLock().lock();
+                tx = pageFile.tx();
+            }
+            return tx;
+        }
         
         ///////////////////////////////////////////////////////////////
         // Message related methods.
         ///////////////////////////////////////////////////////////////
         public Long messageAdd(MessageRecord message) {
+            Long id = rootEntity.nextMessageKey();
+            MessageAddBean bean = new MessageAddBean();
+            bean.setBuffer(message.getBuffer());
+            bean.setEncoding(message.getEncoding());
+            bean.setMessageId(message.getMessageId());
+            bean.setMessageKey(id); 
+            bean.setStreamKey(message.getStreamKey());
+            updates.add(bean);
+            return id;
+        }
+        
+        public Long messageGetKey(AsciiBuffer messageId) {
+            return rootEntity.messageGetKey(tx(), messageId);
+        }
+        
+        public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {
+            Location location = rootEntity.messageGetLocation(tx(), key);
+            if( location ==null ) {
+                throw new KeyNotFoundException("message key: "+key);
+            }
             try {
-                Long id = dbstate.nextMessageId++;
-                MessageAddBean bean = new MessageAddBean();
-                bean.setBuffer(message.getBuffer());
-                bean.setEncoding(message.getEncoding());
-                bean.setMessageId(message.getMessageId());
-                bean.setMessageKey(id); 
-                bean.setStreamKey(message.getStreamKey());
-                store(bean);
-                return id;
+                return (MessageRecord) load(location);
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             }
         }
-        public Long messageGetKey(AsciiBuffer messageId) {
-            return null;
-        }
-        public MessageRecord messageGetRecord(Long key) {
-            return null;
-        }
 
         ///////////////////////////////////////////////////////////////
         // Queue related methods.
         ///////////////////////////////////////////////////////////////
         public void queueAdd(AsciiBuffer queueName) {
-            try {
-                store(new QueueAddBean().setQueueName(queueName));
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
+            updates.add(new QueueAddBean().setQueueName(queueName));
         }
-        public boolean queueRemove(AsciiBuffer queueName) {
-            try {
-                store(new QueueRemoveBean().setQueueName(queueName));
-                return false;
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
+        public void queueRemove(AsciiBuffer queueName) {
+            updates.add(new QueueRemoveBean().setQueueName(queueName));
         }
         public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
-            return null;
+            return rootEntity.queueList(tx(), firstQueueName, max);
         }
         public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
-            try {
-                Long queueKey = 1L;
-                QueueAddMessageBean bean = new QueueAddMessageBean();
-                bean.setQueueName(queueName);
-                bean.setAttachment(record.getAttachment());
-                bean.setMessageKey(record.getMessageKey());
-                bean.setQueueKey(queueKey);
-                store(bean);
-                return queueKey;
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
-            }
+            DestinationEntity destination = rootEntity.getDestination(queueName);
+            if( destination ==null ) {
+                throw new KeyNotFoundException("queue key: "+queueName);
+            }
+            Long queueKey = destination.nextQueueKey();
+            QueueAddMessageBean bean = new QueueAddMessageBean();
+            bean.setQueueName(queueName);
+            bean.setAttachment(record.getAttachment());
+            bean.setMessageKey(record.getMessageKey());
+            bean.setQueueKey(queueKey);
+            updates.add(bean);
+            return queueKey;
         }
         public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+            QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
+            bean.setQueueKey(queueKey);
+            bean.setQueueName(queueName);
+            updates.add(bean);
+        }
+        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
+            DestinationEntity destination = rootEntity.getDestination(queueName);
+            if( destination ==null ) {
+                throw new KeyNotFoundException("queue key: "+queueName);
+            }
             try {
-                QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
-                bean.setQueueKey(queueKey);
-                bean.setQueueName(queueName);
-                store(bean);
+                return destination.listMessages(tx, firstQueueKey, max);
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             }
-
-        }
-        public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
-            return null;
         }
         
         

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Thu Mar 26 16:47:24 2009
@@ -20,6 +20,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.kahadb.journal.Location;
@@ -27,6 +28,34 @@
 
 public class Marshallers {
     
+    public final static Marshaller<QueueRecord> QUEUE_RECORD_MARSHALLER = new Marshaller<QueueRecord>() {
+        
+        public Class<QueueRecord> getType() {
+            return QueueRecord.class;
+        }
+    
+        public QueueRecord readPayload(DataInput dataIn) throws IOException {
+            QueueRecord rc = new QueueRecord();
+            rc.setQueueKey(dataIn.readLong());
+            rc.setMessageKey(dataIn.readLong());
+            if( dataIn.readBoolean() ) {
+                rc.setAttachment(BUFFER_MARSHALLER.readPayload(dataIn));
+            }
+            return rc;
+        }
+    
+        public void writePayload(QueueRecord object, DataOutput dataOut) throws IOException {
+            dataOut.writeLong(object.getQueueKey());
+            dataOut.writeLong(object.getMessageKey());
+            if( object.getAttachment()!=null ) {
+                dataOut.writeBoolean(true);
+                BUFFER_MARSHALLER.writePayload(object.getAttachment(), dataOut);
+            } else {
+                dataOut.writeBoolean(false);
+            }
+        }
+    };
+
     public final static Marshaller<Location> LOCATION_MARSHALLER = new Marshaller<Location>() {
     
         public Class<Location> getType() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java Thu Mar 26 16:47:24 2009
@@ -25,7 +25,6 @@
 import org.apache.kahadb.util.Marshaller;
 
 public class MessageKeys {
-    public static final MessageKeysMarshaller MARSHALLER = new MessageKeysMarshaller();
 
     final AsciiBuffer messageId;
     final Location location;
@@ -40,8 +39,7 @@
         return "["+messageId+","+location+"]";
     }
     
-    public static class MessageKeysMarshaller implements Marshaller<MessageKeys> {
-        
+    public static final Marshaller<MessageKeys> MARSHALLER = new Marshaller<MessageKeys>() {
         public Class<MessageKeys> getType() {
             return MessageKeys.class;
         }
@@ -56,5 +54,5 @@
             dataOut.write(object.messageId.data, object.messageId.offset, object.messageId.length);
             Marshallers.LOCATION_MARSHALLER.writePayload(object.location, dataOut);
         }
-    }
+    };
 }
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=758737&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Thu Mar 26 16:47:24 2009
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.Marshaller;
+
+public class RootEntity {
+    
+    public final static Marshaller<RootEntity> MARSHALLER = new Marshaller<RootEntity>() {
+        public Class<RootEntity> getType() {
+            return RootEntity.class;
+        }
+
+        public RootEntity readPayload(DataInput is) throws IOException {
+            RootEntity rc = new RootEntity();
+            rc.state = is.readInt();
+            rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
+            if (is.readBoolean()) {
+                rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
+            } else {
+                rc.lastUpdate = null;
+            }
+            return rc;
+        }
+
+        public void writePayload(RootEntity object, DataOutput os) throws IOException {
+            os.writeInt(object.state);
+            os.writeLong(object.destinationIndex.getPageId());
+            if (object.lastUpdate != null) {
+                os.writeBoolean(true);
+                Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
+            } else {
+                os.writeBoolean(false);
+            }
+        }
+    };
+
+    // The root page the this object's state is stored on.
+    // private Page<StoredDBState> page;
+
+    // State information about the index
+    private long pageId;
+    private int state;
+    private Location lastUpdate;
+
+    // Message Indexes
+    private long nextMessageKey;
+    private BTreeIndex<Long, MessageKeys> messageKeyIndex;
+    private BTreeIndex<Location, Long> locationIndex;
+    private BTreeIndex<AsciiBuffer, Long> messageIdIndex;
+
+    // The destinations
+    private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
+    private final TreeMap<AsciiBuffer, DestinationEntity> destinations = new TreeMap<AsciiBuffer, DestinationEntity>();
+
+    ///////////////////////////////////////////////////////////////////
+    // Lifecycle Methods.
+    ///////////////////////////////////////////////////////////////////
+    
+    public void allocate(Transaction tx) throws IOException {
+        // First time this is created.. Initialize a new pagefile.
+        Page<RootEntity> page = tx.allocate();
+        pageId = page.getPageId();
+        assert pageId == 0;
+        
+        state = KahaDBStore.CLOSED_STATE;
+        destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
+
+        page.set(this);
+        tx.store(page, MARSHALLER, true);
+    }
+    
+    public void load(Transaction tx) throws IOException {
+        destinationIndex.setPageFile(tx.getPageFile());
+        destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        destinationIndex.setValueMarshaller(DestinationEntity.MARSHALLER);
+        destinationIndex.load(tx);
+        
+        // Keep the StoredDestinations loaded
+        destinations.clear();
+        for (Iterator<Entry<AsciiBuffer, DestinationEntity>> iterator = destinationIndex.iterator(tx); iterator.hasNext();) {
+            Entry<AsciiBuffer, DestinationEntity> entry = iterator.next();
+            entry.getValue().load(tx);
+            destinations.put(entry.getKey(), entry.getValue());
+        }        
+    }
+    
+    public void store(Transaction tx) throws IOException {
+        Page<RootEntity> page = tx.load(pageId, null);
+        page.set(this);
+        tx.store(page, RootEntity.MARSHALLER, true);
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Message Methods.
+    ///////////////////////////////////////////////////////////////////
+    public Long nextMessageKey() {
+        return nextMessageKey++;
+    }
+
+    public void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
+        long id = nextMessageKey++;
+        Long previous = locationIndex.put(tx, location, id);
+        if( previous == null ) {
+            messageIdIndex.put(tx, command.getMessageId(), id);
+            messageKeyIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+        } else {
+            // Message existed.. undo the index update we just did.  Chances
+            // are it's a transaction replay.
+            locationIndex.put(tx, location, previous);
+        }
+    }
+
+    public Long messageGetKey(Transaction tx, AsciiBuffer messageId) {
+        try {
+            return messageIdIndex.get(tx, messageId);
+        } catch (IOException e) {
+            throw new Store.FatalStoreException(e);
+        }
+    }
+    
+    public Location messageGetLocation(Transaction tx, Long messageKey) {
+        try {
+            MessageKeys t = messageKeyIndex.get(tx, messageKey);
+            if( t==null ) {
+                return null;
+            }
+            return t.location;
+        } catch (IOException e) {
+            throw new Store.FatalStoreException(e);
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Queue Methods.
+    ///////////////////////////////////////////////////////////////////
+    public void queueAdd(Transaction tx, AsciiBuffer queueName) throws IOException {
+        if( destinationIndex.get(tx, queueName)==null ) {
+            DestinationEntity rc = new DestinationEntity();
+            rc.allocate(tx);
+            destinationIndex.put(tx, queueName, rc);
+            rc.load(tx);
+            destinations.put(queueName, rc);
+        }
+    }
+
+    public void queueRemove(Transaction tx, AsciiBuffer queueName) throws IOException {
+        DestinationEntity destination = destinations.get(queueName);
+        if( destination!=null ) {
+            destinationIndex.remove(tx, queueName);
+            destinations.remove(queueName);
+            destination.deallocate(tx);
+        }
+    }
+
+    public DestinationEntity getDestination(AsciiBuffer queueName) {
+        return destinations.get(queueName);
+    }
+    
+    public Iterator<AsciiBuffer> queueList(Transaction tx, AsciiBuffer firstQueueName, int max) {
+        return list(destinations, firstQueueName, max);
+    }
+    
+    static private <Key,Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
+        ArrayList<Key> rc = new ArrayList<Key>(max);
+        Set<Key> keys = (first==null ? map : map.tailMap(first)).keySet();
+        for (Key buffer : keys) {
+            if( rc.size() >= max ) {
+                break;
+            }
+            rc.add(buffer);
+        }
+        return rc.iterator();
+    }
+
+    public long getPageId() {
+        return pageId;
+    }
+
+    public void setPageId(long pageId) {
+        this.pageId = pageId;
+    }
+
+    public int getState() {
+        return state;
+    }
+
+    public void setState(int state) {
+        this.state = state;
+    }
+
+    public Location getLastUpdate() {
+        return lastUpdate;
+    }
+
+    public void setLastUpdate(Location lastUpdate) {
+        this.lastUpdate = lastUpdate;
+    }
+
+
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Thu Mar 26 16:47:24 2009
@@ -175,13 +175,11 @@
                 queues.put(queueName, queue);
             }
         }
-        public boolean queueRemove(AsciiBuffer queueName) {
+        public void queueRemove(AsciiBuffer queueName) {
             StoredQueue queue = queues.get(queueName);
             if (queue != null) {
                 queues.remove(queueName);
-                return true;
             }
-            return false;
         }
         public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
             return list(queues, firstQueueName, max);