You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/16 19:42:21 UTC

svn commit: r785317 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/ activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ ac...

Author: cmacnaug
Date: Tue Jun 16 17:42:21 2009
New Revision: 785317

URL: http://svn.apache.org/viewvc?rev=785317&view=rev
Log:
Updating KahaDBStore to apply journal and index updates on the fly instead of at commit time. 

This allows larger chunks of work to be done within a single transaction reducing the need for lots of locking unlocking. Also added locking and transaction controls to the Session interface to that callers can directly operate against a Session to avoid creating one per transaction. 

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Tue Jun 16 17:42:21 2009
@@ -36,7 +36,6 @@
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.VoidCallback;
 import org.apache.activemq.dispatch.DispatcherAware;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.AbstractLimitedFlowResource;
@@ -117,7 +116,7 @@
         }, databaseFlow, storeLimiter, opQueue);
         storeController.useOverFlowQueue(false);
         super.onFlowOpened(storeController);
-        
+
         flushDelayCallback = new Runnable() {
             public void run() {
                 flushDelayCallback();
@@ -171,11 +170,9 @@
     }
 
     public Iterator<QueueQueryResult> listQueues(final short type) throws Exception {
-        // TODO Auto-generated method stub
         return store.execute(new Callback<Iterator<QueueQueryResult>, Exception>() {
 
             public Iterator<QueueQueryResult> execute(Session session) throws Exception {
-                // TODO Auto-generated method stub
                 return session.queueListByType(type, null, Integer.MAX_VALUE);
             }
 
@@ -307,7 +304,7 @@
 
     private final void processOps() {
         int count = 0;
-
+        Session session = store.getSession();
         while (running.get()) {
             final OperationBase firstOp = getNextOp(true);
             if (firstOp == null) {
@@ -318,31 +315,34 @@
             // The first operation we get, triggers a store transaction.
             if (firstOp != null) {
                 final LinkedList<Operation> processedQueue = new LinkedList<Operation>();
+                boolean locked = false;
                 try {
 
                     Operation op = firstOp;
-                    // TODO the recursion here leads to a rather large stack,
-                    // refactor.
                     while (op != null) {
                         final Operation toExec = op;
                         if (toExec.beginExecute()) {
+                            if (!locked) {
+                                session.acquireLock();
+                                locked = true;
+                            }
                             count++;
-
-                            store.execute(new Store.VoidCallback<Exception>() {
-                                @Override
-                                public void run(Session session) throws Exception {
-
-                                    // Try to execute the operation against the
-                                    // session...
-                                    try {
-                                        toExec.execute(session);
-                                        processedQueue.add(toExec);
-                                    } catch (CancellationException ignore) {
-                                        // System.out.println("Cancelled" +
-                                        // toExec);
-                                    }
-                                }
-                            }, null);
+                            op.execute(session);
+                            processedQueue.add(op);
+                            /*
+                             * store.execute(new Store.VoidCallback<Exception>()
+                             * {
+                             * 
+                             * @Override public void run(Session session) throws
+                             * Exception {
+                             * 
+                             * // Try to execute the operation against the //
+                             * session... try { toExec.execute(session);
+                             * processedQueue.add(toExec); } catch
+                             * (CancellationException ignore) { //
+                             * System.out.println("Cancelled" + // toExec); } }
+                             * }, null);
+                             */
                         }
 
                         if (count < 1000) {
@@ -356,6 +356,11 @@
                     // If we procecessed some ops, flush and post process:
                     if (!processedQueue.isEmpty()) {
 
+                        if (locked) {
+                            session.commit();
+                            session.releaseLock();
+                            locked = false;
+                        }
                         if (DEBUG)
                             System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
                         // Sync the store:
@@ -393,6 +398,16 @@
                     IOException ioe = new IOException(e.getMessage());
                     ioe.initCause(e);
                     onDatabaseException(ioe);
+                } finally {
+                    if (locked) {
+                        try {
+                            session.releaseLock();
+                        } catch (Exception e) {
+                            IOException ioe = new IOException(e.getMessage());
+                            ioe.initCause(e);
+                            onDatabaseException(ioe);
+                        }
+                    }
                 }
             }
         }
@@ -1098,12 +1113,13 @@
         return store.allocateStoreTracking();
     }
 
-	public IDispatcher getDispatcher() {
-		return dispatcher;
-	}
-	public void setDispatcher(IDispatcher dispatcher) {
-		this.dispatcher = dispatcher;
-	}
+    public IDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(IDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
 
 	public Store getStore() {
 		return store;

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Tue Jun 16 17:42:21 2009
@@ -53,7 +53,7 @@
 
     protected final boolean USE_KAHA_DB = true;
     protected final boolean PURGE_STORE = true;
-    protected final boolean PERSISTENT = false;
+    protected final boolean PERSISTENT = true;
     protected final boolean DURABLE = true;
 
     // Set to put senders and consumers on separate brokers.

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Tue Jun 16 17:42:21 2009
@@ -22,7 +22,6 @@
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -67,12 +66,14 @@
     private static final int BEGIN_UNIT_OF_WORK = -1;
     private static final int END_UNIT_OF_WORK = -2;
     private static final int FLUSH = -3;
+    private static final int CANCEL_UNIT_OF_WORK = -4;
 
     private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
-    private static final ByteSequence BEGIN_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { END_UNIT_OF_WORK });
+    private static final ByteSequence BEGIN_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { BEGIN_UNIT_OF_WORK });
     private static final ByteSequence END_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { END_UNIT_OF_WORK });
+    private static final ByteSequence CANCEL_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { CANCEL_UNIT_OF_WORK });
     private static final ByteSequence FLUSH_DATA = new ByteSequence(new byte[] { FLUSH });
 
     public static final int CLOSED_STATE = 1;
@@ -116,12 +117,9 @@
     // /////////////////////////////////////////////////////////////////
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
-            try
-            {
+            try {
                 load();
-            }
-            catch (Exception e)
-            {
+            } catch (Exception e) {
                 LOG.error("Error loading store", e);
             }
         }
@@ -140,6 +138,10 @@
         return trackingGen.incrementAndGet();
     }
 
+    public boolean isTransactional() {
+        return true;
+    }
+
     private void loadPageFile() throws IOException {
         indexLock.writeLock().lock();
         try {
@@ -229,7 +231,7 @@
         }
     }
 
-    public void load() throws IOException {
+    private void load() throws IOException {
         indexLock.writeLock().lock();
         try {
             open();
@@ -246,7 +248,7 @@
 
                 loadPageFile();
             }
-            store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())));
+            store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())), null);
         } finally {
             indexLock.writeLock().unlock();
         }
