You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/06/15 22:33:41 UTC

svn commit: r955039 [1/2] - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/region/ command/ store/ store/amq/ store/journal/ store/kahadaptor/ store/kahadb/ store/memory/ transaction/

Author: rajdavies
Date: Tue Jun 15 20:33:41 2010
New Revision: 955039

URL: http://svn.apache.org/viewvc?rev=955039&view=rev
Log:
Transactions dispatch and commit to the store asynchronously, though the commit only returns to the producer when they both complete for KahaDB

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Synchronization.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/XATransaction.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jun 15 20:33:41 2010
@@ -93,7 +93,8 @@ public class Queue extends BaseDestinati
     protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
     protected PendingMessageCursor messages;
     private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
-    // Messages that are paged in but have not yet been targeted at a subscription
+    // Messages that are paged in but have not yet been targeted at a
+    // subscription
     private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
     private List<QueueMessageReference> redeliveredWaitingDispatch = new ArrayList<QueueMessageReference>();
     private MessageGroupMap messageGroupOwners;
@@ -101,7 +102,8 @@ public class Queue extends BaseDestinati
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     private final Object sendLock = new Object();
     private ExecutorService executor;
-    protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
+    protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
+            .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
     private final Object dispatchMutex = new Object();
     private boolean useConsumerPriority = true;
     private boolean strictOrderDispatch = false;
@@ -112,7 +114,7 @@ public class Queue extends BaseDestinati
     private int consumersBeforeDispatchStarts = 0;
     private CountDownLatch consumersBeforeStartsLatch;
     private final AtomicLong pendingWakeups = new AtomicLong();
-
+    
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
             asyncWakeup();
@@ -123,46 +125,47 @@ public class Queue extends BaseDestinati
             expireMessages();
         }
     };
-    
-    private final Object iteratingMutex = new Object() {};
+
+    private final Object iteratingMutex = new Object() {
+    };
     private final Scheduler scheduler;
-    
+
     class TimeoutMessage implements Delayed {
 
         Message message;
         ConnectionContext context;
         long trigger;
-        
+
         public TimeoutMessage(Message message, ConnectionContext context, long delay) {
             this.message = message;
             this.context = context;
             this.trigger = System.currentTimeMillis() + delay;
         }
-        
+
         public long getDelay(TimeUnit unit) {
             long n = trigger - System.currentTimeMillis();
             return unit.convert(n, TimeUnit.MILLISECONDS);
         }
 
         public int compareTo(Delayed delayed) {
-            long other = ((TimeoutMessage)delayed).trigger;
+            long other = ((TimeoutMessage) delayed).trigger;
             int returnValue;
             if (this.trigger < other) {
-              returnValue = -1;
+                returnValue = -1;
             } else if (this.trigger > other) {
-              returnValue = 1;
+                returnValue = 1;
             } else {
-              returnValue = 0;
+                returnValue = 0;
             }
             return returnValue;
         }
-        
+
     }
-    
+
     DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
-    
+
     class FlowControlTimeoutTask extends Thread {
-        
+
         @Override
         public void run() {
             TimeoutMessage timeout;
@@ -172,8 +175,14 @@ public class Queue extends BaseDestinati
                     if (timeout != null) {
                         synchronized (messagesWaitingForSpace) {
                             if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
-                                ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
-                                        + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
+                                ExceptionResponse response = new ExceptionResponse(
+                                        new ResourceAllocationException(
+                                                "Usage Manager Memory Limit reached. Stopping producer ("
+                                                        + timeout.message.getProducerId()
+                                                        + ") to prevent flooding "
+                                                        + getActiveMQDestination().getQualifiedName()
+                                                        + "."
+                                                        + " See http://activemq.apache.org/producer-flow-control.html for more info"));
                                 response.setCorrelationId(timeout.message.getCommandId());
                                 timeout.context.getConnection().dispatchAsync(response);
                             }
@@ -187,19 +196,19 @@ public class Queue extends BaseDestinati
             }
         }
     };
-    
+
     private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
-    
 
     private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
 
         public int compare(Subscription s1, Subscription s2) {
-            //We want the list sorted in descending order
+            // We want the list sorted in descending order
             return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
         }
     };
 
