You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:57 UTC

svn commit: r961157 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/resources/

Author: chirino
Date: Wed Jul  7 04:11:57 2010
New Revision: 961157

URL: http://svn.apache.org/viewvc?rev=961157&view=rev
Log:
broker is now reblancing connections so that they are grouped.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961157&r1=961156&r2=961157&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:11:57 2010
@@ -269,7 +269,7 @@ class Queue(val host: VirtualHost, val d
         check_counter += 1
 
         if( (check_counter%10)==0  ) {
-          display_stats
+//          display_stats
         }
 
 //        if( (check_counter%100)==0 ) {
@@ -298,15 +298,15 @@ class Queue(val host: VirtualHost, val d
             // it's been parking and cursoring through the data at the tune_slow_subscription_rate 
             if( (sub.tail_parked && sub.tail_parkings==0) || ( sub.tail_parkings > 0 && cursor_delta >= slow_cursor_delta ) ) {
               if( sub.slow ) {
-                info("subscription is now fast: %s", sub)
+                debug("subscription is now fast: %s", sub)
                 sub.slow_intervals = 0
               }
             } else {
               if( !sub.slow ) {
-                info("slow interval: %d, %d, %d", sub.slow_intervals, sub.tail_parkings, cursor_delta)
+                trace("slow interval: %d, %d, %d", sub.slow_intervals, sub.tail_parkings, cursor_delta)
                 sub.slow_intervals += 1
                 if( sub.slow ) {
-                  info("subscription is now slow: %s", sub)
+                  debug("subscription is now slow: %s", sub)
                 }
               }
             }
@@ -538,6 +538,13 @@ class Queue(val host: VirtualHost, val d
     }
   }
 
+  def collocate(value:DispatchQueue):Unit = {
+    if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
+      println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
+      this.dispatchQueue.setTargetQueue(value.getTargetQueue)
+    }
+  }
+  
 }
 
 object QueueEntry extends Sizer[QueueEntry] {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961157&r1=961156&r2=961157&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 04:11:57 2010
@@ -22,8 +22,8 @@ import _root_.org.fusesource.hawtdispatc
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 
 import java.util.HashMap
-import collection.JavaConversions
 import path.PathMap
+import collection.JavaConversions
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -201,7 +201,7 @@ class Router(val host:VirtualHost) exten
 
 
    def each(proc:(Destination, DestinationNode)=>Unit) = {
-     import JavaConversions._;
+     import JavaConversions._
      for( (destination, node) <- destinations ) {
         proc(destination, node)
      }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961157&r1=961156&r2=961157&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:11:57 2010
@@ -32,6 +32,7 @@ import org.fusesource.hawtbuf.proto.Wire
 import org.apache.activemq.apollo.store.{StoreFactory, QueueRecord}
 import org.apache.activemq.apollo.dto.{HawtDBStoreDTO, CassandraStoreDTO, VirtualHostDTO}
 import java.io.File
+import java.util.concurrent.TimeUnit
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -90,7 +91,6 @@ class VirtualHost(val broker: Broker) ex
   override val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
 
   var config:VirtualHostDTO = _
-  private val queueStore = new BrokerQueueStore()
   private val queues = new HashMap[AsciiBuffer, Queue]()
   private val durableSubs = new HashMap[String, DurableSubscription]()
   val router = new Router(this)
@@ -173,6 +173,8 @@ class VirtualHost(val broker: Broker) ex
     transactionManager.loadTransactions();
 
     tracker.callback(onCompleted)
+
+    schedualConnectionRegroup
   }
 
 
@@ -214,6 +216,49 @@ class VirtualHost(val broker: Broker) ex
   } |>>: dispatchQueue
 
 
+
+  // Try to periodically re-balance connections so that consumers/producers
+  // are grouped onto the same thread.
+  def schedualConnectionRegroup:Unit = {
+    def connectionRegroup = {
+      router.each { (destination, node)=>
+        node match {
+          case x:router.TopicDestinationNode=>
+
+            // 1->1 is the easy case...
+            if( node.targets.size==1 && node.routes.size==1 ) {
+              // move the producer to the consumer thread.
+              node.routes.head.producer.collocate( node.targets.head.dispatchQueue )
+            } else {
+              // we need to get fancy perhaps look at rates
+              // to figure out how to be group the connections.
+            }
+
+          case x:router.QueueDestinationNode=>
+
+            if( node.targets.size==1 ) {
+              // move the queue to the consumer
+              x.queue.collocate( node.targets.head.dispatchQueue )
+            } else {
+              // we need to get fancy perhaps look at rates
+              // to figure out how to be group the connections.
+            }
+
+            if( node.routes.size==1 ) {
+              // move the producer to the queue.
+              node.routes.head.producer.collocate( x.queue.dispatchQueue )
+            } else {
+              // we need to get fancy perhaps look at rates
+              // to figure out how to be group the connections.
+            }
+        }
+      }
+      schedualConnectionRegroup
+    }
+    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{ if(serviceState.isStarted) { connectionRegroup } } )
+  }
+
+
   def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
     val name = DestinationParser.toBuffer(dest, destination_parser_options)
     if( store!=null ) {
@@ -309,1802 +354,3 @@ class VirtualHost(val broker: Broker) ex
       queueLifecyleListeners.add(listener);
   }
 }