@@ -260,10 +262,11 @@
             try {
                 pageFile.unload();
                 rootEntity = new RootEntity();
+                journal.close();
             } finally {
                 indexLock.writeLock().unlock();
             }
-            journal.close();
+
             checkpointThread.join();
             lockFile.unlock();
             lockFile = null;
@@ -333,6 +336,8 @@
                             redoCounter += uow.size();
                             uow = null;
                         }
+                    } else if (data.length == 1 && data.data[0] == CANCEL_UNIT_OF_WORK) {
+                        uow = null;
                     } else if (data.length == 1 && data.data[0] == FLUSH) {
                     } else {
                         final TypeCreatable message = load(recoveryPosition);
@@ -409,32 +414,8 @@
         // in that case we need to removed references to messages that are not
         // in the journal
         final Location lastAppendLocation = journal.getLastAppendLocation();
-        long undoCounter = 0;
+        int undoCounter = rootEntity.recoverIndex(lastAppendLocation, tx);
 
-        // TODO
-        // // Go through all the destinations to see if they have messages past
-        // the lastAppendLocation
-        // for (StoredDestinationState sd : storedDestinations.values()) {
-        //        	
-        // final ArrayList<Long> matches = new ArrayList<Long>();
-        // // Find all the Locations that are >= than the last Append Location.
-        // sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
-        // Long>(lastAppendLocation) {
-        // @Override
-        // protected void matched(Location key, Long value) {
-        // matches.add(value);
-        // }
-        // });
-        //            
-        //            
-        // for (Long sequenceId : matches) {
-        // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-        // sd.locationIndex.remove(tx, keys.location);
-        // sd.messageIdIndex.remove(tx, keys.messageId);
-        // undoCounter++;
-        // // TODO: do we need to modify the ack positions for the pub sub case?
-        // }
-        // }
         long end = System.currentTimeMillis();
         if (undoCounter > 0) {
             // The rolledback operations are basically in flight journal writes.
@@ -444,10 +425,6 @@
         }
     }
 
