You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/15 22:33:41 UTC

svn commit: r955039 [2/2] - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/ command/ store/ store/amq/ store/journal/ store/kahadaptor/ store/kahadb/ store/memory/ transaction/

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Tue Jun 15 20:33:41 2010
@@ -37,9 +37,6 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.InvalidSelectorException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -63,17 +60,13 @@ import org.apache.activemq.store.Message
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
 import org.apache.activemq.store.kahadb.data.KahaLocation;
-import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
 import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
 import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -88,26 +81,28 @@ import org.apache.commons.logging.LogFac
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
 
-public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
-    private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
+public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
+    static final Log LOG = LogFactory.getLog(KahaDBStore.class);
     private static final int MAX_ASYNC_JOBS = 10000;
     protected ExecutorService queueExecutor;
     protected ExecutorService topicExecutor;
     protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
-    protected final Map<MessageId, StoreTopicTask> asyncTopicMap = new HashMap<MessageId, StoreTopicTask>();
+    protected final Map<AsyncJobKey, StoreTopicTask> asyncTopicMap = new HashMap<AsyncJobKey, StoreTopicTask>();
     private final WireFormat wireFormat = new OpenWireFormat();
     private SystemUsage usageManager;
     private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
     private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
-    private Semaphore queueSemaphore;
-    private Semaphore topicSemaphore;
+    Semaphore globalQueueSemaphore;
+    Semaphore globalTopicSemaphore;
     private boolean concurrentStoreAndDispatchQueues = true;
     private boolean concurrentStoreAndDispatchTopics = true;
     private int maxAsyncJobs = MAX_ASYNC_JOBS;
- 
+    private final KahaDBTransactionStore transactionStore;
+
     public KahaDBStore() {
+        this.transactionStore = new KahaDBTransactionStore(this);
     }
-    
+
     public void setBrokerName(String brokerName) {
     }
 
@@ -166,8 +161,8 @@ public class KahaDBStore extends Message
     @Override
     public void doStart() throws Exception {
         super.doStart();
-        this.queueSemaphore = new Semaphore(getMaxAsyncJobs());
-        this.topicSemaphore = new Semaphore(getMaxAsyncJobs());
+        this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs());
+        this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
         this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
         this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
         this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
