You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:40:20 UTC

svn commit: r961068 [2/4] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache...

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,1092 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.util.{LinkedHashMap, HashMap}
+
+class TransactionManagerConfig {
+
+  def apply(host:VirtualHost): TransactionManager = {
+    new TransactionManager(host, this);
+  }
+}
+
+
+class TransactionManager(val virtualHost:VirtualHost, config:TransactionManagerConfig) {
+// TODO:
+//    private static final Log LOG = LogFactory.getLog(TransactionManager.class);
+//    private static final String TX_QUEUE_PREFIX = "TX-";
+//    private static final AsciiBuffer TXN_MAP = new AsciiBuffer("TXMAP");
+//
+//    private final HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
+//    private final HashMap<AsciiBuffer, Transaction> transactionsByQueue = new HashMap<AsciiBuffer, Transaction>();
+//    private final HashMap<Buffer, XATransaction> xaTransactions = new HashMap<Buffer, XATransaction>();
+//
+//    private final VirtualHost host;
+//    private BrokerDatabase database;
+//
+//    private final AtomicLong tidGen = new AtomicLong(0);
+//    private final TransactionStore txStore;
+//
+//    private static final int DEFAULT_TX_QUEUE_PAGING_THRESHOLD = 1024 * 64;
+//    private static final int DEFAULT_TX_QUEUE_RESUME_THRESHOLD = 1;
+//    // Be default we don't page out elements to disk.
+//    private static final int DEFAULT_TX_QUEUE_SIZE = DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+//    //private static final int DEFAULT_TX_QUEUE_SIZE = Integer.MAX_VALUE;
+//
+//    private static final PersistencePolicy<TxOp> DEFAULT_TX_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<TxOp>() {
+//
+//        private static final boolean PAGING_ENABLED = DEFAULT_TX_QUEUE_SIZE > DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+//
+//        public boolean isPersistent(TxOp elem) {
+//            return elem.isPersistent();
+//        }
+//
+//        public boolean isPageOutPlaceHolders() {
+//            return false;
+//        }
+//
+//        public boolean isPagingEnabled() {
+//            return PAGING_ENABLED;
+//        }
+//
+//        public int getPagingInMemorySize() {
+//            return DEFAULT_TX_QUEUE_PAGING_THRESHOLD;
+//        }
+//
+//        public boolean isThrottleSourcesToMemoryLimit() {
+//            // Keep the queue in memory.
+//            return true;
+//        }
+//
+//        public int getDisconnectedThrottleRate() {
+//            // By default don't throttle consumers when disconnected.
+//            return 0;
+//        }
+//
+//        public int getRecoveryBias() {
+//            return 8;
+//        }
+//    };
+//
+//    private static final Mapper<Long, TxOp> EXPIRATION_MAPPER = new Mapper<Long, TxOp>() {
+//        public Long map(TxOp element) {
+//            return element.getExpiration();
+//        }
+//    };
+//
+//    private static final Mapper<Integer, TxOp> SIZE_MAPPER = new Mapper<Integer, TxOp>() {
+//        public Integer map(TxOp element) {
+//            return element.getLimiterSize();
+//        }
+//    };
+//
+//    private static final Mapper<Integer, TxOp> PRIORITY_MAPPER = new Mapper<Integer, TxOp>() {
+//        public Integer map(TxOp element) {
+//            return element.getPriority();
+//        }
+//    };
+//
+//    private static final Mapper<Long, TxOp> KEY_MAPPER = new Mapper<Long, TxOp>() {
+//        public Long map(TxOp element) {
+//            return element.getStoreTracking();
+//        }
+//    };
+//
+//    private static final Mapper<Integer, TxOp> PARTITION_MAPPER = new Mapper<Integer, TxOp>() {
+//        public Integer map(TxOp element) {
+//            return 1;
+//        }
+//    };
+//
+//    TransactionManager(VirtualHost host) {
+//        this.host = host;
+//        txStore = new TransactionStore(host.getDatabase());
+//        database = host.getDatabase();
+//    }
+//
+//    /**
+//     * @return The TM's virtual host
+//     */
+//    public final VirtualHost getVirtualHost() {
+//        return host;
+//    }
+//
+//    /**
+//     * @param msg
+//     * @param controller
+//     */
+//    public void newMessage(MessageDelivery msg, ISourceController<?> controller) {
+//        if (msg.getStoreTracking() == -1) {
+//            msg.setStoreTracking(host.getDatabase().allocateStoreTracking());
+//        }
+//        transactions.get(msg.getTransactionId()).addMessage(msg, controller);
+//    }
+//
+//    /**
+//     * Creates a transaction.
+//     *
+//     * @param xid
+//     * @return
+//     */
+//    public synchronized final Transaction createTransaction(Buffer xid) {
+//        Transaction ret;
+//
+//        long tid = tidGen.incrementAndGet();
+//        IQueue<Long, TxOp> opQueue = createTransactionQueue(tid);
+//
+//        if (xid == null) {
+//            ret = new LocalTransaction(this, tid, opQueue);
+//        } else {
+//            XATransaction xat = new XATransaction(this, tid, xid, opQueue);
+//            ret = xat;
+//            xaTransactions.put(xid, xat);
+//        }
+//
+//        transactionsByQueue.put(opQueue.getDescriptor().getQueueName(), ret);
+//        transactions.put(ret.getTid(), ret);
+//
+//        return ret;
+//    }
+//
+//    /**
+//     * @param buffer
+//     * @return
+//     */
+//    public synchronized Transaction getXATransaction(Buffer buffer) {
+//        return xaTransactions.get(buffer);
+//    }
+//
+//    /**
+//     *
+//     * @throws Exception
+//     */
+//    public synchronized void loadTransactions() throws Exception {
+//
+//        tidGen.set(database.allocateStoreTracking());
+//
+//        Map<AsciiBuffer, Buffer> txns = database.listMapEntries(TXN_MAP);
+//
+//        // Load shared queues
+//        Iterator<QueueQueryResult> results = database.listQueues(BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+//        while (results.hasNext()) {
+//            QueueQueryResult loaded = results.next();
+//
+//            Buffer b = txns.remove(loaded.getDescriptor().getQueueName());
+//            if (b == null) {
+//                LOG.warn("Recovered orphaned transaction queue: " + loaded.getDescriptor() + " elements: " + loaded.getCount());
+//                database.deleteQueue(loaded.getDescriptor());
+//            }
+//
+//            IQueue<Long, TxOp> queue = createRestoredTxQueue(loaded);
+//            Transaction tx = loadTransaction(b, queue);
+//
+//            //TODO if we recover a tx that isn't committed then, we should discard it.
+//            if (tx.getState() < Transaction.COMMITED_STATE) {
+//                LOG.warn("Recovered unfinished transaction: " + tx);
+//            }
+//            transactions.put(tx.getTid(), tx);
+//            if (tx instanceof XATransaction) {
+//                XATransaction xat = XATransaction.class.cast(tx);
+//                xaTransactions.put(xat.getXid(), xat);
+//            }
+//
+//            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+//        }
+//
+//        if (!txns.isEmpty()) {
+//            //TODO Based on transaction state this is generally ok, anyway the orphaned entries should be
+//            //deleted:
+//            LOG.warn("Recovered transactions without backing queues: " + txns.keySet());
+//        }
+//    }
+//
+//    private Transaction loadTransaction(Buffer b, IQueue<Long, TxOp> queue) throws IOException {
+//        //TODO move the serialization into the transaction itself:
+//        DataByteArrayInputStream bais = new DataByteArrayInputStream(b.getData());
+//        byte type = bais.readByte();
+//        byte state = bais.readByte();
+//        long tid = bais.readLong();
+//
+//        Transaction tx = null;
+//        switch (type) {
+//        case Transaction.TYPE_LOCAL:
+//            tx = new LocalTransaction(this, tid, queue);
+//            break;
+//        case Transaction.TYPE_XA:
+//            int length = bais.readByte() & 0xFF;
+//            Buffer xid = new Buffer(new byte[length]);
+//            bais.readFully(xid.data);
+//            tx = new XATransaction(this, tid, xid, queue);
+//            break;
+//        default:
+//            throw new IOException("Invalid transaction type: " + type);
+//
+//        }
+//        tx.setState(state, null);
+//        return tx;
+//
+//    }
+//
+//    public ListenableFuture<?> persistTransaction(Transaction tx) {
+//
+//        //TODO move the serialization into the transaction itself:
+//        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
+//        try {
+//            baos.writeByte(tx.getType());
+//            baos.writeByte(tx.getState());
+//            baos.writeLong(tx.getTid());
+//            if (tx.getType() == Transaction.TYPE_XA) {
+//                Buffer xid = ((XATransaction) tx).getXid();
+//                // An XID max size is around 140 bytes, byte SHOULD be big enough to frame it.
+//                baos.writeByte(xid.length & 0xFF);
+//                baos.write(xid.data, xid.offset, xid.length);
+//            }
+//            OperationContext<?> ctx = database.updateMapEntry(TXN_MAP, tx.getBackingQueueName(), new Buffer(baos.getData(), 0, baos.size()));
+//            ctx.requestFlush();
+//            return ctx;
+//        } catch (IOException ioe) {
+//            //Shouldn't happen
+//            throw new RuntimeException(ioe);
+//        }
+//    }
+//
+//    private IQueue<Long, TxOp> createRestoredTxQueue(QueueQueryResult loaded) throws IOException {
+//
+//        IQueue<Long, TxOp> queue = createTxQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+//        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+//        return queue;
+//    }
+//
+//    private final IQueue<Long, TxOp> createTransactionQueue(long tid) {
+//        IQueue<Long, TxOp> queue = createTxQueueInternal(TX_QUEUE_PREFIX + tid, BrokerQueueStore.TRANSACTION_QUEUE_TYPE);
+//        queue.initialize(0, 0, 0, 0);
+//        txStore.addQueue(queue.getDescriptor());
+//        return queue;
+//    }
+//
+//    private IQueue<Long, TxOp> createTxQueueInternal(final String name, short type) {
+//        ExclusivePersistentQueue<Long, TxOp> queue;
+//
+//        SizeLimiter<TxOp> limiter = new SizeLimiter<TxOp>(DEFAULT_TX_QUEUE_SIZE, DEFAULT_TX_QUEUE_RESUME_THRESHOLD) {
+//            @Override
+//            public int getElementSize(TxOp elem) {
+//                return elem.getLimiterSize();
+//            }
+//        };
+//        queue = new ExclusivePersistentQueue<Long, TxOp>(name, limiter);
+//        queue.setStore(txStore);
+//        queue.setPersistencePolicy(DEFAULT_TX_QUEUE_PERSISTENCE_POLICY);
+//        queue.setExpirationMapper(EXPIRATION_MAPPER);
+//        queue.getDescriptor().setApplicationType(type);
+//        return queue;
+//    }
+//
+//    final QueueStore<Long, Transaction.TxOp> getTxnStore() {
+//        return txStore;
+//    }
+//
+//    private class TransactionStore implements QueueStore<Long, Transaction.TxOp> {
+//        private final BrokerDatabase database;
+//
+//        private final BrokerDatabase.MessageRecordMarshaller<TxOp> TX_OP_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<TxOp>() {
+//            public MessageRecord marshal(TxOp element) {
+//                return element.createMessageRecord();
+//            }
+//
+//            public TxOp unMarshall(MessageRecord record, QueueDescriptor queue) {
+//                Transaction t = transactionsByQueue.get(queue.getQueueName());
+//                return Transaction.createTxOp(record, t);
+//            }
+//        };
+//
+//        TransactionStore(BrokerDatabase database) {
+//            this.database = database;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#addQueue(org.apache.activemq
+//         * .queue.QueueDescriptor)
+//         */
+//        public void addQueue(QueueDescriptor queue) {
+//            database.addQueue(queue);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#deleteQueue(org.apache.activemq
+//         * .queue.QueueDescriptor)
+//         */
+//        public void deleteQueue(QueueDescriptor queue) {
+//            database.deleteQueue(queue);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#deleteQueueElement(org.apache
+//         * .activemq.queue.QueueDescriptor, java.lang.Object)
+//         */
+//        public void deleteQueueElement(SaveableQueueElement<TxOp> sqe) {
+//            database.deleteQueueElement(sqe);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#isFromStore(java.lang.Object)
+//         */
+//        public boolean isFromStore(TxOp elem) {
+//            return elem.isFromStore();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#persistQueueElement(org.apache
+//         * .activemq.queue.SaveableQueueElement,
+//         * org.apache.activemq.flow.ISourceController, boolean)
+//         */
+//        public void persistQueueElement(SaveableQueueElement<TxOp> sqe, ISourceController<?> source, boolean delayable) {
+//            database.saveQeueuElement(sqe, source, false, TX_OP_MARSHALLER);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore#restoreQueueElements(org.apache
+//         * .activemq.queue.QueueDescriptor, boolean, long, long, int,
+//         * org.apache.activemq.queue.RestoreListener)
+//         */
+//        public void restoreQueueElements(QueueDescriptor queue, boolean recordOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<TxOp> listener) {
+//            database.restoreQueueElements(queue, recordOnly, firstSequence, maxSequence, maxCount, listener, TX_OP_MARSHALLER);
+//        }
+//    }
+
+    def loadTransactions() = {
+    }
+}
+
+
+/**
+ * Keeps track of all the actions the need to be done when a transaction does a
+ * commit or rollback.
+ */
+abstract class Transaction {
+
+// TODO:
+//    private static final Log LOG = LogFactory.getLog(Transaction.class);
+//
+//    public static final byte START_STATE = 0; // can go to: 1,2,3
+//    public static final byte IN_USE_STATE = 1; // can go to: 2,3, 4
+//    public static final byte PREPARED_STATE = 2; // can go to: 3, 4
+//    public static final byte COMMITED_STATE = 3;
+//    public static final byte ROLLBACK_STATE = 4;
+//
+//    static final byte TYPE_LOCAL = 0;
+//    static final byte TYPE_XA = 1;
+//
+//    private byte state = START_STATE;
+//    private final TransactionManager manager;
+//    private final long tid;
+//    private final IQueue<Long, TxOp> opQueue;
+//    protected HashSet<TransactionListener> listeners;
+//
+//    private TxProcessor processor;
+//
+//    Transaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+//        this.manager = manager;
+//        this.opQueue = opQueue;
+//        opQueue.start();
+//        this.tid = tid;
+//    }
+//
+//    /**
+//     * @return the unique identifier used by the {@link TransactionManager} to
+//     *         identify this {@link Transaction}
+//     *
+//     */
+//    public long getTid() {
+//        return tid;
+//    }
+//
+//    public AsciiBuffer getBackingQueueName() {
+//        return opQueue.getDescriptor().getQueueName();
+//    }
+//
+//    /**
+//     * @return The transaction type e.g. {@link Transaction#TYPE_LOCAL}
+//     */
+//    public abstract byte getType();
+//
+//    public void addMessage(MessageDelivery m, ISourceController<?> source) {
+//
+//        synchronized (this) {
+//            switch (state) {
+//            case START_STATE:
+//            case IN_USE_STATE:
+//                opQueue.add(new TxMessage(m, this), source);
+//                break;
+//            default: {
+//                throw new IllegalStateException("Can't add message to finished or prepared transaction");
+//            }
+//            }
+//        }
+//    }
+//
+//    public void addAck(SubscriptionDelivery<MessageDelivery> toAck) {
+//        synchronized (this) {
+//            switch (state) {
+//            case START_STATE:
+//            case IN_USE_STATE:
+//                IQueue<Long, MessageDelivery> target = manager.getVirtualHost().getQueueStore().getQueue(toAck.getQueueDescriptor().getQueueName());
+//                //Queue could be null if it was just deleted:
+//                if (target != null) {
+//                    long tracking = manager.getVirtualHost().getDatabase().allocateStoreTracking();
+//                    opQueue.add(new TxAck(target, toAck.getSourceQueueRemovalKey(), tracking, this), null);
+//                }
+//                break;
+//            default: {
+//                throw new IllegalStateException("Can't add message to finished or prepared transaction");
+//            }
+//            }
+//        }
+//    }
+//
+//    public byte getState() {
+//        return state;
+//    }
+//
+//    public void setState(byte state, FutureListener<? super Object> listener) {
+//        this.state = state;
+//        ListenableFuture<?> future = manager.persistTransaction(this);
+//        future.setFutureListener(listener);
+//    }
+//
+//    public void prePrepare() throws Exception {
+//
+//        // Is it ok to call prepare now given the state of the
+//        // transaction?
+//        switch (state) {
+//        case START_STATE:
+//        case IN_USE_STATE:
+//            break;
+//        default:
+//            XAException xae = new XAException("Prepare cannot be called now.");
+//            xae.errorCode = XAException.XAER_PROTO;
+//            throw xae;
+//        }
+//    }
+//
+//    protected void fireAfterCommit() throws Exception {
+//
+//        synchronized (this) {
+//            for (TransactionListener listener : listeners) {
+//                listener.onCommit(this);
+//            }
+//        }
+//    }
+//
+//    public void fireAfterRollback() throws Exception {
+//        synchronized (this) {
+//            for (TransactionListener listener : listeners) {
+//                listener.onRollback(this);
+//            }
+//        }
+//    }
+//
+//    public void fireAfterPrepare() throws Exception {
+//        synchronized (this) {
+//            for (TransactionListener listener : listeners) {
+//                listener.onPrepared(this);
+//            }
+//        }
+//    }
+//
+//    public String toString() {
+//        return super.toString() + "[queue=" + opQueue + "]";
+//    }
+//
+//    public abstract void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException;
+//
+//    public abstract void rollback(TransactionListener listener) throws XAException, IOException;
+//
+//    public abstract int prepare(TransactionListener listener) throws XAException, IOException;
+//
+//    public boolean isPrepared() {
+//        return getState() == PREPARED_STATE;
+//    }
+//
+//    public long size() {
+//        return opQueue.getEnqueuedCount();
+//    }
+//
+//    public static abstract class TransactionListener {
+//        public void onRollback(Transaction t) {
+//
+//        }
+//
+//        public void onCommit(Transaction t) {
+//
+//        }
+//
+//        public void onPrepared(Transaction t) {
+//
+//        }
+//    }
+//
+//    interface TxOp {
+//        public static final short TYPE_MESSAGE = 0;
+//        public static final short TYPE_ACK = 1;
+//
+//        public short getType();
+//
+//        public <T> T asType(Class<T> type);
+//
+//        public void onRollback(ISourceController<?> controller);
+//
+//        public void onCommit(ISourceController<?> controller);
+//
+//        public int getLimiterSize();
+//
+//        public boolean isFromStore();
+//
+//        public long getStoreTracking();
+//
+//        public MessageRecord createMessageRecord();
+//
+//        /**
+//         * @return
+//         */
+//        public boolean isPersistent();
+//
+//        /**
+//         * @return
+//         */
+//        public Long getExpiration();
+//
+//        public int getPriority();
+//    }
+//
+//    static class TxMessage implements TxOp {
+//        MessageDelivery message;
+//        Transaction tx;
+//        private boolean fromStore;
+//
+//        /**
+//         * @param m
+//         * @param transaction
+//         */
+//        public TxMessage(MessageDelivery m, Transaction tx) {
+//            message = m;
+//            this.tx = tx;
+//        }
+//
+//        public <T> T asType(Class<T> type) {
+//            if (type == TxMessage.class) {
+//                return type.cast(this);
+//            } else {
+//                return null;
+//            }
+//        }
+//
+//        public final short getType() {
+//            return TYPE_MESSAGE;
+//        }
+//
+//        public final int getLimiterSize() {
+//            return message.getFlowLimiterSize();
+//        }
+//
+//        public final void onCommit(ISourceController<?> controller) {
+//            message.clearTransactionId();
+//            tx.manager.getVirtualHost().getRouter().route(message, controller, true);
+//        }
+//
+//        public final void onRollback(ISourceController<?> controller) {
+//            //Nothing to do here, message just gets dropped:
+//            return;
+//        }
+//
+//        public final boolean isFromStore() {
+//            return fromStore;
+//        }
+//
+//        public final MessageRecord createMessageRecord() {
+//            return message.createMessageRecord();
+//        }
+//
+//        public final long getStoreTracking() {
+//            return message.getStoreTracking();
+//        }
+//
+//        public final boolean isPersistent() {
+//            return message.isPersistent();
+//        }
+//
+//        public final Long getExpiration() {
+//            return message.getExpiration();
+//        }
+//
+//        public final int getPriority() {
+//            return message.getPriority();
+//        }
+//    }
+//
+//    static class TxAck implements TxOp {
+//        public static AsciiBuffer ENCODING = new AsciiBuffer("txack");
+//        Transaction tx;
+//        IQueue<Long, ?> queue; //Desriptor of the queue on which to delete.
+//        long queueSequence; //Sequence number of the element on the queue from which to delete.
+//        final long storeTracking; //Store tracking of this delete op.
+//        private boolean fromStore;
+//        private static final int MEM_SIZE = 8 + 8 + 8 + 8 + 1;
+//
+//        TxAck(IQueue<Long, ?> queue, long removalKey, long storeTracking, Transaction tx) {
+//            this.queue = queue;
+//            this.queueSequence = removalKey;
+//            this.tx = tx;
+//            this.storeTracking = storeTracking;
+//        }
+//
+//        public final short getType() {
+//            return TYPE_ACK;
+//        }
+//
+//        public <T> T asType(Class<T> type) {
+//            if (type == TxAck.class) {
+//                return type.cast(this);
+//            } else {
+//                return null;
+//            }
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onCommit()
+//         */
+//        public final void onCommit(ISourceController<?> controller) {
+//            queue.remove(queueSequence);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.apollo.broker.Transaction.TxOp#onRollback()
+//         */
+//        public final void onRollback(ISourceController<?> controller) {
+//            //No-Op for now, it is possible that we'd want to unaquire these
+//            //in the queue if the client weren't to keep these
+//            //around
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.Transaction.TxOp#getLimiterSize()
+//         */
+//        public final int getLimiterSize() {
+//            return MEM_SIZE;
+//        }
+//
+//        public final boolean isFromStore() {
+//            return fromStore;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.Transaction.TxOp#getStoreTracking()
+//         */
+//        public final long getStoreTracking() {
+//            return storeTracking;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.Transaction.TxOp#createMessageRecord
+//         * ()
+//         */
+//        public final MessageRecord createMessageRecord() {
+//            MessageRecord ret = new MessageRecord();
+//            ret.setEncoding(TxAck.ENCODING);
+//            ret.setKey(storeTracking);
+//            ret.setSize(MEM_SIZE);
+//            ret.setBuffer(new Buffer(toBytes().getData()));
+//            return null;
+//        }
+//
+//        private final Buffer toBytes() {
+//            AsciiBuffer queueName = queue.getDescriptor().getQueueName();
+//            DataByteArrayOutputStream baos = new DataByteArrayOutputStream(2 + queueName.length + 8);
+//            try {
+//                baos.writeShort(queueName.length);
+//                baos.write(queueName.data, queueName.offset, queueName.length);
+//                baos.writeLong(queueSequence);
+//            } catch (IOException shouldNotHappen) {
+//                throw new RuntimeException(shouldNotHappen);
+//            }
+//            return baos.toBuffer();
+//        }
+//
+//        private final void fromBytes(byte[] bytes) {
+//            DataByteArrayInputStream baos = new DataByteArrayInputStream(bytes);
+//            byte[] queueBytes = new byte[baos.readShort()];
+//            baos.readFully(queueBytes);
+//            AsciiBuffer queueName = new AsciiBuffer(queueBytes);
+//            queue = tx.manager.getVirtualHost().getQueueStore().getQueue(queueName);
+//            queueSequence = baos.readLong();
+//
+//        }
+//
+//        public final static TxAck createFromMessageRecord(MessageRecord record, Transaction tx) {
+//            TxAck ret = new TxAck(null, -1, record.getKey(), tx);
+//            ret.fromBytes(record.getBuffer().getData());
+//            return ret;
+//        }
+//
+//        public final boolean isPersistent() {
+//            //TODO This could probably be relaxed when the ack is for non persistent
+//            //elements
+//            return true;
+//        }
+//
+//        public final Long getExpiration() {
+//            return -1L;
+//        }
+//
+//        public final int getPriority() {
+//            return 0;
+//        }
+//    }
+//
+//    /**
+//     * @param record
+//     * @return
+//     */
+//    public static TxOp createTxOp(MessageRecord record, Transaction tx) {
+//        if (record.getEncoding().equals(TxAck.ENCODING)) {
+//            return TxAck.createFromMessageRecord(record, tx);
+//        } else {
+//            MessageDelivery delivery = tx.manager.getVirtualHost().getQueueStore().getMessageMarshaller().unMarshall(record, tx.opQueue.getDescriptor());
+//            return new TxMessage(delivery, tx);
+//        }
+//    }
+//
+//    protected void startTransactionProcessor()
+//    {
+//        synchronized(this)
+//        {
+//            if(processor == null)
+//            {
+//                processor = new TxProcessor();
+//                opQueue.addSubscription(processor);
+//            }
+//        }
+//    }
+//
+//
+//    /**
+//     * TxProcessor
+//     * <p>
+//     * Description: The tx processor processes the transaction queue after
+//     * commit or rollback.
+//     * </p>
+//     *
+//     * @author cmacnaug
+//     * @version 1.0
+//     */
+//    private class TxProcessor implements Subscription<TxOp> {
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#add(java.lang.Object,
+//         * org.apache.activemq.flow.ISourceController,
+//         * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
+//         */
+//        public void add(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
+//
+//            switch (state) {
+//            case COMMITED_STATE: {
+//                element.onCommit(controller);
+//                if (callback != null) {
+//                    callback.acknowledge();
+//                }
+//                break;
+//            }
+//            case ROLLBACK_STATE: {
+//                element.onRollback(controller);
+//                if (callback != null) {
+//                    callback.acknowledge();
+//                }
+//                break;
+//            }
+//            default: {
+//                LOG.error("Illegal state for transaction dispatch: " + this + " state: " + state);
+//            }
+//            }
+//
+//            //If we've reached the end of the op queue
+//            if (opQueue.getEnqueuedCount() == 0) {
+//                opQueue.shutdown(null);
+//            }
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#hasSelector()
+//         */
+//        public boolean hasSelector() {
+//            return false;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#isBrowser()
+//         */
+//        public boolean isBrowser() {
+//            return false;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#isExclusive()
+//         */
+//        public boolean isExclusive() {
+//            return true;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.Subscription#isRemoveOnDispatch(java.lang
+//         * .Object)
+//         */
+//        public boolean isRemoveOnDispatch(TxOp elem) {
+//            return false;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#matches(java.lang.Object)
+//         */
+//        public boolean matches(TxOp elem) {
+//            return true;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object,
+//         * org.apache.activemq.flow.ISourceController,
+//         * org.apache.activemq.queue.Subscription.SubscriptionDelivery)
+//         */
+//        public boolean offer(TxOp element, ISourceController<?> controller, SubscriptionDelivery<TxOp> callback) {
+//            add(element, controller, callback);
+//            return true;
+//        }
+//
+//    }
+}
+
+
+/**
+ * LocalTransaction
+ * <p>
+ * Description:
+ * </p>
+ *
+ * @author cmacnaug
+ * @version 1.0
+ */
+class LocalTransaction extends Transaction {
+
+//    TODO:
+//    LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+//        super(manager, tid, opQueue);
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
+//     */
+//    @Override
+//    public void commit(boolean onePhase, final TransactionListener listener) throws XAException, IOException {
+//        if (LOG.isDebugEnabled()) {
+//            LOG.debug("commit: " + this);
+//        }
+//
+//        synchronized(this)
+//        {
+//            // Get ready for commit.
+//            try {
+//                prePrepare();
+//            } catch (XAException e) {
+//                throw e;
+//            } catch (Throwable e) {
+//                LOG.warn("COMMIT FAILED: ", e);
+//                rollback(null);
+//                // Let them know we rolled back.
+//                XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
+//                xae.errorCode = XAException.XA_RBOTHER;
+//                xae.initCause(e);
+//                throw xae;
+//            }
+//
+//            //Add the listener for commit
+//            if(listeners == null)
+//            {
+//                listeners = new HashSet<TransactionListener>();
+//            }
+//            listeners.add(listener);
+//
+//            //Update the transaction state to committed,
+//            //and on complete process the commit:
+//            setState(COMMITED_STATE, new FutureListener<Object>()
+//            {
+//                public void onFutureComplete(Future<? extends Object> dbCommitResult) {
+//                    try {
+//                        fireAfterCommit();
+//                        startTransactionProcessor();
+//                    } catch (InterruptedException e) {
+//                        //Shouldn't happen
+//                        LOG.warn(new AssertionError(e));
+//                    } catch (ExecutionException e) {
+//                        LOG.warn("COMMIT FAILED: ", e);
+//                    }
+//                    catch (Exception e)
+//                    {
+//                    }
+//                }
+//            });
+//        }
+//    }
+//
+//
+//    public int prepare(TransactionListener listener) throws XAException {
+//        XAException xae = new XAException("Prepare not implemented on Local Transactions.");
+//        xae.errorCode = XAException.XAER_RMERR;
+//        throw xae;
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.apollo.broker.Transaction#rollback()
+//     */
+//    @Override
+//    public void rollback(TransactionListener listener) throws XAException, IOException {
+//        // TODO Auto-generated method stub
+//        throw new UnsupportedOperationException("Not yet implemnted");
+//    }
+//
+//    /* (non-Javadoc)
+//     * @see org.apache.activemq.apollo.broker.Transaction#getType()
+//     */
+//    @Override
+//    public byte getType() {
+//        return TYPE_LOCAL;
+//    }
+
+}
+
+/**
+ * XATransaction
+ * <p>
+ * Description:
+ * </p>
+ *
+ * @author cmacnaug
+ * @version 1.0
+ */
+class XATransaction extends Transaction {
+// TODO:
+//    private final Buffer xid;
+//
+//    XATransaction(TransactionManager manager, long tid, Buffer xid, IQueue<Long, TxOp> opQueue) {
+//        super(manager, tid, opQueue);
+//        this.xid = xid;
+//    }
+//
+//    public Buffer getXid() {
+//        return xid;
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
+//     */
+//    @Override
+//    public void commit(boolean onePhase, TransactionListener listener) throws XAException, IOException {
+//        // TODO Auto-generated method stub
+//
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.apollo.broker.Transaction#prepare()
+//     */
+//    @Override
+//    public int prepare(TransactionListener listener) throws XAException, IOException {
+//        // TODO Auto-generated method stub
+//        return 0;
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.apollo.broker.Transaction#rollback()
+//     */
+//    @Override
+//    public void rollback(TransactionListener listener) throws XAException, IOException {
+//        // TODO Auto-generated method stub
+//
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.apollo.broker.Transaction#getType()
+//     */
+//    @Override
+//    public byte getType() {
+//        return TYPE_XA;
+//    }
+}
+