-    public Location getLastUpdatePosition() throws IOException {
-        return rootEntity.getLastUpdate();
-    }
-
     private Location getRecoveryPosition() throws IOException {
 
         if (rootEntity.getLastUpdate() != null) {
@@ -620,8 +597,8 @@
     // /////////////////////////////////////////////////////////////////
     long messageSequence;
 
-    public Location store(TypeCreatable data) throws IOException {
-        return store(data, null);
+    private Location store(TypeCreatable data, Transaction tx) throws IOException {
+        return store(data, null, tx);
     }
 
     /**
@@ -633,105 +610,48 @@
      * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    public Location store(final TypeCreatable data, Runnable onFlush) throws IOException {
+    private Location store(final TypeCreatable data, Runnable onFlush, Transaction tx) throws IOException {
         final MessageBuffer message = ((PBMessage) data).freeze();
         int size = message.serializedSizeUnframed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
         os.writeByte(data.toType().getNumber());
         message.writeUnframed(os);
 
-        long start = System.currentTimeMillis();
-        final Location location;
-        synchronized (journal) {
-            location = journal.write(os.toByteSequence(), onFlush);
-        }
-        long start2 = System.currentTimeMillis();
-
-        try {
+        //If we aren't in a transaction acquire the index lock
+        if (tx == null) {
             indexLock.writeLock().lock();
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    updateIndex(tx, data.toType(), message, location);
-                }
-            });
-            rootEntity.setLastUpdate(location);
-
-        } finally {
-            indexLock.writeLock().unlock();
-        }
-
-        long end = System.currentTimeMillis();
-        if (end - start > 1000) {
-            LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
         }
-        return location;
-    }
-
-    public void store(List<TypeCreatable> batch) throws IOException {
-        store(batch, null);
-    }
 
-    // ArrayList<TypeCreatable>
-    /**
-     * All updated are are funneled through this method. The updates a converted
-     * to a PBMessage which is logged to the journal and then the data from the
-     * PBMessage is used to update the index just like it would be done during a
-     * recovery process.
-     * 
-     * @throws IOException
-     */
-    @SuppressWarnings("unchecked")
-    public void store(final List<TypeCreatable> batch, Runnable onFlush) throws IOException {
-        if (batch.isEmpty()) {
-            return;
-        }
-        if (batch.size() == 1) {
-            store(batch.get(0), onFlush);
-            return;
-        }
-
-        final ArrayList<UoWOperation> uow = new ArrayList<UoWOperation>(batch.size());
-        for (TypeCreatable bean : batch) {
-            final MessageBuffer message = ((PBMessage) bean).freeze();
-            int size = message.serializedSizeUnframed();
-            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-            os.writeByte(bean.toType().getNumber());
-            message.writeUnframed(os);
-            UoWOperation op = new UoWOperation();
-            op.bean = bean;
-            op.data = os.toByteSequence();
-            uow.add(op);
-        }
+        try {
 
-        long start = System.currentTimeMillis();
-        synchronized (journal) {
-            journal.write(BEGIN_UNIT_OF_WORK_DATA, false);
-            for (UoWOperation op : uow) {
-                op.location = journal.write(op.data, false);
+            long start = System.currentTimeMillis();
+            final Location location;
+            synchronized (journal) {
+                location = journal.write(os.toByteSequence(), onFlush);
             }
-            journal.write(END_UNIT_OF_WORK_DATA, onFlush);
-        }
-        long start2 = System.currentTimeMillis();
+            long start2 = System.currentTimeMillis();
 
-        try {
-            indexLock.writeLock().lock();
-            pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    for (UoWOperation op : uow) {
-                        MessageBuffer message = ((PBMessage) op.bean).freeze();
-                        updateIndex(tx, op.bean.toType(), message, op.location);
-                        rootEntity.setLastUpdate(op.location);
+            if (tx == null) {
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        updateIndex(tx, data.toType(), message, location);
                     }
-                }
-            });
+                });
+            } else {
+                updateIndex(tx, data.toType(), message, location);
+            }
+
+            long end = System.currentTimeMillis();
+            if (end - start > 1000) {
+                LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
+            }
+            return location;
+            
         } finally {
-            indexLock.writeLock().unlock();
+            if (tx == null)
+                indexLock.writeLock().unlock();
         }
 
-        long end = System.currentTimeMillis();
-        if (end - start > 1000) {
-            LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
-        }
     }
 
     /**
@@ -741,7 +661,7 @@
      * @return
      * @throws IOException
      */
-    public TypeCreatable load(Location location) throws IOException {
+    private TypeCreatable load(Location location) throws IOException {
         ByteSequence data = journal.read(location);
         return load(location, data);
     }
@@ -764,19 +684,19 @@
         switch (type) {
         case MESSAGE_ADD:
             messageAdd(tx, (MessageAdd) command, location);
-            return;
+            break;
         case QUEUE_ADD:
             queueAdd(tx, (QueueAdd) command, location);
-            return;
+            break;
         case QUEUE_REMOVE:
             queueRemove(tx, (QueueRemove) command, location);
-            return;
+            break;
         case QUEUE_ADD_MESSAGE:
             queueAddMessage(tx, (QueueAddMessage) command, location);
-            return;
+            break;
         case QUEUE_REMOVE_MESSAGE:
             queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
-            return;
+            break;
 
         case TRANSACTION_BEGIN:
         case TRANSACTION_ADD_MESSAGE:
@@ -793,6 +713,7 @@
         case STREAM_REMOVE:
             throw new UnsupportedOperationException();
         }
+        rootEntity.setLastUpdate(location);
     }
 
     private void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
