You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/30 19:11:17 UTC

svn commit: r789831 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ activemq-kaha/src/main/proto/ activemq-openwire/src/main/jav...

Author: cmacnaug
Date: Tue Jun 30 17:11:16 2009
New Revision: 789831

URL: http://svn.apache.org/viewvc?rev=789831&view=rev
Log:
Refactoring in support of Transactions.

-Changed Store interface delete to remove by queue key as opposed to message key.
-Updates to Transaction/TransactionMgr

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Tue Jun 30 17:11:16 2009
@@ -560,8 +560,8 @@
      *            The queue.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext deleteQueueElement(long storeTracking, QueueDescriptor queue) {
-        return add(new DeleteOperation(storeTracking, queue), null, false);
+    public OperationContext deleteQueueElement(SaveableQueueElement<?> queueElement) {
+        return add(new DeleteOperation(queueElement.getSequenceNumber(), queueElement.getQueueDescriptor()), null, false);
     }
 
     /**
@@ -854,11 +854,11 @@
     }
 
     private class DeleteOperation extends OperationBase {
-        private final long storeTracking;
+        private final long queueKey;
         private QueueDescriptor queue;
 
-        public DeleteOperation(long tracking, QueueDescriptor queue) {
-            this.storeTracking = tracking;
+        public DeleteOperation(long queueKey, QueueDescriptor queue) {
+            this.queueKey = queueKey;
             this.queue = queue;
         }
 
@@ -870,7 +870,7 @@
         @Override
         protected void doExcecute(Session session) {
             try {
-                session.queueRemoveMessage(queue, storeTracking);
+                session.queueRemoveMessage(queue, queueKey);
             } catch (KeyNotFoundException e) {
                 // TODO Probably doesn't always mean an error, it is possible
                 // that
@@ -886,7 +886,7 @@
         }
 
         public String toString() {
-            return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + storeTracking + " " + super.toString();
+            return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + queueKey + " " + super.toString();
         }
     }
 
@@ -1223,17 +1223,17 @@
      * @param delayable
      */
     public <T> OperationContext saveQeueuElement(SaveableQueueElement<T> sqe, ISourceController<?> source, boolean delayable, MessageRecordMarshaller<T> marshaller) {
-        return add(new AddElementOpOperation<T>(sqe, delayable, marshaller), source, !delayable);
+        return add(new AddElementOperation<T>(sqe, delayable, marshaller), source, !delayable);
     }
 
-    private class AddElementOpOperation<T> extends OperationBase {
+    private class AddElementOperation<T> extends OperationBase {
 
         private final SaveableQueueElement<T> op;
         private MessageRecord record;
         private boolean delayable;
         private final MessageRecordMarshaller<T> marshaller;
 
-        public AddElementOpOperation(SaveableQueueElement<T> op, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+        public AddElementOperation(SaveableQueueElement<T> op, boolean delayable, MessageRecordMarshaller<T> marshaller) {
             this.op = op;
             this.delayable = delayable;
             if (!delayable) {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Tue Jun 30 17:11:16 2009
@@ -73,7 +73,7 @@
         return fromStore;
     }
 
-    public final void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+    public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
         synchronized (this) {
             // Can flush of this message to the store be delayed?
             if (enableFlushDelay && !delayable) {
@@ -83,13 +83,13 @@
             // list of queues for which to save the message when dispatch is
             // finished:
             if (dispatching) {
-                addPersistentTarget(elem);
+                addPersistentTarget(sqe);
                 return;
             }
             // Otherwise, if it is still in the saver queue, we can add this
             // queue to the queue list:
             else if (pendingSave != null) {
-                addPersistentTarget(elem);
+                addPersistentTarget(sqe);
                 if (!delayable) {
                     pendingSave.requestFlush();
                 }
@@ -97,10 +97,10 @@
             }
         }
 
-        store.saveMessage(elem, controller, delayable);
+        store.saveMessage(sqe, controller, delayable);
     }
 
-    public final void acknowledge(QueueDescriptor queue) {
+    public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
         boolean firePersistListener = false;
         boolean deleted = false;
         synchronized (this) {
@@ -110,7 +110,7 @@
 
                 deleted = true;
 
-                removePersistentTarget(queue);
+                removePersistentTarget(sqe.getQueueDescriptor());
                 // We get a save context when we place the message in the
                 // database queue. If it has been added to the queue,
                 // and we've removed the last queue, see if we can cancel
@@ -127,7 +127,7 @@
         }
 
         if (!deleted) {
-            store.deleteQueueElement(getStoreTracking(), queue);
+            store.deleteQueueElement(sqe);
         }
 
         if (firePersistListener) {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Tue Jun 30 17:11:16 2009
@@ -106,7 +106,7 @@
         public Integer map(MessageDelivery element) {
             return element.getFlowLimiterSize();
         }
-    };    
+    };
 
     public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
         public Integer map(MessageDelivery element) {
@@ -127,7 +127,7 @@
             return (int) (element.getProducerId().hashCode() % 10);
         }
     };
-    
+
     public static final short SUBPARTITION_TYPE = 0;
     public static final short SHARED_QUEUE_TYPE = 1;
     public static final short DURABLE_QUEUE_TYPE = 2;
@@ -251,7 +251,7 @@
 
         }
     }
-    
+
     private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
 
         IQueue<Long, MessageDelivery> queue;
@@ -452,8 +452,16 @@
         return ret;
     }
 
-    public final void deleteQueueElement(QueueDescriptor descriptor, MessageDelivery elem) {
-        elem.acknowledge(descriptor);
+    public final void deleteQueueElement(SaveableQueueElement<MessageDelivery> sqe) {
+        MessageDelivery md = sqe.getElement();
+        //If the message delivery isn't null, funnel through it 
+        //since the message may not yet be in the store:
+        if (md != null) {
+            md.acknowledge(sqe);
+        } else {
+            database.deleteQueueElement(sqe);
+        }
+
     }
 
     public final boolean isFromStore(MessageDelivery elem) {
@@ -464,8 +472,7 @@
         elem.getElement().persist(elem, controller, delayable);
     }
 
-    public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,
-            RestoreListener<MessageDelivery> listener) {
+    public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
         database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java Tue Jun 30 17:11:16 2009
@@ -43,7 +43,7 @@
      * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
      */
     @Override
-    public void commit(boolean onePhase) throws XAException, IOException {
+    public void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException {
         // TODO Auto-generated method stub
 
     }
@@ -54,7 +54,7 @@
      * @see org.apache.activemq.apollo.broker.Transaction#prepare()
      */
     @Override
-    public int prepare() throws XAException, IOException {
+    public int prepare(TransactionListener listener) throws XAException, IOException {
         // TODO Auto-generated method stub
         return 0;
     }
@@ -65,7 +65,7 @@
      * @see org.apache.activemq.apollo.broker.Transaction#rollback()
      */
     @Override
-    public void rollback() throws XAException, IOException {
+    public void rollback(TransactionListener listener) throws XAException, IOException {
         // TODO Auto-generated method stub
 
     }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java Tue Jun 30 17:11:16 2009
@@ -22,7 +22,6 @@
 import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
 
 public interface MessageDelivery {
 
@@ -76,9 +75,11 @@
 
     /**
      * @return if the message is part of a transaction this returns the
-     *         transaction id.
+     *         transaction id returned by {@link Transaction#getTid()} otherwise
+     *         a value of -1 indicates that this delivery is not part of a
+     *         transaction
      */
-    public Buffer getTransactionId();
+    public long getTransactionId();
 
     /**
      * Called by a queue to request that the element be persisted. The save is
@@ -92,7 +93,7 @@
      * {@link SaveableQueueElement#requestSaveNotify()} method before attempting
      * to acces the store directly.
      * 
-     * @param elem
+     * @param sqe
      *            The element to save
      * @param controller
      *            A flow controller to use in the event that there isn't room in
@@ -100,16 +101,16 @@
      * @param delayable
      *            Whether or not the save operation can be delayed.
      */
-    public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable);
+    public void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable);
 
     /**
      * Acknowledges the message for a particular queue. This will cause it to be
      * deleted from the message store.
      * 
-     * @param queue
-     *            The queue for which to acknowledge the message.
+     * @param sqe
+     *            The queue element to delete
      */
-    public void acknowledge(QueueDescriptor queue);
+    public void acknowledge(SaveableQueueElement<MessageDelivery> sqe);
 
     /**
      * Gets the tracking number used to identify this message in the message
@@ -118,9 +119,10 @@
      * @return The store tracking or -1 if not set.
      */
     public long getStoreTracking();
-    
+
     /**
      * Used to apply selectors against the message.
+     * 
      * @return
      */
     public MessageEvaluationContext createMessageEvaluationContext();

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java Tue Jun 30 17:11:16 2009
@@ -22,7 +22,6 @@
 import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
 
 /**
  * @author cmacnaug
@@ -32,14 +31,9 @@
 
     private final MessageDelivery delegate;
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
-    public void acknowledge(QueueDescriptor queue) {
-        delegate.acknowledge(queue);
+   
+    public void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
+        delegate.acknowledge(sqe);
     }
 
     /**
@@ -128,7 +122,7 @@
      * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
      *      org.apache.activemq.flow.ISourceController, boolean)
      */
-    public Buffer getTransactionId() {
+    public long getTransactionId() {
         return delegate.getTransactionId();
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Tue Jun 30 17:11:16 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.apollo.broker;
 
 import java.io.IOException;
+import java.util.HashSet;
 
 import javax.transaction.xa.XAException;
 
@@ -32,8 +33,6 @@
 /**
  * Keeps track of all the actions the need to be done when a transaction does a
  * commit or rollback.
- * 
- * @version $Revision: 1.5 $
  */
 public abstract class Transaction {
 
@@ -49,6 +48,7 @@
     private final TransactionManager manager;
     private final long tid;
     private final IQueue<Long, TxOp> opQueue;
+    private HashSet<TransactionListener> listeners;
 
     Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
         this.manager = manager;
@@ -134,22 +134,26 @@
 
     protected void fireAfterCommit() throws Exception {
 
-        //TODO
+        synchronized (this) {
+
+        }
     }
 
     public void fireAfterRollback() throws Exception {
-        //TODO
+        synchronized (this) {
+
+        }
     }
 
     public String toString() {
         return super.toString() + "[queue=" + opQueue + "]";
     }
 
-    public abstract void commit(boolean onePhase) throws XAException, IOException;
+    public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException;
 
-    public abstract void rollback() throws XAException, IOException;
+    public abstract void rollback(TransactionListener listener) throws XAException, IOException;
 
-    public abstract int prepare() throws XAException, IOException;
+    public abstract int prepare(TransactionListener listener) throws XAException, IOException;
 
     public boolean isPrepared() {
         return getState() == PREPARED_STATE;
@@ -159,6 +163,20 @@
         return opQueue.getEnqueuedCount();
     }
 
+    public static abstract class TransactionListener {
+        public void onRollback(Transaction t) {
+
+        }
+
+        public void onCommit(Transaction t) {
+
+        }
+
+        public void onPrepared(Transaction t) {
+
+        }
+    }
+
     interface TxOp {
         public static final short TYPE_MESSAGE = 0;
         public static final short TYPE_ACK = 1;
@@ -266,9 +284,9 @@
         private boolean fromStore;
         private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
 
-        TxAck(IQueue<Long, ?> queue, long storeTracking, Transaction tx) {
+        TxAck(IQueue<Long, ?> queue, long removalKey, Transaction tx) {
             this.queue = queue;
-            this.storeTracking = storeTracking;
+            this.queueSequence = removalKey;
             this.tx = tx;
         }
 
@@ -290,7 +308,7 @@
          * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
          */
         public final void onCommit() {
-            //TODO
+            queue.remove(queueSequence);
 
         }
 
@@ -347,12 +365,12 @@
             AsciiBuffer queueName = queue.getDescriptor().getQueueName();
             DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
             try {
-				baos.writeShort(queueName.length);
-				baos.write(queueName.data, queueName.offset, queueName.length);
-				baos.writeLong(queueSequence);
-			} catch (IOException shouldNotHappen) {
-				throw new RuntimeException(shouldNotHappen);
-			}
+                baos.writeShort(queueName.length);
+                baos.write(queueName.data, queueName.offset, queueName.length);
+                baos.writeLong(queueSequence);
+            } catch (IOException shouldNotHappen) {
+                throw new RuntimeException(shouldNotHappen);
+            }
             return baos.toBuffer();
         }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Tue Jun 30 17:11:16 2009
@@ -58,9 +58,10 @@
 
     private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
     private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
+    private final HashMap<Buffer, XATransaction> xaTransactions = new HashMap<Buffer, XATransaction>();
 
     private final VirtualHost host;
-    private final BrokerDatabase database;
+    private BrokerDatabase database;
 
     private final AtomicLong tidGen = new AtomicLong(0);
     private final TransactionStore txStore;
@@ -155,7 +156,7 @@
      * @param xid
      * @return
      */
-    public final Transaction createTransaction(Buffer xid) {
+    public synchronized final Transaction createTransaction(Buffer xid) {
         Transaction ret;
 
         long tid = tidGen.incrementAndGet();
@@ -164,7 +165,9 @@
         if (xid == null) {
             ret = new LocalTransaction(this, tid, opQueue);
         } else {
-            ret = new XATransaction(this, tid, xid, opQueue);
+            XATransaction xat = new XATransaction(this, tid, xid, opQueue);
+            ret = xat;
+            xaTransactions.put(xid, xat);
         }
 
         transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
@@ -174,10 +177,18 @@
     }
 
     /**
+     * @param buffer
+     * @return
+     */
+    public synchronized Transaction getXATransaction(Buffer buffer) {
+        return xaTransactions.get(buffer);
+    }
+
+    /**
      * 
      * @throws Exception
      */
-    public void loadTransactions() throws Exception {
+    public synchronized void loadTransactions() throws Exception {
 
         tidGen.set(database.allocateStoreTracking());
 
@@ -196,26 +207,28 @@
 
             IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
             Transaction tx = loadTransaction(b, queue);
-            
+
             //TODO if we recover a tx that isn't committed then, we should discard it.
             if (tx.getState() < Transaction.FINISHED_STATE) {
                 LOG.warn("Recovered unfinished transaction: " + tx);
             }
             transactions.put(tx.getTid(), tx);
+            if (tx instanceof XATransaction) {
+                XATransaction xat = XATransaction.class.cast(tx);
+                xaTransactions.put(xat.getXid(), xat);
+            }
 
             LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
         }
 
-        if (txns.isEmpty()) {
+        if (!txns.isEmpty()) {
             //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be 
             //deleted:
             LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
-
         }
     }
 
-    private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException
-    {
+    private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException {
         //TODO move the serialization into the transaction itself:
         DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
         byte type = bais.readByte();
@@ -228,9 +241,9 @@
             tx = new LocalTransaction(this, tid, queue);
             break;
         case Transaction.TYPE_XA:
-        	int length = bais.readByte() & 0xFF;
-        	Buffer xid = new Buffer(new byte[length]);
-        	bais.readFully(xid.data);
+            int length = bais.readByte() & 0xFF;
+            Buffer xid = new Buffer(new byte[length]);
+            bais.readFully(xid.data);
             tx = new XATransaction(this, tid, xid, queue);
             break;
         default:
@@ -241,22 +254,21 @@
         return tx;
 
     }
-    
+
     public OperationContext persistTransaction(Transaction tx) throws IOException {
-        
+
         //TODO move the serialization into the transaction itself:
         DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
         baos.writeByte(tx.getType());
         baos.writeByte(tx.getState());
         baos.writeLong(tx.getTid());
-        if(tx.getType() == Transaction.TYPE_XA)
-        {
-        	Buffer xid = ((XATransaction)tx).getXid();
-        	// An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
-        	baos.writeByte( xid.length & 0xFF );
-        	baos.write(xid.data, xid.offset, xid.length);
+        if (tx.getType() == Transaction.TYPE_XA) {
+            Buffer xid = ((XATransaction) tx).getXid();
+            // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
+            baos.writeByte(xid.length & 0xFF);
+            baos.write(xid.data, xid.offset, xid.length);
         }
-        
+
         return database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
     }
 
@@ -320,7 +332,6 @@
          */
         public void addQueue(QueueDescriptor queue) {
             database.addQueue(queue);
-
         }
 
         /*
@@ -341,9 +352,8 @@
          * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
          * .activemq.queue.QueueDescriptor, java.lang.Object)
          */
-        public void deleteQueueElement(QueueDescriptor queue, TxOp element) {
-            database.deleteQueueElement(element.getStoreTracking(), queue);
-
+        public void deleteQueueElement(SaveableQueueElement<TxOp> sqe) {
+            database.deleteQueueElement(sqe);
         }
 
         /*

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Tue Jun 30 17:11:16 2009
@@ -40,24 +40,25 @@
     final private HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
     final private HashMap<String, DurableSubscription> durableSubs = new HashMap<String, DurableSubscription>();
     final private Router router = new Router();
-    
+
     private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
     private Broker broker;
     private boolean started;
     private BrokerDatabase database;
+    private TransactionManager txnManager;
 
     public VirtualHost() {
         this.router.setVirtualHost(this);
     }
 
     public VirtualHost(String name) {
-    	this();
-    	addHostName(new AsciiBuffer(name));
-	}
+        this();
+        addHostName(new AsciiBuffer(name));
+    }
 
-	public AsciiBuffer getHostName() {
+    public AsciiBuffer getHostName() {
         if (hostNames.size() > 0) {
-        	return hostNames.get(0);
+            return hostNames.get(0);
         }
         return null;
     }
@@ -65,12 +66,15 @@
     public List<AsciiBuffer> getHostNames() {
         return hostNames;
     }
+
     public void setHostNames(List<AsciiBuffer> hostNames) {
         this.hostNames = new ArrayList<AsciiBuffer>(hostNames);
     }
+
     public void addHostName(AsciiBuffer hostName) {
         this.hostNames.add(hostName);
     }
+
     public void removeHostName(AsciiBuffer hostName) {
         this.hostNames.remove(hostName);
     }
@@ -78,13 +82,15 @@
     public Router getRouter() {
         return router;
     }
-    
+
     public BrokerDatabase getDatabase() {
         return database;
     }
+
     public void setDatabase(BrokerDatabase database) {
         this.database = database;
     }
+
     public void setStore(Store store) {
         database = new BrokerDatabase(store);
     }
@@ -95,18 +101,21 @@
             return;
         }
 
-		if ( database == null ) {
-			Store store = createDefaultStore();
-			database = new BrokerDatabase(store);
-		}
-		
-	    database.setDispatcher(broker.getDispatcher());
-	    database.start();
+        if (database == null) {
+            Store store = createDefaultStore();
+            database = new BrokerDatabase(store);
+        }
+
+        database.setDispatcher(broker.getDispatcher());
+        database.start();
 
         router.setDatabase(database);
+
+        //Recover queues:
         queueStore.setDatabase(database);
         queueStore.setDispatcher(broker.getDispatcher());
         queueStore.loadQueues();
+
         // Create Queue instances
         for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
             Queue queue = new Queue(iQueue);
@@ -119,19 +128,24 @@
         for (Queue queue : queues.values()) {
             queue.start();
         }
+
+        //Recover transactions:
+        txnManager = new TransactionManager(this);
+        txnManager.loadTransactions();
+
         started = true;
     }
 
-	public Store createDefaultStore() throws Exception {
-		Store store = StoreFactory.createStore("kaha-db");
-		if( store.getStoreDirectory() == null ) {
-			File baseDir = broker.getDataDirectory();
-			String hostName = getHostName().toString();
-			String subDir = IOHelper.toFileSystemDirectorySafeName(hostName);
-			store.setStoreDirectory( new File(baseDir, subDir ) );
-		}
-		return store;
-	}
+    public Store createDefaultStore() throws Exception {
+        Store store = StoreFactory.createStore("kaha-db");
+        if (store.getStoreDirectory() == null) {
+            File baseDir = broker.getDataDirectory();
+            String hostName = getHostName().toString();
+            String subDir = IOHelper.toFileSystemDirectorySafeName(hostName);
+            store.setStoreDirectory(new File(baseDir, subDir));
+        }
+        return store;
+    }
 
     public synchronized void stop() throws Exception {
         if (!started) {
@@ -144,7 +158,7 @@
         for (IQueue<Long, MessageDelivery> queue : queueStore.getDurableQueues()) {
             queue.shutdown(true);
         }
-        
+
         database.stop();
         started = false;
     }
@@ -164,7 +178,7 @@
             Domain domain = router.getDomain(dest.getDomain());
             domain.bind(dest.getName(), queue);
             queues.put(dest.getName(), queue);
-            
+
             for (QueueLifecyleListener l : queueLifecyleListeners) {
                 l.onCreate(queue);
             }
@@ -173,6 +187,10 @@
         return queue;
     }
 
+    public TransactionManager getTransactionManager() {
+        return txnManager;
+    }
+
     public BrokerQueueStore getQueueStore() {
         return queueStore;
     }
@@ -182,20 +200,20 @@
     }
 
     public BrokerSubscription createSubscription(ConsumerContext consumer, Destination destination) throws Exception {
-        
+
         // First handle composite destinations..  
         Collection<Destination> destinations = destination.getDestinations();
-        if(destinations != null) {
+        if (destinations != null) {
             ArrayList<BrokerSubscription> subs = new ArrayList<BrokerSubscription>(destinations.size());
             for (Destination childDest : destinations) {
                 subs.add(createSubscription(consumer, childDest));
             }
             return new CompositeSubscription(destination, subs);
         }
-                
+
         // If it's a Topic...
-        if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN) ) {
-            
+        if (destination.getDomain().equals(Router.TOPIC_DOMAIN) || destination.getDomain().equals(Router.TEMP_TOPIC_DOMAIN)) {
+
             // It might be a durable subscription on the topic
             if (consumer.isDurable()) {
                 DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
@@ -211,51 +229,53 @@
             // return a standard subscription
             return new TopicSubscription(this, destination, consumer.getSelectorExpression());
         }
-        
+
         // It looks like a wild card subscription on a queue.. 
-        if( PathFilter.containsWildCards(destination.getName()) ){
+        if (PathFilter.containsWildCards(destination.getName())) {
             return new WildcardQueueSubscription(this, destination, consumer);
         }
 
         // It has to be a Queue subscription then..
         Queue queue = queues.get(destination.getName());
-        if( queue == null ) {
-            if( consumer.autoCreateDestination() ) {
+        if (queue == null) {
+            if (consumer.autoCreateDestination()) {
                 queue = createQueue(destination);
             } else {
-                throw new IllegalStateException("The queue does not exist: "+destination.getName());
+                throw new IllegalStateException("The queue does not exist: " + destination.getName());
             }
         }
         return new Queue.QueueSubscription(queue);
     }
 
-	public Broker getBroker() {
-		return broker;
-	}
-
-	public void setBroker(Broker broker) {
-		this.broker = broker;
-	}
-	
-	interface QueueLifecyleListener {
-	    
-	    /**
-	     * A destination has bean created
-	     * @param destination
-	     */
+    public Broker getBroker() {
+        return broker;
+    }
+
+    public void setBroker(Broker broker) {
+        this.broker = broker;
+    }
+
+    interface QueueLifecyleListener {
+
+        /**
+         * A destination has bean created
+         * 
+         * @param destination
+         */
         public void onCreate(Queue queue);
-        
+
         /**
-         * A destination has bean destroyed 
+         * A destination has bean destroyed
+         * 
          * @param destination
          */
         public void onDestroy(Queue queue);
-                
-	}
-	
-	ArrayList<QueueLifecyleListener> queueLifecyleListeners = new ArrayList<QueueLifecyleListener>();
 
-	synchronized public void addDestinationLifecyleListener(QueueLifecyleListener listener) {
+    }
+
+    ArrayList<QueueLifecyleListener> queueLifecyleListeners = new ArrayList<QueueLifecyleListener>();
+
+    synchronized public void addDestinationLifecyleListener(QueueLifecyleListener listener) {
         queueLifecyleListeners.add(listener);
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java Tue Jun 30 17:11:16 2009
@@ -20,6 +20,7 @@
 
 import javax.transaction.xa.XAException;
 
+import org.apache.activemq.apollo.broker.Transaction.TransactionListener;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.util.buffer.Buffer;
 
@@ -51,7 +52,7 @@
      * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
      */
     @Override
-    public void commit(boolean onePhase) throws XAException, IOException {
+    public void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException {
         // TODO Auto-generated method stub
 
     }
@@ -62,7 +63,7 @@
      * @see org.apache.activemq.apollo.broker.Transaction#prepare()
      */
     @Override
-    public int prepare() throws XAException, IOException {
+    public int prepare(TransactionListener listener) throws XAException, IOException {
         // TODO Auto-generated method stub
         return 0;
     }
@@ -73,7 +74,7 @@
      * @see org.apache.activemq.apollo.broker.Transaction#rollback()
      */
     @Override
-    public void rollback() throws XAException, IOException {
+    public void rollback(TransactionListener listener) throws XAException, IOException {
         // TODO Auto-generated method stub
 
     }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java Tue Jun 30 17:11:16 2009
@@ -23,6 +23,7 @@
 import javax.transaction.xa.Xid;
 
 import org.apache.activemq.util.buffer.Buffer;
+import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
 
 /**
  * An implementation of JTA transaction identifier (javax.transaction.xa.Xid).
@@ -244,4 +245,21 @@
         in.readFully(data);
         setBranchQualifier(data);
     }
+
+    /**
+     * @param tid
+     * @return
+     */
+    public static Buffer toBuffer(Xid xid) {
+        XidImpl x = new XidImpl(xid);
+        DataByteArrayOutputStream baos = new DataByteArrayOutputStream(x.getMemorySize());
+        try {
+            x.writebody(baos);
+        } catch (IOException e) {
+            //Shouldn't happen:
+            throw new RuntimeException(e);
+        }
+        return baos.toBuffer();
+        
+    }
 } // class XidImpl

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Tue Jun 30 17:11:16 2009
@@ -229,15 +229,23 @@
         }
     }
 
-    public boolean remove(Transaction tx, long msgKey) throws IOException {
-        Long queueKey = trackingIndex.remove(tx, msgKey);
-        if (queueKey != null) {
-            QueueRecord qr = queueIndex.remove(tx, queueKey);
+    /**
+     * Removes a queue record returning the corresponding element tracking number.
+     * @param tx The transaction under which to do the removal
+     * @param queueKey The queue key
+     * @return The store tracking. 
+     * @throws IOException
+     */
+    public long remove(Transaction tx, long queueKey) throws IOException {
+        QueueRecord qr = queueIndex.remove(tx, queueKey);
+        if(qr != null)
+        {
+            trackingIndex.remove(tx, qr.getMessageKey());
             getMetaData(tx).update(-1, -qr.getSize());
             tx.store(metaData, META_DATA_MARSHALLER, true);
-            return true;
+            return qr.getMessageKey();
         }
-        return false;
+        return -1;
     }
 
     public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, Long maxQueueKey, final int max) throws IOException {

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Tue Jun 30 17:11:16 2009
@@ -335,11 +335,13 @@
                     if (data.length == 1 && data.data[0] == BEGIN_UNIT_OF_WORK) {
                         uow = pageFile.tx();
                     } else if (data.length == 1 && data.data[0] == END_UNIT_OF_WORK) {
-                        rootEntity.setLastUpdate(recoveryPosition);
-                        uow.commit();
-                        redoCounter += uowCounter;
-                        uowCounter = 0;
-                        uow = null;
+                        if (uow != null) {
+                            rootEntity.setLastUpdate(recoveryPosition);
+                            uow.commit();
+                            redoCounter += uowCounter;
+                            uowCounter = 0;
+                            uow = null;
+                        }
                     } else if (data.length == 1 && data.data[0] == CANCEL_UNIT_OF_WORK) {
                         uow.rollback();
                         uow = null;
@@ -718,8 +720,9 @@
         qd.setQueueName(command.getQueueName());
         DestinationEntity destination = rootEntity.getDestination(qd);
         if (destination != null) {
-            if (destination.remove(tx, command.getMessageKey())) {
-                rootEntity.removeMessageRef(tx, command.getQueueName(), command.getMessageKey());
+            long messageKey = destination.remove(tx, command.getQueueKey());
+            if (messageKey >= 0) {
+                rootEntity.removeMessageRef(tx, command.getQueueName(), command.getQueueKey());
             }
         }
     }
@@ -940,9 +943,9 @@
             addUpdate(bean);
         }
 
-        public void queueRemoveMessage(QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
+        public void queueRemoveMessage(QueueDescriptor queue, Long queueKey) throws KeyNotFoundException {
             QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
-            bean.setMessageKey(messageKey);
+            bean.setQueueKey(queueKey);
             bean.setQueueName(queue.getQueueName());
             addUpdate(bean);
         }

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto Tue Jun 30 17:11:16 2009
@@ -85,7 +85,7 @@
 }  
 message QueueRemoveMessage {
   optional bytes queueName = 1 [java_override_type = "AsciiBuffer"];
-  optional int64 messageKey=2;
+  optional int64 queueKey=2;
 }  
 
 

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Tue Jun 30 17:11:16 2009
@@ -37,6 +37,8 @@
     private PersistListener persistListener = null;
     private final int size;
 
+    private long tid = -1;
+
     public interface PersistListener {
         public void onMessagePersisted(OpenWireMessageDelivery delivery);
     }
@@ -120,9 +122,15 @@
         return record;
     }
 
-    public Buffer getTransactionId() {
-        // TODO Auto-generated method stub
-        return null;
+    public long getTransactionId() {
+        return tid;
+    }
+    
+    /**
+     * @param tid
+     */
+    public void setTransactionId(long tid) {
+        this.tid  = tid;
     }
 
     public void setStoreWireFormat(OpenWireFormat wireFormat) {
@@ -145,4 +153,6 @@
     public String toString() {
         return message.getMessageId().toString();
     }
+
+    
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Tue Jun 30 17:11:16 2009
@@ -20,6 +20,11 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.JMSException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
 
 import org.apache.activemq.apollo.WindowLimiter;
 import org.apache.activemq.apollo.broker.Broker;
@@ -30,7 +35,9 @@
 import org.apache.activemq.apollo.broker.MessageDelivery;
 import org.apache.activemq.apollo.broker.ProtocolHandler;
 import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.apollo.broker.Transaction;
 import org.apache.activemq.apollo.broker.VirtualHost;
+import org.apache.activemq.apollo.broker.XidImpl;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -64,6 +71,7 @@
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.BooleanExpression;
@@ -93,6 +101,8 @@
     protected final HashMap<ProducerId, ProducerContext> producers = new HashMap<ProducerId, ProducerContext>();
     protected final HashMap<ConsumerId, ConsumerContext> consumers = new HashMap<ConsumerId, ConsumerContext>();
 
+    protected final ConcurrentHashMap<TransactionId, Transaction> transactions = new ConcurrentHashMap<TransactionId, Transaction>();
+
     protected BrokerConnection connection;
     private OpenWireFormat wireFormat;
     private OpenWireFormat storeWireFormat;
@@ -122,7 +132,7 @@
                         }
                     };
                     connections.put(info.getConnectionId(), connection);
-                    
+
                     // Connections have an implicitly created "default" session identified by session id = -1
                     SessionId sessionId = new SessionId(info.getConnectionId(), -1);
                     addSession(sessionId, connection);
@@ -228,7 +238,13 @@
 
                 OpenWireMessageDelivery md = new OpenWireMessageDelivery(info);
                 md.setStoreWireFormat(storeWireFormat);
-                md.setPersistListener(OpenwireProtocolHandler.this);
+                TransactionId tid = info.getTransactionId();
+                if (tid != null) {
+                    Transaction t = locateTransaction(tid);
+                    md.setTransactionId(t.getTid());
+                } else {
+                    md.setPersistListener(OpenwireProtocolHandler.this);
+                }
 
                 // Only producers that are not using a window will block,
                 // and if it blocks.
@@ -242,7 +258,12 @@
                 while (!producerContext.controller.offer(md, null)) {
                     producerContext.controller.waitForFlowUnblock();
                 }
-                return null;
+
+                if (tid != null) {
+                    return ack(info);
+                } else {
+                    return null;
+                }
             }
 
             public Response processMessageAck(MessageAck info) throws Exception {
@@ -298,6 +319,11 @@
             }
 
             public Response processConnectionControl(ConnectionControl info) throws Exception {
+                if (info != null) {
+                    if (info.isFaultTolerant()) {
+                        throw new UnsupportedOperationException("Fault Tolerance");
+                    }
+                }
                 return ack(info);
             }
 
@@ -339,34 +365,71 @@
             // Methods for transaction management
             // /////////////////////////////////////////////////////////////////
             public Response processBeginTransaction(TransactionInfo info) throws Exception {
-                throw new UnsupportedOperationException();
+                TransactionId tid = info.getTransactionId();
+
+                Transaction t = locateTransaction(tid);
+                if (t == null) {
+
+                    Buffer xid = null;
+                    if (tid.isXATransaction()) {
+                        xid = XidImpl.toBuffer((Xid) tid);
+                    }
+                    t = host.getTransactionManager().createTransaction(xid);
+                    transactions.put(tid, t);
+                }
+
+                return ack(info);
             }
 
-            public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
+            public Response processCommitTransactionOnePhase(final TransactionInfo info) throws Exception {
+                TransactionId tid = info.getTransactionId();
+                Transaction t = locateTransaction(tid);
+                t.commit(true, new Transaction.TransactionListener()
+                {
+                    
+                });
+                transactions.remove(tid);
                 throw new UnsupportedOperationException();
             }
 
             public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
+                TransactionId tid = info.getTransactionId();
+                Transaction t = locateTransaction(tid);
+                t.commit(false, null);
+                transactions.remove(tid);
                 throw new UnsupportedOperationException();
             }
 
             public Response processEndTransaction(TransactionInfo info) throws Exception {
+                //Shouldn't actually do anything, send by client to ensure that it is
+                //in sync with broker transaction state. 
+                //TODO need to investigate whether this should wait for prior transaction
+                //state to flush out?
                 throw new UnsupportedOperationException();
             }
 
             public Response processForgetTransaction(TransactionInfo info) throws Exception {
-                throw new UnsupportedOperationException();
+                return processRollbackTransaction(info);
             }
 
             public Response processPrepareTransaction(TransactionInfo info) throws Exception {
+                TransactionId tid = info.getTransactionId();
+                Transaction t = locateTransaction(tid);
+                t.prepare(null);
                 throw new UnsupportedOperationException();
             }
 
             public Response processRecoverTransactions(TransactionInfo info) throws Exception {
+                //TODO
                 throw new UnsupportedOperationException();
             }
 
             public Response processRollbackTransaction(TransactionInfo info) throws Exception {
+                TransactionId tid = info.getTransactionId();
+                Transaction t = locateTransaction(tid);
+                t.rollback(null);
+                transactions.remove(tid);
+                //TODO need to respond to this when the rollback completes
                 throw new UnsupportedOperationException();
             }
 
@@ -393,6 +456,27 @@
         };
     }
 
+    private Transaction locateTransaction(TransactionId tid) throws XAException, JMSException {
+        Transaction t;
+
+        if (tid.isLocalTransaction()) {
+            t = transactions.get(tid);
+        } else {
+            t = host.getTransactionManager().getXATransaction(XidImpl.toBuffer((Xid) tid));
+        }
+
+        if (t == null) {
+            if (tid.isXATransaction()) {
+                XAException e = new XAException("Transaction '" + tid + "' has not been started.");
+                e.errorCode = XAException.XAER_NOTA;
+                throw e;
+            } else {
+                throw new JMSException("Transaction '" + tid + "' has not been started.");
+            }
+        }
+        return t;
+    }
+
     public void start() throws Exception {
 
     }
@@ -594,17 +678,23 @@
             }
         }
 
-        public void ack(MessageAck info) {
+        public void ack(MessageAck info) throws XAException, JMSException {
             // TODO: The pending message queue could probably be optimized to
             // avoid having to create a new list here.
             int flowCredit = info.getMessageCount();
-            if( info.isDeliveredAck() ) {
+            if (info.isDeliveredAck()) {
                 // This ack is just trying to expand the flow control window size without actually 
                 // acking the message.  Keep track of how many limiter credits we borrow since they need
                 // to get paid back with real acks later.
                 borrowedLimterCredits += flowCredit;
                 limiter.onProtocolCredit(flowCredit);
-            } else if(info.isStandardAck()) {
+            } else if (info.isStandardAck()) {
+                TransactionId tid = info.getTransactionId();
+                Transaction transaction = null;
+                if (tid != null) {
+                    transaction = locateTransaction(tid);
+                }
+
                 LinkedList<SubscriptionDelivery<MessageDelivery>> acked = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
                 synchronized (this) {
                     MessageId id = info.getLastMessageId();
@@ -619,27 +709,35 @@
                             }
                         }
                     }
-                    
+
                     // Did we have DeliveredAcks previously sent?  Then the 
                     // the flow window has already been credited.  We need to 
                     // pay back the borrowed limiter credits before giving 
                     // credits directly to the limiter.
-                    if(borrowedLimterCredits>0) {
-                        if( flowCredit > borrowedLimterCredits ) {
+                    if (borrowedLimterCredits > 0) {
+                        if (flowCredit > borrowedLimterCredits) {
                             flowCredit -= borrowedLimterCredits;
-                            borrowedLimterCredits=0;
+                            borrowedLimterCredits = 0;
                         } else {
                             borrowedLimterCredits -= flowCredit;
-                            flowCredit=0;
+                            flowCredit = 0;
                         }
                     }
                     limiter.onProtocolCredit(flowCredit);
                 }
 
-                // Delete outside of synchronization on queue to avoid contention
-                // with enqueueing threads.
-                for (SubscriptionDelivery<MessageDelivery> callback : acked) {
-                    callback.acknowledge();
+                if (transaction == null) {
+                    // Delete outside of synchronization on queue to avoid contention
+                    // with enqueueing threads.
+                    for (SubscriptionDelivery<MessageDelivery> callback : acked) {
+                        callback.acknowledge();
+                    }
+                } else {
+                    // Delete outside of synchronization on queue to avoid contention
+                    // with enqueueing threads.
+                    for (SubscriptionDelivery<MessageDelivery> callback : acked) {
+                        transaction.addAck(callback);
+                    }
                 }
             }
         }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Tue Jun 30 17:11:16 2009
@@ -70,7 +70,7 @@
     BrokerDatabase database;
     BrokerQueueStore queueStore;
     private static final boolean USE_KAHA_DB = true;
-    private static final boolean PERSISTENT = false;
+    private static final boolean PERSISTENT = true;
     private static final boolean PURGE_STORE = true;
     // Producers send sync and operations are never canceled. 
     private static final boolean TEST_MAX_STORE_LATENCY = false;

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java Tue Jun 30 17:11:16 2009
@@ -124,6 +124,22 @@
         expirator.elementAdded(qe);
     }
 
+    public void remove(long sequence) {
+        QueueElement<V> qe = queue.lower(sequence, true);
+        if (qe == null) {
+            return;
+        } else if (qe.getSequence() == sequence) {
+            qe.acknowledge();
+        }
+        //Otherwise if the element is paged out, create a new
+        //holder and mark it for deletion
+        else if (persistencePolicy.isPageOutPlaceHolders()) {
+            //FIXME, need to track this delete otherwise an in flight message restore
+            //might load this element back in.
+            getQueueStore().deleteQueueElement(new QueueElement<V>(null, sequence, this));
+        }
+    }
+
     /**
      * @return True if the queue needs dispatching.
      */
@@ -865,9 +881,11 @@
                 }
 
                 if (saved) {
-                    queue.getQueueStore().deleteQueueElement(queue.getDescriptor(), elem);
+                    queue.getQueueStore().deleteQueueElement(this);
                 }
 
+                //FIXME need to track deletions when paging is enabled
+                //otherwise an in process restore might reload the element
                 elem = null;
                 unload(null);
 
@@ -1157,8 +1175,11 @@
             return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.activemq.queue.Subscription.SubscriptionDelivery#getSourceQueueRemovalKey()
+        /*
+         * (non-Javadoc)
+         * 
+         * @seeorg.apache.activemq.queue.Subscription.SubscriptionDelivery#
+         * getSourceQueueRemovalKey()
          */
         public long getSourceQueueRemovalKey() {
             return sequence;

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Tue Jun 30 17:11:16 2009
@@ -96,6 +96,10 @@
         };
     }
 
+    public synchronized void remove(long key) {
+        queue.remove(key);
+    }
+
     /*
      * (non-Javadoc)
      * 

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java Tue Jun 30 17:11:16 2009
@@ -93,5 +93,11 @@
      *            queue shutdown will proceed asynchronously.
      */
     public void shutdown(boolean sync);
+    
+    /**
+     * Removes the element with the given sequence from this queue
+     * @param key The sequence key. 
+     */
+    public void remove(long sequence);
 
 }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/PartitionedQueue.java Tue Jun 30 17:11:16 2009
@@ -71,10 +71,9 @@
 
         return rc;
     }
-    
-    
+
     abstract public IQueue<K, V> createPartition(int partitionKey);
-    
+
     /*
      * (non-Javadoc)
      * 
@@ -92,7 +91,7 @@
             }
         }
     }
-    
+
     public int getEnqueuedCount() {
         checkShutdown();
         synchronized (this) {
@@ -118,7 +117,6 @@
         }
     }
 
-
     public void setStore(QueueStore<K, V> store) {
         this.store = store;
     }
@@ -149,7 +147,6 @@
             persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
         }
     }
-    
 
     public synchronized void start() {
         if (!started) {
@@ -172,9 +169,8 @@
         }
     }
 
-
     public void shutdown(boolean sync) {
-        Collection <IQueue<K, V>> partitions = null;
+        Collection<IQueue<K, V>> partitions = null;
         synchronized (this) {
             if (!shutdown) {
                 shutdown = true;
@@ -225,7 +221,6 @@
         return partitionMapper;
     }
 
-    
     public void add(V value, ISourceController<?> source) {
         int partitionKey = partitionMapper.map(value);
         getPartition(partitionKey).add(value, source);
@@ -235,7 +230,7 @@
         int partitionKey = partitionMapper.map(value);
         return getPartition(partitionKey).offer(value, source);
     }
-    
+
     public void setKeyMapper(Mapper<K, V> keyMapper) {
         this.keyMapper = keyMapper;
     }
@@ -254,9 +249,8 @@
             }
         }
     }
-    
-    protected Collection<IQueue<K, V>> getPartitions()
-    {
+
+    protected Collection<IQueue<K, V>> getPartitions() {
         return partitions.values();
     }
 
@@ -265,8 +259,10 @@
             throw new IllegalStateException(this + " is shutdown");
         }
     }
-    
-    /* (non-Javadoc)
+
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.activemq.queue.IPollableFlowSource#isDispatchReady()
      */
     public boolean isDispatchReady() {
@@ -274,7 +270,9 @@
         throw new UnsupportedOperationException();
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.activemq.queue.IPollableFlowSource#poll()
      */
     public V poll() {
@@ -282,7 +280,9 @@
         throw new UnsupportedOperationException();
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.activemq.queue.IPollableFlowSource#pollingDispatch()
      */
     public boolean pollingDispatch() {
@@ -290,12 +290,19 @@
         throw new UnsupportedOperationException();
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.activemq.flow.ISinkController.FlowControllable#flowElemAccepted(org.apache.activemq.flow.ISourceController, java.lang.Object)
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.activemq.flow.ISinkController.FlowControllable#flowElemAccepted
+     * (org.apache.activemq.flow.ISourceController, java.lang.Object)
      */
     public void flowElemAccepted(ISourceController<V> source, V elem) {
         // TODO Remove
         throw new UnsupportedOperationException();
-        
+    }
+
+    public void remove(long key) {
+        throw new UnsupportedOperationException();
     }
 }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java Tue Jun 30 17:11:16 2009
@@ -95,12 +95,10 @@
     /**
      * Asynchronously deletes an element from the store.
      * 
-     * @param descriptor
-     *            The queue descriptor
      * @param element
      *            The element to delete.
      */
-    public void deleteQueueElement(QueueDescriptor descriptor, V element);
+    public void deleteQueueElement(SaveableQueueElement<V> elem);
 
     /**
      * Asynchronously saves the given element to the store

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Tue Jun 30 17:11:16 2009
@@ -351,6 +351,12 @@
         }
     }
 
+    public void remove(long key) {
+        synchronized (mutex) {
+            queue.remove(key);
+        }
+    }
+
     public boolean offer(V elem, ISourceController<?> source) {
         synchronized (mutex) {
 
@@ -542,11 +548,10 @@
                 } else {
                     cursor.reset(queue.getFirstSequence());
                 }
-                
+
                 if (DEBUG)
                     System.out.println("Starting " + this + " at " + cursor);
 
-
                 updateDispatchList();
             }
         }
@@ -701,17 +706,16 @@
             SubscriptionDelivery<V> callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
             // If the sub is a browser don't pass it a callback since it does not need to 
             // delete messages
-            if( sub.isBrowser() ) { 
+            if (sub.isBrowser()) {
                 callback = null;
             }
-            
+
             // See if the sink has room:
             qe.setAcquired(sub);
             if (sub.offer(qe.elem, this, callback)) {
                 if (DEBUG)
                     System.out.println("Dispatched " + qe.getElement() + " to " + this);
 
-                
                 if (!sub.isBrowser()) {
 
                     // If remove on dispatch acknowledge now:

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueueOld.java Tue Jun 30 17:11:16 2009
@@ -172,6 +172,11 @@
         return sinkController.offer(elem, source);
     }
     
+
+    public void remove(long key) {
+        throw new UnsupportedOperationException();
+    }
+    
     /**
      * Called when the controller accepts a message for this queue.
      */

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java Tue Jun 30 17:11:16 2009
@@ -208,7 +208,7 @@
 
         }
 
-        public final void deleteQueueElement(QueueDescriptor descriptor, Message elem) {
+        public final void deleteQueueElement(SaveableQueueElement<Message> elem) {
 
         }
 

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Tue Jun 30 17:11:16 2009
@@ -23,7 +23,6 @@
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompFrame;
 import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
 
 public class StompMessageDelivery extends BrokerMessageDelivery {
 
@@ -38,6 +37,8 @@
     private PersistListener persistListener = null;
     private long tte = Long.MIN_VALUE;
 
+    private long tid = -1;
+
     public interface PersistListener {
         public void onMessagePersisted(StompMessageDelivery delivery);
     }
@@ -152,9 +153,12 @@
         return record;
     }
 
-    public Buffer getTransactionId() {
-        // TODO Auto-generated method stub
-        return null;
+    public long getTransactionId() {
+        return tid;
+    }
+
+    public void setTransactionId(long tid) {
+        this.tid = tid;
     }
 
     public MessageEvaluationContext createMessageEvaluationContext() {

Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Tue Jun 30 17:11:16 2009
@@ -639,7 +639,13 @@
          */
         public void queueAddMessage(QueueDescriptor queue, QueueRecord record) throws KeyNotFoundException;
 
-        public void queueRemoveMessage(QueueDescriptor queue, Long messageKey) throws KeyNotFoundException;
+        /**
+         * Deletes an element from a queue.
+         * @param queue The queue
+         * @param queueKey The element's queue key as given by {@link QueueRecord#setQueueKey(Long)}
+         * @throws KeyNotFoundException
+         */
+        public void queueRemoveMessage(QueueDescriptor queue, Long queueKey) throws KeyNotFoundException;
 
         public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxSequence, int max) throws KeyNotFoundException;
 

Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Tue Jun 30 17:11:16 2009
@@ -98,8 +98,6 @@
         QueueDescriptor descriptor;
 
         TreeMap<Long, QueueRecord> records = new TreeMap<Long, QueueRecord>(Comparators.LONG_COMPARATOR);
-        // Maps tracking to sequence number:
-        HashMap<Long, Long> trackingMap = new HashMap<Long, Long>();
         int count = 0;
         long size = 0;
         HashMap<QueueDescriptor, StoredQueue> partitions;
@@ -111,20 +109,18 @@
 
         public void add(QueueRecord record) {
             records.put(record.getQueueKey(), record);
-            trackingMap.put(record.getMessageKey(), record.getQueueKey());
             count++;
             size += record.getSize();
         }
 
-        public boolean remove(Long msgKey) {
-            Long sequenceKey = trackingMap.remove(msgKey);
-            if (sequenceKey != null) {
-                QueueRecord record = records.remove(sequenceKey);
+        public long remove(Long queueKey) {
+            QueueRecord record = records.remove(queueKey);
+            if (record != null) {
                 count--;
                 size -= record.getSize();
-                return true;
+                return record.getMessageKey();
             }
-            return false;
+            return -1;
         }
 
         public Iterator<QueueRecord> list(Long firstQueueKey, long maxSequence, int max) {
@@ -230,11 +226,11 @@
 
     static private class RemoveOp {
         QueueDescriptor queue;
-        Long messageKey;
+        Long queueKey;
 
-        public RemoveOp(QueueDescriptor queue, Long messageKey) {
+        public RemoveOp(QueueDescriptor queue, Long queueKey) {
             this.queue = queue;
-            this.messageKey = messageKey;
+            this.queueKey = queueKey;
         }
     }
 
@@ -244,7 +240,7 @@
 
         public void commit(MemorySession session) throws KeyNotFoundException {
             for (RemoveOp op : removes) {
-                session.queueRemoveMessage(op.queue, op.messageKey);
+                session.queueRemoveMessage(op.queue, op.queueKey);
             }
         }
 
@@ -258,8 +254,8 @@
             adds.add(messageKey);
         }
 
-        public void removeMessage(QueueDescriptor queue, Long messageKey) {
-            removes.add(new RemoveOp(queue, messageKey));
+        public void removeMessage(QueueDescriptor queue, Long queueKey) {
+            removes.add(new RemoveOp(queue, queueKey));
         }
     }
 
@@ -517,8 +513,10 @@
             }
         }
 
-        public void queueRemoveMessage(QueueDescriptor queue, Long msgKey) throws KeyNotFoundException {
-            if (get(queues, queue.getQueueName()).remove(msgKey)) {
+        public void queueRemoveMessage(QueueDescriptor queue, Long queueKey) throws KeyNotFoundException {
+            long msgKey = get(queues, queue.getQueueName()).remove(queueKey);
+            if(msgKey >= 0)
+            {   
                 deleteMessageReference(msgKey);
             }
         }

Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Tue Jun 30 17:11:16 2009
@@ -324,7 +324,7 @@
                             for (Iterator<QueueRecord> iterator = queueRecords; iterator.hasNext();) {
                                 QueueRecord r = iterator.next();
                                 records.add(session.messageGetRecord(r.getMessageKey()));
-                                session.queueRemoveMessage(queueId, r.messageKey);
+                                session.queueRemoveMessage(queueId, r.queueKey);
                             }
                         }
                     };

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java?rev=789831&r1=789830&r2=789831&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SortedLinkedList.java Tue Jun 30 17:11:16 2009
@@ -57,6 +57,13 @@
             prev.linkAfter(node);
         }
     }
+    
+    /**
+     * @param sequence The sequence number of the element to get.
+     */
+    public T get(long sequence) {
+        return index.get(sequence);
+    }
 
     public T lower(long sequence, boolean inclusive) {
         Entry<Long, T> lower = index.floorEntry(sequence);
@@ -154,4 +161,6 @@
         }
         return rc;
     }
+
+    
 }