@@ -190,11 +185,23 @@ public class KahaDBStore extends Message
 
     @Override
     public void doStop(ServiceStopper stopper) throws Exception {
-        if (this.queueSemaphore != null) {
-            this.queueSemaphore.drainPermits();
+        synchronized (this.asyncQueueMap) {
+            for (StoreQueueTask task : this.asyncQueueMap.values()) {
+                task.cancel();
+            }
+            this.asyncQueueMap.clear();
+        }
+        synchronized (this.asyncTopicMap) {
+            for (StoreTopicTask task : this.asyncTopicMap.values()) {
+                task.cancel();
+            }
+            this.asyncTopicMap.clear();
+        }
+        if (this.globalQueueSemaphore != null) {
+            this.globalQueueSemaphore.drainPermits();
         }
-        if (this.topicSemaphore != null) {
-            this.topicSemaphore.drainPermits();
+        if (this.globalTopicSemaphore != null) {
+            this.globalTopicSemaphore.drainPermits();
         }
         if (this.queueExecutor != null) {
             this.queueExecutor.shutdownNow();
@@ -205,99 +212,50 @@ public class KahaDBStore extends Message
         super.doStop(stopper);
     }
 
-    protected StoreQueueTask removeQueueTask(ActiveMQDestination activeMQDestination, MessageId id) {
-        StoreQueueTask task = this.asyncQueueMap.remove(new AsyncJobKey(id, activeMQDestination));
-        if (task != null) {
-            task.getMessage().decrementReferenceCount();
-            this.queueSemaphore.release();
+    protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
+        StoreQueueTask task = null;
+        synchronized (this.asyncQueueMap) {
+            task = this.asyncQueueMap.remove(new AsyncJobKey(id, store.getDestination()));
         }
         return task;
     }
 
-    protected void addQueueTask(ActiveMQDestination activeMQDestination, StoreQueueTask task) throws IOException {
-        try {
-            this.queueSemaphore.acquire();
-
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException(e.getMessage());
+    protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
+        synchronized (this.asyncQueueMap) {
+            this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
         }
-        this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), activeMQDestination), task);
-        task.getMessage().incrementReferenceCount();
         this.queueExecutor.execute(task);
     }
 
-    protected StoreTopicTask removeTopicTask(MessageId id) {
-        StoreTopicTask task = this.asyncTopicMap.remove(id);
-        if (task != null) {
-            task.getMessage().decrementReferenceCount();
-            this.topicSemaphore.release();
+    protected StoreTopicTask removeTopicTask(KahaDBMessageStore store, MessageId id) {
+        StoreTopicTask task = null;
+        synchronized (this.asyncTopicMap) {
+            task = this.asyncTopicMap.remove(new AsyncJobKey(id, store.getDestination()));
         }
         return task;
     }
 
-    protected void addTopicTask(StoreTopicTask task) throws IOException {
-        try {
-            this.topicSemaphore.acquire();
-        } catch (InterruptedException e) {
-            throw new InterruptedIOException(e.getMessage());
+    protected void addTopicTask(KahaDBMessageStore store, StoreTopicTask task) throws IOException {
+        synchronized (this.asyncTopicMap) {
+            this.asyncTopicMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
         }
-        this.asyncTopicMap.put(task.getMessage().getMessageId(), task);
-        task.getMessage().incrementReferenceCount();
         this.topicExecutor.execute(task);
     }
 
     public TransactionStore createTransactionStore() throws IOException {
-        return new TransactionStore() {
-
-            public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
-                store(new KahaCommitCommand().setTransactionInfo(createTransactionInfo(txid)), true, done);
-            }
-            public void prepare(TransactionId txid) throws IOException {
-                store(new KahaPrepareCommand().setTransactionInfo(createTransactionInfo(txid)), true, null);
-            }
-            public void rollback(TransactionId txid) throws IOException {
-                store(new KahaRollbackCommand().setTransactionInfo(createTransactionInfo(txid)), false, null);
-            }
-            public void recover(TransactionRecoveryListener listener) throws IOException {
-                for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
-                    XATransactionId xid = (XATransactionId) entry.getKey();
-                    ArrayList<Message> messageList = new ArrayList<Message>();
-                    ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
-
-                    for (Operation op : entry.getValue()) {
-                        if (op.getClass() == AddOpperation.class) {
-                            AddOpperation addOp = (AddOpperation) op;
-                            Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand()
-                                    .getMessage().newInput()));
-                            messageList.add(msg);
-                        } else {
-                            RemoveOpperation rmOp = (RemoveOpperation) op;
-                            Buffer ackb = rmOp.getCommand().getAck();
-                            MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
-                            ackList.add(ack);
-                        }
-                    }
-
-                    Message[] addedMessages = new Message[messageList.size()];
-                    MessageAck[] acks = new MessageAck[ackList.size()];
-                    messageList.toArray(addedMessages);
-                    ackList.toArray(acks);
-                    listener.recover(xid, addedMessages, acks);
-                }
-            }
-            public void start() throws Exception {
-            }
-            public void stop() throws Exception {
-            }
-        };
+        return this.transactionStore;
     }
 
     public class KahaDBMessageStore extends AbstractMessageStore {
         protected KahaDestination dest;
+        private final int maxAsyncJobs;
+        private final Semaphore localDestinationSemaphore;
 
         public KahaDBMessageStore(ActiveMQDestination destination) {
             super(destination);
             this.dest = convert(destination);
+            this.maxAsyncJobs = getMaxAsyncJobs();
+            this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs);
         }
 
         @Override
@@ -310,7 +268,8 @@ public class KahaDBStore extends Message
                 throws IOException {
             if (isConcurrentStoreAndDispatchQueues()) {
                 StoreQueueTask result = new StoreQueueTask(this, context, message);
-                addQueueTask(destination, result);
+                result.aquireLocks();
+                addQueueTask(this, result);
                 return result.getFuture();
             } else {
                 return super.asyncAddQueueMessage(context, message);
@@ -320,10 +279,15 @@ public class KahaDBStore extends Message
         @Override
         public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
             if (isConcurrentStoreAndDispatchQueues()) {
-                StoreQueueTask task = removeQueueTask(destination, ack.getLastMessageId());
+                AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
+                StoreQueueTask task = null;
+                synchronized (asyncQueueMap) {
+                    task = asyncQueueMap.get(key);
+                }
                 if (task != null) {
                     if (!task.cancel()) {
                         try {
+
                             task.future.get();
                         } catch (InterruptedException e) {
                             throw new InterruptedIOException(e.toString());
@@ -331,6 +295,10 @@ public class KahaDBStore extends Message
                             LOG.debug("removeAsync: cannot cancel, waiting for add resulted in ex", ignored);
                         }
                         removeMessage(context, ack);
+                    } else {
+                        synchronized (asyncQueueMap) {
+                            asyncQueueMap.remove(key);
+                        }
                     }
                 } else {
                     removeMessage(context, ack);
@@ -348,7 +316,7 @@ public class KahaDBStore extends Message
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
             command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null);
+            store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
         }
 
         public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
@@ -359,13 +327,13 @@ public class KahaDBStore extends Message
 
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
             command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null);
+            store(command, isEnableJournalDiskSyncs() && ack.isResponseRequired(), null, null);
         }
 
         public void removeAllMessages(ConnectionContext context) throws IOException {
             KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
             command.setDestination(dest);
-            store(command, true, null);
+            store(command, true, null, null);
         }
 
         public Message getMessage(MessageId identity) throws IOException {
@@ -395,21 +363,27 @@ public class KahaDBStore extends Message
         }
 
         public int getMessageCount() throws IOException {
-            synchronized (indexMutex) {
-                return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
-                    public Integer execute(Transaction tx) throws IOException {
-                        // Iterate through all index entries to get a count of
-                        // messages in the destination.
-                        StoredDestination sd = getStoredDestination(dest, tx);
-                        int rc = 0;
-                        for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
-                                .hasNext();) {
-                            iterator.next();
-                            rc++;
+            try {
+                lockAsyncJobQueue();
+                synchronized (indexMutex) {
+                    return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
+                        public Integer execute(Transaction tx) throws IOException {
+                            // Iterate through all index entries to get a count
+                            // of
+                            // messages in the destination.
+                            StoredDestination sd = getStoredDestination(dest, tx);
+                            int rc = 0;
+                            for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
+                                    .hasNext();) {
+                                iterator.next();
+                                rc++;
+                            }
+                            return rc;
                         }
-                        return rc;
-                    }
-                });
+                    });
+                }
+            } finally {
+                unlockAsyncJobQueue();
             }
         }
 