-    public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
+    public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,
+            DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
         super(brokerService, store, destination, parentStats);
         this.taskFactory = taskFactory;
         this.dispatchSelector = new QueueDispatchSelector(destination);
@@ -212,7 +221,8 @@ public class Queue extends BaseDestinati
         }
     }
 
-    // make the queue easily visible in the debugger from its task runner threads
+    // make the queue easily visible in the debugger from its task runner
+    // threads
     final class QueueThread extends Thread {
         final Queue queue;
 
@@ -231,9 +241,12 @@ public class Queue extends BaseDestinati
                 this.messages = new StoreQueueCursor(broker, this);
             }
         }
-        // If a VMPendingMessageCursor don't use the default Producer System Usage
-        // since it turns into a shared blocking queue which can lead to a network deadlock.  
-        // If we are cursoring to disk..it's not and issue because it does not block due 
+        // If a VMPendingMessageCursor don't use the default Producer System
+        // Usage
+        // since it turns into a shared blocking queue which can lead to a
+        // network deadlock.
+        // If we are cursoring to disk..it's not and issue because it does not
+        // block due
         // to large disk sizes.
         if (messages instanceof VMPendingMessageCursor) {
             this.systemUsage = brokerService.getSystemUsage();
@@ -260,7 +273,8 @@ public class Queue extends BaseDestinati
                         if (message.isExpired()) {
                             if (broker.isExpired(message)) {
                                 messageExpired(createConnectionContext(), createMessageReference(message));
-                                // drop message will decrement so counter balance here
+                                // drop message will decrement so counter
+                                // balance here
                                 destinationStatistics.getMessages().increment();
                             }
                             return true;
@@ -300,8 +314,9 @@ public class Queue extends BaseDestinati
     }
 
     /*
-     * Holder for subscription that needs attention on next iterate
-     * browser needs access to existing messages in the queue that have already been dispatched
+     * Holder for subscription that needs attention on next iterate browser
+     * needs access to existing messages in the queue that have already been
+     * dispatched
      */
     class BrowserDispatch {
         QueueBrowserSubscription browser;
@@ -370,26 +385,30 @@ public class Queue extends BaseDestinati
                     browserDispatches.addLast(browserDispatch);
                 }
             }
-            
+
             if (!(this.optimizedDispatch || isSlave())) {
                 wakeup();
             }
         }
         if (this.optimizedDispatch || isSlave()) {
             // Outside of dispatchLock() to maintain the lock hierarchy of
-            // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
+            // iteratingMutex -> dispatchLock. - see
+            // https://issues.apache.org/activemq/browse/AMQ-1878
             wakeup();
         }
     }