@@ -846,20 +767,25 @@
     }
 
     class KahaDBSession implements Session {
-        ArrayList<TypeCreatable> updates = new ArrayList<TypeCreatable>();
+        TypeCreatable atomicUpdate = null;
+        int updateCount = 0;
 
         private Transaction tx;
 
         private Transaction tx() {
-            if (tx == null) {
-                indexLock.readLock().lock();
-                tx = pageFile.tx();
-            }
+            acquireLock();
             return tx;
         }
 
-        public void close() {
+        public final void commit() {
+            commit(null);
+        }
+
+        public final void rollback() {
             try {
+                if (updateCount > 1) {
+                    journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
+                }
                 if (tx != null) {
                     tx.rollback();
                 }
@@ -867,30 +793,90 @@
                 throw new FatalStoreException(e);
             } finally {
                 if (tx != null) {
-                    indexLock.readLock().unlock();
                     tx = null;
+                    updateCount = 0;
+                    atomicUpdate = null;
                 }
             }
         }
 
+        /**
+         * Indicates callers intent to start a transaction.
+         */
+        public final void acquireLock() {
+            if (tx == null) {
+                indexLock.writeLock().lock();
+                tx = pageFile.tx();
+            }
+        }
+
+        public final void releaseLock() {
+            try {
+                if (tx != null) {
+                    rollback();
+                }
+            } finally {
+                indexLock.writeLock().unlock();
+            }
+        }
+
         public void commit(Runnable onFlush) {
             try {
+
+                boolean flush = false;
+                if (atomicUpdate != null) {
+                    store(atomicUpdate, onFlush, tx);
+                } else if (updateCount > 1) {
+                    journal.write(END_UNIT_OF_WORK_DATA, onFlush);
+                } else {
+                    flush = onFlush != null;
+                }
+
                 if (tx != null) {
                     tx.commit();
                 }
+
+                if (flush) {
+                    onFlush.run();
+                }
+
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             } finally {
-                if (tx != null) {
-                    indexLock.readLock().unlock();
-                    tx = null;
+                tx = null;
+                updateCount = 0;
+                atomicUpdate = null;
+            }
+        }
+
+        private void storeAtomic() {
+            if (atomicUpdate != null) {
+                try {
+                    journal.write(BEGIN_UNIT_OF_WORK_DATA, false);
+                    store(atomicUpdate, null, tx);
+                    atomicUpdate = null;
+                } catch (IOException ioe) {
+                    throw new FatalStoreException(ioe);
                 }
             }
+        }
 
+        private void addUpdate(TypeCreatable bean) {
             try {
-                store(updates, onFlush);
-            } catch (IOException e) {
-                throw new FatalStoreException(e);
+                //As soon as we do more than one update we'll wrap in a unit of 
+                //work:
+                if (updateCount == 0) {
+                    atomicUpdate = bean;
+                    updateCount++;
+                    return;
+                }
+                storeAtomic();
+
+                updateCount++;
+                store(bean, null, tx);
+
+            } catch (IOException ioe) {
+                throw new FatalStoreException(ioe);
             }
         }
 
@@ -915,10 +901,12 @@
             if (streamKey != null) {
                 bean.setStreamKey(streamKey);
             }
-            updates.add(bean);
+
+            addUpdate(bean);
         }
 
         public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {
+            storeAtomic();
             Location location = rootEntity.messageGetLocation(tx(), key);
             if (location == null) {
                 throw new KeyNotFoundException("message key: " + key);
@@ -955,14 +943,15 @@
                 update.setParentName(parent);
                 update.setPartitionId(descriptor.getPartitionKey());
             }
-            updates.add(update);
+            addUpdate(update);
         }
 
         public void queueRemove(QueueDescriptor descriptor) {
-            updates.add(new QueueRemoveBean().setQueueName(descriptor.getQueueName()));
+            addUpdate(new QueueRemoveBean().setQueueName(descriptor.getQueueName()));
         }
 
         public Iterator<QueueQueryResult> queueListByType(short type, QueueDescriptor firstQueue, int max) {
+            storeAtomic();
             try {
                 return rootEntity.queueList(tx(), type, firstQueue, max);
             } catch (IOException e) {
@@ -971,6 +960,7 @@
         }
 
         public Iterator<QueueQueryResult> queueList(QueueDescriptor firstQueue, int max) {
+            storeAtomic();
             try {
                 return rootEntity.queueList(tx(), (short) -1, firstQueue, max);
             } catch (IOException e) {
@@ -987,17 +977,18 @@
             if (record.getAttachment() != null) {
                 bean.setAttachment(record.getAttachment());
             }
-            updates.add(bean);
+            addUpdate(bean);
         }
 
         public void queueRemoveMessage(QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
             QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
             bean.setMessageKey(messageKey);
             bean.setQueueName(queue.getQueueName());
-            updates.add(bean);
+            addUpdate(bean);
         }
 
         public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
+            storeAtomic();
             DestinationEntity destination = rootEntity.getDestination(queue);
             if (destination == null) {
                 throw new KeyNotFoundException("queue key: " + queue);
@@ -1084,14 +1075,23 @@
         }
     }
 
+    public Session getSession() {
+        return new KahaDBSession();
+    }
+
+    /**
+     * Convenienct method for executing a batch of work within a store
+     * transaction.
+     */
     public <R, T extends Exception> R execute(final Callback<R, T> callback, final Runnable onFlush) throws T {
         KahaDBSession session = new KahaDBSession();
+        session.acquireLock();
         try {
             R rc = callback.execute(session);
             session.commit(onFlush);
             return rc;
         } finally {
-            session.close();
+            session.releaseLock();
         }
     }
 
@@ -1136,13 +1136,28 @@
         return manager;
     }
 
+    private PageFile getPageFile() {
+        if (pageFile == null) {
+            pageFile = createPageFile();
+        }
+        return pageFile;
+    }
+
+    private Journal getJournal() {
+        if (journal == null) {
+            journal = createJournal();
+        }
+        return journal;
+    }
+
     public File getDirectory() {
         return directory;
     }
 
-	public File getStoreDirectory() {
-		return directory;
-	}
+    public File getStoreDirectory() {
+        return directory;
+    }
+
     public void setStoreDirectory(File directory) {
         this.directory = directory;
     }
@@ -1203,20 +1218,6 @@
         return journalMaxFileLength;
     }
 
-    public PageFile getPageFile() {
-        if (pageFile == null) {
-            pageFile = createPageFile();
-        }
-        return pageFile;
-    }
-
-    public Journal getJournal() {
-        if (journal == null) {
-            journal = createJournal();
-        }
-        return journal;
-    }
-
     public boolean isFailIfDatabaseIsLocked() {
         return failIfDatabaseIsLocked;
     }
@@ -1225,5 +1226,4 @@
         this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
     }
 