@@ -452,11 +426,12 @@ public class KahaDBStore extends Message
                         Entry<Long, MessageKeys> entry = null;
                         int counter = 0;
                         for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
-                                .hasNext();) {
+                                .hasNext()
+                                && listener.hasSpace();) {
                             entry = iterator.next();
                             listener.recoverMessage(loadMessage(entry.getValue().location));
                             counter++;
-                            if (counter >= maxReturned) {
+                            if (counter >= maxReturned || listener.hasSpace() == false) {
                                 break;
                             }
                         }
@@ -474,22 +449,27 @@ public class KahaDBStore extends Message
 
         @Override
         public void setBatch(MessageId identity) throws IOException {
-            final String key = identity.toString();
+            try {
+                final String key = identity.toString();
+                lockAsyncJobQueue();
 
-            // Hopefully one day the page file supports concurrent read
-            // operations... but for now we must
-            // externally synchronize...
-            Long location;
-            synchronized (indexMutex) {
-                location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
-                    public Long execute(Transaction tx) throws IOException {
-                        StoredDestination sd = getStoredDestination(dest, tx);
-                        return sd.messageIdIndex.get(tx, key);
-                    }
-                });
-            }
-            if (location != null) {
-                cursorPos = location + 1;
+                // Hopefully one day the page file supports concurrent read
+                // operations... but for now we must
+                // externally synchronize...
+                Long location;
+                synchronized (indexMutex) {
+                    location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
+                        public Long execute(Transaction tx) throws IOException {
+                            StoredDestination sd = getStoredDestination(dest, tx);
+                            return sd.messageIdIndex.get(tx, key);
+                        }
+                    });
+                }
+                if (location != null) {
+                    cursorPos = location + 1;
+                }
+            } finally {
+                unlockAsyncJobQueue();
             }
 
         }
@@ -506,6 +486,30 @@ public class KahaDBStore extends Message
             super.stop();
         }
 
+        protected void lockAsyncJobQueue() {
+            try {
+                this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                LOG.error("Failed to lock async jobs for " + this.destination, e);
+            }
+        }
+
+        protected void unlockAsyncJobQueue() {
+            this.localDestinationSemaphore.release(this.maxAsyncJobs);
+        }
+
+        protected void acquireLocalAsyncLock() {
+            try {
+                this.localDestinationSemaphore.acquire();
+            } catch (InterruptedException e) {
+                LOG.error("Failed to aquire async lock for " + this.destination, e);
+            }
+        }
+
+        protected void releaseLocalAsyncLock() {
+            this.localDestinationSemaphore.release();
+        }
+
     }
 
     class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
@@ -520,7 +524,8 @@ public class KahaDBStore extends Message
                 throws IOException {
             if (isConcurrentStoreAndDispatchTopics()) {
                 StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
-                addTopicTask(result);
+                result.aquireLocks();
+                addTopicTask(this, result);
                 return result.getFuture();
             } else {
                 return super.asyncAddTopicMessage(context, message);
@@ -531,11 +536,19 @@ public class KahaDBStore extends Message
                 throws IOException {
             String subscriptionKey = subscriptionKey(clientId, subscriptionName);
             if (isConcurrentStoreAndDispatchTopics()) {
-                StoreTopicTask task = asyncTopicMap.get(messageId);
+                AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
+                StoreTopicTask task = null;
+                synchronized (asyncTopicMap) {
+                    task = asyncTopicMap.get(key);
+                }
                 if (task != null) {
                     if (task.addSubscriptionKey(subscriptionKey)) {
-                        removeTopicTask(messageId);
-                        task.cancel();
+                        removeTopicTask(this, messageId);
+                        if (task.cancel()) {
+                            synchronized (asyncTopicMap) {
+                                asyncTopicMap.remove(key);
+                            }
+                        }
                     }
                 } else {
                     doAcknowledge(context, subscriptionKey, messageId);
@@ -551,7 +564,7 @@ public class KahaDBStore extends Message
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey);
             command.setMessageId(messageId.toString());
-            store(command, false, null);
+            store(command, false, null, null);
         }
 
         public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
@@ -563,7 +576,7 @@ public class KahaDBStore extends Message
             command.setRetroactive(retroactive);
             org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo);
             command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
-            store(command, isEnableJournalDiskSyncs() && true, null);
+            store(command, isEnableJournalDiskSyncs() && true, null, null);
             this.subscriptionCount.incrementAndGet();
         }
 
@@ -571,7 +584,7 @@ public class KahaDBStore extends Message
             KahaSubscriptionCommand command = new KahaSubscriptionCommand();
             command.setDestination(dest);
             command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName));
-            store(command, isEnableJournalDiskSyncs() && true, null);
+            store(command, isEnableJournalDiskSyncs() && true, null, null);
             this.subscriptionCount.decrementAndGet();
         }
 