-
-///**
-// * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
-// */
-//class BrokerDatabase() {
-//
-//  @BeanProperty
-//  var store:Store=new MemoryStore;
-//
-//  @BeanProperty
-//  var virtualHost:VirtualHost=null;
-//
-//    def start() ={
-//        //To change body of implemented methods use File | Settings | File Templates.
-//    }
-//
-//    def stop() = {
-//        //To change body of implemented methods use File | Settings | File Templates.
-//    }
-//
-// TODO: re-implement.
-//    private static final boolean DEBUG = false;
-//
-//    private final Flow databaseFlow = new Flow("database", false);
-//
-//    private final SizeLimiter<OperationBase<?>> storeLimiter;
-//    private final FlowController<OperationBase<?>> storeController;
-//    private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
-//
-//    private DispatchQueue dispatcher;
-//    private Thread flushThread;
-//    private AtomicBoolean running = new AtomicBoolean(false);
-//    private DatabaseListener listener;
-//
-//    private final LinkedNodeList<OperationBase<?>> opQueue;
-//    private AtomicBoolean notify = new AtomicBoolean(false);
-//    private Semaphore opsReady = new Semaphore(0);
-//    private long opSequenceNumber;
-//    private long flushPointer = -1; // The last seq num for which flush was
-//    // requested
-//    private long requestedDelayedFlushPointer = -1; // Set to the last sequence
-//    // num scheduled for delay
-//    private long delayedFlushPointer = 0; // The last delayable sequence num
-//    // requested.
-//    private long flushDelay = 10;
-//
-//    private final Runnable flushDelayCallback;
-//    private boolean storeBypass = true;
-//
-//    public interface DatabaseListener {
-//        /**
-//         * Called if there is a catastrophic problem with the database.
-//         *
-//         * @param ioe
-//         *            The causing exception.
-//         */
-//        public void onDatabaseException(IOException ioe);
-//    }
-//
-//    public static interface MessageRecordMarshaller<V> {
-//        MessageRecord marshal(V element);
-//
-//        /**
-//         * Called when a queue element is recovered from the store for a
-//         * particular queue.
-//         *
-//         * @param mRecord
-//         *            The message record
-//         * @param queue
-//         *            The queue that the element is being restored to (or null
-//         *            if not being restored for a queue)
-//         * @return
-//         */
-//        V unMarshall(MessageRecord mRecord, QueueDescriptor queue);
-//    }
-//
-//    public BrokerDatabase(Store store) {
-//        this.store = store;
-//        this.opQueue = new LinkedNodeList<OperationBase<?>>();
-//        storeLimiter = new SizeLimiter<OperationBase<?>>(FLUSH_QUEUE_SIZE, 0) {
-//
-//            @Override
-//            public int getElementSize(OperationBase<?> op) {
-//                return op.getLimiterSize();
-//            }
-//        };
-//
-//        storeController = new FlowController<OperationBase<?>>(new FlowControllable<OperationBase<?>>() {
-//
-//            public void flowElemAccepted(ISourceController<OperationBase<?>> controller, OperationBase<?> op) {
-//                addToOpQueue(op);
-//            }
-//
-//            public IFlowResource getFlowResource() {
-//                return BrokerDatabase.this;
-//            }
-//
-//        }, databaseFlow, storeLimiter, opQueue);
-//        storeController.useOverFlowQueue(false);
-//        super.onFlowOpened(storeController);
-//
-//        flushDelayCallback = new Runnable() {
-//            public void run() {
-//                flushDelayCallback();
-//            }
-//        };
-//    }
-//
-//    public synchronized void start() throws Exception {
-//        if (flushThread == null) {
-//
-//            running.set(true);
-//            store.start();
-//            flushThread = new Thread(new Runnable() {
-//
-//                public void run() {
-//                    processOps();
-//                }
-//
-//            }, "StoreThread");
-//            flushThread.start();
-//        }
-//    }
-//
-//    public synchronized void stop() throws Exception {
-//        if (flushThread != null) {
-//
-//            synchronized (opQueue) {
-//                updateFlushPointer(opSequenceNumber + 1);
-//            }
-//
-//            running.set(false);
-//            boolean interrupted = false;
-//            while (true) {
-//                opsReady.release();
-//                try {
-//                    flushThread.join();
-//                    break;
-//                } catch (InterruptedException e) {
-//                    interrupted = true;
-//                }
-//            }
-//
-//            store.flush();
-//            store.stop();
-//
-//            if (interrupted) {
-//                Thread.currentThread().interrupt();
-//            }
-//            flushThread = null;
-//        }
-//    }
-//
-//    /**
-//     * A blocking operation that lists all queues of a given type:
-//     *
-//     * @param type
-//     *            The queue type
-//     * @return A list of queues.
-//     *
-//     * @throws Exception
-//     *             If there was an error listing the queues.
-//     */
-//    public Iterator<QueueQueryResult> listQueues(final short type) throws Exception {
-//        return store.execute(new Callback<Iterator<QueueQueryResult>, Exception>() {
-//
-//            public Iterator<QueueQueryResult> execute(Session session) throws Exception {
-//                return session.queueListByType(type, null, Integer.MAX_VALUE);
-//            }
-//
-//        }, null);
-//    }
-//
-//    /**
-//     * A blocking operation that lists all entries in the specified map
-//     *
-//     * @param map
-//     *            The map to list
-//     * @return A list of map entries
-//     *
-//     * @throws Exception
-//     *             If there was an error listing the queues.
-//     */
-//    public Map<AsciiBuffer, Buffer> listMapEntries(final AsciiBuffer map) throws Exception {
-//        return store.execute(new Callback<Map<AsciiBuffer, Buffer>, Exception>() {
-//
-//            public Map<AsciiBuffer, Buffer> execute(Session session) throws Exception {
-//                HashMap<AsciiBuffer, Buffer> ret = new HashMap<AsciiBuffer, Buffer>();
-//                try {
-//                    Iterator<AsciiBuffer> keys = session.mapEntryListKeys(map, null, -1);
-//                    while (keys.hasNext()) {
-//                        AsciiBuffer key = keys.next();
-//                        ret.put(key, session.mapEntryGet(map, key));
-//                    }
-//                } catch (Store.KeyNotFoundException knfe) {
-//                    //No keys then:
-//                }
-//
-//                return ret;
-//            }
-//
-//        }, null);
-//    }
-//
-//    /**
-//     * @param map
-//     *            The name of the map to update.
-//     * @param key
-//     *            The key in the map to update.
-//     * @param value
-//     *            The value to insert.
-//     */
-//    public OperationContext<?> updateMapEntry(AsciiBuffer map, AsciiBuffer key, Buffer value) {
-//        return add(new MapUpdateOperation(map, key, value), null, false);
-//    }
-//
-//    /**
-//     * Executes user supplied {@link Operation}. If the {@link Operation} does
-//     * not throw any Exceptions, all updates to the store are committed,
-//     * otherwise they are rolled back. Any exceptions thrown by the
-//     * {@link Operation} are propagated by this method.
-//     *
-//     * If limiter space on the store processing queue is exceeded, the
-//     * controller will be blocked.
-//     *
-//     * If this method is called with flush set to
-//     * <code>false</false> there is no
-//     * guarantee made about when the operation will be executed. If <code>flush</code>
-//     * is <code>true</code> and {@link Operation#isDelayable()} is also
-//     * <code>true</code> then an attempt will be made to execute the event at
-//     * the {@link Store}'s configured delay interval.
-//     *
-//     * @param op
-//     *            The operation to execute
-//     * @param flush
-//     *            Whether or not this operation needs immediate processing.
-//     * @param controller
-//     *            the source of the operation.
-//     * @return the {@link OperationContext} associated with the operation
-//     */
-//    private <T> OperationContext<T> add(OperationBase<T> op, ISourceController<?> controller, boolean flush) {
-//
-//        op.flushRequested = flush;
-//        storeController.add(op, controller);
-//        return op;
-//    }
-//
-//    private final void addToOpQueue(OperationBase<?> op) {
-//        if (!running.get()) {
-//            throw new IllegalStateException("BrokerDatabase not started");
-//        }
-//
-//        synchronized (opQueue) {
-//            op.opSequenceNumber = opSequenceNumber++;
-//            opQueue.addLast(op);
-//            if (op.flushRequested || storeLimiter.getThrottled()) {
-//                if (op.isDelayable() && flushDelay > 0) {
-//                    scheduleDelayedFlush(op.opSequenceNumber);
-//                } else {
-//                    updateFlushPointer(op.opSequenceNumber);
-//                }
-//            }
-//        }
-//    }
-//
-//    private void updateFlushPointer(long seqNumber) {
-//        if (seqNumber > flushPointer) {
-//            flushPointer = seqNumber;
-//            OperationBase<?> op = opQueue.getHead();
-//            if (op != null && op.opSequenceNumber <= flushPointer && notify.get()) {
-//                opsReady.release();
-//            }
-//        }
-//    }
-//
-//    private void scheduleDelayedFlush(long seqNumber) {
-//        if (seqNumber < flushPointer) {
-//            return;
-//        }
-//
-//        if (seqNumber > delayedFlushPointer) {
-//            delayedFlushPointer = seqNumber;
-//        }
-//
-//        if (requestedDelayedFlushPointer == -1) {
-//            requestedDelayedFlushPointer = delayedFlushPointer;
-//            Dispatch.getGlobalQueue().dispatchAfter(flushDelay, TimeUnit.MILLISECONDS, flushDelayCallback);
-//        }
-//
-//    }
-//
-//    private final void flushDelayCallback() {
-//        synchronized (opQueue) {
-//            if (flushPointer < requestedDelayedFlushPointer) {
-//                updateFlushPointer(requestedDelayedFlushPointer);
-//
-//            }
-//
-//            // If another delayed flush has been scheduled schedule it:
-//            requestedDelayedFlushPointer = -1;
-//            // Schedule next delay if needed:
-//            if (delayedFlushPointer > flushPointer) {
-//                scheduleDelayedFlush(delayedFlushPointer);
-//            } else {
-//                delayedFlushPointer = -1;
-//            }
-//
-//        }
-//    }
-//
-//    private final OperationBase<?> getNextOp(boolean wait) {
-//        if (!wait) {
-//            synchronized (opQueue) {
-//                OperationBase<?> op = opQueue.getHead();
-//                if (op != null && (op.opSequenceNumber <= flushPointer || !op.isDelayable())) {
-//                    op.unlink();
-//                    return op;
-//                }
-//            }
-//            return null;
-//        } else {
-//            OperationBase<?> op = getNextOp(false);
-//            if (op == null) {
-//                notify.set(true);
-//                op = getNextOp(false);
-//                try {
-//                    while (running.get() && op == null) {
-//                        opsReady.acquireUninterruptibly();
-//                        op = getNextOp(false);
-//                    }
-//                } finally {
-//                    notify.set(false);
-//                    opsReady.drainPermits();
-//                }
-//            }
-//            return op;
-//        }
-//    }
-//
-//    private final void processOps() {
-//        int count = 0;
-//        Session session = store.getSession();
-//        while (running.get()) {
-//            final OperationBase<?> firstOp = getNextOp(true);
-//            if (firstOp == null) {
-//                continue;
-//            }
-//            count = 0;
-//
-//            // The first operation we get, triggers a store transaction.
-//            if (firstOp != null) {
-//                final LinkedList<Operation<?>> processedQueue = new LinkedList<Operation<?>>();
-//                boolean locked = false;
-//                try {
-//
-//                    Operation<?> op = firstOp;
-//                    while (op != null) {
-//                        final Operation<?> toExec = op;
-//                        if (toExec.beginExecute()) {
-//                            if (!locked) {
-//                                session.acquireLock();
-//                                locked = true;
-//                            }
-//                            count++;
-//                            op.execute(session);
-//                            processedQueue.add(op);
-//                            /*
-//                             * store.execute(new Store.VoidCallback<Exception>()
-//                             * {
-//                             *
-//                             * @Override public void run(Session session) throws
-//                             * Exception {
-//                             *
-//                             * // Try to execute the operation against the //
-//                             * session... try { toExec.execute(session);
-//                             * processedQueue.add(toExec); } catch
-//                             * (CancellationException ignore) { //
-//                             * System.out.println("Cancelled" + // toExec); } }
-//                             * }, null);
-//                             */
-//                        }
-//
-//                        if (count < 1000) {
-//                            op = getNextOp(false);
-//                        } else {
-//                            op = null;
-//                        }
-//                    }
-//                    // executeOps(firstOp, processedQueue, counter);
-//
-//                    // If we procecessed some ops, flush and post process:
-//                    if (!processedQueue.isEmpty()) {
-//
-//                        if (locked) {
-//                            session.commit();
-//                            session.releaseLock();
-//                            locked = false;
-//                        }
-//                        if (DEBUG)
-//                            System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
-//                        // Sync the store:
-//                        store.flush();
-//
-//                        // Post process operations
-//                        long release = 0;
-//                        for (Operation<?> processed : processedQueue) {
-//                            processed.onCommit();
-//                            // System.out.println("Processed" + processed);
-//                            release += processed.getLimiterSize();
-//                        }
-//
-//                        synchronized (opQueue) {
-//                            this.storeLimiter.remove(1, release);
-//                        }
-//                    }
-//
-//                } catch (IOException e) {
-//                    for (Operation<?> processed : processedQueue) {
-//                        processed.onRollback(e);
-//                    }
-//                    onDatabaseException(e);
-//                } catch (RuntimeException e) {
-//                    for (Operation<?> processed : processedQueue) {
-//                        processed.onRollback(e);
-//                    }
-//                    IOException ioe = new IOException(e.getMessage());
-//                    ioe.initCause(e);
-//                    onDatabaseException(ioe);
-//                } catch (Exception e) {
-//                    for (Operation<?> processed : processedQueue) {
-//                        processed.onRollback(e);
-//                    }
-//                    IOException ioe = new IOException(e.getMessage());
-//                    ioe.initCause(e);
-//                    onDatabaseException(ioe);
-//                } finally {
-//                    if (locked) {
-//                        try {
-//                            session.releaseLock();
-//                        } catch (Exception e) {
-//                            IOException ioe = new IOException(e.getMessage());
-//                            ioe.initCause(e);
-//                            onDatabaseException(ioe);
-//                        }
-//                    }
-//                }
-//            }
-//        }
-//    }
-//
-//    /*
-//     * private final void executeOps(final OperationBase op, final
-//     * LinkedList<Operation> processedQueue, final OpCounter counter) throws
-//     * FatalStoreException, Exception { store.execute(new
-//     * Store.VoidCallback<Exception>() {
-//     *
-//     * @Override public void run(Session session) throws Exception {
-//     *
-//     * // Try to execute the operation against the // session... try { if
-//     * (op.execute(session)) { processedQueue.add(op); } else { counter.count--;
-//     * } } catch (CancellationException ignore) { System.out.println("Cancelled"
-//     * + op); }
-//     *
-//     * // See if we can batch up some additional operations // in this
-//     * transaction. if (counter.count < 100) { OperationBase next =
-//     * getNextOp(false); if (next != null) { counter.count++; executeOps(next,
-//     * processedQueue, counter); } } } }, null); }
-//     */
-//
-//    /**
-//     * Adds a queue to the database
-//     *
-//     * @param queue
-//     *            The queue to add.
-//     */
-//    public void addQueue(QueueDescriptor queue) {
-//        add(new QueueAddOperation(queue), null, false);
-//    }
-//
-//    /**
-//     * Deletes a queue and all of its messages from the database
-//     *
-//     * @param queue
-//     *            The queue to delete.
-//     */
-//    public void deleteQueue(QueueDescriptor queue) {
-//        add(new QueueDeleteOperation(queue), null, false);
-//    }
-//
-//    /**
-//     * Saves a message for all of the recipients in the
-//     * {@link BrokerMessageDelivery}.
-//     *
-//     * @param delivery
-//     *            The delivery.
-//     * @param source
-//     *            The source's controller.
-//     * @throws IOException
-//     *             If there is an error marshalling the message.
-//     * @return The {@link OperationContext} associated with the operation
-//     */
-//    public OperationContext<?> persistReceivedMessage(BrokerMessageDelivery delivery, ISourceController<?> source) {
-//        return add(new AddMessageOperation(delivery), source, true);
-//    }
-//
-//    /**
-//     * Saves a Message for a single queue.
-//     *
-//     * @param queueElement
-//     *            The element to save.
-//     * @param source
-//     *            The source initiating the save or null, if there isn't one.
-//     * @throws IOException
-//     *             If there is an error marshalling the message.
-//     *
-//     * @return The {@link OperationContext} associated with the operation
-//     */
-//    public OperationContext<?> saveMessage(SaveableQueueElement<MessageDelivery> queueElement, ISourceController<?> source, boolean delayable) {
-//        return add(new AddMessageOperation(queueElement), source, !delayable);
-//    }
-//
-//    /**
-//     * Deletes the given message from the store for the given queue.
-//     *
-//     * @param queueElement
-//     * @return The {@link OperationContext} associated with the operation
-//     */
-//    public OperationContext<?> deleteQueueElement(SaveableQueueElement<?> queueElement) {
-//        return add(new DeleteOperation(queueElement.getSequenceNumber(), queueElement.getQueueDescriptor()), null, false);
-//    }
-//
-//    /**
-//     * Loads a batch of messages for the specified queue. The loaded messages
-//     * are given the provided {@link RestoreListener}.
-//     * <p>
-//     * <b><i>NOTE:</i></b> This method uses the queue sequence number for the
-//     * message not the store tracking number.
-//     *
-//     * @param queue
-//     *            The queue for which to load messages
-//     * @param recordsOnly
-//     *            True if message body shouldn't be restored
-//     * @param first
-//     *            The first queue sequence number to load (-1 starts at
-//     *            begining)
-//     * @param maxSequence
-//     *            The maximum sequence number to load (-1 if no limit)
-//     * @param maxCount
-//     *            The maximum number of messages to load (-1 if no limit)
-//     * @param listener
-//     *            The listener to which messags should be passed.
-//     * @return The {@link OperationContext} associated with the operation
-//     */
-//    public <T> OperationContext<?> restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long first, long maxSequence, int maxCount, RestoreListener<T> listener,
-//            MessageRecordMarshaller<T> marshaller) {
-//        return add(new RestoreElementsOperation<T>(queue, recordsOnly, first, maxCount, maxSequence, listener, marshaller), null, true);
-//    }
-//
-//    private void onDatabaseException(IOException ioe) {
-//        if (listener != null) {
-//            listener.onDatabaseException(ioe);
-//        } else {
-//            ioe.printStackTrace();
-//        }
-//    }
-//
-//    public interface OperationContext<V> extends ListenableFuture<V> {
-//
-//        /**
-//         * Attempts to cancel the store operation. Returns true if the operation
-//         * could be canceled or false if the operation was already executed by
-//         * the store.
-//         *
-//         * @return true if the operation could be canceled
-//         */
-//        public boolean cancel();
-//
-//        /**
-//         * Requests flush for this database operation (overriding a previous
-//         * delay)
-//         */
-//        public void requestFlush();
-//    }
-//
-//    /**
-//     * This interface is used to execute transacted code.
-//     *
-//     * It is used by the {@link Store#execute(org.apache.activemq.broker.store.Store.Callback, Runnable)} method, often as
-//     * anonymous class.
-//     */
-//    public interface Operation<V> extends OperationContext<V> {
-//
-//        /**
-//         * Called when the saver is about to execute the operation. If true is
-//         * returned the operation can no longer be canceled.
-//         *
-//         * @return false if the operation has been canceled.
-//         */
-//        public boolean beginExecute();
-//
-//        /**
-//         * Gets called by the {@link Store}
-//         * within a transactional context. If any exception is thrown including
-//         * Runtime exception, the transaction is rolled back.
-//         *
-//         * @param session
-//         *            provides you access to read and update the persistent
-//         *            data.
-//         * @throws Exception
-//         *             if an system error occured while executing the
-//         *             operations.
-//         * @throws RuntimeException
-//         *             if an system error occured while executing the
-//         *             operations.
-//         */
-//        public void execute(Session session) throws CancellationException, Exception, RuntimeException;
-//
-//        /**
-//         * Returns true if this operation can be delayed. This is useful in
-//         * cases where external events can negate the need to execute the
-//         * operation. The delay interval is not guaranteed to be honored, if
-//         * subsequent events or other store flush policy/criteria requires a
-//         * flush of subsequent events.
-//         *
-//         * @return True if the operation can be delayed.
-//         */
-//        public boolean isDelayable();
-//
-//        /**
-//         * Returns the size to be used when calculating how much space this
-//         * operation takes on the store processing queue.
-//         *
-//         * @return The limiter size to be used.
-//         */
-//        public int getLimiterSize();
-//
-//        /**
-//         * Called after {@link #execute(Session)} is called and the the
-//         * operation has been committed.
-//         */
-//        public void onCommit();
-//
-//        /**
-//         * Called after {@link #execute(Session)} is called and the the
-//         * operation has been rolled back.
-//         */
-//        public void onRollback(Throwable error);
-//    }
-//
-//    /**
-//     * This is a convenience base class that can be used to implement
-//     * Operations. It handles operation cancellation for you.
-//     */
-//    abstract class OperationBase<V> extends LinkedNode<OperationBase<?>> implements Operation<V> {
-//        public boolean flushRequested = false;
-//        public long opSequenceNumber = -1;
-//
-//        final protected AtomicBoolean executePending = new AtomicBoolean(true);
-//        final protected AtomicBoolean cancelled = new AtomicBoolean(false);
-//        final protected AtomicBoolean executed = new AtomicBoolean(false);
-//        final protected AtomicReference<FutureListener<? super V>> listener = new AtomicReference<FutureListener<? super V>>();
-//
-//        protected Throwable error;
-//
-//        public static final int BASE_MEM_SIZE = 20;
-//
-//        public boolean cancel(boolean interrupt) {
-//            return cancel();
-//        }
-//
-//        public boolean cancel() {
-//            if (storeBypass) {
-//                if (executePending.compareAndSet(true, false)) {
-//                    cancelled.set(true);
-//                    // System.out.println("Cancelled: " + this);
-//                    synchronized (opQueue) {
-//                        unlink();
-//                        storeController.elementDispatched(this);
-//                    }
-//                    fireListener();
-//                    return true;
-//                }
-//            }
-//            return cancelled.get();
-//        }
-//
-//        public final boolean isCancelled() {
-//            return cancelled.get();
-//        }
-//
-//        public final boolean isExecuted() {
-//            return executed.get();
-//        }
-//
-//        public final boolean isDone() {
-//            return isCancelled() || isExecuted();
-//        }
-//
-//        /**
-//         * Called when the saver is about to execute the operation. If true is
-//         * returned the operation can no longer be cancelled.
-//         *
-//         * @return true if operation should be executed
-//         */
-//        public final boolean beginExecute() {
-//            if (executePending.compareAndSet(true, false)) {
-//                return true;
-//            } else {
-//                return false;
-//            }
-//        }
-//
-//        /**
-//         * Gets called by the
-//         * {@link Store} method
-//         * within a transactional context. If any exception is thrown including
-//         * Runtime exception, the transaction is rolled back.
-//         *
-//         * @param session
-//         *            provides you access to read and update the persistent
-//         *            data.
-//         * @throws Exception
-//         *             if an system error occured while executing the
-//         *             operations.
-//         * @throws RuntimeException
-//         *             if an system error occured while executing the
-//         *             operations.
-//         */
-//        public void execute(Session session) throws Exception, RuntimeException {
-//            if (DEBUG)
-//                System.out.println("Executing " + this);
-//            doExcecute(session);
-//        }
-//
-//        abstract protected void doExcecute(Session session);
-//
-//        public int getLimiterSize() {
-//            return BASE_MEM_SIZE;
-//        }
-//
-//        public boolean isDelayable() {
-//            return false;
-//        }
-//
-//        /**
-//         * Requests flush for this database operation (overriding a previous
-//         * delay)
-//         */
-//        public void requestFlush() {
-//            synchronized (opQueue) {
-//                updateFlushPointer(opSequenceNumber);
-//            }
-//        }
-//
-//        public void onCommit() {
-//            executed.set(true);
-//            fireListener();
-//        }
-//
-//        /**
-//         * Called after {@link #execute(Session)} is called and the the
-//         * operation has been rolled back.
-//         */
-//        public void onRollback(Throwable error) {
-//            executed.set(true);
-//            if (!fireListener()) {
-//                error.printStackTrace();
-//            }
-//        }
-//
-//        private final boolean fireListener() {
-//            FutureListener<? super V> l = this.listener.getAndSet(null);
-//            if (l != null) {
-//                l.onFutureComplete(this);
-//                return true;
-//            }
-//            return false;
-//        }
-//
-//        public void setFutureListener(FutureListener<? super V> listener) {
-//            this.listener.set(listener);
-//            if (isDone()) {
-//                fireListener();
-//            }
-//        }
-//
-//        /**
-//         * Subclasses the return a result should override this
-//         * @return The result.
-//         */
-//        protected final V getResult() {
-//            return null;
-//        }
-//
-//        /**
-//         * Waits if necessary for the computation to complete, and then
-//         * retrieves its result.
-//         *
-//         * @return the computed result
-//         * @throws CancellationException
-//         *             if the computation was cancelled
-//         * @throws ExecutionException
-//         *             if the computation threw an exception
-//         * @throws InterruptedException
-//         *             if the current thread was interrupted while waiting
-//         */
-//        public final V get() throws ExecutionException, InterruptedException  {
-//
-//            try {
-//                return get(-1, TimeUnit.MILLISECONDS);
-//            } catch (TimeoutException e) {
-//                //Can't happen.
-//                throw new AssertionError(e);
-//            }
-//        }
-//
-//        /**
-//         * Waits if necessary for at most the given time for the computation
-//         * to complete, and then retrieves its result, if available.
-//         *
-//         * @param timeout the maximum time to wait
-//         * @param tu the time unit of the timeout argument
-//         * @return the computed result
-//         * @throws CancellationException if the computation was cancelled
-//         * @throws ExecutionException if the computation threw an
-//         * exception
-//         * @throws InterruptedException if the current thread was interrupted
-//         * while waiting
-//         * @throws TimeoutException if the wait timed out
-//         */
-//        public final V get(long timeout, TimeUnit tu) throws ExecutionException, InterruptedException, TimeoutException {
-//            if (isCancelled()) {
-//                throw new CancellationException();
-//            }
-//            if (error != null) {
-//                throw new ExecutionException(error);
-//            }
-//
-//            //TODO implement blocking?
-//            if(!isDone())
-//            {
-//                throw new UnsupportedOperationException("Blocking result retrieval not yet implemented");
-//            }
-//
-//            return getResult();
-//        }
-//
-//        public String toString() {
-//            return "DBOp seq: " + opSequenceNumber + "P/C/E: " + executePending.get() + "/" + isCancelled() + "/" + isExecuted();
-//        }
-//    }
-//
-//    private class QueueAddOperation extends OperationBase<Object> {
-//
-//        private QueueDescriptor qd;
-//
-//        QueueAddOperation(QueueDescriptor queue) {
-//            qd = queue;
-//        }
-//
-//        @Override
-//        protected void doExcecute(Session session) {
-//            try {
-//                session.queueAdd(qd);
-//            } catch (KeyNotFoundException e) {
-//                throw new FatalStoreException(e);
-//            }
-//        }
-//
-//        public String toString() {
-//            return "QueueAdd: " + qd.getQueueName().toString();
-//        }
-//    }
-//
-//    private class QueueDeleteOperation extends OperationBase<Object> {
-//
-//        private QueueDescriptor qd;
-//
-//        QueueDeleteOperation(QueueDescriptor queue) {
-//            qd = queue;
-//        }
-//
-//        @Override
-//        protected void doExcecute(Session session) {
-//            session.queueRemove(qd);
-//        }
-//
-//        public String toString() {
-//            return "QueueDelete: " + qd.getQueueName().toString();
-//        }
-//    }
-//
-//    private class DeleteOperation extends OperationBase<Object> {
-//        private final long queueKey;
-//        private QueueDescriptor queue;
-//
-//        public DeleteOperation(long queueKey, QueueDescriptor queue) {
-//            this.queueKey = queueKey;
-//            this.queue = queue;
-//        }
-//
-//        @Override
-//        public int getLimiterSize() {
-//            return BASE_MEM_SIZE + 8;
-//        }
-//
-//        @Override
-//        protected void doExcecute(Session session) {
-//            try {
-//                session.queueRemoveMessage(queue, queueKey);
-//            } catch (KeyNotFoundException e) {
-//                // TODO Probably doesn't always mean an error, it is possible
-//                // that
-//                // the queue has been deleted, in which case its messages will
-//                // have been deleted, too.
-//                e.printStackTrace();
-//            }
-//        }
-//
-//        public String toString() {
-//            return "MessageDelete: " + queue.getQueueName().toString() + " tracking: " + queueKey + " " + super.toString();
-//        }
-//    }
-//
-//    private class RestoreElementsOperation<V> extends OperationBase<V> {
-//        private QueueDescriptor queue;
-//        private long firstKey;
-//        private int maxRecords;
-//        private long maxSequence;
-//        private boolean recordsOnly;
-//        private RestoreListener<V> listener;
-//        private Collection<RestoredElement<V>> msgs = null;
-//        private MessageRecordMarshaller<V> marshaller;
-//
-//        RestoreElementsOperation(QueueDescriptor queue, boolean recordsOnly, long firstKey, int maxRecords, long maxSequence, RestoreListener<V> listener, MessageRecordMarshaller<V> marshaller) {
-//            this.queue = queue;
-//            this.recordsOnly = recordsOnly;
-//            this.firstKey = firstKey;
-//            this.maxRecords = maxRecords;
-//            this.maxSequence = maxSequence;
-//            this.listener = listener;
-//            this.marshaller = marshaller;
-//        }
-//
-//        @Override
-//        public int getLimiterSize() {
-//            return BASE_MEM_SIZE + 44;
-//        }
-//
-//        @Override
-//        protected void doExcecute(Session session) {
-//
-//            Iterator<QueueRecord> records = null;
-//            try {
-//                records = session.queueListMessagesQueue(queue, firstKey, maxSequence, maxRecords);
-//                msgs = new LinkedList<RestoredElement<V>>();
-//            } catch (KeyNotFoundException e) {
-//                msgs = new ArrayList<RestoredElement<V>>(0);
-//                return;
-//            }
-//
-//            QueueRecord qRecord = null;
-//            int count = 0;
-//            if (records.hasNext()) {
-//                qRecord = records.next();
-//            }
-//
-//            while (qRecord != null) {
-//                RestoredElementImpl<V> rm = new RestoredElementImpl<V>();
-//                // TODO should update jms redelivery here.
-//                rm.qRecord = qRecord;
-//                rm.queue = queue;
-//                count++;
-//
-//                // Set the next sequence number:
-//                if (records.hasNext()) {
-//                    qRecord = records.next();
-//                    rm.nextSequence = qRecord.getQueueKey();
-//                } else {
-//                    // Look up the next sequence number:
-//                    try {
-//                        records = session.queueListMessagesQueue(queue, qRecord.getQueueKey() + 1, -1L, 1);
-//                        if (!records.hasNext()) {
-//                            rm.nextSequence = -1;
-//                        } else {
-//                            rm.nextSequence = records.next().getQueueKey();
-//                        }
-//                    } catch (KeyNotFoundException e) {
-//                        rm.nextSequence = -1;
-//                    }
-//                    qRecord = null;
-//                }
-//
-//                if (!recordsOnly) {
-//                    try {
-//                        rm.mRecord = session.messageGetRecord(rm.qRecord.getMessageKey());
-//                        rm.marshaller = marshaller;
-//                        msgs.add(rm);
-//                    } catch (KeyNotFoundException shouldNotHappen) {
-//                        shouldNotHappen.printStackTrace();
-//                    }
-//                } else {
-//                    msgs.add(rm);
-//                }
-//            }
-//
-//            if (DEBUG)
-//                System.out.println("Restored: " + count + " messages");
-//        }
-//
-//        @Override
-//        public void onCommit() {
-//            listener.elementsRestored(msgs);
-//            super.onCommit();
-//        }
-//
-//        public String toString() {
-//            return "MessageRestore: " + queue.getQueueName().toString() + " first: " + firstKey + " max: " + maxRecords;
-//        }
-//    }
-//
-//    private class AddMessageOperation extends OperationBase<Object> {
-//
-//        private final BrokerMessageDelivery brokerDelivery;
-//        private final SaveableQueueElement<MessageDelivery> singleElement;
-//        private final MessageDelivery delivery;
-//        private MessageRecord record;
-//        private LinkedList<SaveableQueueElement<MessageDelivery>> notifyTargets;
-//        private final boolean delayable;
-//
-//        public AddMessageOperation(BrokerMessageDelivery delivery) {
-//            this.brokerDelivery = delivery;
-//            this.singleElement = null;
-//            this.delivery = delivery;
-//            this.delayable = delivery.isFlushDelayable();
-//            if (!delayable) {
-//                this.record = delivery.createMessageRecord();
-//            }
-//        }
-//
-//        public AddMessageOperation(SaveableQueueElement<MessageDelivery> queueElement) {
-//            this.brokerDelivery = null;
-//            singleElement = queueElement;
-//            delivery = queueElement.getElement();
-//            this.record = singleElement.getElement().createMessageRecord();
-//            delayable = false;
-//        }
-//
-//        public boolean isDelayable() {
-//            return delayable;
-//        }
-//
-//        @Override
-//        public int getLimiterSize() {
-//            return delivery.getFlowLimiterSize() + BASE_MEM_SIZE + 40;
-//        }
-//
-//        @Override
-//        protected void doExcecute(Session session) {
-//
-//            if (singleElement == null) {
-//                brokerDelivery.beginStore();
-//                Collection<SaveableQueueElement<MessageDelivery>> targets = brokerDelivery.getPersistentQueues();
-//
-//                if (targets != null && !targets.isEmpty()) {
-//                    if (record == null) {
-//                        record = brokerDelivery.createMessageRecord();
-//                        if (record == null) {
-//                            throw new RuntimeException("Error creating message record for " + brokerDelivery.getMsgId());
-//                        }
-//                    }
-//                    record.setId(brokerDelivery.getStoreTracking());
-//                    session.messageAdd(record);
-//
-//                    for (SaveableQueueElement<MessageDelivery> target : targets) {
-//                        try {
-//                            QueueRecord queueRecord = new QueueRecord();
-//                            queueRecord.setAttachment(null);
-//                            queueRecord.setMessageKey(record.getId());
-//                            queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
-//                            queueRecord.setQueueKey(target.getSequenceNumber());
-//                            session.queueAddMessage(target.getQueueDescriptor(), queueRecord);
-//
-//                        } catch (KeyNotFoundException e) {
-//                            e.printStackTrace();
-//                        }
-//
-//                        if (target.requestSaveNotify()) {
-//                            if (notifyTargets == null) {
-//                                notifyTargets = new LinkedList<SaveableQueueElement<MessageDelivery>>();
-//                            }
-//                            notifyTargets.add(target);
-//                        }
-//                    }
-//                } else {
-//                    // Save with no targets must have been cancelled:
-//                    // System.out.println("Skipping save for " +
-//                    // delivery.getStoreTracking());
-//                }
-//            } else {
-//
-//                session.messageAdd(record);
-//                try {
-//                    QueueRecord queueRecord = new QueueRecord();
-//                    queueRecord.setAttachment(null);
-//                    queueRecord.setMessageKey(record.getId());
-//                    queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
-//                    queueRecord.setQueueKey(singleElement.getSequenceNumber());
-//                    session.queueAddMessage(singleElement.getQueueDescriptor(), queueRecord);
-//                } catch (KeyNotFoundException e) {
-//                    e.printStackTrace();
-//                }
-//            }
-//        }
-//
-//        @Override
-//        public void onCommit() {
-//
-//            // Notify that the message was persisted.
-//            delivery.onMessagePersisted();
-//
-//            // Notify any of the targets that requested notify on save:
-//            if (singleElement != null && singleElement.requestSaveNotify()) {
-//                singleElement.notifySave();
-//            } else if (notifyTargets != null) {
-//                for (SaveableQueueElement<MessageDelivery> notify : notifyTargets) {
-//                    notify.notifySave();
-//                }
-//            }
-//
-//            super.onCommit();
-//        }
-//
-//        public String toString() {
-//            return "AddOperation " + delivery.getStoreTracking() + super.toString();
-//        }
-//    }
-//
-//    private class MapUpdateOperation extends OperationBase<Object> {
-//        final AsciiBuffer map;
-//        final AsciiBuffer key;
-//        final Buffer value;
-//
-//        MapUpdateOperation(AsciiBuffer mapName, AsciiBuffer key, Buffer value) {
-//            this.map = mapName;
-//            this.key = key;
-//            this.value = value;
-//        }
-//
-//        @Override
-//        public int getLimiterSize() {
-//            return BASE_MEM_SIZE + map.length + key.length + value.length;
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.apollo.broker.BrokerDatabase.OperationBase#doExcecute
-//         * (org.apache.activemq.broker.store.Store.Session)
-//         */
-//        @Override
-//        protected void doExcecute(Session session) {
-//            try {
-//                session.mapEntryPut(map, key, value);
-//            } catch (KeyNotFoundException e) {
-//                throw new Store.FatalStoreException(e);
-//            }
-//        }
-//    }
-//
-//    private class RestoredElementImpl<T> implements RestoredElement<T> {
-//        QueueRecord qRecord;
-//        QueueDescriptor queue;
-//        MessageRecord mRecord;
-//        MessageRecordMarshaller<T> marshaller;
-//        long nextSequence;
-//
-//        public T getElement() throws IOException {
-//            if (mRecord == null) {
-//                return null;
-//            }
-//            return marshaller.unMarshall(mRecord, queue);
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.queue.QueueStore.RestoredElement#getSequenceNumber
-//         * ()
-//         */
-//        public long getSequenceNumber() {
-//            return qRecord.getQueueKey();
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.queue.QueueStore.RestoredElement#getStoreTracking
-//         * ()
-//         */
-//        public long getStoreTracking() {
-//            return qRecord.getMessageKey();
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @seeorg.apache.activemq.queue.QueueStore.RestoredElement#
-//         * getNextSequenceNumber()
-//         */
-//        public long getNextSequenceNumber() {
-//            return nextSequence;
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.queue.QueueStore.RestoredElement#getElementSize()
-//         */
-//        public int getElementSize() {
-//            return qRecord.getSize();
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.queue.QueueStore.RestoredElement#getExpiration()
-//         */
-//        public long getExpiration() {
-//            return qRecord.getTte();
-//        }
-//    }
-//
-//    public long allocateStoreTracking() {
-//        return store.allocateStoreTracking();
-//    }
-//
-//    public void setDispatchQueue(DispatchQueue queue) {
-//        this.dispatcher = queue;
-//    }
-//
-//
-//    /**
-//     * @param sqe
-//     * @param source
-//     * @param delayable
-//     */
-//    public <T> OperationContext<?> saveQeueuElement(SaveableQueueElement<T> sqe, ISourceController<?> source, boolean delayable, MessageRecordMarshaller<T> marshaller) {
-//        return add(new AddElementOperation<T>(sqe, delayable, marshaller), source, !delayable);
-//    }
-//
-//    private class AddElementOperation<T> extends OperationBase<Object> {
-//
-//        private final SaveableQueueElement<T> op;
-//        private MessageRecord record;
-//        private boolean delayable;
-//        private final MessageRecordMarshaller<T> marshaller;
-//
-//        public AddElementOperation(SaveableQueueElement<T> op, boolean delayable, MessageRecordMarshaller<T> marshaller) {
-//            this.op = op;
-//            this.delayable = delayable;
-//            if (!delayable) {
-//                record = marshaller.marshal(op.getElement());
-//                this.marshaller = null;
-//            } else {
-//                this.marshaller = marshaller;
-//            }
-//        }
-//
-//        public boolean isDelayable() {
-//            return delayable;
-//        }
-//
-//        @Override
-//        public int getLimiterSize() {
-//            return op.getLimiterSize() + BASE_MEM_SIZE + 32;
-//        }
-//
-//        @Override
-//        protected void doExcecute(Session session) {
-//
-//            if (record == null) {
-//                record = marshaller.marshal(op.getElement());
-//            }
-//
-//            session.messageAdd(record);
-//            try {
-//                QueueRecord queueRecord = new QueueRecord();
-//                queueRecord.setAttachment(null);
-//                queueRecord.setMessageKey(record.getId());
-//                queueRecord.setSize(record.getSize());
-//                queueRecord.setQueueKey(op.getSequenceNumber());
-//                session.queueAddMessage(op.getQueueDescriptor(), queueRecord);
-//            } catch (KeyNotFoundException e) {
-//                e.printStackTrace();
-//            }
-//        }
-//
-//        public String toString() {
-//            return "AddTxOpOperation " + record.getId() + super.toString();
-//        }
-//    }
-//
-//    public long getFlushDelay() {
-//        return flushDelay;
-//    }
-//
-//    public void setFlushDelay(long flushDelay) {
-//        this.flushDelay = flushDelay;
-//    }
-//
-//    /**
-//     * @return true if operations are allowed to bypass the store.
-//     */
-//    public boolean isStoreBypass() {
-//        return storeBypass;
-//    }
-//
-//    /**
-//     * Sets if persistent operations should be allowed to bypass the store.
-//     * Defaults to true, as this will give you the best performance. In some
-//     * cases, you want to disable this as the store being used will double as an
-//     * audit log and you do not want any persistent operations to bypass the
-//     * store.
-//     *
-//     * When store bypass is disabled, all {@link Operation#cancel()} requests
-//     * will return false.
-//     *
-//     * @param enable
-//     *            if true will enable store bypass
-//     */
-//    public void setStoreBypass(boolean enable) {
-//        this.storeBypass = enable;
-//    }
-//
-//}
-
-
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class UserAlreadyConnectedException extends Exception
-
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class BrokerQueueStore { // implements QueueStore<Long, MessageDelivery> {
-// TODO:
-//    private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
-//    private static final boolean USE_OLD_QUEUE = false;
-//    private static final boolean USE_PRIORITY_QUEUES = true;
-//
-//    private BrokerDatabase database;
-//    private DispatchQueue dispatchQueue;
-//
-//    private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
-//    private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
-//         * #marshal(java.lang.Object)
-//         */
-//        public MessageRecord marshal(MessageDelivery element) {
-//            return element.createMessageRecord();
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
-//         * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
-//         */
-//        public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
-//            ProtocolHandler handler = protocolHandlers.get(record.getProtocol().toString());
-//            if (handler == null) {
-//                try {
-//                    handler = ProtocolHandlerFactory.createProtocolHandler(record.getProtocol().toString());
-//                    protocolHandlers.put(record.getProtocol().toString(), handler);
-//                } catch (Throwable thrown) {
-//                    throw new RuntimeException("Unknown message format" + record.getProtocol().toString(), thrown);
-//                }
-//            }
-//            try {
-//                return handler.createMessageDelivery(record);
-//            } catch (IOException ioe) {
-//                throw new RuntimeException(ioe);
-//            }
-//        }
-//    };
-//
-//    final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> getMessageMarshaller() {
-//        return MESSAGE_MARSHALLER;
-//    }
-//
-//    private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
-//        public Long map(MessageDelivery element) {
-//            return element.getExpiration();
-//        }
-//    };
-//
-//    private static final Mapper<Integer, MessageDelivery> SIZE_MAPPER = new Mapper<Integer, MessageDelivery>() {
-//        public Integer map(MessageDelivery element) {
-//            return element.getFlowLimiterSize();
-//        }
-//    };
-//
-//    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
-//        public Integer map(MessageDelivery element) {
-//            return element.getPriority();
-//        }
-//    };
-//
-//    static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
-//        public Long map(MessageDelivery element) {
-//            return element.getStoreTracking();
-//        }
-//    };
-//
-//    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
-//        public Integer map(MessageDelivery element) {
-//            // we modulo 10 to have at most 10 partitions which the producers
-//            // gets split across.
-//            return (int) (element.getProducerId().hashCode() % 10);
-//        }
-//    };
-//
-//    public static final short SUBPARTITION_TYPE = 0;
-//    public static final short SHARED_QUEUE_TYPE = 1;
-//    public static final short DURABLE_QUEUE_TYPE = 2;
-//    public static short TRANSACTION_QUEUE_TYPE = 3;
-//
-//    private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
-//    private final HashMap<String, IQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
-//
-//    private Mapper<Integer, MessageDelivery> partitionMapper;
-//
-//    private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1;
-//    private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
-//    // Be default we don't page out elements to disk.
-//    private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
-//    //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
-//
-//    private static long dynamicQueueCounter = 0;
-//
-//    private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
-//
-//        private static final boolean PAGING_ENABLED = DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
-//
-//        public boolean isPersistent(MessageDelivery elem) {
-//            return elem.isPersistent();
-//        }
-//
-//        public boolean isPageOutPlaceHolders() {
-//            return true;
-//        }
-//
-//        public boolean isPagingEnabled() {
-//            return PAGING_ENABLED;
-//        }
-//
-//        public int getPagingInMemorySize() {
-//            return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
-//        }
-//
-//        public boolean isThrottleSourcesToMemoryLimit() {
-//            // Keep the queue in memory.
-//            return true;
-//        }
-//
-//        public int getDisconnectedThrottleRate() {
-//            // By default don't throttle consumers when disconnected.
-//            return 0;
-//        }
-//
-//        public int getRecoveryBias() {
-//            return 8;
-//        }
-//    };
-//
-//    private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
-//    private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
-//    // Be default we don't page out elements to disk.
-//    //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-//    private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
-//
-//    private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
-//
-//        private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-//
-//        public boolean isPersistent(MessageDelivery elem) {
-//            return elem.isPersistent();
-//        }
-//
-//        public boolean isPageOutPlaceHolders() {
-//            return true;
-//        }
-//
-//        public boolean isPagingEnabled() {
-//            return PAGING_ENABLED;
-//        }
-//
-//        public int getPagingInMemorySize() {
-//            return DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-//        }
-//
-//        public boolean isThrottleSourcesToMemoryLimit() {
-//            // Keep the queue in memory.
-//            return true;
-//        }
-//
-//        public int getDisconnectedThrottleRate() {
-//            // By default don't throttle consumers when disconnected.
-//            return 0;
-//        }
-//
-//        public int getRecoveryBias() {
-//            return 8;
-//        }
-//    };
-//
-//    public void setDatabase(BrokerDatabase database) {
-//        this.database = database;
-//    }
-//
-//    public void setDispatchQueue(DispatchQueue dispatchQueue) {
-//        this.dispatchQueue = dispatchQueue;
-//    }
-//
-//    public void loadQueues() throws Exception {
-//
-//        // Load shared queues
-//        Iterator<QueueQueryResult> results = database.listQueues(SHARED_QUEUE_TYPE);
-//        while (results.hasNext()) {
-//            QueueQueryResult loaded = results.next();
-//            IQueue<Long, MessageDelivery> queue = createRestoredQueue(null, loaded);
-//            sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
-//            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
-//        }
-//
-//        // Load durable queues
-//        results = database.listQueues(DURABLE_QUEUE_TYPE);
-//        while (results.hasNext()) {
-//            QueueQueryResult loaded = results.next();
-//            IQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
-//            durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
-//            LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
-//
-//        }
-//    }
-//
-//    private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
-//
-//        IQueue<Long, MessageDelivery> queue;
-//        if (parent != null) {
-//            queue = parent.createPartition(loaded.getDescriptor().getPartitionKey());
-//        } else {
-//            queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
-//        }
-//
-//        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
-//
-//        // Creat the child queues
-//        Collection<QueueQueryResult> children = loaded.getPartitions();
-//        if (children != null) {
-//            try {
-//                IPartitionedQueue<Long, MessageDelivery> pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue;
-//                for (QueueQueryResult child : children) {
-//                    createRestoredQueue(pQueue, child);
-//                }
-//            } catch (ClassCastException cce) {
-//                LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName());
-//                throw cce;
-//            }
-//        }
-//
-//        return queue;
-//
-//    }
-//
-//    private IQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
-//
-//        ExclusivePersistentQueue<Long, MessageDelivery> queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
-//        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
-//
-//        //TODO implement this for priority queue:
-//        // Create the child queues
-//        /*
-//         * Collection<QueueQueryResult> children = loaded.getPartitions(); if
-//         * (children != null) { try { IPartitionedQueue<Long, MessageDelivery>
-//         * pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue; for
-//         * (QueueQueryResult child : children) { createRestoredQueue(pQueue,
-//         * child); } } catch (ClassCastException cce) {
-//         * LOG.error("Loaded partition for unpartitionable queue: " +
-//         * queue.getResourceName()); throw cce; } }
-//         */
-//
-//        return queue;
-//
-//    }
-//
-//    public IQueue<Long, MessageDelivery> getQueue(AsciiBuffer queueName) {
-//        //TODO
-//        return null;
-//    }
-//
-//    public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
-//        synchronized (this) {
-//            Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
-//            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
-//            ret.addAll(c);
-//            return ret;
-//        }
-//    }
-//
-//    public IQueue<Long, MessageDelivery> createDurableQueue(String name) {
-//        IQueue<Long, MessageDelivery> queue = null;
-//        synchronized (this) {
-//            queue = durableQueues.get(name);
-//            if (queue == null) {
-//                queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
-//                queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
-//                queue.initialize(0, 0, 0, 0);
-//                durableQueues.put(name, queue);
-//                addQueue(queue.getDescriptor());
-//            }
-//        }
-//
-//        return queue;
-//    }
-//
-//    public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
-//        ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
-//        synchronized (this) {
-//            String name = "temp:" + (dynamicQueueCounter++);
-//            queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
-//            queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
-//            queue.initialize(0, 0, 0, 0);
-//            addQueue(queue.getDescriptor());
-//        }
-//        return queue;
-//    }
-//
-//    public Collection<IQueue<Long, MessageDelivery>> getDurableQueues() {
-//        synchronized (this) {
-//            Collection<IQueue<Long, MessageDelivery>> c = durableQueues.values();
-//            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
-//            ret.addAll(c);
-//            return ret;
-//        }
-//    }
-//
-//    public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
-//
-//        IQueue<Long, MessageDelivery> queue = null;
-//        synchronized (this) {
-//            queue = sharedQueues.get(name);
-//            if (queue == null) {
-//                queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
-//                queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE);
-//                queue.initialize(0, 0, 0, 0);
-//                sharedQueues.put(name, queue);
-//                addQueue(queue.getDescriptor());
-//            }
-//        }
-//
-//        return queue;
-//    }
-//
-//    private ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueueInternal(final String name, short type) {
-//        ExclusivePersistentQueue<Long, MessageDelivery> queue;
-//
-//        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) {
-//            @Override
-//            public int getElementSize(MessageDelivery elem) {
-//                return elem.getFlowLimiterSize();
-//            }
-//        };
-//        queue = new ExclusivePersistentQueue<Long, MessageDelivery>(name, limiter);
-//        queue.setStore(this);
-//        queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY);
-//        queue.setExpirationMapper(EXPIRATION_MAPPER);
-//        return queue;
-//    }
-//
-//    private IQueue<Long, MessageDelivery> createSharedQueueInternal(final String name, short type) {
-//
-//        IQueue<Long, MessageDelivery> ret;
-//
-//        switch (type) {
-//        case QueueDescriptor.PARTITIONED: {
-//            PartitionedQueue<Long, MessageDelivery> queue = new PartitionedQueue<Long, MessageDelivery>(name) {
-//                @Override
-//                public IQueue<Long, MessageDelivery> createPartition(int partitionKey) {
-//                    IQueue<Long, MessageDelivery> queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
-//                    queue.getDescriptor().setPartitionId(partitionKey);
-//                    queue.getDescriptor().setParent(this.getDescriptor().getQueueName());
-//                    return queue;
-//                }
-//
-//            };
-//            queue.setPartitionMapper(partitionMapper);
-//
-//            ret = queue;
-//            break;
-//        }
-//        case QueueDescriptor.SHARED_PRIORITY: {
-//            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10);
-//            limiter.setPriorityMapper(PRIORITY_MAPPER);
-//            limiter.setSizeMapper(SIZE_MAPPER);
-//            SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
-//            ret = queue;
-//            queue.setKeyMapper(KEY_MAPPER);
-//            queue.setAutoRelease(true);
-//            break;
-//        }
-//        case QueueDescriptor.SHARED: {
-//            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD) {
-//                @Override
-//                public int getElementSize(MessageDelivery elem) {
-//                    return elem.getFlowLimiterSize();
-//                }
-//            };
-//
-//            if (!USE_OLD_QUEUE) {
-//                SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
-//                sQueue.setKeyMapper(KEY_MAPPER);
-//                sQueue.setAutoRelease(true);
-//                ret = sQueue;
-//            } else {
-//                SharedQueueOld<Long, MessageDelivery> sQueue = new SharedQueueOld<Long, MessageDelivery>(name, limiter);
-//                sQueue.setKeyMapper(KEY_MAPPER);
-//                sQueue.setAutoRelease(true);
-//                ret = sQueue;
-//            }
-//            break;
-//        }
-//        default: {
-//            throw new IllegalArgumentException("Unknown queue type" + type);
-//        }
-//        }
-//        ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
-//        ret.setStore(this);
-//        ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
-//        ret.setExpirationMapper(EXPIRATION_MAPPER);
-//
-//        return ret;
-//    }
-//
-//    public final void deleteQueueElement(SaveableQueueElement<MessageDelivery> sqe) {
-//        MessageDelivery md = sqe.getElement();
-//        //If the message delivery isn't null, funnel through it
-//        //since the message may not yet be in the store:
-//        if (md != null) {
-//            md.acknowledge(sqe);
-//        } else {
-//            database.deleteQueueElement(sqe);
-//        }
-//
-//    }
-//
-//    public final boolean isFromStore(MessageDelivery elem) {
-//        return elem.isFromStore();
-//    }
-//
-//    public final void persistQueueElement(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
-//        elem.getElement().persist(elem, controller, delayable);
-//    }
-//
-//    public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
-//        database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
-//    }
-//
-//    public final void addQueue(QueueDescriptor queue) {
-//        database.addQueue(queue);
-//    }
-//
-//    public final void deleteQueue(QueueDescriptor queue) {
-//        database.deleteQueue(queue);
-//    }
-
-    def setDatabase(database:Store ) = {
-    }
-
-    def setDispatchQueue(dispatchQueue:DispatchQueue )= {
-    }
-
-    def loadQueues() ={
-    }
-}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties?rev=961157&r1=961156&r2=961157&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/resources/log4j.properties Wed Jul  7 04:11:57 2010
@@ -20,12 +20,13 @@
 #
 log4j.rootLogger=WARN, console, file
 log4j.logger.org.apache.activemq=TRACE
+log4j.logger.org=TRACE
 
 # Console will only display warnnings
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n
-log4j.appender.console.threshold=TRACE
+log4j.appender.console.threshold=INFO
 
 # File appender will contain all info messages
 log4j.appender.file=org.apache.log4j.FileAppender