-
 }

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Tue Jun 16 17:42:21 2009
@@ -425,4 +425,45 @@
             return lastSequence;
         }
     }
+
+    /**
+     * @param lastAppendLocation
+     * @param tx
+     * @return the number of undone index entries
+     */
+    public int recoverIndex(Location lastAppendLocation, Transaction tx) {
+
+        //TODO check that none of the locations specified by the indexes
+        //are past the last update location in the journal. This can happen
+        //if the index is flushed before the journal. 
+        //
+        //Collection<DestinationEntity> values = destinations.values();
+        //for (DestinationEntity de : values) {
+        //    count += 
+        //}
+        // Go through all the destinations to see if they have messages past
+        // the lastAppendLocation
+        //for (StoredDestinationState sd : 
+        //          
+        // final ArrayList<Long> matches = new ArrayList<Long>();
+        // // Find all the Locations that are >= than the last Append Location.
+        // sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
+        // Long>(lastAppendLocation) {
+        // @Override
+        // protected void matched(Location key, Long value) {
+        // matches.add(value);
+        // }
+        // });
+        //            
+        //            
+        // for (Long sequenceId : matches) {
+        // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+        // sd.locationIndex.remove(tx, keys.location);
+        // sd.messageIdIndex.remove(tx, keys.messageId);
+        // undoCounter++;
+        // // TODO: do we need to modify the ack positions for the pub sub case?
+        // }
+        // }
+        return 0;
+    }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Tue Jun 16 17:42:21 2009
@@ -118,6 +118,11 @@
     public long allocateStoreTracking();
 
     /**
+     * @return A new store Session.
+     */
+    public Session getSession();
+    
+    /**
      * This interface is used to execute transacted code.
      * 
      * It is used by the {@link Store#execute(Callback)} method, often as
@@ -354,6 +359,12 @@
      */
     public void flush();
 
