You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/28 21:37:07 UTC

svn commit: r789143 [1/2] - in /activemq/sandbox/activemq-flow: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ activemq-openwire/src/main/java/org/a...

Author: cmacnaug
Date: Sun Jun 28 19:37:06 2009
New Revision: 789143

URL: http://svn.apache.org/viewvc?rev=789143&view=rev
Log:
Adding the beginnings of Transaction support for the broker.
Also added support for Maps in avtivemq-kaha store implementation.

Not yet functional or tested. This commit adds in the TransactionManager and related transaction objects, and the backing queue/store logic. Still need to test this and implement actual commit and rollback. 

Added:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java   (with props)
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java   (with props)
Modified:
    activemq/sandbox/activemq-flow/activemq-broker/pom.xml
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
    activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
    activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/pom.xml?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/pom.xml Sun Jun 28 19:37:06 2009
@@ -226,7 +226,6 @@
         </dependency>
       </dependencies>
     </profile>
-    <!--
     <profile>
       <id>default-tools.jar</id>
       <activation>
@@ -246,7 +245,6 @@
         </dependency>
       </dependencies>
     </profile>
-  -->
   </profiles>
 
 </project>

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Sun Jun 28 19:37:06 2009
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -45,6 +46,8 @@
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.QueueDescriptor;
 import org.apache.activemq.queue.RestoreListener;
 import org.apache.activemq.queue.RestoredElement;
@@ -67,8 +70,6 @@
     private AtomicBoolean running = new AtomicBoolean(false);
     private DatabaseListener listener;
 
-    private HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
-
     private final LinkedNodeList<OperationBase> opQueue;
     private AtomicBoolean notify = new AtomicBoolean(false);
     private Semaphore opsReady = new Semaphore(0);
@@ -92,6 +93,23 @@
         public void onDatabaseException(IOException ioe);
     }
 
+    public static interface MessageRecordMarshaller<V> {
+        MessageRecord marshal(V element);
+
+        /**
+         * Called when a queue element is recovered from the store for a
+         * particular queue.
+         * 
+         * @param mRecord
+         *            The message record
+         * @param queue
+         *            The queue that the element is being restored to (or null
+         *            if not being restored for a queue)
+         * @return
+         */
+        V unMarshall(MessageRecord mRecord, QueueDescriptor queue);
+    }
+
     public BrokerDatabase(Store store) {
         this.store = store;
         this.opQueue = new LinkedNodeList<OperationBase>();
@@ -169,6 +187,16 @@
         }
     }
 