@@ -698,7 +711,7 @@ public class KahaDBStore extends Message
                             entry = iterator.next();
                             listener.recoverMessage(loadMessage(entry.getValue().location));
                             counter++;
-                            if (counter >= maxReturned) {
+                            if (counter >= maxReturned || listener.hasSpace() == false) {
                                 break;
                             }
                         }
@@ -732,11 +745,11 @@ public class KahaDBStore extends Message
     }
 
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
-        return new KahaDBMessageStore(destination);
+        return this.transactionStore.proxy(new KahaDBMessageStore(destination));
     }
 
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
-        return new KahaDBTopicMessageStore(destination);
+        return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
     }
 
     /**
@@ -928,20 +941,19 @@ public class KahaDBStore extends Message
     static class AsyncJobKey {
         MessageId id;
         ActiveMQDestination destination;
-        
+
         AsyncJobKey(MessageId id, ActiveMQDestination destination) {
             this.id = id;
             this.destination = destination;
         }
-        
+
         @Override
         public boolean equals(Object obj) {
             if (obj == this) {
                 return true;
             }
-            return obj instanceof AsyncJobKey &&
-                id.equals(((AsyncJobKey)obj).id) &&
-                destination.equals(((AsyncJobKey)obj).destination);
+            return obj instanceof AsyncJobKey && id.equals(((AsyncJobKey) obj).id)
+                    && destination.equals(((AsyncJobKey) obj).destination);
         }
 
         @Override
@@ -949,24 +961,25 @@ public class KahaDBStore extends Message
             return id.hashCode() + destination.hashCode();
         }
 
+        @Override
         public String toString() {
-            return destination.getPhysicalName() + "-" + id; 
+            return destination.getPhysicalName() + "-" + id;
         }
     }
-    
+
     class StoreQueueTask implements Runnable {
         protected final Message message;
         protected final ConnectionContext context;
-        protected final MessageStore store;
+        protected final KahaDBMessageStore store;
         protected final InnerFutureTask future;
         protected final AtomicBoolean done = new AtomicBoolean();
+        protected final AtomicBoolean locked = new AtomicBoolean();
 
-        public StoreQueueTask(MessageStore store, ConnectionContext context, Message message) {
+        public StoreQueueTask(KahaDBMessageStore store, ConnectionContext context, Message message) {
             this.store = store;
             this.context = context;
             this.message = message;
             this.future = new InnerFutureTask(this);
-
         }
 
         public Future<Object> getFuture() {
@@ -974,21 +987,45 @@ public class KahaDBStore extends Message
         }
 
         public boolean cancel() {
+            releaseLocks();
             if (this.done.compareAndSet(false, true)) {
                 return this.future.cancel(false);
             }
             return false;
         }
 
+        void aquireLocks() {
+            if (this.locked.compareAndSet(false, true)) {
+                try {
+                    globalQueueSemaphore.acquire();
+                    store.acquireLocalAsyncLock();
+                    message.incrementReferenceCount();
+                } catch (InterruptedException e) {
+                    LOG.warn("Failed to aquire lock", e);
+                }
+            }
+
+        }
+
+        void releaseLocks() {
+            if (this.locked.compareAndSet(true, false)) {
+                store.releaseLocalAsyncLock();
+                globalQueueSemaphore.release();
+                message.decrementReferenceCount();
+            }
+        }
+
         public void run() {
             try {
                 if (this.done.compareAndSet(false, true)) {
                     this.store.addMessage(context, message);
-                    removeQueueTask(this.store.getDestination(), this.message.getMessageId());
+                    removeQueueTask(this.store, this.message.getMessageId());
                     this.future.complete();
                 }
             } catch (Exception e) {
                 this.future.setException(e);
+            } finally {
+                releaseLocks();
             }
         }
 
@@ -1025,6 +1062,29 @@ public class KahaDBStore extends Message
 
         }
 
+        @Override
+        void aquireLocks() {
+            if (this.locked.compareAndSet(false, true)) {
+                try {
+                    globalTopicSemaphore.acquire();
+                    store.acquireLocalAsyncLock();
+                    message.incrementReferenceCount();
+                } catch (InterruptedException e) {
+                    LOG.warn("Failed to aquire lock", e);
+                }
+            }
+
+        }
+
+        @Override
+        void releaseLocks() {
+            if (this.locked.compareAndSet(true, false)) {
+                message.decrementReferenceCount();
+                store.releaseLocalAsyncLock();
+                globalTopicSemaphore.release();
+            }
+        }
+
         /**
          * add a key
          * 
@@ -1047,13 +1107,16 @@ public class KahaDBStore extends Message
                     synchronized (this.subscriptionKeys) {
                         for (String key : this.subscriptionKeys) {
                             this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
+                            
                         }
                     }
-                    removeTopicTask(this.message.getMessageId());
+                    removeTopicTask(this.topicStore, this.message.getMessageId());
                     this.future.complete();
                 }
             } catch (Exception e) {
                 this.future.setException(e);
+            } finally {
+                releaseLocks();
             }
         }
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=955039&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import javax.transaction.xa.XAException;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
+import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
+import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
+import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
+import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
+import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
+import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Provides a TransactionStore implementation that can create transaction aware
+ * MessageStore objects from non transaction aware MessageStore objects.
+ * 
+ * @version $Revision: 1.4 $
+ */
+public class KahaDBTransactionStore implements TransactionStore {
+
+    ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
+    private final WireFormat wireFormat = new OpenWireFormat();
+    private final KahaDBStore theStore;
+
+    public KahaDBTransactionStore(KahaDBStore theStore) {
+        this.theStore = theStore;
+    }
+
+    public class Tx {
+        private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
+
+        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
+
+        public void add(AddMessageCommand msg) {
+            messages.add(msg);
+        }
+
+        public void add(RemoveMessageCommand ack) {
+            acks.add(ack);
+        }
+
+        public Message[] getMessages() {
+            Message rc[] = new Message[messages.size()];
+            int count = 0;
+            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
+                AddMessageCommand cmd = iter.next();
+                rc[count++] = cmd.getMessage();
+            }
+            return rc;
+        }
+
+        public MessageAck[] getAcks() {
+            MessageAck rc[] = new MessageAck[acks.size()];
+            int count = 0;
+            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
+                RemoveMessageCommand cmd = iter.next();
+                rc[count++] = cmd.getMessageAck();
+            }
+            return rc;
+        }
+
+        /**
+         * @return true if something to commit
+         * @throws IOException
+         */
+        public List<Future<Object>> commit() throws IOException {
+            List<Future<Object>> results = new ArrayList<Future<Object>>();
+            // Do all the message adds.
+            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
+                AddMessageCommand cmd = iter.next();
+                results.add(cmd.run());
+
+            }
+            // And removes..
+            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
+                RemoveMessageCommand cmd = iter.next();
+                cmd.run();
+                results.add(cmd.run());
+            }
+            
+            return results;
+        }
+    }
+
+    public abstract class AddMessageCommand {
+        private final ConnectionContext ctx;
+        AddMessageCommand(ConnectionContext ctx) {
+            this.ctx = ctx;
+        }
+        abstract Message getMessage();
+        Future<Object> run() throws IOException {
+            return run(this.ctx);
+        }
+        abstract Future<Object> run(ConnectionContext ctx) throws IOException;
+    }
+
+    public abstract class RemoveMessageCommand {
+
+        private final ConnectionContext ctx;
+        RemoveMessageCommand(ConnectionContext ctx) {
+            this.ctx = ctx;
+        }
+        abstract MessageAck getMessageAck();
+        Future<Object> run() throws IOException {
+            return run(this.ctx);
+        }
+        abstract Future<Object> run(ConnectionContext context) throws IOException;
+    }
+
+    public MessageStore proxy(MessageStore messageStore) {
+        return new ProxyMessageStore(messageStore) {
+            @Override
+            public void addMessage(ConnectionContext context, final Message send) throws IOException {
+                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
+            }
+
+            @Override
+            public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
+                return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
+            }
+
+            @Override
+            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
+            }
+
+            @Override
+            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
+            }
+        };
+    }
+
+    public TopicMessageStore proxy(TopicMessageStore messageStore) {
+        return new ProxyTopicMessageStore(messageStore) {
+            @Override
+            public void addMessage(ConnectionContext context, final Message send) throws IOException {
+                KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
+            }
+
+            @Override
+            public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
+                KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
+                return AbstractMessageStore.FUTURE;
+            }
+
+            @Override
+            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+                KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
+            }
+
+            @Override
+            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
+                KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
+            }
+        };
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
+     */
+    public void prepare(TransactionId txid) throws IOException {
+        inflightTransactions.remove(txid);
+        KahaTransactionInfo info = getTransactionInfo(txid);
+        theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
+    }
+
+    public Tx getTx(Object txid) {
+        Tx tx = inflightTransactions.get(txid);
+        if (tx == null) {
+            tx = new Tx();
+            inflightTransactions.put(txid, tx);
+        }
+        return tx;
+    }
+
+    /**
+     * @throws XAException
+     * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
+     */
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
+            throws IOException {
+        if (txid != null) {
+            if (!txid.isXATransaction()) {
+                if (preCommit != null) {
+                    preCommit.run();
+                }
+                Tx tx = inflightTransactions.remove(txid);
+                if (tx != null) {
+                    List<Future<Object>> results = tx.commit();
+                    boolean doneSomething = false;
+                    for (Future<Object> result : results) {
+                        try {
+                            result.get();
+                        } catch (InterruptedException e) {
+                            theStore.brokerService.handleIOException(new IOException(e));
+                        } catch (ExecutionException e) {
+                            theStore.brokerService.handleIOException(new IOException(e));
+                        }catch(CancellationException e) {
+                        }
+                        if (!result.isCancelled()) {
+                            doneSomething = true;
+                        }
+                    }
+                   
+                    if (postCommit != null) {
+                        postCommit.run();
+                    }
+                    if (doneSomething) {
+                        KahaTransactionInfo info = getTransactionInfo(txid);
+                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
+                    }
+                }else {
+                    //The Tx will be null for failed over clients - lets run their post commits
+                    if (postCommit != null) {
+                        postCommit.run();
+                    }
+                }
+
+            } else {
+                KahaTransactionInfo info = getTransactionInfo(txid);
+                theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+            }
+
+        }
+    }
+
+    /**
+     * @throws IOException
+     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
+     */
+    public void rollback(TransactionId txid) throws IOException {
+        if (txid.isXATransaction()) {
+            KahaTransactionInfo info = getTransactionInfo(txid);
+            theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
+        } else {
+            Object result = inflightTransactions.remove(txid);
+        }
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+    }
+
+    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
+        // All the inflight transactions get rolled back..
+        // inflightTransactions.clear();
+        for (Map.Entry<TransactionId, ArrayList<Operation>> entry : theStore.preparedTransactions.entrySet()) {
+            XATransactionId xid = (XATransactionId) entry.getKey();
+            ArrayList<Message> messageList = new ArrayList<Message>();
+            ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
+
+            for (Operation op : entry.getValue()) {
+                if (op.getClass() == AddOpperation.class) {
+                    AddOpperation addOp = (AddOpperation) op;
+                    Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
+                            .newInput()));
+                    messageList.add(msg);
+                } else {
+                    RemoveOpperation rmOp = (RemoveOpperation) op;
+                    Buffer ackb = rmOp.getCommand().getAck();
+                    MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
+                    ackList.add(ack);
+                }
+            }
+
+            Message[] addedMessages = new Message[messageList.size()];
+            MessageAck[] acks = new MessageAck[ackList.size()];
+            messageList.toArray(addedMessages);
+            ackList.toArray(acks);
+            listener.recover(xid, addedMessages, acks);
+        }
+    }
+
+    /**
+     * @param message
+     * @throws IOException
+     */
+    void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
+            throws IOException {
+
+        if (message.getTransactionId() != null) {
+            if (message.getTransactionId().isXATransaction()) {
+                destination.addMessage(context, message);
+            } else {
+                Tx tx = getTx(message.getTransactionId());
+                tx.add(new AddMessageCommand(context) {
+                    @Override
+                    public Message getMessage() {
+                        return message;
+                    }
+                    @Override
+                    public Future<Object> run(ConnectionContext ctx) throws IOException {
+                        destination.addMessage(ctx, message);
+                        return AbstractMessageStore.FUTURE;
+                    }
+
+                });
+            }
+        } else {
+            destination.addMessage(context, message);
+        }
+    }
+
+    Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
+            throws IOException {
+
+        if (message.getTransactionId() != null) {
+            if (message.getTransactionId().isXATransaction()) {
+                return destination.asyncAddQueueMessage(context, message);
+            } else {
+                Tx tx = getTx(message.getTransactionId());
+                tx.add(new AddMessageCommand(context) {
+                    @Override
+                    public Message getMessage() {
+                        return message;
+                    }
+                    @Override
+                    public Future<Object> run(ConnectionContext ctx) throws IOException {
+                        return destination.asyncAddQueueMessage(ctx, message);
+                    }
+
+                });
+                return AbstractMessageStore.FUTURE;
+            }
+        } else {
+            return destination.asyncAddQueueMessage(context, message);
+        }
+    }
+
+    Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
+            throws IOException {
+
+        if (message.getTransactionId() != null) {
+            if (message.getTransactionId().isXATransaction()) {
+                return destination.asyncAddTopicMessage(context, message);
+            } else {
+                Tx tx = getTx(message.getTransactionId());
+                tx.add(new AddMessageCommand(context) {
+                    @Override
+                    public Message getMessage() {
+                        return message;
+                    }
+                    @Override
+                    public Future run(ConnectionContext ctx) throws IOException {
+                        return destination.asyncAddTopicMessage(ctx, message);
+                    }
+
+                });
+                return AbstractMessageStore.FUTURE;
+            }
+        } else {
+            return destination.asyncAddTopicMessage(context, message);
+        }
+    }
+
+    /**
+     * @param ack
+     * @throws IOException
+     */
+    final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
+            throws IOException {
+
+        if (ack.isInTransaction()) {
+            if (ack.getTransactionId().isXATransaction()) {
+                destination.removeMessage(context, ack);
+            } else {
+                Tx tx = getTx(ack.getTransactionId());
+                tx.add(new RemoveMessageCommand(context) {
+                    @Override
+                    public MessageAck getMessageAck() {
+                        return ack;
+                    }
+
+                    @Override
+                    public Future<Object> run(ConnectionContext ctx) throws IOException {
+                        destination.removeMessage(ctx, ack);
+                        return AbstractMessageStore.FUTURE;
+                    }
+                });
+            }
+        } else {
+            destination.removeMessage(context, ack);
+        }
+    }
+
+    final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
+            throws IOException {
+
+        if (ack.isInTransaction()) {
+            if (ack.getTransactionId().isXATransaction()) {
+                destination.removeAsyncMessage(context, ack);
+            } else {
+                Tx tx = getTx(ack.getTransactionId());
+                tx.add(new RemoveMessageCommand(context) {
+                    @Override
+                    public MessageAck getMessageAck() {
+                        return ack;
+                    }
+
+                    @Override
+                    public Future<Object> run(ConnectionContext ctx) throws IOException {
+                        destination.removeMessage(ctx, ack);
+                        return AbstractMessageStore.FUTURE;
+                    }
+                });
+            }
+        } else {
+            destination.removeAsyncMessage(context, ack);
+        }
+    }
+
+    private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
+        return theStore.createTransactionInfo(txid);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Tue Jun 15 20:33:41 2010
@@ -631,7 +631,7 @@ public class MessageDatabase extends Ser
     // Methods call by the broker to update and query the store.
     // /////////////////////////////////////////////////////////////////
     public Location store(JournalCommand data) throws IOException {
-        return store(data, false, null);
+        return store(data, false, null,null);
     }
 
     /**
@@ -641,8 +641,11 @@ public class MessageDatabase extends Ser
      * during a recovery process.
      * @param done 
      */