-    
-    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
+
+    public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
+            throws Exception {
         destinationStatistics.getConsumers().decrement();
         // synchronize with dispatch method so that no new messages are sent
         // while removing up a subscription.
         synchronized (dispatchMutex) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
-                        + getDestinationStatistics().getDispatched().getCount() + ", inflight: " + getDestinationStatistics().getInflight().getCount());
+                LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: "
+                        + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+                        + getDestinationStatistics().getDispatched().getCount() + ", inflight: "
+                        + getDestinationStatistics().getInflight().getCount());
             }
             synchronized (consumers) {
                 removeFromConsumerList(sub);
@@ -398,7 +417,9 @@ public class Queue extends BaseDestinati
                     if (exclusiveConsumer == sub) {
                         exclusiveConsumer = null;
                         for (Subscription s : consumers) {
-                            if (s.getConsumerInfo().isExclusive() && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority())) {
+                            if (s.getConsumerInfo().isExclusive()
+                                    && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer
+                                            .getConsumerInfo().getPriority())) {
                                 exclusiveConsumer = s;
 
                             }
@@ -410,13 +431,15 @@ public class Queue extends BaseDestinati
                 getMessageGroupOwners().removeConsumer(consumerId);
 
                 // redeliver inflight messages
-                
+
                 for (MessageReference ref : sub.remove(context, this)) {
                     QueueMessageReference qmr = (QueueMessageReference) ref;
                     if (qmr.getLockOwner() == sub) {
                         qmr.unlock();
-                        // only increment redelivery if it was delivered or we have no delivery information
-                        if (lastDeiveredSequenceId == 0 || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
+                        // only increment redelivery if it was delivered or we
+                        // have no delivery information
+                        if (lastDeiveredSequenceId == 0
+                                || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
                             qmr.incrementRedeliveryCounter();
                         }
                     }
@@ -432,7 +455,8 @@ public class Queue extends BaseDestinati
         }
         if (this.optimizedDispatch || isSlave()) {
             // Outside of dispatchLock() to maintain the lock hierarchy of
-            // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
+            // iteratingMutex -> dispatchLock. - see
+            // https://issues.apache.org/activemq/browse/AMQ-1878
             wakeup();
         }
     }
@@ -443,9 +467,10 @@ public class Queue extends BaseDestinati
         // destination.. it may have expired.
         message.setRegionDestination(this);
         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
-        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
+        final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
+                && !context.isInRecoveryMode();
         if (message.isExpired()) {
-            //message not stored - or added to stats yet - so chuck here
+            // message not stored - or added to stats yet - so chuck here
             broker.getRoot().messageExpired(context, message);
             if (sendProducerAck) {
                 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
@@ -459,20 +484,28 @@ public class Queue extends BaseDestinati
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("Usage Manager Memory Limit ("+ memoryUsage.getLimit() + ") reached on " + getActiveMQDestination().getQualifiedName()
-                            + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
-                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
+                    LOG
+                            .info("Usage Manager Memory Limit ("
+                                    + memoryUsage.getLimit()
+                                    + ") reached on "
+                                    + getActiveMQDestination().getQualifiedName()
+                                    + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+                                    + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
 
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
-                            + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
+                    throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer ("
+                            + message.getProducerId() + ") to prevent flooding "
+                            + getActiveMQDestination().getQualifiedName() + "."
+                            + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
 
-                // We can avoid blocking due to low usage if the producer is sending
+                // We can avoid blocking due to low usage if the producer is
+                // sending
                 // a sync message or if it is using a producer window
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
-                    // copy the exchange state since the context will be modified while we are waiting
+                    // copy the exchange state since the context will be
+                    // modified while we are waiting
                     // for space.
                     final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
                     synchronized (messagesWaitingForSpace) {
@@ -491,7 +524,8 @@ public class Queue extends BaseDestinati
                                     }
 
                                     if (sendProducerAck) {
-                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
+                                        ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message
+                                                .getSize());
                                         context.getConnection().dispatchAsync(ack);
                                     } else {
                                         Response response = new Response();
@@ -510,9 +544,10 @@ public class Queue extends BaseDestinati
                                 }
                             }
                         });
-                        
+
                         if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
-                            flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
+                            flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage
+                                    .getSendFailIfNoSpaceAfterTimeout()));
                         }
 
                         registerCallbackForNotFullNotification();
@@ -523,8 +558,10 @@ public class Queue extends BaseDestinati
                 } else {
 
                     if (memoryUsage.isFull()) {
-                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer (" + message.getProducerId() + ") stopped to prevent flooding "
-                                + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
+                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
+                                + message.getProducerId() + ") stopped to prevent flooding "
+                                + getActiveMQDestination().getQualifiedName() + "."
+                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
                     }
 
                     // The usage manager could have delayed us by the time
@@ -555,14 +592,18 @@ public class Queue extends BaseDestinati
         }
     }
 
-    void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+    void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
+            Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         Future<Object> result = null;
         synchronized (sendLock) {
             if (store != null && message.isPersistent()) {
                 if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
 
-                    String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of " + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+                    String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+                            + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+                            + message.getProducerId() + ") to prevent flooding "
+                            + getActiveMQDestination().getQualifiedName() + "."
                             + " See http://activemq.apache.org/producer-flow-control.html for more info";
 
                     if (systemUsage.isSendFailIfNoSpace()) {
@@ -572,11 +613,7 @@ public class Queue extends BaseDestinati
                     waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
                 }
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
-                if (context.isInTransaction()) {
-                    store.addMessage(context, message);
-                }else {
-                    result = store.asyncAddQueueMessage(context, message);
-                }
+                result = store.asyncAddQueueMessage(context, message);
             }
         }
         if (context.isInTransaction()) {
@@ -613,10 +650,10 @@ public class Queue extends BaseDestinati
         }
         if (result != null && !result.isCancelled()) {
             try {
-            result.get();
-            }catch(CancellationException e) {
-              //ignore - the task has been cancelled if the message
-              // has already been deleted
+                result.get();
+            } catch (CancellationException e) {
+                // ignore - the task has been cancelled if the message
+                // has already been deleted
             }
         }
     }