+    /**
+     * A blocking operation that lists all queues of a given type:
+     * 
+     * @param type
+     *            The queue type
+     * @return A list of queues.
+     * 
+     * @throws Exception
+     *             If there was an error listing the queues.
+     */
     public Iterator<QueueQueryResult> listQueues(final short type) throws Exception {
         return store.execute(new Callback<Iterator<QueueQueryResult>, Exception>() {
 
@@ -180,6 +208,42 @@
     }
 
     /**
+     * A blocking operation that lists all entries in the specified map
+     * 
+     * @param map
+     *            The map to list
+     * @return A list of map entries
+     * 
+     * @throws Exception
+     *             If there was an error listing the queues.
+     */
+    public Map<AsciiBuffer, Buffer> listMapEntries(final AsciiBuffer map) throws Exception {
+        return store.execute(new Callback<Map<AsciiBuffer, Buffer>, Exception>() {
+
+            public Map<AsciiBuffer, Buffer> execute(Session session) throws Exception {
+                HashMap<AsciiBuffer, Buffer> ret = new HashMap<AsciiBuffer, Buffer>();
+                Iterator<AsciiBuffer> keys = session.mapEntryListKeys(map, null, -1);
+                while (keys.hasNext()) {
+                    AsciiBuffer key = keys.next();
+                    ret.put(key, session.mapEntryGet(map, key));
+                }
+
+                return ret;
+            }
+
+        }, null);
+    }
+
+    /**
+     * @param map The name of the map to update.
+     * @param key The key in the map to update.
+     * @param value The value to insert.
+     */
+    public OperationContext updateMapEntry(AsciiBuffer map, AsciiBuffer key, Buffer value) {
+        return add(new MapUpdateOperation(map, key, value), null, false) ;
+    }
+
+    /**
      * Executes user supplied {@link Operation}. If the {@link Operation} does
      * not throw any Exceptions, all updates to the store are committed,
      * otherwise they are rolled back. Any exceptions thrown by the
@@ -481,20 +545,20 @@
      * @return The {@link OperationContext} associated with the operation
      */
     public OperationContext saveMessage(SaveableQueueElement<MessageDelivery> queueElement, ISourceController<?> source, boolean delayable) {
-        return add(new AddMessageOperation(queueElement), source, delayable);
+        return add(new AddMessageOperation(queueElement), source, !delayable);
     }
 
     /**
      * Deletes the given message from the store for the given queue.
      * 
-     * @param delivery
-     *            The delivery.
+     * @param storeTracking
+     *            The tracking number of the element being deleted
      * @param queue
      *            The queue.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext deleteMessage(MessageDelivery delivery, QueueDescriptor queue) {
-        return add(new DeleteMessageOperation(delivery.getStoreTracking(), queue), null, false);
+    public OperationContext deleteQueueElement(long storeTracking, QueueDescriptor queue) {
+        return add(new DeleteOperation(storeTracking, queue), null, false);
     }
 
     /**
@@ -519,8 +583,9 @@
      *            The listener to which messags should be passed.
      * @return The {@link OperationContext} associated with the operation
      */
-    public OperationContext restoreMessages(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
-        return add(new RestoreMessageOperation(queue, recordsOnly, first, maxCount, maxSequence, listener), null, true);
+    public <T> OperationContext restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<T> listener,
+            MessageRecordMarshaller<T> marshaller) {
+        return add(new RestoreElementsOperation<T>(queue, recordsOnly, first, maxCount, maxSequence, listener, marshaller), null, true);
     }
 
     private void onDatabaseException(IOException ioe) {
@@ -783,18 +848,17 @@
         }
     }
 
-    private class DeleteMessageOperation extends OperationBase {
+    private class DeleteOperation extends OperationBase {
         private final long storeTracking;
         private QueueDescriptor queue;
 
-        public DeleteMessageOperation(long tracking, QueueDescriptor queue) {
+        public DeleteOperation(long tracking, QueueDescriptor queue) {
             this.storeTracking = tracking;
             this.queue = queue;
         }
 
         @Override
         public int getLimiterSize() {
-            // Might consider bumping this up to avoid too much accumulation?
             return BASE_MEM_SIZE + 8;
         }
 
@@ -821,22 +885,24 @@
         }
     }
 
-    private class RestoreMessageOperation extends OperationBase {
+    private class RestoreElementsOperation<V> extends OperationBase {
         private QueueDescriptor queue;
         private long firstKey;
         private int maxRecords;
         private long maxSequence;
         private boolean recordsOnly;
-        private RestoreListener<MessageDelivery> listener;
-        private Collection<RestoredElement<MessageDelivery>> msgs = null;
+        private RestoreListener<V> listener;
+        private Collection<RestoredElement<V>> msgs = null;
+        private MessageRecordMarshaller<V> marshaller;
 
-        RestoreMessageOperation(QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<MessageDelivery> listener) {
+        RestoreElementsOperation(QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<V> listener, MessageRecordMarshaller<V> marshaller) {
             this.queue = queue;
             this.recordsOnly = recordsOnly;
             this.firstKey = firstKey;
             this.maxRecords = maxRecords;
             this.maxSequence = maxSequence;
             this.listener = listener;
+            this.marshaller = marshaller;
         }
 
         @Override
@@ -850,9 +916,9 @@
             Iterator<QueueRecord> records = null;
             try {
                 records = session.queueListMessagesQueue(queue, firstKey, maxSequence, maxRecords);
-                msgs = new LinkedList<RestoredElement<MessageDelivery>>();
+                msgs = new LinkedList<RestoredElement<V>>();
             } catch (KeyNotFoundException e) {
-                msgs = new ArrayList<RestoredElement<MessageDelivery>>(0);
+                msgs = new ArrayList<RestoredElement<V>>(0);
                 return;
             }
 
@@ -863,9 +929,10 @@
             }
 
             while (qRecord != null) {
-                RestoredMessageImpl rm = new RestoredMessageImpl();
+                RestoredElementImpl<V> rm = new RestoredElementImpl<V>();
                 // TODO should update jms redelivery here.
                 rm.qRecord = qRecord;
+                rm.queue = queue;
                 count++;
 
                 // Set the next sequence number:
@@ -890,15 +957,7 @@
                 if (!recordsOnly) {
                     try {
                         rm.mRecord = session.messageGetRecord(rm.qRecord.getMessageKey());
-                        rm.handler = protocolHandlers.get(rm.mRecord.getEncoding().toString());
-                        if (rm.handler == null) {
-                            try {
-                                rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.getEncoding().toString());
-                                protocolHandlers.put(rm.mRecord.getEncoding().toString(), rm.handler);
-                            } catch (Throwable thrown) {
-                                throw new RuntimeException("Unknown message format" + rm.mRecord.getEncoding().toString(), thrown);
-                            }
-                        }
+                        rm.marshaller = marshaller;
                         msgs.add(rm);
                     } catch (KeyNotFoundException shouldNotHappen) {
                         shouldNotHappen.printStackTrace();
@@ -1037,20 +1096,51 @@
         }
     }
 
-    private class RestoredMessageImpl implements RestoredElement<MessageDelivery> {
+    private class MapUpdateOperation extends OperationBase {
+        final AsciiBuffer map;
+        final AsciiBuffer key;
+        final Buffer value;
+
+        MapUpdateOperation(AsciiBuffer mapName, AsciiBuffer key, Buffer value) {
+            this.map = mapName;
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public int getLimiterSize() {
+            return BASE_MEM_SIZE + map.length + key.length + value.length;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.apollo.broker.BrokerDatabase.OperationBase#doExcecute
+         * (org.apache.activemq.broker.store.Store.Session)
+         */
+        @Override
+        protected void doExcecute(Session session) {
+            try {
+                session.mapEntryPut(map, key, value);
+            } catch (KeyNotFoundException e) {
+                throw new Store.FatalStoreException(e);
+            }
+        }
+    }
+
+    private class RestoredElementImpl<T> implements RestoredElement<T> {
         QueueRecord qRecord;
+        QueueDescriptor queue;
         MessageRecord mRecord;
-        ProtocolHandler handler;
+        MessageRecordMarshaller<T> marshaller;
         long nextSequence;
 
-        public MessageDelivery getElement() throws IOException {
+        public T getElement() throws IOException {
             if (mRecord == null) {
                 return null;
             }
-
-            BrokerMessageDelivery delivery = handler.createMessageDelivery(mRecord);
-            delivery.setFromDatabase(BrokerDatabase.this, mRecord);
-            return delivery;
+            return marshaller.unMarshall(mRecord, queue);
         }
 
         /*
@@ -1092,7 +1182,6 @@
          * org.apache.activemq.queue.QueueStore.RestoredElement#getElementSize()
          */
         public int getElementSize() {
-            // TODO Auto-generated method stub
             return qRecord.getSize();
         }
 
@@ -1105,11 +1194,9 @@
         public long getExpiration() {
             return qRecord.getTte();
         }
-
     }
 
     public long allocateStoreTracking() {
-        // TODO Auto-generated method stub
         return store.allocateStoreTracking();
     }
 
@@ -1121,8 +1208,72 @@
         this.dispatcher = dispatcher;
     }
 
-	public Store getStore() {
-		return store;
-	}
+    public Store getStore() {
+        return store;
+    }
+
+    /**
+     * @param sqe
+     * @param source
+     * @param delayable
+     */
+    public <T> OperationContext saveQeueuElement(SaveableQueueElement<T> sqe, ISourceController<?> source, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+        return add(new AddElementOpOperation<T>(sqe, delayable, marshaller), source, !delayable);
+    }
+
+    private class AddElementOpOperation<T> extends OperationBase {
+
+        private final SaveableQueueElement<T> op;
+        private MessageRecord record;
+        private boolean delayable;
+        private final MessageRecordMarshaller<T> marshaller;
+
+        public AddElementOpOperation(SaveableQueueElement<T> op, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+            this.op = op;
+            this.delayable = delayable;
+            if (!delayable) {
+                record = marshaller.marshal(op.getElement());
+                this.marshaller = null;
+            } else {
+                this.marshaller = marshaller;
+            }
+        }
+
+        public boolean isDelayable() {
+            return delayable;
+        }
+
+        @Override
+        public int getLimiterSize() {
+            return record.getSize() + BASE_MEM_SIZE + 40;
+        }
+
+        @Override
+        protected void doExcecute(Session session) {
+
+            if (record == null) {
+                record = marshaller.marshal(op.getElement());
+            }
 
+            session.messageAdd(record);
+            try {
+                QueueRecord queueRecord = new QueueRecord();
+                queueRecord.setAttachment(null);
+                queueRecord.setMessageKey(record.getKey());
+                queueRecord.setSize(record.getSize());
+                queueRecord.setQueueKey(op.getSequenceNumber());
+                session.queueAddMessage(op.getQueueDescriptor(), queueRecord);
+            } catch (KeyNotFoundException e) {
+                e.printStackTrace();
+            }
+        }
+
+        @Override
+        public void onCommit() {
+        }
+
+        public String toString() {
+            return "AddTxOpOperation " + record.getKey() + super.toString();
+        }
+    }
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Sun Jun 28 19:37:06 2009
@@ -127,7 +127,7 @@
         }
 
         if (!deleted) {
-            store.deleteMessage(this, queue);
+            store.deleteQueueElement(getStoreTracking(), queue);
         }
 
         if (firePersistListener) {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Sun Jun 28 19:37:06 2009
@@ -21,13 +21,14 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PrioritySizeLimiter;
 import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.queue.ExclusivePersistentQueue;
 import org.apache.activemq.queue.IPartitionedQueue;
 import org.apache.activemq.queue.IQueue;
@@ -53,6 +54,48 @@
     private BrokerDatabase database;
     private IDispatcher dispatcher;
 
+    private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
+    private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+         * #marshal(java.lang.Object)
+         */
+        public MessageRecord marshal(MessageDelivery element) {
+            return element.createMessageRecord();
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+         * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
+         */
+        public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
+            ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString());
+            if (handler == null) {
+                try {
+                    handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString());
+                    protocolHandlers.put(record.getEncoding().toString(), handler);
+                } catch (Throwable thrown) {
+                    throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown);
+                }
+            }
+            try {
+                return handler.createMessageDelivery(record);
+            } catch (IOException ioe) {
+                throw new RuntimeException(ioe);
+            }
+        }
+    };
+
+    final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> getMessageMarshaller() {
+        return MESSAGE_MARSHALLER;
+    }
+
     private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
         public Long map(MessageDelivery element) {
             return element.getExpiration();
@@ -63,14 +106,35 @@
         public Integer map(MessageDelivery element) {
             return element.getFlowLimiterSize();
         }
+    };    
+
+    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            return element.getPriority();
+        }
     };
 
-    private final short PARTITION_TYPE = 0;
-    private final short SHARED_QUEUE_TYPE = 1;
-    private final short DURABLE_QUEUE_TYPE = 2;
+    static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
+        public Long map(MessageDelivery element) {
+            return element.getStoreTracking();
+        }
+    };
+
+    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+        public Integer map(MessageDelivery element) {
+            // we modulo 10 to have at most 10 partitions which the producers
+            // gets split across.
+            return (int) (element.getProducerId().hashCode() % 10);
+        }
+    };
+    
+    public static final short SUBPARTITION_TYPE = 0;
+    public static final short SHARED_QUEUE_TYPE = 1;
+    public static final short DURABLE_QUEUE_TYPE = 2;
+    public static short TRANSACTION_QUEUE_TYPE = 3;
 
     private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