+    
+    /**
+     * @return true if the store is transactional. 
+     */
+    public boolean isTransactional();
+    
     /**
      * This interface allows you to query and update the Store.
      * 
@@ -362,6 +373,31 @@
      * 
      */
     public interface Session {
+        
+        /**
+         * Commits work done on the Session
+         */
+        public void commit();
+
+        /**
+         * Rolls back work done on the Session
+         * since the last call to {@link #acquireLock()}
+         * 
+         * @throw {@link UnsupportedOperationException} if the store is not transactional
+         */
+        public void rollback();
+
+        /**
+         * Indicates callers intent to start a transaction. 
+         */
+        public void acquireLock();
+
+        /**
+         * Indicates caller is done with the transaction, if 
+         * not committed then the transaction will be rolled back (providing
+         * the store is transactional.
+         */
+        public void releaseLock();
 
         public void messageAdd(MessageRecord message);
 
@@ -397,7 +433,7 @@
          * 
          * @param firstQueueName
          *            If null starts the query at the first queue.
-         * @param max
+        * @param max
          *            The maximum number of queues to return
          * @return The list of queues.
          */

Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Tue Jun 16 17:42:21 2009
@@ -25,8 +25,10 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.Session;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.QueueDescriptor;
@@ -43,6 +45,7 @@
 
     private MemorySession session = new MemorySession();
     private AtomicLong trackingGen = new AtomicLong(0);
+    private ReentrantLock updateLock = new ReentrantLock();
 
     /**
      * @return a unique sequential store tracking number.
@@ -51,6 +54,10 @@
         return trackingGen.incrementAndGet();
     }
 
+    public boolean isTransactional() {
+        return false;
+    }
+
     static private class Stream {
 
         private ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -126,7 +133,7 @@
             } else {
                 list = new ArrayList<QueueRecord>(max);
             }
-            
+
             for (Long key : records.tailMap(firstQueueKey).keySet()) {
                 if ((max >= 0 && list.size() >= max) || (maxSequence >= 0 && key > maxSequence)) {
                     break;
@@ -198,7 +205,7 @@
             result.count = count;
             result.size = size;
             result.firstSequence = records.isEmpty() ? 0 : records.get(records.firstKey()).getQueueKey();
-            result.lastSequence = records.isEmpty() ? 0 :  records.get(records.lastKey()).getQueueKey();
+            result.lastSequence = records.isEmpty() ? 0 : records.get(records.lastKey()).getQueueKey();
             result.desc = descriptor.copy();
             if (this.partitions != null) {
                 ArrayList<QueueQueryResult> childResults = new ArrayList<QueueQueryResult>(partitions.size());
@@ -212,6 +219,14 @@
         }
 
     }
+    
+    /**
+     * @return A new store Session.
+     */
+    public Session getSession()
+    {
+        return session;
+    }
 
     static private class RemoveOp {
         QueueDescriptor queue;
@@ -283,12 +298,10 @@
         }
 
         public long getFirstSequence() {
-            // TODO Auto-generated method stub
             return firstSequence;
         }
 
         public long getLastSequence() {
-            // TODO Auto-generated method stub
             return lastSequence;
         }
     }
@@ -304,6 +317,44 @@
         private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
         private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
 