@@ -652,10 +689,12 @@ public class Queue extends BaseDestinati
     public void gc() {
     }
 
-    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
+    public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
+            throws IOException {
         messageConsumed(context, node);
         if (store != null && node.isPersistent()) {
-            // the original ack may be a ranged ack, but we are trying to delete a specific
+            // the original ack may be a ranged ack, but we are trying to delete
+            // a specific
             // message store here so we need to convert to a non ranged ack.
             if (ack.getMessageCount() > 0) {
                 // Dup the ack
@@ -692,7 +731,8 @@ public class Queue extends BaseDestinati
         synchronized (messages) {
             size = messages.size();
         }
-        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
+        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
+                + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
                 + messageGroupOwners;
     }
 
@@ -705,15 +745,15 @@ public class Queue extends BaseDestinati
         if (getExpireMessagesPeriod() > 0) {
             scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
         }
-        
+
         flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
-        
+
         // Start flow control timeout task
         // Prevent trying to start it multiple times
         if (!flowControlTimeoutTask.isAlive()) {
             flowControlTimeoutTask.start();
         }
-        
+
         doPageIn(false);
     }
 
@@ -726,7 +766,7 @@ public class Queue extends BaseDestinati
         }
 
         scheduler.cancel(expireMessagesTask);
-        
+
         if (flowControlTimeoutTask.isAlive()) {
             flowControlTimeoutTask.interrupt();
         }
@@ -897,7 +937,8 @@ public class Queue extends BaseDestinati
         }
     }
 
-    private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
+    private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize,
+            List<MessageReference> toExpire) throws Exception {
         for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
             QueueMessageReference ref = i.next();
             if (ref.isExpired()) {
@@ -962,10 +1003,13 @@ public class Queue extends BaseDestinati
                 } catch (IOException e) {
                 }
             }
-            // don't spin/hang if stats are out and there is nothing left in the store
+            // don't spin/hang if stats are out and there is nothing left in the
+            // store
         } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
         if (this.destinationStatistics.getMessages().getCount() > 0) {
-            LOG.warn(getActiveMQDestination().getQualifiedName() + " after purge complete, message count stats report: " +  this.destinationStatistics.getMessages().getCount());
+            LOG.warn(getActiveMQDestination().getQualifiedName()
+                    + " after purge complete, message count stats report: "
+                    + this.destinationStatistics.getMessages().getCount());
         }
         gc();
         this.destinationStatistics.getMessages().setCount(0);
@@ -1032,7 +1076,8 @@ public class Queue extends BaseDestinati
     /**
      * Copies the message matching the given messageId
      */
-    public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception {
+    public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
+            throws Exception {
         return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
     }
 
@@ -1041,7 +1086,8 @@ public class Queue extends BaseDestinati
      * 
      * @return the number of messages copied
      */
-    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
+    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
+            throws Exception {
         return copyMatchingMessagesTo(context, selector, dest, -1);
     }
 
@@ -1051,7 +1097,8 @@ public class Queue extends BaseDestinati
      * 
      * @return the number of messages copied
      */
-    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
+    public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
+            int maximumMessages) throws Exception {
         return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
     }
 
@@ -1061,7 +1108,8 @@ public class Queue extends BaseDestinati
      * 
      * @return the number of messages copied
      */