-    private final HashMap<String, ExclusivePersistentQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, ExclusivePersistentQueue<Long, MessageDelivery>>();
+    private final HashMap<String, IQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
 
     private Mapper<Integer, MessageDelivery> partitionMapper;
 
@@ -79,7 +143,7 @@
     // Be default we don't page out elements to disk.
     private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
     //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
-    
+
     private static long dynamicQueueCounter = 0;
 
     private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
@@ -122,7 +186,7 @@
     // Be default we don't page out elements to disk.
     //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
     private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
-    
+
     private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
 
         private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
@@ -158,26 +222,6 @@
         }
     };
 
-    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
-        public Integer map(MessageDelivery element) {
-            return element.getPriority();
-        }
-    };
-
-    static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
-        public Long map(MessageDelivery element) {
-            return element.getStoreTracking();
-        }
-    };
-
-    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
-        public Integer map(MessageDelivery element) {
-            // we modulo 10 to have at most 10 partitions which the producers
-            // gets split across.
-            return (int) (element.getProducerId().hashCode() % 10);
-        }
-    };
-
     public void setDatabase(BrokerDatabase database) {
         this.database = database;
     }
@@ -201,13 +245,13 @@
         results = database.listQueues(DURABLE_QUEUE_TYPE);
         while (results.hasNext()) {
             QueueQueryResult loaded = results.next();
-            ExclusivePersistentQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
+            IQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
             durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
             LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
 
         }
     }
-
+    
     private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
 
         IQueue<Long, MessageDelivery> queue;
@@ -237,7 +281,7 @@
 
     }
 
-    private ExclusivePersistentQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
+    private IQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
 
         ExclusivePersistentQueue<Long, MessageDelivery> queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
         queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
@@ -258,6 +302,11 @@
 
     }
 
+    public IQueue<Long, MessageDelivery> getQueue(AsciiBuffer queueName) {
+        //TODO
+        return null;
+    }
+
     public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
         synchronized (this) {
             Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
@@ -267,8 +316,8 @@
         }
     }
 