-    public Location store(JournalCommand data, boolean sync, Runnable done) throws IOException {
-    	try {
+    public Location store(JournalCommand data, boolean sync, Runnable before,Runnable after) throws IOException {
+    	if (before != null) {
+    	    before.run();
+    	}
+        try {
             int size = data.serializedSizeFramed();
             DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
             os.writeByte(data.type().getNumber());
@@ -664,8 +667,8 @@ public class MessageDatabase extends Ser
                 LOG.info("KahaDB: Recovering checkpoint thread after exception");
                 startCheckpoint();
             }
-            if (done != null) {
-                done.run();
+            if (after != null) {
+                after.run();
             }
             return location;
     	} catch (IOException ioe) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java Tue Jun 15 20:33:41 2010
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -62,7 +61,7 @@ import org.apache.kahadb.page.Transactio
 
 public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter {
 
-    private WireFormat wireFormat = new OpenWireFormat();
+    private final WireFormat wireFormat = new OpenWireFormat();
 
     public void setBrokerName(String brokerName) {
     }
@@ -72,9 +71,14 @@ public class TempKahaDBStore extends Tem
     public TransactionStore createTransactionStore() throws IOException {
         return new TransactionStore(){
             
-            public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+            public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
+                if (preCommit != null) {
+                    preCommit.run();
+                }
                 processCommit(txid);
-                done.run();
+                if (postCommit != null) {
+                    postCommit.run();
+                }
             }
             public void prepare(TransactionId txid) throws IOException {
             	processPrepare(txid);
@@ -122,6 +126,7 @@ public class TempKahaDBStore extends Tem
             this.dest = convert( destination );
         }
 
+        @Override
         public ActiveMQDestination getDestination() {
             return destination;
         }
@@ -254,10 +259,13 @@ public class TempKahaDBStore extends Tem
             
         }
 
+        @Override
         public void setMemoryUsage(MemoryUsage memoeyUSage) {
         }
+        @Override
         public void start() throws Exception {
         }
+        @Override
         public void stop() throws Exception {
         }
         

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -21,9 +21,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
-
 import javax.transaction.xa.XAException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -53,9 +51,9 @@ public class MemoryTransactionStore impl
     private boolean doingRecover;
 
     public class Tx {
-        private ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
+        private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
 
-        private ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
+        private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
 
         public void add(AddMessageCommand msg) {
             messages.add(msg);
@@ -130,19 +128,23 @@ public class MemoryTransactionStore impl
 
     public MessageStore proxy(MessageStore messageStore) {
         return new ProxyMessageStore(messageStore) {
+            @Override
             public void addMessage(ConnectionContext context, final Message send) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), send);
             }
 
+            @Override
             public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), message);
                 return AbstractMessageStore.FUTURE;
              }
              
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
              
+            @Override
             public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
             }
@@ -151,19 +153,23 @@ public class MemoryTransactionStore impl
 
     public TopicMessageStore proxy(TopicMessageStore messageStore) {
         return new ProxyTopicMessageStore(messageStore) {
+            @Override
             public void addMessage(ConnectionContext context, final Message send) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), send);
             }
 