-    public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
+    public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest,
+            int maximumMessages) throws Exception {
         int movedCounter = 0;
         int count = 0;
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
@@ -1098,9 +1146,12 @@ public class Queue extends BaseDestinati
     /**
      * Move a message
      * 
-     * @param context connection context
-     * @param m message
-     * @param dest ActiveMQDestination
+     * @param context
+     *            connection context
+     * @param m
+     *            message
+     * @param dest
+     *            ActiveMQDestination
      * @throws Exception
      */
     public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception {
@@ -1116,7 +1167,8 @@ public class Queue extends BaseDestinati
     /**
      * Moves the message matching the given messageId
      */
-    public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception {
+    public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest)
+            throws Exception {
         return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
     }
 
@@ -1125,7 +1177,8 @@ public class Queue extends BaseDestinati
      * 
      * @return the number of messages removed
      */
-    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
+    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest)
+            throws Exception {
         return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
     }
 
@@ -1133,7 +1186,8 @@ public class Queue extends BaseDestinati
      * Moves the messages matching the given selector up to the maximum number
      * of matched messages
      */
-    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
+    public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest,
+            int maximumMessages) throws Exception {
         return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
     }
 
@@ -1141,7 +1195,8 @@ public class Queue extends BaseDestinati
      * Moves the messages matching the given filter up to the maximum number of
      * matched messages
      */
-    public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
+    public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter,
+            ActiveMQDestination dest, int maximumMessages) throws Exception {
         int movedCounter = 0;
         Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
         do {
@@ -1180,7 +1235,7 @@ public class Queue extends BaseDestinati
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        boolean pageInMoreMessages = false;       
+        boolean pageInMoreMessages = false;
         synchronized (iteratingMutex) {
 
             // do early to allow dispatch of these waiting messages
@@ -1202,7 +1257,8 @@ public class Queue extends BaseDestinati
                 firstConsumer = false;
                 try {
                     if (consumersBeforeDispatchStarts > 0) {
-                        int timeout = 1000; // wait one second by default if consumer count isn't reached  
+                        int timeout = 1000; // wait one second by default if
+                                            // consumer count isn't reached
                         if (timeBeforeDispatchStarts > 0) {
                             timeout = timeBeforeDispatchStarts;
                         }
@@ -1212,7 +1268,8 @@ public class Queue extends BaseDestinati
                             }
                         } else {
                             if (LOG.isDebugEnabled()) {
-                                LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch.");
+                                LOG.debug(timeout + " ms elapsed and " + consumers.size()
+                                        + " consumers subscribed. Starting dispatch.");
                             }
                         }
                     }
@@ -1226,21 +1283,24 @@ public class Queue extends BaseDestinati
                     LOG.error(e);
                 }
             }
-            
+
             BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
 
             synchronized (messages) {
                 pageInMoreMessages |= !messages.isEmpty();
             }
 
-            // Kinda ugly.. but I think dispatchLock is the only mutex protecting the 
-            // pagedInPendingDispatch variable. 	        
+            // Kinda ugly.. but I think dispatchLock is the only mutex
+            // protecting the
+            // pagedInPendingDispatch variable.
             synchronized (dispatchMutex) {
                 pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
             }
 
-            // Perhaps we should page always into the pagedInPendingDispatch list if 
-            // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
+            // Perhaps we should page always into the pagedInPendingDispatch
+            // list if
+            // !messages.isEmpty(), and then if
+            // !pagedInPendingDispatch.isEmpty()
             // then we do a dispatch.
             if (pageInMoreMessages || pendingBrowserDispatch != null || !redeliveredWaitingDispatch.isEmpty()) {
                 try {
@@ -1250,7 +1310,7 @@ public class Queue extends BaseDestinati
                     LOG.error("Failed to page in more queue messages ", e);
                 }
             }