+        /**
+         * Commits work done on the Session, if {@link Store#isTransactional()} is true.
+         */
+        public void commit()
+        {
+            //NOOP
+        }
+
+        /**
+         * Rolls back work done on the Session
+         * since the last call to {@link #acquireLock()}
+         */
+        public void rollback()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Indicates callers intent to start a transaction. If the store
+         * is transaction, the caller must call {@link #commit()} when the 
+         * done operating on the Session prior to a mandatory call to 
+         * {@link #releaseLock()}
+         */
+        public void acquireLock()
+        {
+            updateLock.lock();
+        }
+
+        /**
+         * Indicates caller is done with the transaction, if 
+         * not committed then the transaction will be rolled back (providing
+         * the store is transactional.
+         */
+        public void releaseLock()
+        {
+            updateLock.unlock();
+        }
+
         // //////////////////////////////////////////////////////////////////////////////
         // Message related methods.
         // ///////////////////////////////////////////////////////////////////////////////
@@ -404,7 +455,7 @@
             }
 
             for (StoredQueue sq : tailResults) {
-                if (max >=0 && results.size() >= max) {
+                if (max >= 0 && results.size() >= max) {
                     break;
                 }
                 if (type != -1 && sq.descriptor.getApplicationType() != type) {
@@ -598,8 +649,7 @@
 	}
 
 	public void setDeleteAllMessages(boolean val) {
-        // TODO Auto-generated method stub
-
+        // NOOP
     }
 
 

Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Tue Jun 16 17:42:21 2009
@@ -18,7 +18,11 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import junit.framework.TestCase;
 
@@ -35,12 +39,13 @@
 
 public abstract class StorePerformanceBase extends TestCase {
 
-    private static int PERFORMANCE_SAMPLES = 3;
+    private static int PERFORMANCE_SAMPLES = 5;
     private static boolean SYNC_TO_DISK = true;
-    
-    
+    private static final boolean USE_SHARED_WRITER = true;
+
     private Store store;
     private QueueDescriptor queueId;
+    private AtomicLong queueKey = new AtomicLong(0);
 
     protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
     protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
@@ -50,11 +55,21 @@
 
     abstract protected Store createStore();
 
+    private SharedWriter writer = null;
+    private Semaphore writePermits = null;
+
     @Override
     protected void setUp() throws Exception {
         store = createStore();
         store.start();
-        
+
+        if (USE_SHARED_WRITER) {
+            writer = new SharedWriter();
+            writer.start();
+        }
+
+        writePermits = new Semaphore(1000);
+
         queueId = new QueueDescriptor();
         queueId.setQueueName(new AsciiBuffer("test"));
         store.execute(new VoidCallback<Exception>() {
@@ -75,14 +90,96 @@
             p.stop();
         }
         producers.clear();
-        
+
+        if (writer != null) {
+            writer.stop();
+        }
+
         if (store != null) {
             store.stop();
         }
     }
 
-    private final Object wakeupMutex = new Object(); 
-    
+    private final Object wakeupMutex = new Object();
+
+    class SharedWriter implements Runnable {
+        LinkedBlockingQueue<SharedQueueOp> queue = new LinkedBlockingQueue<SharedQueueOp>(1000);
+        private Thread thread;
+        private AtomicBoolean stopped = new AtomicBoolean();
+
+        public void start() {
+            thread = new Thread(this, "Writer");
+            thread.start();
+        }
+
+        public void stop() throws InterruptedException {
+            stopped.set(true);
+
+            //Add an op to trigger shutdown:
+            SharedQueueOp op = new SharedQueueOp() {
+                public void run() {
+                }
+            };
+            op.op = new Store.VoidCallback<Exception>() {
+
+                @Override
+                public void run(Session session) throws Exception {
+                    // TODO Auto-generated method stub
+                }
+            };
+
+            queue.put(op);
+            thread.join();
+        }
+
+        public void run() {
+            Session session = store.getSession();
+            try {
+                LinkedList<Runnable> processed = new LinkedList<Runnable>();
+                while (!stopped.get()) {
+                    SharedQueueOp op = queue.take();
+                    session.acquireLock();
+                    int ops = 0;
+                    while (op != null && ops < 1000) {
+                        op.op.execute(session);
+                        processed.add(op);
+                        op = queue.poll();
+                        ops++;
+                    }
+
+                    session.commit();
+                    session.releaseLock();
+
+                    if (SYNC_TO_DISK) {
+                        store.flush();
+                    }
+
+                    for (Runnable r : processed) {
+                        r.run();
+                    }
+                    processed.clear();
+                }
+
+            } catch (InterruptedException e) {
+                if (!stopped.get()) {
+                    e.printStackTrace();
+                }
+                return;
+            } catch (Exception e) {
+                e.printStackTrace();
+                return;
+            }
+        }
+
+        public void addOp(SharedQueueOp op) throws InterruptedException {
+            queue.put(op);
+        }
+    }
+
+    abstract class SharedQueueOp implements Runnable {
+        VoidCallback<Exception> op;
+    }
+
     class Producer implements Runnable {
         private Thread thread;
         private AtomicBoolean stopped = new AtomicBoolean();
@@ -91,104 +188,122 @@
         private long sleep;
 
         public Producer(String name) {
-            this.name=name;
+            this.name = name;
         }
+
         public void start() {
             rate.name("Producer " + name + " Rate");
             totalProducerRate.add(rate);
-            thread = new Thread(this, "Producer"+ name);
+            thread = new Thread(this, "Producer" + name);
             thread.start();
         }
+
         public void stop() throws InterruptedException {
             stopped.set(true);
+            while (writePermits.hasQueuedThreads()) {
+                writePermits.release();
+            }
             thread.join();
         }
+
         public void run() {
             try {
                 Buffer buffer = new Buffer(new byte[1024]);
-                for( long i=0; !stopped.get(); i++ ) {
-                    
+                for (long i = 0; !stopped.get(); i++) {
+
+                    writePermits.acquireUninterruptibly();
+
                     final MessageRecord messageRecord = new MessageRecord();
                     messageRecord.setKey(store.allocateStoreTracking());
-                    messageRecord.setMessageId(new AsciiBuffer(""+i));
+                    messageRecord.setMessageId(new AsciiBuffer("" + i));
                     messageRecord.setEncoding(new AsciiBuffer("encoding"));
                     messageRecord.setBuffer(buffer);
                     messageRecord.setSize(buffer.getLength());
 
-                    Runnable onFlush = new Runnable(){
+                    SharedQueueOp op = new SharedQueueOp() {
                         public void run() {
                             rate.increment();
-                            synchronized(wakeupMutex){
+                            writePermits.release();
+                            synchronized (wakeupMutex) {
                                 wakeupMutex.notify();
                             }
                         }
                     };
-                    final long queueKey = i + 1;
-                    store.execute(new VoidCallback<Exception>() {
+
+                    op.op = new VoidCallback<Exception>() {
                         @Override
                         public void run(Session session) throws Exception {
                             session.messageAdd(messageRecord);
                             QueueRecord queueRecord = new Store.QueueRecord();
                             queueRecord.setMessageKey(messageRecord.getKey());
-                            queueRecord.setQueueKey(queueKey);
+                            queueRecord.setQueueKey(queueKey.incrementAndGet());
                             queueRecord.setSize(messageRecord.getSize());
                             session.queueAddMessage(queueId, queueRecord);
                         }
-                    }, onFlush);
-                    
-                    if( SYNC_TO_DISK ) {
-                        store.flush();
+                    };
+
+                    if (!USE_SHARED_WRITER) {
+                        store.execute(op.op, op);
+
+                        if (SYNC_TO_DISK) {
+                            store.flush();
+                        }
+
+                    } else {
+                        writer.addOp(op);
                     }
 
-                    
-                    if( sleep>0 ) {
+                    if (sleep > 0) {
                         Thread.sleep(sleep);
                     }
                 }
+            } catch (InterruptedException e) {
+                if (!stopped.get()) {
+                    e.printStackTrace();
+                }
+                return;
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
     }
-    
+
     class Consumer implements Runnable {
         private Thread thread;
         private AtomicBoolean stopped = new AtomicBoolean();
         protected final MetricCounter rate = new MetricCounter();
         private String name;
+        private final Semaphore queryWait = new Semaphore(0);
 
         public Consumer(String name) {
-            this.name=name;
+            this.name = name;
         }
+
         public void start() {
             rate.name("Consumer " + name + " Rate");
             totalConsumerRate.add(rate);
-            thread = new Thread(this, "Consumer " + name );
+            thread = new Thread(this, "Consumer " + name);
             thread.start();
         }
+
         public void stop() throws InterruptedException {
             stopped.set(true);
+            queryWait.release();
             thread.join();
         }
-        
+
         public void run() {
             try {
-                while( !stopped.get() ) {
-                    final ArrayList<MessageRecord> records = new ArrayList<MessageRecord>(1000);;
-                    Runnable onFlush = new Runnable(){
+                while (!stopped.get()) {
+                    final ArrayList<MessageRecord> records = new ArrayList<MessageRecord>(1000);
+                    SharedQueueOp op = new SharedQueueOp() {
                         public void run() {
                             rate.increment(records.size());
-                            if( records.isEmpty() ) {
-                                synchronized(wakeupMutex){
-                                    try {
-                                        wakeupMutex.wait(500);
-                                    } catch (InterruptedException e) {
-                                    }
-                                }
-                            }
+                            queryWait.release();
                         }
                     };
-                    store.execute(new VoidCallback<Exception>() {
+
+                    op.op = new VoidCallback<Exception>() {
                         @Override
                         public void run(Session session) throws Exception {
                             Iterator<QueueRecord> queueRecords = session.queueListMessagesQueue(queueId, 0L, -1L, 1000);
@@ -198,23 +313,45 @@
                                 session.queueRemoveMessage(queueId, r.messageKey);
                             }
                         }
-                    }, onFlush);
-                    if( SYNC_TO_DISK ) {
-                        store.flush();
+                    };
+
+                    if (!USE_SHARED_WRITER) {
+                        store.execute(op.op, op);
+                        if (SYNC_TO_DISK) {
+                            store.flush();
+                        }
+                    } else {
+                        writer.addOp(op);
                     }
+
+                    //queryWait.acquireUninterruptibly();
+                    if (records.isEmpty()) {
+                        //                        synchronized (wakeupMutex) {
+                        //                            try {
+                        //                                wakeupMutex.wait(500);
+                        //                            } catch (InterruptedException e) {
+                        //                            }
+                        //                        }
+                    }
+                    records.clear();
+                }
+            } catch (InterruptedException e) {
+                if (!stopped.get()) {
+                    e.printStackTrace();
                 }
+                return;
             } catch (Exception e) {
                 e.printStackTrace();
             }
         }
     }
-    
+
     public void test1_1_1() throws Exception {
         startProducers(1);
         startConsumers(1);
         reportRates();
     }
-    
+
     public void test10_1_1() throws Exception {
         startProducers(10);
         startConsumers(1);
@@ -223,20 +360,20 @@
 
     private void startProducers(int count) {
         for (int i = 0; i < count; i++) {
-            Producer p = new  Producer(""+(i+1));
+            Producer p = new Producer("" + (i + 1));
             producers.add(p);
             p.start();
         }
     }
-    
+
     private void startConsumers(int count) {
         for (int i = 0; i < count; i++) {
-            Consumer c = new  Consumer(""+(i+1));
+            Consumer c = new Consumer("" + (i + 1));
             consumers.add(c);
             c.start();
         }
     }
-    
+
     private void reportRates() throws InterruptedException {
         System.out.println("Checking rates for test: " + getName());
         for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
@@ -248,5 +385,5 @@
             totalConsumerRate.reset();
         }
     }
-    
+
 }