+            @Override
             public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
                 MemoryTransactionStore.this.addMessage(getDelegate(), message);
                 return AbstractMessageStore.FUTURE;
              }
 
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
             }
             
+            @Override
             public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
                 MemoryTransactionStore.this.removeMessage(getDelegate(), ack);       
             }
@@ -194,8 +200,10 @@ public class MemoryTransactionStore impl
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
-
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
+        if (preCommit != null) {
+            preCommit.run();
+        }
         Tx tx;
         if (wasPrepared) {
             tx = preparedTransactions.remove(txid);
@@ -204,11 +212,15 @@ public class MemoryTransactionStore impl
         }
 
         if (tx == null) {
-            done.run();
+            if (postCommit != null) {
+                postCommit.run();
+            }
             return;
         }
         tx.commit();
-        done.run();
+        if (postCommit != null) {
+            postCommit.run();
+        }
 
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java Tue Jun 15 20:33:41 2010
@@ -17,9 +17,7 @@
 package org.apache.activemq.transaction;
 
 import java.io.IOException;
-
 import javax.transaction.xa.XAException;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.TransactionId;
@@ -44,6 +42,7 @@ public class LocalTransaction extends Tr
         this.context = context;
     }
 
+    @Override
     public void commit(boolean onePhase) throws XAException, IOException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("commit: "  + xid
@@ -69,10 +68,11 @@ public class LocalTransaction extends Tr
         context.getTransactions().remove(xid);
         // Sync on transaction store to avoid out of order messages in the cursor
         // https://issues.apache.org/activemq/browse/AMQ-2594
-        transactionStore.commit(getTransactionId(), false, postCommitTask);
+        transactionStore.commit(getTransactionId(), false,preCommitTask, postCommitTask);
         this.waitPostCommitDone(postCommitTask);
     }
 