-    public ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueue(String name) {
-        ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
+    public IQueue<Long, MessageDelivery> createDurableQueue(String name) {
+        IQueue<Long, MessageDelivery> queue = null;
         synchronized (this) {
             queue = durableQueues.get(name);
             if (queue == null) {
@@ -283,12 +332,10 @@
         return queue;
     }
 
-    
-    
     public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
         ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
         synchronized (this) {
-            String name = "temp:"+(dynamicQueueCounter++);
+            String name = "temp:" + (dynamicQueueCounter++);
             queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
             queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
             queue.initialize(0, 0, 0, 0);
@@ -296,12 +343,11 @@
         }
         return queue;
     }
-    
-    
-    public Collection<ExclusivePersistentQueue<Long, MessageDelivery>> getDurableQueues() {
+
+    public Collection<IQueue<Long, MessageDelivery>> getDurableQueues() {
         synchronized (this) {
-            Collection<ExclusivePersistentQueue<Long, MessageDelivery>> c = durableQueues.values();
-            ArrayList<ExclusivePersistentQueue<Long, MessageDelivery>> ret = new ArrayList<ExclusivePersistentQueue<Long, MessageDelivery>>(c.size());
+            Collection<IQueue<Long, MessageDelivery>> c = durableQueues.values();
+            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
             ret.addAll(c);
             return ret;
         }
@@ -397,7 +443,7 @@
             throw new IllegalArgumentException("Unknown queue type" + type);
         }
         }
-        ret.getDescriptor().setApplicationType(PARTITION_TYPE);
+        ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
         ret.setDispatcher(dispatcher);
         ret.setStore(this);
         ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
@@ -419,8 +465,8 @@
     }
 
     public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount,
-            org.apache.activemq.queue.RestoreListener<MessageDelivery> listener) {
-        database.restoreMessages(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener);
+            RestoreListener<MessageDelivery> listener) {
+        database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
     }
 
     public final void addQueue(QueueDescriptor queue) {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java Sun Jun 28 19:37:06 2009
@@ -22,19 +22,19 @@
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.Subscription;
 
 public class DurableSubscription implements BrokerSubscription, DeliveryTarget {
 
-    private final ExclusivePersistentQueue<Long, MessageDelivery> queue;
+    private final IQueue<Long, MessageDelivery> queue;
     private final VirtualHost host;
     private final Destination destination;
     private Subscription<MessageDelivery> connectedSub;
     boolean started = false;
     BooleanExpression selector;
 
-    DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector, ExclusivePersistentQueue<Long, MessageDelivery> queue) {
+    DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector, IQueue<Long, MessageDelivery> queue) {
         this.host = host;
         this.queue = queue;
         this.destination = destination;

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.IOException;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.queue.IQueue;
+
+/**
+ * LocalTransaction
+ * <p>
+ * Description:
+ * </p>
+ * 
+ * @author cmacnaug
+ * @version 1.0
+ */
+public class LocalTransaction extends Transaction {
+
+    LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+        super(manager, tid, opQueue);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
+     */
+    @Override
+    public void commit(boolean onePhase) throws XAException, IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.apollo.broker.Transaction#prepare()
+     */
+    @Override
+    public int prepare() throws XAException, IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.apollo.broker.Transaction#rollback()
+     */
+    @Override
+    public void rollback() throws XAException, IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.apollo.broker.Transaction#getType()
+     */
+    @Override
+    public byte getType() {
+        return TYPE_LOCAL;
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java Sun Jun 28 19:37:06 2009
@@ -124,4 +124,5 @@
      * @return
      */
     public MessageEvaluationContext createMessageEvaluationContext();
+
 }

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java Sun Jun 28 19:37:06 2009
@@ -26,7 +26,7 @@
 
 /**
  * @author cmacnaug
- *
+ * 
  */
 public class MessageDeliveryWrapper implements MessageDelivery {
 
@@ -151,9 +151,10 @@
     public boolean isPersistent() {
         return delegate.isPersistent();
     }
-    
-    
-    /** (non-Javadoc)
+
+    /**
+     * (non-Javadoc)
+     * 
      * @see org.apache.activemq.apollo.broker.MessageDelivery#getExpiration()
      */
     public long getExpiration() {
@@ -203,5 +204,4 @@
     MessageDeliveryWrapper(MessageDelivery delivery) {
         delegate = delivery;
     }
-
 }

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.IOException;
+
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+
+/**
+ * Keeps track of all the actions the need to be done when a transaction does a
+ * commit or rollback.
+ * 
+ * @version $Revision: 1.5 $
+ */
+public abstract class Transaction {
+
+    public static final byte START_STATE = 0; // can go to: 1,2,3
+    public static final byte IN_USE_STATE = 1; // can go to: 2,3
+    public static final byte PREPARED_STATE = 2; // can go to: 3
+    public static final byte FINISHED_STATE = 3;
+
+    static final byte TYPE_LOCAL = 0;
+    static final byte TYPE_XA = 1;
+
+    private byte state = START_STATE;
+    private final TransactionManager manager;
+    private final long tid;
+    private final IQueue<Long, TxOp> opQueue;
+
+    Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+        this.manager = manager;
+        this.opQueue = opQueue;
+        this.tid = tid;
+    }
+
+    /**
+     * @return the unique identifier used by the {@link TransactionManager} to
+     *         identify this {@link Transaction}
+     * 
+     */
+    public long getTid() {
+        return tid;
+    }
+
+    public AsciiBuffer getBackingQueueName() {
+        return opQueue.getDescriptor().getQueueName();
+    }
+
+    /**
+     * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL}
+     */
+    public abstract byte getType();
+
+    public void addMessage(BrokerMessageDelivery m, ISourceController<?> source) {
+
+        synchronized (this) {
+            switch (state) {
+            case START_STATE:
+            case IN_USE_STATE:
+                opQueue.add(new TxMessage(m, this), source);
+                break;
+            default: {
+                throw new IllegalStateException("Can't add message to finished or prepared transaction");
+            }
+            }
+        }
+    }
+
+    public void addAck(SubscriptionDelivery<MessageDelivery> toAck) {
+        synchronized (this) {
+            switch (state) {
+            case START_STATE:
+            case IN_USE_STATE:
+                IQueue<Long, MessageDelivery> target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName());
+                //Queue could be null if it was just deleted:
+                if (target != null) {
+                    opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), this), null);
+                }
+                break;
+            default: {
+                throw new IllegalStateException("Can't add message to finished or prepared transaction");
+            }
+            }
+        }
+    }
+
+    public byte getState() {
+        return state;
+    }
+
+    public void setState(byte state) {
+        this.state = state;
+    }
+
+    public void prePrepare() throws Exception {
+
+        // Is it ok to call prepare now given the state of the
+        // transaction?
+        switch (state) {
+        case START_STATE:
+        case IN_USE_STATE:
+            break;
+        default:
+            XAException xae = new XAException("Prepare cannot be called now.");
+            xae.errorCode = XAException.XAER_PROTO;
+            throw xae;
+        }
+
+        //TODO:
+    }
+
+    protected void fireAfterCommit() throws Exception {
+
+        //TODO
+    }
+
+    public void fireAfterRollback() throws Exception {
+        //TODO
+    }
+
+    public String toString() {
+        return super.toString() + "[queue=" + opQueue + "]";
+    }
+
+    public abstract void commit(boolean onePhase) throws XAException, IOException;
+
+    public abstract void rollback() throws XAException, IOException;
+
+    public abstract int prepare() throws XAException, IOException;
+
+    public boolean isPrepared() {
+        return getState() == PREPARED_STATE;
+    }
+
+    public long size() {
+        return opQueue.getEnqueuedCount();
+    }
+
+    interface TxOp {
+        public static final short TYPE_MESSAGE = 0;
+        public static final short TYPE_ACK = 1;
+
+        public short getType();
+
+        public <T> T asType(Class<T> type);
+
+        public void onRollback();
+
+        public void onCommit();
+
+        public int getLimiterSize();
+
+        public boolean isFromStore();
+
+        public long getStoreTracking();
+
+        public MessageRecord createMessageRecord();
+
+        /**
+         * @return
+         */
+        public boolean isPersistent();
+
+        /**
+         * @return
+         */
+        public Long getExpiration();
+
+        public int getPriority();
+    }
+
+    static class TxMessage implements TxOp {
+        MessageDelivery message;
+        Transaction tx;
+        private boolean fromStore;
+
+        /**
+         * @param m
+         * @param transaction
+         */
+        public TxMessage(MessageDelivery m, Transaction tx) {
+            message = m;
+            this.tx = tx;
+        }
+
+        public <T> T asType(Class<T> type) {
+            if (type == TxMessage.class) {
+                return type.cast(this);
+            } else {
+                return null;
+            }
+        }
+
+        public final short getType() {
+            return TYPE_MESSAGE;
+        }
+
+        public final int getLimiterSize() {
+            // TODO Auto-generated method stub
+            return message.getFlowLimiterSize();
+        }
+
+        public final void onCommit() {
+
+        }
+
+        public final void onRollback() {
+            // TODO Auto-generated method stub
+
+        }
+
+        public final boolean isFromStore() {
+            return fromStore;
+        }
+
+        public final MessageRecord createMessageRecord() {
+            return message.createMessageRecord();
+        }
+
+        public final long getStoreTracking() {
+            return message.getStoreTracking();
+        }
+
+        public final boolean isPersistent() {
+            return message.isPersistent();
+        }
+
+        public final Long getExpiration() {
+            return message.getExpiration();
+        }
+
+        public final int getPriority() {
+            return message.getPriority();
+        }
+    }
+
+    static class TxAck implements TxOp {
+        public static AsciiBuffer ENCODING = new AsciiBuffer("txack");
+        Transaction tx;
+        IQueue<Long, ?> queue; //Desriptor of the queue on which to delete.
+        long queueSequence; //Sequence number of the element on the queue from which to delete.
+        long storeTracking; //Store tracking of this delete op.
+        private boolean fromStore;
+        private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
+
+        TxAck(IQueue<Long, ?> queue, long storeTracking, Transaction tx) {
+            this.queue = queue;
+            this.storeTracking = storeTracking;
+            this.tx = tx;
+        }
+
+        public final short getType() {
+            return TYPE_ACK;
+        }
+
+        public <T> T asType(Class<T> type) {
+            if (type == TxAck.class) {
+                return type.cast(this);
+            } else {
+                return null;
+            }
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
+         */
+        public final void onCommit() {
+            //TODO
+
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
+         */
+        public final int getLimiterSize() {
+            return MEM_SIZE;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
+         */
+        public final void onRollback() {
+            // TODO unaquire the element.
+        }
+
+        public final boolean isFromStore() {
+            return fromStore;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking()
+         */
+        public final long getStoreTracking() {
+            return storeTracking;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord
+         * ()
+         */
+        public final MessageRecord createMessageRecord() {
+            MessageRecord ret = new MessageRecord();
+            ret.setEncoding(TxAck.ENCODING);
+            ret.setKey(storeTracking);
+            ret.setSize(MEM_SIZE);
+            ret.setBuffer(new Buffer(toBytes().getData()));
+            return null;
+        }
+
+        private final ByteSequence toBytes() {
+            AsciiBuffer queueName = queue.getDescriptor().getQueueName();
+            DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
+            baos.writeShort(queueName.length);
+            baos.write(queueName.data, queueName.offset, queueName.length);
+            baos.writeLong(queueSequence);
+            return baos.toByteSequence();
+        }
+
+        private final void fromBytes(byte[] bytes) {
+            DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes);
+            byte[] queueBytes = new byte[baos.readShort()];
+            baos.readFully(queueBytes);
+            AsciiBuffer queueName = new AsciiBuffer(queueBytes);
+            queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName);
+
+        }
+
+        public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
+            TxAck ret = new TxAck(null, record.getKey(), tx);
+            ret.fromBytes(record.getBuffer().getData());
+            return ret;
+        }
+
+        public final boolean isPersistent() {
+            //TODO This could probably be relaxed when the ack is for non persistent
+            //elements
+            return true;
+        }
+
+        public final Long getExpiration() {
+            return -1L;
+        }
+
+        public final int getPriority() {
+            return 0;
+        }
+    }
+
+    /**
+     * @param record
+     * @return
+     */
+    public static TxOp createTxOp(MessageRecord record, Transaction tx) {
+        if (record.getEncoding().equals(TxAck.ENCODING)) {
+            return TxAck.createFromMessageRecord(record, tx);
+        } else {
+            MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor());
+            return new TxMessage(delivery, tx);
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Transaction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
+import org.apache.activemq.apollo.broker.Transaction.TxOp;
+import org.apache.activemq.broker.store.Store.MessageRecord;
+import org.apache.activemq.broker.store.Store.QueueQueryResult;
+import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.flow.SizeLimiter;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.ExclusivePersistentQueue;
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.queue.PersistencePolicy;
+import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.RestoreListener;
+import org.apache.activemq.queue.SaveableQueueElement;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.Mapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * TransactionManager
+ * <p>
+ * Description:
+ * </p>
+ * 
+ * @version 1.0
+ */
+public class TransactionManager {
+    private static final Log LOG = LogFactory.getLog(TransactionManager.class);
+    private static final String TX_QUEUE_PREFIX = "TX-";
+    private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP");
+
+    private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
+    private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
+
+    private final VirtualHost host;
+    private final BrokerDatabase database;
+
+    private final AtomicLong tidGen = new AtomicLong(0);
+    private final TransactionStore txStore;
+
+    private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64;
+    private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1;
+    // Be default we don't page out elements to disk.
+    //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+    private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
+
+    private static final PersistencePolicy<TxOp> DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<TxOp>() {
+
+        private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+
+        public boolean isPersistent(TxOp elem) {
+            return elem.isPersistent();
+        }
+
+        public boolean isPageOutPlaceHolders() {
+            return false;
+        }
+
+        public boolean isPagingEnabled() {
+            return PAGING_ENABLED;
+        }
+
+        public int getPagingInMemorySize() {
+            return DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+        }
+
+        public boolean isThrottleSourcesToMemoryLimit() {
+            // Keep the queue in memory.
+            return true;
+        }
+
+        public int getDisconnectedThrottleRate() {
+            // By default don't throttle consumers when disconnected.
+            return 0;
+        }
+
+        public int getRecoveryBias() {
+            return 8;
+        }
+    };
+
+    private static final Mapper<Long, TxOp> EXPIRATION_MAPPER = new Mapper<Long, TxOp>() {
+        public Long map(TxOp element) {
+            return element.getExpiration();
+        }
+    };
+
+    private static final Mapper<Integer, TxOp> SIZE_MAPPER = new Mapper<Integer, TxOp>() {
+        public Integer map(TxOp element) {
+            return element.getLimiterSize();
+        }
+    };
+
+    private static final Mapper<Integer, TxOp> PRIORITY_MAPPER = new Mapper<Integer, TxOp>() {
+        public Integer map(TxOp element) {
+            return element.getPriority();
+        }
+    };
+
+    private static final Mapper<Long, TxOp> KEY_MAPPER = new Mapper<Long, TxOp>() {
+        public Long map(TxOp element) {
+            return element.getStoreTracking();
+        }
+    };
+
+    private static final Mapper<Integer, TxOp> PARTITION_MAPPER = new Mapper<Integer, TxOp>() {
+        public Integer map(TxOp element) {
+            return 1;
+        }
+    };
+
+    TransactionManager(VirtualHost host) {
+        this.host = host;
+        txStore = new TransactionStore(host.getDatabase());
+        database = host.getDatabase();
+    }
+
+    /**
+     * @return The TM's virtual host
+     */
+    public final VirtualHost getVirtualHost() {
+        return host;
+    }
+
+    /**
+     * Creates a transaction.
+     * 
+     * @param xid
+     * @return
+     */
+    public final Transaction createTransaction(Xid xid) {
+        Transaction ret;
+
+        long tid = tidGen.incrementAndGet();
+        IQueue<Long, TxOp> opQueue = createTranscationQueue(tid);
+
+        if (xid == null) {
+            ret = new LocalTransaction(this, tid, opQueue);
+        } else {
+            ret = new XATransaction(this, tid, xid, opQueue);
+        }
+
+        transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
+        transactions.put(ret.getTid(), ret);
+
+        return ret;
+    }
+
+    /**
+     * 
+     * @throws Exception
+     */
+    public void loadTransactions() throws Exception {
+
+        tidGen.set(database.allocateStoreTracking());
+
+        Map<AsciiBuffer, Buffer> txns = database.listMapEntries(TXN_MAP);
+
+        // Load shared queues
+        Iterator<QueueQueryResult> results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+        while (results.hasNext()) {
+            QueueQueryResult loaded = results.next();
+
+            Buffer b = txns.remove(loaded.getDescriptor().getQueueName());
+            if (b == null) {
+                LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount());
+                database.deleteQueue(loaded.getDescriptor());
+            }
+
+            IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
+            Transaction tx = loadTransaction(b, queue);
+            
+            //TODO if we recover a tx that isn't committed then, we should discard it.
+            if (tx.getState() < Transaction.FINISHED_STATE) {
+                LOG.warn("Recovered unfinished transaction: " + tx);
+            }
+            transactions.put(tx.getTid(), tx);
+
+            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+        }
+
+        if (txns.isEmpty()) {
+            //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be 
+            //deleted:
+            LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
+
+        }
+    }
+
+    private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException
+    {
+        //TODO move the serialization into the transaction itself:
+        DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
+        byte type = bais.readByte();
+        byte state = bais.readByte();
+        long tid = bais.readLong();
+
+        Transaction tx = null;
+        switch (type) {
+        case Transaction.TYPE_LOCAL:
+            tx = new LocalTransaction(this, tid, queue);
+            break;
+        case Transaction.TYPE_XA:
+            XidImpl xid = new XidImpl();
+            xid.readbody(bais);
+            tx = new XATransaction(this, tid, xid, queue);
+            break;
+        default:
+            throw new IOException("Invalid transaction type: " + type);
+
+        }
+        tx.setState(state);
+        return tx;
+
+    }
+    
+    public OperationContext persistTransaction(Transaction tx) throws IOException {
+        
+        //TODO move the serialization into the transaction itself:
+        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+        baos.writeByte(tx.getType());
+        baos.writeByte(tx.getState());
+        baos.writeLong(tx.getTid());
+        if(tx.getType() == Transaction.TYPE_XA)
+        {
+            ((XATransaction)tx).getXid().writebody(baos);
+        }
+        
+        return database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
+    }
+
+    private IQueue<Long, TxOp> createRestoredTxQueue(QueueQueryResult loaded) throws IOException {
+
+        IQueue<Long, TxOp> queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+        return queue;
+    }
+
+    private final IQueue<Long, TxOp> createTranscationQueue(long tid) {
+        return createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+    }
+
+    private IQueue<Long, TxOp> createTxQueueInternal(final String name, short type) {
+        ExclusivePersistentQueue<Long, TxOp> queue;
+
+        SizeLimiter<TxOp> limiter = new SizeLimiter<TxOp>(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) {
+            @Override
+            public int getElementSize(TxOp elem) {
+                return elem.getLimiterSize();
+            }
+        };
+        queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
+        queue.setDispatcher(host.getBroker().getDispatcher());
+        queue.setStore(txStore);
+        queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
+        queue.setExpirationMapper(EXPIRATION_MAPPER);
+        queue.getDescriptor().setApplicationType(type);
+        return queue;
+    }
+
+    final QueueStore<Long, Transaction.TxOp> getTxnStore() {
+        return txStore;
+    }
+
+    private class TransactionStore implements QueueStore<Long, Transaction.TxOp> {
+        private final BrokerDatabase database;
+
+        private final BrokerDatabase.MessageRecordMarshaller<TxOp> TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<TxOp>() {
+            public MessageRecord marshal(TxOp element) {
+                return element.createMessageRecord();
+            }
+
+            public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) {
+                Transaction t = transactionsByQueue.get(queue.getQueueName());
+                return Transaction.createTxOp(record, t);
+            }
+        };
+
+        TransactionStore(BrokerDatabase database) {
+            this.database = database;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq
+         * .queue.QueueDescriptor)
+         */
+        public void addQueue(QueueDescriptor queue) {
+            database.addQueue(queue);
+
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq
+         * .queue.QueueDescriptor)
+         */
+        public void deleteQueue(QueueDescriptor queue) {
+            database.deleteQueue(queue);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
+         * .activemq.queue.QueueDescriptor, java.lang.Object)
+         */
+        public void deleteQueueElement(QueueDescriptor queue, TxOp element) {
+            database.deleteQueueElement(element.getStoreTracking(), queue);
+
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object)
+         */
+        public boolean isFromStore(TxOp elem) {
+            return elem.isFromStore();
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache
+         * .activemq.queue.SaveableQueueElement,
+         * org.apache.activemq.flow.ISourceController, boolean)
+         */
+        public void persistQueueElement(SaveableQueueElement<TxOp> sqe, ISourceController<?> source, boolean delayable) {
+            database.saveQeueuElement(sqe, source, delayable, TX_OP_MARSHALLER);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache
+         * .activemq.queue.QueueDescriptor, boolean, long, long, int,
+         * org.apache.activemq.queue.RestoreListener)
+         */
+        public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<TxOp> listener) {
+            database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER);
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/TransactionManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Sun Jun 28 19:37:06 2009
@@ -28,8 +28,6 @@
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.StoreFactory;
 import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.AbstractFlowQueue;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.util.IOHelper;
 
@@ -143,7 +141,7 @@
             queue.shutdown(true);
         }
 
-        for (AbstractFlowQueue<MessageDelivery> queue : queueStore.getDurableQueues()) {
+        for (IQueue<Long, MessageDelivery> queue : queueStore.getDurableQueues()) {
             queue.shutdown(true);
         }
         
@@ -202,7 +200,7 @@
             if (consumer.isDurable()) {
                 DurableSubscription dsub = durableSubs.get(consumer.getSubscriptionName());
                 if (dsub == null) {
-                    ExclusivePersistentQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
+                    IQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
                     queue.start();
                     dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
                     durableSubs.put(consumer.getSubscriptionName(), dsub);

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.IOException;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.queue.IQueue;
+import org.apache.activemq.util.DataByteArrayInputStream;
+
+/**
+ * XATransaction
+ * <p>
+ * Description:
+ * </p>
+ * 
+ * @author cmacnaug
+ * @version 1.0
+ */
+public class XATransaction extends Transaction {
+
+    private final XidImpl xid;
+
+    XATransaction(TransactionManager manager, long tid, Xid xid, IQueue<Long, TxOp> opQueue) {
+        super(manager, tid, opQueue);
+        if (xid instanceof XidImpl) {
+            this.xid = XidImpl.class.cast(xid);
+        } else {
+            this.xid = new XidImpl(xid);
+        }
+    }
+
+    public XidImpl getXid() {
+        return xid;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
+     */
+    @Override
+    public void commit(boolean onePhase) throws XAException, IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.apollo.broker.Transaction#prepare()
+     */
+    @Override
+    public int prepare() throws XAException, IOException {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.apollo.broker.Transaction#rollback()
+     */
+    @Override
+    public void rollback() throws XAException, IOException {
+        // TODO Auto-generated method stub
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.apollo.broker.Transaction#getType()
+     */
+    @Override
+    public byte getType() {
+        return TYPE_XA;
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XATransaction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java?rev=789143&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java Sun Jun 28 19:37:06 2009
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * An implementation of JTA transaction idenifier (javax.transaction.xa.Xid).
+ * This is SonicMQ internal Xid. Any external Xid object will be converted to
+ * this class.
+ */
+public class XidImpl implements Xid, Cloneable, java.io.Serializable {
+
+    //fix bug #8334
+    static final long serialVersionUID = -5363901495878210611L;
+
+    // The format identifier for the XID. A value of -1 indicates the NULLXID
+    private int m_formatID = -1; // default format
+
+    private byte m_gtrid[];
+    // The number of bytes in the global transaction identfier
+    private int m_gtridLength; // Value from 1 through MAXGTRIDSIZE
+
+    private byte m_bqual[];
+    // The number of bytes in the branch qualifier
+    private int m_bqualLength; // Value from 1 through MAXBQUALSIZE
+
+    /////////////////////////////// Constructors /////////////////////////////
+    /**
+     * Constructs a new null XID.
+     * <p>
+     * After construction the data within the XID should be initialized.
+     */
+    public XidImpl() {
+        this(-1, null, null);
+    }
+
+    public XidImpl(int formatID, byte[] globalTxnID, byte[] branchID) {
+        m_formatID = formatID;
+        setGlobalTransactionId(globalTxnID);
+        setBranchQualifier(branchID);
+    }
+
+    /**
+     * Initialize an XID using another XID as the source of data.
+     * 
+     * @param from
+     *            the XID to initialize this XID from
+     */
+    public XidImpl(Xid from) {
+
+        if ((from == null) || (from.getFormatId() == -1)) {
+            m_formatID = -1;
+            setGlobalTransactionId(null);
+            setBranchQualifier(null);
+        } else {
+            m_formatID = from.getFormatId();
+            setGlobalTransactionId(from.getGlobalTransactionId());
+            setBranchQualifier(from.getBranchQualifier());
+        }
+
+    }
+
+    // used for test purpose
+    public XidImpl(String globalTxnId, String branchId) {
+
+        this(99, globalTxnId.getBytes(), branchId.getBytes());
+
+    }
+
+    //////////// Public Methods //////////////
+
+    /**
+     * Determine whether or not two objects of this type are equal.
+     * 
+     * @param o
+     *            the other XID object to be compared with this XID.
+     * 
+     * @return Returns true of the supplied object represents the same global
+     *         transaction as this, otherwise returns false.
+     */
+    public boolean equals(Object o) {
+        Xid other;
+
+        if (!(o instanceof Xid))
+            return false;
+
+        other = (Xid) o;
+
+        if (m_formatID == -1 && other.getFormatId() == -1)
+            return true;
+
+        if (m_formatID != other.getFormatId() || m_gtridLength != other.getGlobalTransactionId().length || m_bqualLength != other.getBranchQualifier().length) {
+            return false;
+        }
+
+        return isEqualGtrid(other) && isEqualBranchQualifier(other.getBranchQualifier());
+
+    }
+
+    /**
+     * Compute the hash code.
+     * 
+     * @return the computed hashcode
+     */
+
+    public int hashCode() {
+
+        if (m_formatID == -1)
+            return (-1);
+
+        return m_formatID + m_gtridLength - m_bqualLength;
+
+    }
+
+    /**
+     * Return a string representing this XID.
+     * <p>
+     * This is normally used to display the XID when debugging.
+     * 
+     * @return the string representation of this XID
+     */
+
+    public String toString() {
+        String gtString = new String(getGlobalTransactionId());
+        String brString = new String(getBranchQualifier());
+        return new String("{Xid: " + "formatID=" + m_formatID + ", " + "gtrid[" + m_gtridLength + "]=" + gtString + ", " + "brid[" + m_bqualLength + "]=" + brString + "}");
+
+    }
+
+    /**
+     * Obtain the format identifier part of the XID.
+     * 
+     * @return Format identifier. -1 indicates a null XID
+     */
+    public int getFormatId() {
+        return m_formatID;
+    }
+
+    /**
+     * Returns the global transaction identifier for this XID.
+     * 
+     * @return the global transaction identifier
+     */
+    public byte[] getGlobalTransactionId() {
+        return m_gtrid;
+    }
+
+    /**
+     * Returns the branch qualifier for this XID.
+     * 
+     * @return the branch qualifier
+     */
+    public byte[] getBranchQualifier() {
+        return m_bqual;
+    }
+
+    ///////////////////////// private methods ////////////////////////////////
+
+    /**
+     * Set the branch qualifier for this XID. Note that the branch qualifier has
+     * a maximum size.
+     * 
+     * @param qual
+     *            a Byte array containing the branch qualifier to be set. If the
+     *            size of the array exceeds MAXBQUALSIZE, only the first
+     *            MAXBQUALSIZE elements of qual will be used.
+     */
+    private void setBranchQualifier(byte[] branchID) {
+        if (branchID == null) {
+            m_bqualLength = 0;
+            m_bqual = new byte[m_bqualLength];
+        } else {
+            m_bqualLength = branchID.length > MAXBQUALSIZE ? MAXBQUALSIZE : branchID.length;
+            m_bqual = new byte[m_bqualLength];
+            System.arraycopy(branchID, 0, m_bqual, 0, m_bqualLength);
+        }
+    }
+
+    private void setGlobalTransactionId(byte[] globalTxnID) {
+        if (globalTxnID == null) {
+            m_gtridLength = 0;
+            m_gtrid = new byte[m_gtridLength];
+        } else {
+            m_gtridLength = globalTxnID.length > MAXGTRIDSIZE ? MAXGTRIDSIZE : globalTxnID.length;
+            m_gtrid = new byte[m_gtridLength];
+            System.arraycopy(globalTxnID, 0, m_gtrid, 0, m_gtridLength);
+        }
+    }
+
+    /**
+     * Return whether the Gtrid of this is equal to the Gtrid of xid
+     */
+    private boolean isEqualGtrid(Xid xid) {
+        byte[] xidGtrid = xid.getGlobalTransactionId();
+
+        if (getGlobalTransactionId() == null && xidGtrid == null)
+            return true;
+        if (getGlobalTransactionId() == null)
+            return false;
+        if (xidGtrid == null)
+            return false;
+
+        if (m_gtridLength != xidGtrid.length) {
+            return false;
+        }
+
+        for (int i = 0; i < m_gtridLength; i++) {
+            if (m_gtrid[i] != xidGtrid[i])
+                return false;
+        }
+        return true;
+    }
+
+    /**
+     * Determine if an array of bytes equals the branch qualifier
+     * 
+     * @return true if equal
+     */
+    private boolean isEqualBranchQualifier(byte[] data) {
+
+        int L = data.length > MAXBQUALSIZE ? MAXBQUALSIZE : data.length;
+
+        if (L != m_bqualLength)
+            return false;
+
+        for (int i = 0; i < m_bqualLength; i++) {
+            if (data[i] != m_bqual[i])
+                return false;
+        }
+
+        return true;
+    }
+
+    public int getMemorySize() {
+        return 4 // formatId
+                + 4 // length of globalTxnId
+                + m_gtridLength // globalTxnId
+                + 4 // length of branchId
+                + m_bqualLength; // branchId
+    }
+
+    /**
+     * Writes this XidImpl's data to the DataOutput destination
+     * 
+     * @param out
+     *            The DataOutput destination
+     * @param maxbytes
+     *            Maximum number of bytes that may be written to the destination
+     * 
+     * @exception ELogEventTooLong
+     *                The data could not be written without exceeding the
+     *                maxbytes parameter. The data may have been partially
+     *                written.
+     */
+    public void writebody(DataOutput out) throws IOException {
+        out.writeInt(m_formatID); // format ID
+
+        out.writeInt(m_gtridLength); // length of global Txn ID
+        out.write(getGlobalTransactionId(), 0, m_gtridLength); // global transaction ID
+        out.writeInt(m_bqualLength); // length of branch ID
+        out.write(getBranchQualifier(), 0, m_bqualLength); // branch ID
+    }
+
+    /**
+     * read xid from an Array and set each fields.
+     * 
+     * @param in
+     *            the data input array
+     * @throws IOException
+     */
+    public void readbody(DataInput in) throws IOException {
+        m_formatID = in.readInt();
+        int gtidLen = in.readInt();
+        byte[] globalTxnId = new byte[gtidLen];
+        in.readFully(globalTxnId, 0, gtidLen);
+
+        int brlen = in.readInt();
+        byte[] branchId = new byte[brlen];
+        in.readFully(branchId, 0, brlen);
+
+        setGlobalTransactionId(globalTxnId);
+        setBranchQualifier(branchId);
+    }
+} // class XidImpl

Propchange: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native