-            
+
             if (pendingBrowserDispatch != null) {
                 ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
                 synchronized (pagedInMessages) {
@@ -1264,7 +1324,7 @@ public class Queue extends BaseDestinati
                     try {
                         MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
                         msgContext.setDestination(destination);
-                        
+
                         QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
                         for (QueueMessageReference node : alreadyDispatchedMessages) {
                             if (!node.isAcked()) {
@@ -1278,10 +1338,10 @@ public class Queue extends BaseDestinati
                     } catch (Exception e) {
                         LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
                     }
-                
+
                 } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
             }
-            
+
             if (pendingWakeups.get() > 0) {
                 pendingWakeups.decrementAndGet();
             }
@@ -1336,7 +1396,8 @@ public class Queue extends BaseDestinati
         removeMessage(c, subs, r, ack);
     }
 
-    protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, MessageAck ack) throws IOException {
+    protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference,
+            MessageAck ack) throws IOException {
         reference.setAcked(true);
         // This sends the ack the the journal..
         if (!ack.isInTransaction()) {
@@ -1408,7 +1469,8 @@ public class Queue extends BaseDestinati
     final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
         if (!msg.isPersistent() && messages.getSystemUsage() != null) {
             if (systemUsage.getTempUsage().isFull()) {
-                final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+                final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId()
+                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                         + " See http://activemq.apache.org/producer-flow-control.html for more info";
                 if (systemUsage.isSendFailIfNoSpace()) {
                     throw new ResourceAllocationException(logMessage);
@@ -1460,12 +1522,14 @@ public class Queue extends BaseDestinati
         synchronized (dispatchMutex) {
             int toPageIn = Math.min(getMaxPageSize(), messages.size());
             if (LOG.isDebugEnabled()) {
-                LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+                LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
+                        + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
                         + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
             }
 
             if (isLazyDispatch() && !force) {
-                // Only page in the minimum number of messages which can be dispatched immediately.
+                // Only page in the minimum number of messages which can be
+                // dispatched immediately.
                 toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
             }
             if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size() < getMaxPageSize()))) {
@@ -1478,6 +1542,7 @@ public class Queue extends BaseDestinati
                         while (messages.hasNext() && count < toPageIn) {
                             MessageReference node = messages.next();
                             messages.remove();
+                            
                             QueueMessageReference ref = createMessageReference(node.getMessage());
                             if (ref.isExpired()) {
                                 if (broker.isExpired(ref)) {
@@ -1494,7 +1559,8 @@ public class Queue extends BaseDestinati
                         messages.release();
                     }
                 }
-                // Only add new messages, not already pagedIn to avoid multiple dispatch attempts
+                // Only add new messages, not already pagedIn to avoid multiple
+                // dispatch attempts
                 synchronized (pagedInMessages) {
                     resultList = new ArrayList<QueueMessageReference>(result.size());
                     for (QueueMessageReference ref : result) {
@@ -1520,7 +1586,8 @@ public class Queue extends BaseDestinati
 
             synchronized (pagedInPendingDispatch) {
                 if (!redeliveredWaitingDispatch.isEmpty()) {
-                    // Try first to dispatch redelivered messages to keep an proper order
+                    // Try first to dispatch redelivered messages to keep an
+                    // proper order
                     redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch);
                 }
                 if (!pagedInPendingDispatch.isEmpty()) {
@@ -1528,7 +1595,8 @@ public class Queue extends BaseDestinati
                     // dispatched before.
                     pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
                 }
-                // and now see if we can dispatch the new stuff.. and append to the pending
+                // and now see if we can dispatch the new stuff.. and append to
+                // the pending
                 // list anything that does not actually get dispatched.
                 if (list != null && !list.isEmpty()) {
                     if (pagedInPendingDispatch.isEmpty()) {
@@ -1581,29 +1649,34 @@ public class Queue extends BaseDestinati
                         if (!s.isFull()) {
                             // Dispatch it.
                             s.add(node);
-                            target = s;
+                            target = s;                            
                             break;
                         } else {
-                            // no further dispatch of list to a full consumer to avoid out of order message receipt 
+                            // no further dispatch of list to a full consumer to
+                            // avoid out of order message receipt
                             fullConsumers.add(s);
                         }
                     }
                     interestCount++;
                 } else {
                     // makes sure it gets dispatched again
-                    if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
+                    if (!node.isDropped() && !((QueueMessageReference) node).isAcked()
+                            && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
                         interestCount++;
                     }
                 }
             }
 
             if ((target == null && interestCount > 0) || consumers.size() == 0) {
-                // This means all subs were full or that there are no consumers...
+                // This means all subs were full or that there are no
+                // consumers...
                 rc.add((QueueMessageReference) node);
             }
 
-            // If it got dispatched, rotate the consumer list to get round robin distribution. 
-            if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) {
+            // If it got dispatched, rotate the consumer list to get round robin
+            // distribution.
+            if (target != null && !strictOrderDispatch && consumers.size() > 1
+                    && !dispatchSelector.isExclusiveConsumer(target)) {
                 synchronized (this.consumers) {
                     if (removeFromConsumerList(target)) {
                         addToConsumerList(target);
@@ -1654,7 +1727,6 @@ public class Queue extends BaseDestinati
      * dispatch process is non deterministic between master and slave. On a
      * notification, the actual dispatch to the subscription (as chosen by the
      * master) is completed. (non-Javadoc)
-     * 
      * @see
      * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
      * (org.apache.activemq.command.MessageDispatchNotification)
@@ -1670,7 +1742,8 @@ public class Queue extends BaseDestinati
         }
     }
 
-    private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception {
+    private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification)
+            throws Exception {
         QueueMessageReference message = null;
         MessageId messageId = messageDispatchNotification.getMessageId();
 
@@ -1719,8 +1792,11 @@ public class Queue extends BaseDestinati
 
         }
         if (message == null) {
-            throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination()
-                    + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " + messageDispatchNotification.getConsumerId());
+            throw new JMSException("Slave broker out of sync with master - Message: "
+                    + messageDispatchNotification.getMessageId() + " on "
+                    + messageDispatchNotification.getDestination() + " does not exist among pending("
+                    + pagedInPendingDispatch.size() + ") for subscription: "
+                    + messageDispatchNotification.getConsumerId());
         }
         return message;
     }
@@ -1732,7 +1808,8 @@ public class Queue extends BaseDestinati
      * @return sub or null if the subscription has been removed before dispatch
      * @throws JMSException
      */
-    private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) throws JMSException {
+    private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification)
+            throws JMSException {
         Subscription sub = null;
         synchronized (consumers) {
             for (Subscription s : consumers) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Jun 15 20:33:41 2010
@@ -163,10 +163,10 @@ public class Topic extends BaseDestinati
             DurableTopicSubscription removed = durableSubcribers.remove(key);
             if (removed != null) {
                 destinationStatistics.getConsumers().decrement();
-            }
-            // deactivate and remove
-            removed.deactivate(false);
-            consumers.remove(removed);
+                // deactivate and remove
+                removed.deactivate(false);
+                consumers.remove(removed);
+            }         
         }
     }
 
@@ -418,12 +418,8 @@ public class Topic extends BaseDestinati
                 }
 
                 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
-            }
-            if (context.isInTransaction()) {
-                topicStore.addMessage(context, message);
-            }else {
-                result = topicStore.asyncAddTopicMessage(context, message);
             }      
+            topicStore.asyncAddTopicMessage(context, message);      
         }
 
         message.incrementReferenceCount();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Tue Jun 15 20:33:41 2010
@@ -602,6 +602,8 @@ public abstract class Message extends Ba
 
         if (rc == 1 && getMemoryUsage() != null) {
             getMemoryUsage().increaseUsage(size);
+            //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
+           
         }
 
         //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
@@ -618,7 +620,10 @@ public abstract class Message extends Ba
 
         if (rc == 0 && getMemoryUsage() != null) {
             getMemoryUsage().decreaseUsage(size);
+            //Thread.dumpStack();
+            //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
         }
+       
         //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
 
         return rc;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TransactionStore.java Tue Jun 15 20:33:41 2010
@@ -17,8 +17,6 @@
 package org.apache.activemq.store;
 
 import java.io.IOException;
-import java.util.concurrent.FutureTask;
-
 import org.apache.activemq.Service;
 import org.apache.activemq.command.TransactionId;
 
@@ -32,10 +30,9 @@ public interface TransactionStore extend
 
     void prepare(TransactionId txid) throws IOException;
 
-    void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException;
+    void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException;
 
     void rollback(TransactionId txid) throws IOException;
 
     void recover(TransactionRecoveryListener listener) throws IOException;
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -21,9 +21,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
 import javax.transaction.xa.XAException;
-
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
@@ -99,7 +97,10 @@ public class AMQTransactionStore impleme
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
+        if (preCommit != null) {
+            preCommit.run();
+        }
         AMQTx tx;
         if (wasPrepared) {
             synchronized (preparedTransactions) {
@@ -111,7 +112,9 @@ public class AMQTransactionStore impleme
             }
         }
         if (tx == null) {
-            done.run();
+            if (postCommit != null) {
+                postCommit.run();
+            }
             return;
         }
         if (txid.isXATransaction()) {
@@ -119,7 +122,9 @@ public class AMQTransactionStore impleme
         } else {
             peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
         }
-        done.run();
+        if (postCommit != null) {
+            postCommit.run();
+        }
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -22,9 +22,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
 import javax.transaction.xa.XAException;
-
 import org.apache.activeio.journal.RecordLocation;
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.JournalTransaction;
@@ -40,8 +38,8 @@ import org.apache.activemq.store.Transac
 public class JournalTransactionStore implements TransactionStore {
 
     private final JournalPersistenceAdapter peristenceAdapter;
-    private Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
-    private Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
+    private final Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
+    private final Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
     private boolean doingRecover;
 
     public static class TxOperation {
@@ -70,7 +68,7 @@ public class JournalTransactionStore imp
     public static class Tx {
 
         private final RecordLocation location;
-        private ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
+        private final ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
 
         public Tx(RecordLocation location) {
             this.location = location;
@@ -176,8 +174,11 @@ public class JournalTransactionStore imp
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
         Tx tx;
+        if (preCommit != null) {
+            preCommit.run();
+        }
         if (wasPrepared) {
             synchronized (preparedTransactions) {
                 tx = preparedTransactions.remove(txid);
@@ -188,7 +189,9 @@ public class JournalTransactionStore imp
             }
         }
         if (tx == null) {
-            done.run();
+            if (postCommit != null) {
+                postCommit.run();
+            }
             return;
         }
         if (txid.isXATransaction()) {
@@ -198,7 +201,9 @@ public class JournalTransactionStore imp
             peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
                                                                   wasPrepared), true);
         }
-        done.run();
+        if (postCommit != null) {
+            postCommit.run();
+        }
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=955039&r1=955038&r2=955039&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Tue Jun 15 20:33:41 2010
@@ -21,9 +21,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-
 import javax.transaction.xa.XAException;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
@@ -38,7 +36,6 @@ import org.apache.activemq.store.ProxyTo
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.journal.JournalPersistenceAdapter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -51,9 +48,9 @@ import org.apache.commons.logging.LogFac
 public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {	
     private static final Log LOG = LogFactory.getLog(KahaTransactionStore.class);
 	
-    private Map transactions = new ConcurrentHashMap();
-    private Map prepared;
-    private KahaPersistenceAdapter adaptor;
+    private final Map transactions = new ConcurrentHashMap();
+    private final Map prepared;
+    private final KahaPersistenceAdapter adaptor;
     
     private BrokerService brokerService;
 
@@ -64,10 +61,12 @@ public class KahaTransactionStore implem
 
     public MessageStore proxy(MessageStore messageStore) {
         return new ProxyMessageStore(messageStore) {
+            @Override
             public void addMessage(ConnectionContext context, final Message send) throws IOException {
                 KahaTransactionStore.this.addMessage(getDelegate(), send);
             }
 
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
                 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
             }
@@ -76,10 +75,12 @@ public class KahaTransactionStore implem
 
     public TopicMessageStore proxy(TopicMessageStore messageStore) {
         return new ProxyTopicMessageStore(messageStore) {
+            @Override
             public void addMessage(ConnectionContext context, final Message send) throws IOException {
                 KahaTransactionStore.this.addMessage(getDelegate(), send);
             }
 
+            @Override
             public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
                 KahaTransactionStore.this.removeMessage(getDelegate(), ack);
             }
@@ -101,13 +102,18 @@ public class KahaTransactionStore implem
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid, boolean wasPrepared, Runnable done) throws IOException {
+    public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
+        if(before != null) {
+            before.run();
+        }
         KahaTransaction tx = getTx(txid);
         if (tx != null) {
             tx.commit(this);
             removeTx(txid);
         }
-        done.run();
+        if (after != null) {
+            after.run();
+        }
     }
 
     /**