+    @Override
     public void rollback() throws XAException, IOException {
 
         if (LOG.isDebugEnabled()) {
@@ -98,12 +98,14 @@ public class LocalTransaction extends Tr
         }
     }
 
+    @Override
     public int prepare() throws XAException {
         XAException xae = new XAException("Prepare not implemented on Local Transactions.");
         xae.errorCode = XAException.XAER_RMERR;
         throw xae;
     }
 
+    @Override
     public TransactionId getTransactionId() {
         return xid;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java Tue Jun 15 20:33:41 2010
@@ -23,6 +23,9 @@ public class Synchronization {
 
     public void beforeEnd() throws Exception {
     }
+    
+    public void beforeCommit() throws Exception {
+    }
 
     public void afterCommit() throws Exception {
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java Tue Jun 15 20:33:41 2010
@@ -24,9 +24,7 @@ import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
-
 import javax.transaction.xa.XAException;
-
 import org.apache.activemq.command.TransactionId;
 import org.apache.commons.logging.Log;
 
@@ -36,16 +34,27 @@ import org.apache.commons.logging.Log;
  * 
  * @version $Revision: 1.5 $
  */
-public abstract class Transaction implements Callable {
+public abstract class Transaction {
 
     public static final byte START_STATE = 0; // can go to: 1,2,3
     public static final byte IN_USE_STATE = 1; // can go to: 2,3
     public static final byte PREPARED_STATE = 2; // can go to: 3
     public static final byte FINISHED_STATE = 3;
 
-    private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
+    private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
     private byte state = START_STATE;
-    protected FutureTask<?> postCommitTask = new FutureTask(this);
+    protected FutureTask<?> preCommitTask = new FutureTask<Object>(new Callable<Object>() {
+        public Object call() throws Exception {
+            doPreCommit();
+            return null;
+        }   
+    });
+    protected FutureTask<?> postCommitTask = new FutureTask<Object>(new Callable<Object>() {
+        public Object call() throws Exception {
+            doPostCommit();
+            return null;
+        }   
+    });
     
     public byte getState() {
         return state;
@@ -86,6 +95,13 @@ public abstract class Transaction implem
         // r.execute();
         // }
     }
+    
+    protected void fireBeforeCommit() throws Exception {
+        for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
+            Synchronization s = iter.next();
+            s.beforeCommit();
+        }
+    }
 
     protected void fireAfterCommit() throws Exception {
         for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
@@ -102,6 +118,7 @@ public abstract class Transaction implem
         }
     }
 
+    @Override
     public String toString() {
         return super.toString() + "[synchronizations=" + synchronizations + "]";
     }
@@ -140,6 +157,20 @@ public abstract class Transaction implem
             }
         }    
     }
