You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:40:20 UTC

svn commit: r961068 [3/4] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache...

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961068&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 03:40:18 2010
@@ -0,0 +1,2000 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import _root_.java.util.{LinkedHashMap, ArrayList, HashMap}
+import _root_.org.apache.activemq.broker.store.{Store}
+import _root_.org.apache.activemq.Service
+import _root_.java.lang.{String}
+import _root_.org.apache.activemq.util.buffer.{AsciiBuffer}
+import _root_.org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue}
+import _root_.scala.collection.JavaConversions._
+import _root_.scala.reflect.BeanProperty
+
+import path.{PathFilter}
+
+object VirtualHost extends Log
+
+class VirtualHost() extends Service with Logging {
+
+  override protected def log = VirtualHost
+
+  private val queueStore = new BrokerQueueStore()
+  private val queues = new HashMap[AsciiBuffer, Queue]()
+  private val durableSubs = new HashMap[String, DurableSubscription]()
+  private val q:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
+  val router = new Router(q)
+
+  private var started = false;
+
+  @BeanProperty
+  var broker: Broker = null
+  @BeanProperty
+  var names:List[String] = Nil;
+  def setNamesArray( names:ArrayList[String]) = {
+    this.names = names.toList
+  }
+
+  @BeanProperty
+  var database:BrokerDatabase = null
+  @BeanProperty
+  var txnManager:TransactionManager = null
+
+
+  def start():Unit = {
+      if (started) {
+          return;
+      }
+
+      database.start();
+
+//      router.setDatabase(database);
+
+      //Recover queues:
+      queueStore.setDatabase(database);
+      queueStore.setDispatchQueue(q);
+      queueStore.loadQueues();
+
+      // Create Queue instances
+//        TODO:
+//        for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
+//            Queue queue = new Queue(iQueue);
+//            Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
+//            Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName());
+//            queue.setDestination(dest);
+//            domain.bind(dest.getName(), queue);
+//            queues.put(dest.getName(), queue);
+//        }
+//        for (Queue queue : queues.values()) {
+//            queue.start();
+//        }
+
+      //Recover transactions:
+      txnManager.loadTransactions();
+      started = true;
+  }
+
+
+
+  def stop():Unit = {
+
+      if (!started) {
+          return;
+      }
+
+//    TODO:
+//      val tmp = new ArrayList[Queue](queues.values())
+//      for (queue <-  tmp) {
+//        queue.shutdown
+//      }
+
+// TODO:
+//        ArrayList<IQueue<Long, MessageDelivery>> durableQueues = new ArrayList<IQueue<Long,MessageDelivery>>(queueStore.getDurableQueues());
+//        done = new RunnableCountDownLatch(durableQueues.size());
+//        for (IQueue<Long, MessageDelivery> queue : durableQueues) {
+//            queue.shutdown(done);
+//        }
+//        done.await();
+
+      database.stop();
+      started = false;
+  }
+
+  def createQueue(dest:Destination) :Queue = {
+      if (!started) {
+          //Queues from the store must be loaded before we can create new ones:
+          throw new IllegalStateException("Can't create queue on unstarted host");
+      }
+
+      val queue = queues.get(dest);
+//        TODO:
+//        // If the queue doesn't exist create it:
+//        if (queue == null) {
+//            IQueue<Long, MessageDelivery> iQueue = queueStore.createSharedQueue(dest.getName().toString());
+//            queue = new Queue(iQueue);
+//            queue.setDestination(dest);
+//            Domain domain = router.getDomain(dest.getDomain());
+//            domain.bind(dest.getName(), queue);
+//            queues.put(dest.getName(), queue);
+//
+//            for (QueueLifecyleListener l : queueLifecyleListeners) {
+//                l.onCreate(queue);
+//            }
+//        }
+//        queue.start();
+      queue;
+  }
+
+
+  def createSubscription(consumer:ConsumerContext):BrokerSubscription = {
+      createSubscription(consumer, consumer.getDestination());
+  }
+
+  def createSubscription(consumer:ConsumerContext, destination:Destination):BrokerSubscription = {
+
+      // First handle composite destinations..
+      var destinations = destination.getDestinations();
+      if (destinations != null) {
+          var subs :List[BrokerSubscription] = Nil
+          for (childDest <- destinations) {
+              subs ::= createSubscription(consumer, childDest);
+          }
+          return new CompositeSubscription(destination, subs);
+      }
+
+      // If it's a Topic...
+//      if ( destination.getDomain == TOPIC_DOMAIN || destination.getDomain == TEMP_TOPIC_DOMAIN ) {
+//
+//          // It might be a durable subscription on the topic
+//          if (consumer.isDurable()) {
+//              var dsub = durableSubs.get(consumer.getSubscriptionName());
+//              if (dsub == null) {
+////                    TODO:
+////                    IQueue<Long, MessageDelivery> queue = queueStore.createDurableQueue(consumer.getSubscriptionName());
+////                    queue.start();
+////                    dsub = new DurableSubscription(this, destination, consumer.getSelectorExpression(), queue);
+////                    durableSubs.put(consumer.getSubscriptionName(), dsub);
+//              }
+//              return dsub;
+//          }
+//
+//          // return a standard subscription
+////            TODO:
+////            return new TopicSubscription(this, destination, consumer.getSelectorExpression());
+//          return null;
+//      }
+
+      // It looks like a wild card subscription on a queue..
+      if (PathFilter.containsWildCards(destination.getName())) {
+          return new WildcardQueueSubscription(this, destination, consumer);
+      }
+
+      // It has to be a Queue subscription then..
+      var queue = queues.get(destination.getName());
+      if (queue == null) {
+          if (consumer.autoCreateDestination()) {
+              queue = createQueue(destination);
+          } else {
+              throw new IllegalStateException("The queue does not exist: " + destination.getName());
+          }
+      }
+//        TODO:
+//        return new Queue.QueueSubscription(queue);
+      return null;
+  }
+
+
+  val queueLifecyleListeners = new ArrayList[QueueLifecyleListener]();
+
+  def addDestinationLifecyleListener(listener:QueueLifecyleListener):Unit= {
+      queueLifecyleListeners.add(listener);
+  }
+
+  def removeDestinationLifecyleListener(listener:QueueLifecyleListener):Unit= {
+      queueLifecyleListeners.add(listener);
+  }
+}
+
+class BrokerDatabase() extends Service {
+
+  @BeanProperty
+  var store:Store=null;
+
+  @BeanProperty
+  var virtualHost:VirtualHost=null;
+
+    def start() ={
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    def stop() = {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+// TODO: re-implement.
+//    private static final boolean DEBUG = false;
+//
+//    private final Flow databaseFlow = new Flow("database", false);
+//
+//    private final SizeLimiter<OperationBase<?>> storeLimiter;
+//    private final FlowController<OperationBase<?>> storeController;
+//    private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
+//
+//    private DispatchQueue dispatcher;
+//    private Thread flushThread;
+//    private AtomicBoolean running = new AtomicBoolean(false);
+//    private DatabaseListener listener;
+//
+//    private final LinkedNodeList<OperationBase<?>> opQueue;
+//    private AtomicBoolean notify = new AtomicBoolean(false);
+//    private Semaphore opsReady = new Semaphore(0);
+//    private long opSequenceNumber;
+//    private long flushPointer = -1; // The last seq num for which flush was
+//    // requested
+//    private long requestedDelayedFlushPointer = -1; // Set to the last sequence
+//    // num scheduled for delay
+//    private long delayedFlushPointer = 0; // The last delayable sequence num
+//    // requested.
+//    private long flushDelay = 10;
+//
+//    private final Runnable flushDelayCallback;
+//    private boolean storeBypass = true;
+//
+//    public interface DatabaseListener {
+//        /**
+//         * Called if there is a catastrophic problem with the database.
+//         *
+//         * @param ioe
+//         *            The causing exception.
+//         */
+//        public void onDatabaseException(IOException ioe);
+//    }
+//
+//    public static interface MessageRecordMarshaller<V> {
+//        MessageRecord marshal(V element);
+//
+//        /**
+//         * Called when a queue element is recovered from the store for a
+//         * particular queue.
+//         *
+//         * @param mRecord
+//         *            The message record
+//         * @param queue
+//         *            The queue that the element is being restored to (or null
+//         *            if not being restored for a queue)
+//         * @return
+//         */
+//        V unMarshall(MessageRecord mRecord, QueueDescriptor queue);
+//    }
+//
+//    public BrokerDatabase(Store store) {
+//        this.store = store;
+//        this.opQueue = new LinkedNodeList<OperationBase<?>>();
+//        storeLimiter = new SizeLimiter<OperationBase<?>>(FLUSH_QUEUE_SIZE, 0) {
+//
+//            @Override
+//            public int getElementSize(OperationBase<?> op) {
+//                return op.getLimiterSize();
+//            }
+//        };
+//
+//        storeController = new FlowController<OperationBase<?>>(new FlowControllable<OperationBase<?>>() {
+//
+//            public void flowElemAccepted(ISourceController<OperationBase<?>> controller, OperationBase<?> op) {
+//                addToOpQueue(op);
+//            }
+//
+//            public IFlowResource getFlowResource() {
+//                return BrokerDatabase.this;
+//            }
+//
+//        }, databaseFlow, storeLimiter, opQueue);
+//        storeController.useOverFlowQueue(false);
+//        super.onFlowOpened(storeController);
+//
+//        flushDelayCallback = new Runnable() {
+//            public void run() {
+//                flushDelayCallback();
+//            }
+//        };
+//    }
+//
+//    public synchronized void start() throws Exception {
+//        if (flushThread == null) {
+//
+//            running.set(true);
+//            store.start();
+//            flushThread = new Thread(new Runnable() {
+//
+//                public void run() {
+//                    processOps();
+//                }
+//
+//            }, "StoreThread");
+//            flushThread.start();
+//        }
+//    }
+//
+//    public synchronized void stop() throws Exception {
+//        if (flushThread != null) {
+//
+//            synchronized (opQueue) {
+//                updateFlushPointer(opSequenceNumber + 1);
+//            }
+//
+//            running.set(false);
+//            boolean interrupted = false;
+//            while (true) {
+//                opsReady.release();
+//                try {
+//                    flushThread.join();
+//                    break;
+//                } catch (InterruptedException e) {
+//                    interrupted = true;
+//                }
+//            }
+//
+//            store.flush();
+//            store.stop();
+//
+//            if (interrupted) {
+//                Thread.currentThread().interrupt();
+//            }
+//            flushThread = null;
+//        }
+//    }
+//
+//    /**
+//     * A blocking operation that lists all queues of a given type:
+//     *
+//     * @param type
+//     *            The queue type
+//     * @return A list of queues.
+//     *
+//     * @throws Exception
+//     *             If there was an error listing the queues.
+//     */
+//    public Iterator<QueueQueryResult> listQueues(final short type) throws Exception {
+//        return store.execute(new Callback<Iterator<QueueQueryResult>, Exception>() {
+//
+//            public Iterator<QueueQueryResult> execute(Session session) throws Exception {
+//                return session.queueListByType(type, null, Integer.MAX_VALUE);
+//            }
+//
+//        }, null);
+//    }
+//
+//    /**
+//     * A blocking operation that lists all entries in the specified map
+//     *
+//     * @param map
+//     *            The map to list
+//     * @return A list of map entries
+//     *
+//     * @throws Exception
+//     *             If there was an error listing the queues.
+//     */
+//    public Map<AsciiBuffer, Buffer> listMapEntries(final AsciiBuffer map) throws Exception {
+//        return store.execute(new Callback<Map<AsciiBuffer, Buffer>, Exception>() {
+//
+//            public Map<AsciiBuffer, Buffer> execute(Session session) throws Exception {
+//                HashMap<AsciiBuffer, Buffer> ret = new HashMap<AsciiBuffer, Buffer>();
+//                try {
+//                    Iterator<AsciiBuffer> keys = session.mapEntryListKeys(map, null, -1);
+//                    while (keys.hasNext()) {
+//                        AsciiBuffer key = keys.next();
+//                        ret.put(key, session.mapEntryGet(map, key));
+//                    }
+//                } catch (Store.KeyNotFoundException knfe) {
+//                    //No keys then:
+//                }
+//
+//                return ret;
+//            }
+//
+//        }, null);
+//    }
+//
+//    /**
+//     * @param map
+//     *            The name of the map to update.
+//     * @param key
+//     *            The key in the map to update.
+//     * @param value
+//     *            The value to insert.
+//     */
+//    public OperationContext<?> updateMapEntry(AsciiBuffer map, AsciiBuffer key, Buffer value) {
+//        return add(new MapUpdateOperation(map, key, value), null, false);
+//    }
+//
+//    /**
+//     * Executes user supplied {@link Operation}. If the {@link Operation} does
+//     * not throw any Exceptions, all updates to the store are committed,
+//     * otherwise they are rolled back. Any exceptions thrown by the
+//     * {@link Operation} are propagated by this method.
+//     *
+//     * If limiter space on the store processing queue is exceeded, the
+//     * controller will be blocked.
+//     *
+//     * If this method is called with flush set to
+//     * <code>false</false> there is no
+//     * guarantee made about when the operation will be executed. If <code>flush</code>
+//     * is <code>true</code> and {@link Operation#isDelayable()} is also
+//     * <code>true</code> then an attempt will be made to execute the event at
+//     * the {@link Store}'s configured delay interval.
+//     *
+//     * @param op
+//     *            The operation to execute
+//     * @param flush
+//     *            Whether or not this operation needs immediate processing.
+//     * @param controller
+//     *            the source of the operation.
+//     * @return the {@link OperationContext} associated with the operation
+//     */
+//    private <T> OperationContext<T> add(OperationBase<T> op, ISourceController<?> controller, boolean flush) {
+//
+//        op.flushRequested = flush;
+//        storeController.add(op, controller);
+//        return op;
+//    }
+//
+//    private final void addToOpQueue(OperationBase<?> op) {
+//        if (!running.get()) {
+//            throw new IllegalStateException("BrokerDatabase not started");
+//        }
+//
+//        synchronized (opQueue) {
+//            op.opSequenceNumber = opSequenceNumber++;
+//            opQueue.addLast(op);
+//            if (op.flushRequested || storeLimiter.getThrottled()) {
+//                if (op.isDelayable() && flushDelay > 0) {
+//                    scheduleDelayedFlush(op.opSequenceNumber);
+//                } else {
+//                    updateFlushPointer(op.opSequenceNumber);
+//                }
+//            }
+//        }
+//    }
+//
+//    private void updateFlushPointer(long seqNumber) {
+//        if (seqNumber > flushPointer) {
+//            flushPointer = seqNumber;
+//            OperationBase<?> op = opQueue.getHead();
+//            if (op != null && op.opSequenceNumber <= flushPointer && notify.get()) {
+//                opsReady.release();
+//            }
+//        }
+//    }
+//
+//    private void scheduleDelayedFlush(long seqNumber) {
+//        if (seqNumber < flushPointer) {
+//            return;
+//        }
+//
+//        if (seqNumber > delayedFlushPointer) {
+//            delayedFlushPointer = seqNumber;
+//        }
+//
+//        if (requestedDelayedFlushPointer == -1) {
+//            requestedDelayedFlushPointer = delayedFlushPointer;
+//            Dispatch.getGlobalQueue().dispatchAfter(flushDelay, TimeUnit.MILLISECONDS, flushDelayCallback);
+//        }
+//
+//    }
+//
+//    private final void flushDelayCallback() {
+//        synchronized (opQueue) {
+//            if (flushPointer < requestedDelayedFlushPointer) {
+//                updateFlushPointer(requestedDelayedFlushPointer);
+//
+//            }
+//
+//            // If another delayed flush has been scheduled schedule it:
+//            requestedDelayedFlushPointer = -1;
+//            // Schedule next delay if needed:
+//            if (delayedFlushPointer > flushPointer) {
+//                scheduleDelayedFlush(delayedFlushPointer);
+//            } else {
+//                delayedFlushPointer = -1;
+//            }
+//
+//        }
+//    }
+//
+//    private final OperationBase<?> getNextOp(boolean wait) {
+//        if (!wait) {
+//            synchronized (opQueue) {
+//                OperationBase<?> op = opQueue.getHead();
+//                if (op != null && (op.opSequenceNumber <= flushPointer || !op.isDelayable())) {
+//                    op.unlink();
+//                    return op;
+//                }
+//            }
+//            return null;
+//        } else {
+//            OperationBase<?> op = getNextOp(false);
+//            if (op == null) {
+//                notify.set(true);
+//                op = getNextOp(false);
+//                try {
+//                    while (running.get() && op == null) {
+//                        opsReady.acquireUninterruptibly();
+//                        op = getNextOp(false);
+//                    }
+//                } finally {
+//                    notify.set(false);
+//                    opsReady.drainPermits();
+//                }
+//            }
+//            return op;
+//        }
+//    }
+//
+//    private final void processOps() {
+//        int count = 0;
+//        Session session = store.getSession();
+//        while (running.get()) {
+//            final OperationBase<?> firstOp = getNextOp(true);
+//            if (firstOp == null) {
+//                continue;
+//            }
+//            count = 0;
+//
+//            // The first operation we get, triggers a store transaction.
+//            if (firstOp != null) {
+//                final LinkedList<Operation<?>> processedQueue = new LinkedList<Operation<?>>();
+//                boolean locked = false;
+//                try {
+//
+//                    Operation<?> op = firstOp;
+//                    while (op != null) {
+//                        final Operation<?> toExec = op;
+//                        if (toExec.beginExecute()) {
+//                            if (!locked) {
+//                                session.acquireLock();
+//                                locked = true;
+//                            }
+//                            count++;
+//                            op.execute(session);
+//                            processedQueue.add(op);
+//                            /*
+//                             * store.execute(new Store.VoidCallback<Exception>()
+//                             * {
+//                             *
+//                             * @Override public void run(Session session) throws
+//                             * Exception {
+//                             *
+//                             * // Try to execute the operation against the //
+//                             * session... try { toExec.execute(session);
+//                             * processedQueue.add(toExec); } catch
+//                             * (CancellationException ignore) { //
+//                             * System.out.println("Cancelled" + // toExec); } }
+//                             * }, null);
+//                             */
+//                        }
+//
+//                        if (count < 1000) {
+//                            op = getNextOp(false);
+//                        } else {
+//                            op = null;
+//                        }
+//                    }
+//                    // executeOps(firstOp, processedQueue, counter);
+//
+//                    // If we procecessed some ops, flush and post process:
+//                    if (!processedQueue.isEmpty()) {
+//
+//                        if (locked) {
+//                            session.commit();
+//                            session.releaseLock();
+//                            locked = false;
+//                        }
+//                        if (DEBUG)
+//                            System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
+//                        // Sync the store:
+//                        store.flush();
+//
+//                        // Post process operations
+//                        long release = 0;
+//                        for (Operation<?> processed : processedQueue) {
+//                            processed.onCommit();
+//                            // System.out.println("Processed" + processed);
+//                            release += processed.getLimiterSize();
+//                        }
+//
+//                        synchronized (opQueue) {
+//                            this.storeLimiter.remove(1, release);
+//                        }
+//                    }
+//
+//                } catch (IOException e) {
+//                    for (Operation<?> processed : processedQueue) {
+//                        processed.onRollback(e);
+//                    }
+//                    onDatabaseException(e);
+//                } catch (RuntimeException e) {
+//                    for (Operation<?> processed : processedQueue) {
+//                        processed.onRollback(e);
+//                    }
+//                    IOException ioe = new IOException(e.getMessage());
+//                    ioe.initCause(e);
+//                    onDatabaseException(ioe);
+//                } catch (Exception e) {
+//                    for (Operation<?> processed : processedQueue) {
+//                        processed.onRollback(e);
+//                    }
+//                    IOException ioe = new IOException(e.getMessage());
+//                    ioe.initCause(e);
+//                    onDatabaseException(ioe);
+//                } finally {
+//                    if (locked) {
+//                        try {
+//                            session.releaseLock();
+//                        } catch (Exception e) {
+//                            IOException ioe = new IOException(e.getMessage());
+//                            ioe.initCause(e);
+//                            onDatabaseException(ioe);
+//                        }
+//                    }
+//                }
+//            }
+//        }
+//    }
+//
+//    /*
+//     * private final void executeOps(final OperationBase op, final
+//     * LinkedList<Operation> processedQueue, final OpCounter counter) throws
+//     * FatalStoreException, Exception { store.execute(new
+//     * Store.VoidCallback<Exception>() {
+//     *
+//     * @Override public void run(Session session) throws Exception {
+//     *
+//     * // Try to execute the operation against the // session... try { if
+//     * (op.execute(session)) { processedQueue.add(op); } else { counter.count--;
+//     * } } catch (CancellationException ignore) { System.out.println("Cancelled"
+//     * + op); }
+//     *
+//     * // See if we can batch up some additional operations // in this
+//     * transaction. if (counter.count < 100) { OperationBase next =
+//     * getNextOp(false); if (next != null) { counter.count++; executeOps(next,
+//     * processedQueue, counter); } } } }, null); }
+//     */
+//
+//    /**
+//     * Adds a queue to the database
+//     *
+//     * @param queue
+//     *            The queue to add.
+//     */
+//    public void addQueue(QueueDescriptor queue) {
+//        add(new QueueAddOperation(queue), null, false);
+//    }
+//
+//    /**
+//     * Deletes a queue and all of its messages from the database
+//     *
+//     * @param queue
+//     *            The queue to delete.
+//     */
+//    public void deleteQueue(QueueDescriptor queue) {
+//        add(new QueueDeleteOperation(queue), null, false);
+//    }
+//
+//    /**
+//     * Saves a message for all of the recipients in the
+//     * {@link BrokerMessageDelivery}.
+//     *
+//     * @param delivery
+//     *            The delivery.
+//     * @param source
+//     *            The source's controller.
+//     * @throws IOException
+//     *             If there is an error marshalling the message.
+//     * @return The {@link OperationContext} associated with the operation
+//     */
+//    public OperationContext<?> persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) {
+//        return add(new AddMessageOperation(delivery), source, true);
+//    }
+//
+//    /**
+//     * Saves a Message for a single queue.
+//     *
+//     * @param queueElement
+//     *            The element to save.
+//     * @param source
+//     *            The source initiating the save or null, if there isn't one.
+//     * @throws IOException
+//     *             If there is an error marshalling the message.
+//     *
+//     * @return The {@link OperationContext} associated with the operation
+//     */
+//    public OperationContext<?> saveMessage(SaveableQueueElement<MessageDelivery> queueElement, ISourceController<?> source, boolean delayable) {
+//        return add(new AddMessageOperation(queueElement), source, !delayable);
+//    }
+//
+//    /**
+//     * Deletes the given message from the store for the given queue.
+//     *
+//     * @param queueElement
+//     * @return The {@link OperationContext} associated with the operation
+//     */
+//    public OperationContext<?> deleteQueueElement(SaveableQueueElement<?> queueElement) {
+//        return add(new DeleteOperation(queueElement.getSequenceNumber(), queueElement.getQueueDescriptor()), null, false);
+//    }
+//
+//    /**
+//     * Loads a batch of messages for the specified queue. The loaded messages
+//     * are given the provided {@link RestoreListener}.
+//     * <p>
+//     * <b><i>NOTE:</i></b> This method uses the queue sequence number for the
+//     * message not the store tracking number.
+//     *
+//     * @param queue
+//     *            The queue for which to load messages
+//     * @param recordsOnly
+//     *            True if message body shouldn't be restored
+//     * @param first
+//     *            The first queue sequence number to load (-1 starts at
+//     *            begining)
+//     * @param maxSequence
+//     *            The maximum sequence number to load (-1 if no limit)
+//     * @param maxCount
+//     *            The maximum number of messages to load (-1 if no limit)
+//     * @param listener
+//     *            The listener to which messags should be passed.
+//     * @return The {@link OperationContext} associated with the operation
+//     */
+//    public <T> OperationContext<?> restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<T> listener,
+//            MessageRecordMarshaller<T> marshaller) {
+//        return add(new RestoreElementsOperation<T>(queue, recordsOnly, first, maxCount, maxSequence, listener, marshaller), null, true);
+//    }
+//
+//    private void onDatabaseException(IOException ioe) {
+//        if (listener != null) {
+//            listener.onDatabaseException(ioe);
+//        } else {
+//            ioe.printStackTrace();
+//        }
+//    }
+//
+//    public interface OperationContext<V> extends ListenableFuture<V> {
+//
+//        /**
+//         * Attempts to cancel the store operation. Returns true if the operation
+//         * could be canceled or false if the operation was already executed by
+//         * the store.
+//         *
+//         * @return true if the operation could be canceled
+//         */
+//        public boolean cancel();
+//
+//        /**
+//         * Requests flush for this database operation (overriding a previous
+//         * delay)
+//         */
+//        public void requestFlush();
+//    }
+//
+//    /**
+//     * This interface is used to execute transacted code.
+//     *
+//     * It is used by the {@link Store#execute(org.apache.activemq.broker.store.Store.Callback, Runnable)} method, often as
+//     * anonymous class.
+//     */
+//    public interface Operation<V> extends OperationContext<V> {
+//
+//        /**
+//         * Called when the saver is about to execute the operation. If true is
+//         * returned the operation can no longer be canceled.
+//         *
+//         * @return false if the operation has been canceled.
+//         */
+//        public boolean beginExecute();
+//
+//        /**
+//         * Gets called by the {@link Store}
+//         * within a transactional context. If any exception is thrown including
+//         * Runtime exception, the transaction is rolled back.
+//         *
+//         * @param session
+//         *            provides you access to read and update the persistent
+//         *            data.
+//         * @throws Exception
+//         *             if an system error occured while executing the
+//         *             operations.
+//         * @throws RuntimeException
+//         *             if an system error occured while executing the
+//         *             operations.
+//         */
+//        public void execute(Session session) throws CancellationException, Exception, RuntimeException;
+//
+//        /**
+//         * Returns true if this operation can be delayed. This is useful in
+//         * cases where external events can negate the need to execute the
+//         * operation. The delay interval is not guaranteed to be honored, if
+//         * subsequent events or other store flush policy/criteria requires a
+//         * flush of subsequent events.
+//         *
+//         * @return True if the operation can be delayed.
+//         */
+//        public boolean isDelayable();
+//
+//        /**
+//         * Returns the size to be used when calculating how much space this
+//         * operation takes on the store processing queue.
+//         *
+//         * @return The limiter size to be used.
+//         */
+//        public int getLimiterSize();
+//
+//        /**
+//         * Called after {@link #execute(Session)} is called and the the
+//         * operation has been committed.
+//         */
+//        public void onCommit();
+//
+//        /**
+//         * Called after {@link #execute(Session)} is called and the the
+//         * operation has been rolled back.
+//         */
+//        public void onRollback(Throwable error);
+//    }
+//
+//    /**
+//     * This is a convenience base class that can be used to implement
+//     * Operations. It handles operation cancellation for you.
+//     */
+//    abstract class OperationBase<V> extends LinkedNode<OperationBase<?>> implements Operation<V> {
+//        public boolean flushRequested = false;
+//        public long opSequenceNumber = -1;
+//
+//        final protected AtomicBoolean executePending = new AtomicBoolean(true);
+//        final protected AtomicBoolean cancelled = new AtomicBoolean(false);
+//        final protected AtomicBoolean executed = new AtomicBoolean(false);
+//        final protected AtomicReference<FutureListener<? super V>> listener = new AtomicReference<FutureListener<? super V>>();
+//
+//        protected Throwable error;
+//
+//        public static final int BASE_MEM_SIZE = 20;
+//
+//        public boolean cancel(boolean interrupt) {
+//            return cancel();
+//        }
+//
+//        public boolean cancel() {
+//            if (storeBypass) {
+//                if (executePending.compareAndSet(true, false)) {
+//                    cancelled.set(true);
+//                    // System.out.println("Cancelled: " + this);
+//                    synchronized (opQueue) {
+//                        unlink();
+//                        storeController.elementDispatched(this);
+//                    }
+//                    fireListener();
+//                    return true;
+//                }
+//            }
+//            return cancelled.get();
+//        }
+//
+//        public final boolean isCancelled() {
+//            return cancelled.get();
+//        }
+//
+//        public final boolean isExecuted() {
+//            return executed.get();
+//        }
+//
+//        public final boolean isDone() {
+//            return isCancelled() || isExecuted();
+//        }
+//
+//        /**
+//         * Called when the saver is about to execute the operation. If true is
+//         * returned the operation can no longer be cancelled.
+//         *
+//         * @return true if operation should be executed
+//         */
+//        public final boolean beginExecute() {
+//            if (executePending.compareAndSet(true, false)) {
+//                return true;
+//            } else {
+//                return false;
+//            }
+//        }
+//
+//        /**
+//         * Gets called by the
+//         * {@link Store} method
+//         * within a transactional context. If any exception is thrown including
+//         * Runtime exception, the transaction is rolled back.
+//         *
+//         * @param session
+//         *            provides you access to read and update the persistent
+//         *            data.
+//         * @throws Exception
+//         *             if an system error occured while executing the
+//         *             operations.
+//         * @throws RuntimeException
+//         *             if an system error occured while executing the
+//         *             operations.
+//         */
+//        public void execute(Session session) throws Exception, RuntimeException {
+//            if (DEBUG)
+//                System.out.println("Executing " + this);
+//            doExcecute(session);
+//        }
+//
+//        abstract protected void doExcecute(Session session);
+//
+//        public int getLimiterSize() {
+//            return BASE_MEM_SIZE;
+//        }
+//
+//        public boolean isDelayable() {
+//            return false;
+//        }
+//
+//        /**
+//         * Requests flush for this database operation (overriding a previous
+//         * delay)
+//         */
+//        public void requestFlush() {
+//            synchronized (opQueue) {
+//                updateFlushPointer(opSequenceNumber);
+//            }
+//        }
+//
+//        public void onCommit() {
+//            executed.set(true);
+//            fireListener();
+//        }
+//
+//        /**
+//         * Called after {@link #execute(Session)} is called and the the
+//         * operation has been rolled back.
+//         */
+//        public void onRollback(Throwable error) {
+//            executed.set(true);
+//            if (!fireListener()) {
+//                error.printStackTrace();
+//            }
+//        }
+//
+//        private final boolean fireListener() {
+//            FutureListener<? super V> l = this.listener.getAndSet(null);
+//            if (l != null) {
+//                l.onFutureComplete(this);
+//                return true;
+//            }
+//            return false;
+//        }
+//
+//        public void setFutureListener(FutureListener<? super V> listener) {
+//            this.listener.set(listener);
+//            if (isDone()) {
+//                fireListener();
+//            }
+//        }
+//
+//        /**
+//         * Subclasses the return a result should override this
+//         * @return The result.
+//         */
+//        protected final V getResult() {
+//            return null;
+//        }
+//
+//        /**
+//         * Waits if necessary for the computation to complete, and then
+//         * retrieves its result.
+//         *
+//         * @return the computed result
+//         * @throws CancellationException
+//         *             if the computation was cancelled
+//         * @throws ExecutionException
+//         *             if the computation threw an exception
+//         * @throws InterruptedException
+//         *             if the current thread was interrupted while waiting
+//         */
+//        public final V get() throws ExecutionException, InterruptedException  {
+//
+//            try {
+//                return get(-1, TimeUnit.MILLISECONDS);
+//            } catch (TimeoutException e) {
+//                //Can't happen.
+//                throw new AssertionError(e);
+//            }
+//        }
+//
+//        /**
+//         * Waits if necessary for at most the given time for the computation
+//         * to complete, and then retrieves its result, if available.
+//         *
+//         * @param timeout the maximum time to wait
+//         * @param tu the time unit of the timeout argument
+//         * @return the computed result
+//         * @throws CancellationException if the computation was cancelled
+//         * @throws ExecutionException if the computation threw an
+//         * exception
+//         * @throws InterruptedException if the current thread was interrupted
+//         * while waiting
+//         * @throws TimeoutException if the wait timed out
+//         */
+//        public final V get(long timeout, TimeUnit tu) throws ExecutionException, InterruptedException, TimeoutException {
+//            if (isCancelled()) {
+//                throw new CancellationException();
+//            }
+//            if (error != null) {
+//                throw new ExecutionException(error);
+//            }
+//
+//            //TODO implement blocking?
+//            if(!isDone())
+//            {
+//                throw new UnsupportedOperationException("Blocking result retrieval not yet implemented");
+//            }
+//
+//            return getResult();
+//        }
+//
+//        public String toString() {
+//            return "DBOp seq: " + opSequenceNumber + "P/C/E: " + executePending.get() + "/" + isCancelled() + "/" + isExecuted();
+//        }
+//    }
+//
+//    private class QueueAddOperation extends OperationBase<Object> {
+//
+//        private QueueDescriptor qd;
+//
+//        QueueAddOperation(QueueDescriptor queue) {
+//            qd = queue;
+//        }
+//
+//        @Override
+//        protected void doExcecute(Session session) {
+//            try {
+//                session.queueAdd(qd);
+//            } catch (KeyNotFoundException e) {
+//                throw new FatalStoreException(e);
+//            }
+//        }
+//
+//        public String toString() {
+//            return "QueueAdd: " + qd.getQueueName().toString();
+//        }
+//    }
+//
+//    private class QueueDeleteOperation extends OperationBase<Object> {
+//
+//        private QueueDescriptor qd;
+//
+//        QueueDeleteOperation(QueueDescriptor queue) {
+//            qd = queue;
+//        }
+//
+//        @Override
+//        protected void doExcecute(Session session) {
+//            session.queueRemove(qd);
+//        }
+//
+//        public String toString() {
+//            return "QueueDelete: " + qd.getQueueName().toString();
+//        }
+//    }
+//
+//    private class DeleteOperation extends OperationBase<Object> {
+//        private final long queueKey;
+//        private QueueDescriptor queue;
+//
+//        public DeleteOperation(long queueKey, QueueDescriptor queue) {
+//            this.queueKey = queueKey;
+//            this.queue = queue;
+//        }
+//
+//        @Override
+//        public int getLimiterSize() {
+//            return BASE_MEM_SIZE + 8;
+//        }
+//
+//        @Override
+//        protected void doExcecute(Session session) {
+//            try {
+//                session.queueRemoveMessage(queue, queueKey);
+//            } catch (KeyNotFoundException e) {
+//                // TODO Probably doesn't always mean an error, it is possible
+//                // that
+//                // the queue has been deleted, in which case its messages will
+//                // have been deleted, too.
+//                e.printStackTrace();
+//            }
+//        }
+//
+//        public String toString() {
+//            return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + queueKey + " " + super.toString();
+//        }
+//    }
+//
+//    private class RestoreElementsOperation<V> extends OperationBase<V> {
+//        private QueueDescriptor queue;
+//        private long firstKey;
+//        private int maxRecords;
+//        private long maxSequence;
+//        private boolean recordsOnly;
+//        private RestoreListener<V> listener;
+//        private Collection<RestoredElement<V>> msgs = null;
+//        private MessageRecordMarshaller<V> marshaller;
+//
+//        RestoreElementsOperation(QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<V> listener, MessageRecordMarshaller<V> marshaller) {
+//            this.queue = queue;
+//            this.recordsOnly = recordsOnly;
+//            this.firstKey = firstKey;
+//            this.maxRecords = maxRecords;
+//            this.maxSequence = maxSequence;
+//            this.listener = listener;
+//            this.marshaller = marshaller;
+//        }
+//
+//        @Override
+//        public int getLimiterSize() {
+//            return BASE_MEM_SIZE + 44;
+//        }
+//
+//        @Override
+//        protected void doExcecute(Session session) {
+//
+//            Iterator<QueueRecord> records = null;
+//            try {
+//                records = session.queueListMessagesQueue(queue, firstKey, maxSequence, maxRecords);
+//                msgs = new LinkedList<RestoredElement<V>>();
+//            } catch (KeyNotFoundException e) {
+//                msgs = new ArrayList<RestoredElement<V>>(0);
+//                return;
+//            }
+//
+//            QueueRecord qRecord = null;
+//            int count = 0;
+//            if (records.hasNext()) {
+//                qRecord = records.next();
+//            }
+//
+//            while (qRecord != null) {
+//                RestoredElementImpl<V> rm = new RestoredElementImpl<V>();
+//                // TODO should update jms redelivery here.
+//                rm.qRecord = qRecord;
+//                rm.queue = queue;
+//                count++;
+//
+//                // Set the next sequence number:
+//                if (records.hasNext()) {
+//                    qRecord = records.next();
+//                    rm.nextSequence = qRecord.getQueueKey();
+//                } else {
+//                    // Look up the next sequence number:
+//                    try {
+//                        records = session.queueListMessagesQueue(queue, qRecord.getQueueKey() + 1, -1L, 1);
+//                        if (!records.hasNext()) {
+//                            rm.nextSequence = -1;
+//                        } else {
+//                            rm.nextSequence = records.next().getQueueKey();
+//                        }
+//                    } catch (KeyNotFoundException e) {
+//                        rm.nextSequence = -1;
+//                    }
+//                    qRecord = null;
+//                }
+//
+//                if (!recordsOnly) {
+//                    try {
+//                        rm.mRecord = session.messageGetRecord(rm.qRecord.getMessageKey());
+//                        rm.marshaller = marshaller;
+//                        msgs.add(rm);
+//                    } catch (KeyNotFoundException shouldNotHappen) {
+//                        shouldNotHappen.printStackTrace();
+//                    }
+//                } else {
+//                    msgs.add(rm);
+//                }
+//            }
+//
+//            if (DEBUG)
+//                System.out.println("Restored: " + count + " messages");
+//        }
+//
+//        @Override
+//        public void onCommit() {
+//            listener.elementsRestored(msgs);
+//            super.onCommit();
+//        }
+//
+//        public String toString() {
+//            return "MessageRestore: " + queue.getQueueName().toString() + " first: " + firstKey + " max: " + maxRecords;
+//        }
+//    }
+//
+//    private class AddMessageOperation extends OperationBase<Object> {
+//
+//        private final BrokerMessageDelivery brokerDelivery;
+//        private final SaveableQueueElement<MessageDelivery> singleElement;
+//        private final MessageDelivery delivery;
+//        private MessageRecord record;
+//        private LinkedList<SaveableQueueElement<MessageDelivery>> notifyTargets;
+//        private final boolean delayable;
+//
+//        public AddMessageOperation(BrokerMessageDelivery delivery) {
+//            this.brokerDelivery = delivery;
+//            this.singleElement = null;
+//            this.delivery = delivery;
+//            this.delayable = delivery.isFlushDelayable();
+//            if (!delayable) {
+//                this.record = delivery.createMessageRecord();
+//            }
+//        }
+//
+//        public AddMessageOperation(SaveableQueueElement<MessageDelivery> queueElement) {
+//            this.brokerDelivery = null;
+//            singleElement = queueElement;
+//            delivery = queueElement.getElement();
+//            this.record = singleElement.getElement().createMessageRecord();
+//            delayable = false;
+//        }
+//
+//        public boolean isDelayable() {
+//            return delayable;
+//        }
+//
+//        @Override
+//        public int getLimiterSize() {
+//            return delivery.getFlowLimiterSize() + BASE_MEM_SIZE + 40;
+//        }
+//
+//        @Override
+//        protected void doExcecute(Session session) {
+//
+//            if (singleElement == null) {
+//                brokerDelivery.beginStore();
+//                Collection<SaveableQueueElement<MessageDelivery>> targets = brokerDelivery.getPersistentQueues();
+//
+//                if (targets != null && !targets.isEmpty()) {
+//                    if (record == null) {
+//                        record = brokerDelivery.createMessageRecord();
+//                        if (record == null) {
+//                            throw new RuntimeException("Error creating message record for " + brokerDelivery.getMsgId());
+//                        }
+//                    }
+//                    record.setKey(brokerDelivery.getStoreTracking());
+//                    session.messageAdd(record);
+//
+//                    for (SaveableQueueElement<MessageDelivery> target : targets) {
+//                        try {
+//                            QueueRecord queueRecord = new QueueRecord();
+//                            queueRecord.setAttachment(null);
+//                            queueRecord.setMessageKey(record.getKey());
+//                            queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
+//                            queueRecord.setQueueKey(target.getSequenceNumber());
+//                            session.queueAddMessage(target.getQueueDescriptor(), queueRecord);
+//
+//                        } catch (KeyNotFoundException e) {
+//                            e.printStackTrace();
+//                        }
+//
+//                        if (target.requestSaveNotify()) {
+//                            if (notifyTargets == null) {
+//                                notifyTargets = new LinkedList<SaveableQueueElement<MessageDelivery>>();
+//                            }
+//                            notifyTargets.add(target);
+//                        }
+//                    }
+//                } else {
+//                    // Save with no targets must have been cancelled:
+//                    // System.out.println("Skipping save for " +
+//                    // delivery.getStoreTracking());
+//                }
+//            } else {
+//
+//                session.messageAdd(record);
+//                try {
+//                    QueueRecord queueRecord = new QueueRecord();
+//                    queueRecord.setAttachment(null);
+//                    queueRecord.setMessageKey(record.getKey());
+//                    queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
+//                    queueRecord.setQueueKey(singleElement.getSequenceNumber());
+//                    session.queueAddMessage(singleElement.getQueueDescriptor(), queueRecord);
+//                } catch (KeyNotFoundException e) {
+//                    e.printStackTrace();
+//                }
+//            }
+//        }
+//
+//        @Override
+//        public void onCommit() {
+//
+//            // Notify that the message was persisted.
+//            delivery.onMessagePersisted();
+//
+//            // Notify any of the targets that requested notify on save:
+//            if (singleElement != null && singleElement.requestSaveNotify()) {
+//                singleElement.notifySave();
+//            } else if (notifyTargets != null) {
+//                for (SaveableQueueElement<MessageDelivery> notify : notifyTargets) {
+//                    notify.notifySave();
+//                }
+//            }
+//
+//            super.onCommit();
+//        }
+//
+//        public String toString() {
+//            return "AddOperation " + delivery.getStoreTracking() + super.toString();
+//        }
+//    }
+//
+//    private class MapUpdateOperation extends OperationBase<Object> {
+//        final AsciiBuffer map;
+//        final AsciiBuffer key;
+//        final Buffer value;
+//
+//        MapUpdateOperation(AsciiBuffer mapName, AsciiBuffer key, Buffer value) {
+//            this.map = mapName;
+//            this.key = key;
+//            this.value = value;
+//        }
+//
+//        @Override
+//        public int getLimiterSize() {
+//            return BASE_MEM_SIZE + map.length + key.length + value.length;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.BrokerDatabase.OperationBase#doExcecute
+//         * (org.apache.activemq.broker.store.Store.Session)
+//         */
+//        @Override
+//        protected void doExcecute(Session session) {
+//            try {
+//                session.mapEntryPut(map, key, value);
+//            } catch (KeyNotFoundException e) {
+//                throw new Store.FatalStoreException(e);
+//            }
+//        }
+//    }
+//
+//    private class RestoredElementImpl<T> implements RestoredElement<T> {
+//        QueueRecord qRecord;
+//        QueueDescriptor queue;
+//        MessageRecord mRecord;
+//        MessageRecordMarshaller<T> marshaller;
+//        long nextSequence;
+//
+//        public T getElement() throws IOException {
+//            if (mRecord == null) {
+//                return null;
+//            }
+//            return marshaller.unMarshall(mRecord, queue);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore.RestoredElement#getSequenceNumber
+//         * ()
+//         */
+//        public long getSequenceNumber() {
+//            return qRecord.getQueueKey();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore.RestoredElement#getStoreTracking
+//         * ()
+//         */
+//        public long getStoreTracking() {
+//            return qRecord.getMessageKey();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @seeorg.apache.activemq.queue.QueueStore.RestoredElement#
+//         * getNextSequenceNumber()
+//         */
+//        public long getNextSequenceNumber() {
+//            return nextSequence;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore.RestoredElement#getElementSize()
+//         */
+//        public int getElementSize() {
+//            return qRecord.getSize();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.queue.QueueStore.RestoredElement#getExpiration()
+//         */
+//        public long getExpiration() {
+//            return qRecord.getTte();
+//        }
+//    }
+//
+//    public long allocateStoreTracking() {
+//        return store.allocateStoreTracking();
+//    }
+//
+//    public void setDispatchQueue(DispatchQueue queue) {
+//        this.dispatcher = queue;
+//    }
+//
+//
+//    /**
+//     * @param sqe
+//     * @param source
+//     * @param delayable
+//     */
+//    public <T> OperationContext<?> saveQeueuElement(SaveableQueueElement<T> sqe, ISourceController<?> source, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+//        return add(new AddElementOperation<T>(sqe, delayable, marshaller), source, !delayable);
+//    }
+//
+//    private class AddElementOperation<T> extends OperationBase<Object> {
+//
+//        private final SaveableQueueElement<T> op;
+//        private MessageRecord record;
+//        private boolean delayable;
+//        private final MessageRecordMarshaller<T> marshaller;
+//
+//        public AddElementOperation(SaveableQueueElement<T> op, boolean delayable, MessageRecordMarshaller<T> marshaller) {
+//            this.op = op;
+//            this.delayable = delayable;
+//            if (!delayable) {
+//                record = marshaller.marshal(op.getElement());
+//                this.marshaller = null;
+//            } else {
+//                this.marshaller = marshaller;
+//            }
+//        }
+//
+//        public boolean isDelayable() {
+//            return delayable;
+//        }
+//
+//        @Override
+//        public int getLimiterSize() {
+//            return op.getLimiterSize() + BASE_MEM_SIZE + 32;
+//        }
+//
+//        @Override
+//        protected void doExcecute(Session session) {
+//
+//            if (record == null) {
+//                record = marshaller.marshal(op.getElement());
+//            }
+//
+//            session.messageAdd(record);
+//            try {
+//                QueueRecord queueRecord = new QueueRecord();
+//                queueRecord.setAttachment(null);
+//                queueRecord.setMessageKey(record.getKey());
+//                queueRecord.setSize(record.getSize());
+//                queueRecord.setQueueKey(op.getSequenceNumber());
+//                session.queueAddMessage(op.getQueueDescriptor(), queueRecord);
+//            } catch (KeyNotFoundException e) {
+//                e.printStackTrace();
+//            }
+//        }
+//
+//        public String toString() {
+//            return "AddTxOpOperation " + record.getKey() + super.toString();
+//        }
+//    }
+//
+//    public long getFlushDelay() {
+//        return flushDelay;
+//    }
+//
+//    public void setFlushDelay(long flushDelay) {
+//        this.flushDelay = flushDelay;
+//    }
+//
+//    /**
+//     * @return true if operations are allowed to bypass the store.
+//     */
+//    public boolean isStoreBypass() {
+//        return storeBypass;
+//    }
+//
+//    /**
+//     * Sets if persistent operations should be allowed to bypass the store.
+//     * Defaults to true, as this will give you the best performance. In some
+//     * cases, you want to disable this as the store being used will double as an
+//     * audit log and you do not want any persistent operations to bypass the
+//     * store.
+//     *
+//     * When store bypass is disabled, all {@link Operation#cancel()} requests
+//     * will return false.
+//     *
+//     * @param enable
+//     *            if true will enable store bypass
+//     */
+//    public void setStoreBypass(boolean enable) {
+//        this.storeBypass = enable;
+//    }
+
+}
+
+
+
+class UserAlreadyConnectedException extends Exception
+
+
+class BrokerQueueStore { // implements QueueStore<Long, MessageDelivery> {
+// TODO:
+//    private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
+//    private static final boolean USE_OLD_QUEUE = false;
+//    private static final boolean USE_PRIORITY_QUEUES = true;
+//
+//    private BrokerDatabase database;
+//    private DispatchQueue dispatchQueue;
+//
+//    private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
+//    private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+//         * #marshal(java.lang.Object)
+//         */
+//        public MessageRecord marshal(MessageDelivery element) {
+//            return element.createMessageRecord();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+//         * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
+//         */
+//        public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
+//            ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString());
+//            if (handler == null) {
+//                try {
+//                    handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString());
+//                    protocolHandlers.put(record.getEncoding().toString(), handler);
+//                } catch (Throwable thrown) {
+//                    throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown);
+//                }
+//            }
+//            try {
+//                return handler.createMessageDelivery(record);
+//            } catch (IOException ioe) {
+//                throw new RuntimeException(ioe);
+//            }
+//        }
+//    };
+//
+//    final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> getMessageMarshaller() {
+//        return MESSAGE_MARSHALLER;
+//    }
+//
+//    private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
+//        public Long map(MessageDelivery element) {
+//            return element.getExpiration();
+//        }
+//    };
+//
+//    private static final Mapper<Integer, MessageDelivery> SIZE_MAPPER = new Mapper<Integer, MessageDelivery>() {
+//        public Integer map(MessageDelivery element) {
+//            return element.getFlowLimiterSize();
+//        }
+//    };
+//
+//    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+//        public Integer map(MessageDelivery element) {
+//            return element.getPriority();
+//        }
+//    };
+//
+//    static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
+//        public Long map(MessageDelivery element) {
+//            return element.getStoreTracking();
+//        }
+//    };
+//
+//    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+//        public Integer map(MessageDelivery element) {
+//            // we modulo 10 to have at most 10 partitions which the producers
+//            // gets split across.
+//            return (int) (element.getProducerId().hashCode() % 10);
+//        }
+//    };
+//
+//    public static final short SUBPARTITION_TYPE = 0;
+//    public static final short SHARED_QUEUE_TYPE = 1;
+//    public static final short DURABLE_QUEUE_TYPE = 2;
+//    public static short TRANSACTION_QUEUE_TYPE = 3;
+//
+//    private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
+//    private final HashMap<String, IQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
+//
+//    private Mapper<Integer, MessageDelivery> partitionMapper;
+//
+//    private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1;
+//    private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
+//    // Be default we don't page out elements to disk.
+//    private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+//    //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
+//
+//    private static long dynamicQueueCounter = 0;
+//
+//    private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
+//
+//        private static final boolean PAGING_ENABLED = DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+//
+//        public boolean isPersistent(MessageDelivery elem) {
+//            return elem.isPersistent();
+//        }
+//
+//        public boolean isPageOutPlaceHolders() {
+//            return true;
+//        }
+//
+//        public boolean isPagingEnabled() {
+//            return PAGING_ENABLED;
+//        }
+//
+//        public int getPagingInMemorySize() {
+//            return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+//        }
+//
+//        public boolean isThrottleSourcesToMemoryLimit() {
+//            // Keep the queue in memory.
+//            return true;
+//        }
+//
+//        public int getDisconnectedThrottleRate() {
+//            // By default don't throttle consumers when disconnected.
+//            return 0;
+//        }
+//
+//        public int getRecoveryBias() {
+//            return 8;
+//        }
+//    };
+//
+//    private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
+//    private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
+//    // Be default we don't page out elements to disk.
+//    //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+//    private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
+//
+//    private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
+//
+//        private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+//
+//        public boolean isPersistent(MessageDelivery elem) {
+//            return elem.isPersistent();
+//        }
+//
+//        public boolean isPageOutPlaceHolders() {
+//            return true;
+//        }
+//
+//        public boolean isPagingEnabled() {
+//            return PAGING_ENABLED;
+//        }
+//
+//        public int getPagingInMemorySize() {
+//            return DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+//        }
+//
+//        public boolean isThrottleSourcesToMemoryLimit() {
+//            // Keep the queue in memory.
+//            return true;
+//        }
+//
+//        public int getDisconnectedThrottleRate() {
+//            // By default don't throttle consumers when disconnected.
+//            return 0;
+//        }
+//
+//        public int getRecoveryBias() {
+//            return 8;
+//        }
+//    };
+//
+//    public void setDatabase(BrokerDatabase database) {
+//        this.database = database;
+//    }
+//
+//    public void setDispatchQueue(DispatchQueue dispatchQueue) {
+//        this.dispatchQueue = dispatchQueue;
+//    }
+//
+//    public void loadQueues() throws Exception {
+//
+//        // Load shared queues
+//        Iterator<QueueQueryResult> results = database.listQueues(SHARED_QUEUE_TYPE);
+//        while (results.hasNext()) {
+//            QueueQueryResult loaded = results.next();
+//            IQueue<Long, MessageDelivery> queue = createRestoredQueue(null, loaded);
+//            sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
+//            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+//        }
+//
+//        // Load durable queues
+//        results = database.listQueues(DURABLE_QUEUE_TYPE);
+//        while (results.hasNext()) {
+//            QueueQueryResult loaded = results.next();
+//            IQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
+//            durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
+//            LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+//
+//        }
+//    }
+//
+//    private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
+//
+//        IQueue<Long, MessageDelivery> queue;
+//        if (parent != null) {
+//            queue = parent.createPartition(loaded.getDescriptor().getPartitionKey());
+//        } else {
+//            queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+//        }
+//
+//        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+//
+//        // Creat the child queues
+//        Collection<QueueQueryResult> children = loaded.getPartitions();
+//        if (children != null) {
+//            try {
+//                IPartitionedQueue<Long, MessageDelivery> pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue;
+//                for (QueueQueryResult child : children) {
+//                    createRestoredQueue(pQueue, child);
+//                }
+//            } catch (ClassCastException cce) {
+//                LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName());
+//                throw cce;
+//            }
+//        }
+//
+//        return queue;
+//
+//    }
+//
+//    private IQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
+//
+//        ExclusivePersistentQueue<Long, MessageDelivery> queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+//        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+//
+//        //TODO implement this for priority queue:
+//        // Create the child queues
+//        /*
+//         * Collection<QueueQueryResult> children = loaded.getPartitions(); if
+//         * (children != null) { try { IPartitionedQueue<Long, MessageDelivery>
+//         * pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue; for
+//         * (QueueQueryResult child : children) { createRestoredQueue(pQueue,
+//         * child); } } catch (ClassCastException cce) {
+//         * LOG.error("Loaded partition for unpartitionable queue: " +
+//         * queue.getResourceName()); throw cce; } }
+//         */
+//
+//        return queue;
+//
+//    }
+//
+//    public IQueue<Long, MessageDelivery> getQueue(AsciiBuffer queueName) {
+//        //TODO
+//        return null;
+//    }
+//
+//    public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
+//        synchronized (this) {
+//            Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
+//            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+//            ret.addAll(c);
+//            return ret;
+//        }
+//    }
+//
+//    public IQueue<Long, MessageDelivery> createDurableQueue(String name) {
+//        IQueue<Long, MessageDelivery> queue = null;
+//        synchronized (this) {
+//            queue = durableQueues.get(name);
+//            if (queue == null) {
+//                queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+//                queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+//                queue.initialize(0, 0, 0, 0);
+//                durableQueues.put(name, queue);
+//                addQueue(queue.getDescriptor());
+//            }
+//        }
+//
+//        return queue;
+//    }
+//
+//    public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
+//        ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
+//        synchronized (this) {
+//            String name = "temp:" + (dynamicQueueCounter++);
+//            queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+//            queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+//            queue.initialize(0, 0, 0, 0);
+//            addQueue(queue.getDescriptor());
+//        }
+//        return queue;
+//    }
+//
+//    public Collection<IQueue<Long, MessageDelivery>> getDurableQueues() {
+//        synchronized (this) {
+//            Collection<IQueue<Long, MessageDelivery>> c = durableQueues.values();
+//            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+//            ret.addAll(c);
+//            return ret;
+//        }
+//    }
+//
+//    public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
+//
+//        IQueue<Long, MessageDelivery> queue = null;
+//        synchronized (this) {
+//            queue = sharedQueues.get(name);
+//            if (queue == null) {
+//                queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+//                queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE);
+//                queue.initialize(0, 0, 0, 0);
+//                sharedQueues.put(name, queue);
+//                addQueue(queue.getDescriptor());
+//            }
+//        }
+//
+//        return queue;
+//    }
+//
+//    private ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueueInternal(final String name, short type) {
+//        ExclusivePersistentQueue<Long, MessageDelivery> queue;
+//
+//        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) {
+//            @Override
+//            public int getElementSize(MessageDelivery elem) {
+//                return elem.getFlowLimiterSize();
+//            }
+//        };
+//        queue = new ExclusivePersistentQueue<Long, MessageDelivery>(name, limiter);
+//        queue.setStore(this);
+//        queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY);
+//        queue.setExpirationMapper(EXPIRATION_MAPPER);
+//        return queue;
+//    }
+//
+//    private IQueue<Long, MessageDelivery> createSharedQueueInternal(final String name, short type) {
+//
+//        IQueue<Long, MessageDelivery> ret;
+//
+//        switch (type) {
+//        case QueueDescriptor.PARTITIONED: {
+//            PartitionedQueue<Long, MessageDelivery> queue = new PartitionedQueue<Long, MessageDelivery>(name) {
+//                @Override
+//                public IQueue<Long, MessageDelivery> createPartition(int partitionKey) {
+//                    IQueue<Long, MessageDelivery> queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+//                    queue.getDescriptor().setPartitionId(partitionKey);
+//                    queue.getDescriptor().setParent(this.getDescriptor().getQueueName());
+//                    return queue;
+//                }
+//
+//            };
+//            queue.setPartitionMapper(partitionMapper);
+//
+//            ret = queue;
+//            break;
+//        }
+//        case QueueDescriptor.SHARED_PRIORITY: {
+//            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10);
+//            limiter.setPriorityMapper(PRIORITY_MAPPER);
+//            limiter.setSizeMapper(SIZE_MAPPER);
+//            SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
+//            ret = queue;
+//            queue.setKeyMapper(KEY_MAPPER);
+//            queue.setAutoRelease(true);
+//            break;
+//        }
+//        case QueueDescriptor.SHARED: {
+//            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD) {
+//                @Override
+//                public int getElementSize(MessageDelivery elem) {
+//                    return elem.getFlowLimiterSize();
+//                }
+//            };
+//
+//            if (!USE_OLD_QUEUE) {
+//                SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
+//                sQueue.setKeyMapper(KEY_MAPPER);
+//                sQueue.setAutoRelease(true);
+//                ret = sQueue;
+//            } else {
+//                SharedQueueOld<Long, MessageDelivery> sQueue = new SharedQueueOld<Long, MessageDelivery>(name, limiter);
+//                sQueue.setKeyMapper(KEY_MAPPER);
+//                sQueue.setAutoRelease(true);
+//                ret = sQueue;
+//            }
+//            break;
+//        }
+//        default: {
+//            throw new IllegalArgumentException("Unknown queue type" + type);
+//        }
+//        }
+//        ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
+//        ret.setStore(this);
+//        ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
+//        ret.setExpirationMapper(EXPIRATION_MAPPER);
+//
+//        return ret;
+//    }
+//
+//    public final void deleteQueueElement(SaveableQueueElement<MessageDelivery> sqe) {
+//        MessageDelivery md = sqe.getElement();
+//        //If the message delivery isn't null, funnel through it
+//        //since the message may not yet be in the store:
+//        if (md != null) {
+//            md.acknowledge(sqe);
+//        } else {
+//            database.deleteQueueElement(sqe);
+//        }
+//
+//    }
+//
+//    public final boolean isFromStore(MessageDelivery elem) {
+//        return elem.isFromStore();
+//    }
+//
+//    public final void persistQueueElement(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+//        elem.getElement().persist(elem, controller, delayable);
+//    }
+//
+//    public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
+//        database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
+//    }
+//
+//    public final void addQueue(QueueDescriptor queue) {
+//        database.addQueue(queue);
+//    }
+//
+//    public final void deleteQueue(QueueDescriptor queue) {
+//        database.deleteQueue(queue);
+//    }
+
+    def setDatabase(database:BrokerDatabase ) = {
+    }
+
+    def setDispatchQueue(dispatchQueue:DispatchQueue )= {
+    }
+
+    def loadQueues() ={
+    }
+}
\ No newline at end of file

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/XidImpl.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/XidImpl.java Wed Jul  7 03:40:18 2010
@@ -61,7 +61,7 @@ public class XidImpl implements Xid, Clo
 
     /**
      * Initialize an XID using another XID as the source of data.
-     * 
+     *
      * @param from
      *            the XID to initialize this XID from
      */
@@ -87,10 +87,10 @@ public class XidImpl implements Xid, Clo
 
     /**
      * Determine whether or not two objects of this type are equal.
-     * 
+     *
      * @param o
      *            the other XID object to be compared with this XID.
-     * 
+     *
      * @return Returns true of the supplied object represents the same global
      *         transaction as this, otherwise returns false.
      */
@@ -102,14 +102,14 @@ public class XidImpl implements Xid, Clo
         if (formatId == -1 && other.formatId == -1)
             return true;
 
-        return formatId == other.formatId 
+        return formatId == other.formatId
         		&& globalTransactionId.equals(other.globalTransactionId)
         		&& branchQualifier.equals(other.branchQualifier);
     }
 
     /**
      * Compute the hash code.
-     * 
+     *
      * @return the computed hashcode
      */
     public int hashCode() {
@@ -122,7 +122,7 @@ public class XidImpl implements Xid, Clo
      * Return a string representing this XID.
      * <p>
      * This is normally used to display the XID when debugging.
-     * 
+     *
      * @return the string representation of this XID
      */
 
@@ -135,7 +135,7 @@ public class XidImpl implements Xid, Clo
 
     /**
      * Obtain the format identifier part of the XID.
-     * 
+     *
      * @return Format identifier. -1 indicates a null XID
      */
     public int getFormatId() {
@@ -144,7 +144,7 @@ public class XidImpl implements Xid, Clo
 
     /**
      * Returns the global transaction identifier for this XID.
-     * 
+     *
      * @return the global transaction identifier
      */
     public byte[] getGlobalTransactionId() {
@@ -154,7 +154,7 @@ public class XidImpl implements Xid, Clo
 
     /**
      * Returns the branch qualifier for this XID.
-     * 
+     *
      * @return the branch qualifier
      */
     public byte[] getBranchQualifier() {
@@ -167,8 +167,8 @@ public class XidImpl implements Xid, Clo
     /**
      * Set the branch qualifier for this XID. Note that the branch qualifier has
      * a maximum size.
-     * 
-     * @param qual
+     *
+     * @param branchID
      *            a Byte array containing the branch qualifier to be set. If the
      *            size of the array exceeds MAXBQUALSIZE, only the first
      *            MAXBQUALSIZE elements of qual will be used.
@@ -205,16 +205,8 @@ public class XidImpl implements Xid, Clo
 
     /**
      * Writes this XidImpl's data to the DataOutput destination
-     * 
-     * @param out
-     *            The DataOutput destination
-     * @param maxbytes
-     *            Maximum number of bytes that may be written to the destination
-     * 
-     * @exception ELogEventTooLong
-     *                The data could not be written without exceeding the
-     *                maxbytes parameter. The data may have been partially
-     *                written.
+     *
+     * @param out The DataOutput destination
      */
     public void writebody(DataOutput out) throws IOException {
         out.writeInt(formatId); // format ID
@@ -227,7 +219,7 @@ public class XidImpl implements Xid, Clo
 
     /**
      * read xid from an Array and set each fields.
-     * 
+     *
      * @param in
      *            the data input array
      * @throws IOException
@@ -247,7 +239,7 @@ public class XidImpl implements Xid, Clo
     }
 
     /**
-     * @param tid
+     * @param xid
      * @return
      */
     public static Buffer toBuffer(Xid xid) {
@@ -260,6 +252,6 @@ public class XidImpl implements Xid, Clo
             throw new RuntimeException(e);
         }
         return baos.toBuffer();
-        
+
     }
 } // class XidImpl

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/AnyChildPathNode.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMap.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMap.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapEntry.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapEntry.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathMapNode.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathMapNode.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathNode.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathNode.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PathSupport.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathSupport.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java (from r961067, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PrefixPathFilter.java&r1=961067&r2=961068&rev=961068&view=diff
==============================================================================
    (empty)