+    
+    protected void doPreCommit() throws XAException {
+        try {
+            fireBeforeCommit();
+        } catch (Throwable e) {
+            // I guess this could happen. Post commit task failed
+            // to execute properly.
+            getLog().warn("PRE COMMIT FAILED: ", e);
+            XAException xae = new XAException("PRE COMMIT FAILED");
+            xae.errorCode = XAException.XAER_RMERR;
+            xae.initCause(e);
+            throw xae;
+        }
+    }
 
     protected void doPostCommit() throws XAException {
         try {
@@ -154,10 +185,4 @@ public abstract class Transaction implem
             throw xae;
         }
     }
-    
-    public Object call() throws Exception {
-        doPostCommit();
-        return null;
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java Tue Jun 15 20:33:41 2010
@@ -17,10 +17,8 @@
 package org.apache.activemq.transaction;
 
 import java.io.IOException;
-
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
-
 import org.apache.activemq.broker.TransactionBroker;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
@@ -48,6 +46,7 @@ public class XATransaction extends Trans
         }
     }
 
+    @Override
     public void commit(boolean onePhase) throws XAException, IOException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("XA Transaction commit: " + xid);
@@ -64,14 +63,14 @@ public class XATransaction extends Trans
             checkForPreparedState(onePhase);
             doPrePrepare();
             setStateFinished();
-            transactionStore.commit(getTransactionId(), false, postCommitTask);
+            transactionStore.commit(getTransactionId(), false, preCommitTask,postCommitTask);
             waitPostCommitDone(postCommitTask);
             break;
         case PREPARED_STATE:
             // 2 phase commit, work done.
             // We would record commit here.
             setStateFinished();
-            transactionStore.commit(getTransactionId(), true, postCommitTask);
+            transactionStore.commit(getTransactionId(), true, preCommitTask,postCommitTask);
             waitPostCommitDone(postCommitTask);
             break;
         default:
@@ -108,6 +107,7 @@ public class XATransaction extends Trans
         }
     }
 
+    @Override
     public void rollback() throws XAException, IOException {
 
         if (LOG.isDebugEnabled()) {
@@ -151,6 +151,7 @@ public class XATransaction extends Trans
         }
     }
 
+    @Override
     public int prepare() throws XAException, IOException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("XA Transaction prepare: " + xid);
@@ -178,6 +179,7 @@ public class XATransaction extends Trans
         broker.removeTransaction(xid);
     }
 
+    @Override
     public TransactionId getTransactionId() {
         return xid;
     }