You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:39:04 UTC

svn commit: r961067 [3/5] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/java/org/apache/activemq/apollo/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/test/java/org/apache/activemq/broker/...

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerMessageDelivery.java Wed Jul  7 03:39:03 2010
@@ -21,262 +21,260 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 
-import org.apache.activemq.apollo.broker.BrokerDatabase.OperationContext;
 import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store.MessageRecord;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.activemq.util.buffer.Buffer;
 
 public abstract class BrokerMessageDelivery implements MessageDelivery {
 
-    // True while the message is being dispatched to the delivery targets:
-    boolean dispatching = false;
-
-    // A non null pending save indicates that the message is the
-    // saver queue and that the message
-    OperationContext<?> pendingSave;
-
-    // List of persistent targets for which the message should be saved
-    // when dispatch is complete:
-    HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
-    SaveableQueueElement<MessageDelivery> singleTarget;
-
-    long storeTracking = -1;
-    BrokerDatabase store;
-    boolean fromStore = false;
-    boolean enableFlushDelay = true;
-    private int limiterSize = -1;
-    private long tid=-1;
-
-    public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
-        fromStore = true;
-        store = database;
-        storeTracking = mRecord.getKey();
-        limiterSize = mRecord.getSize();
-    }
-
-    public final int getFlowLimiterSize() {
-        if (limiterSize == -1) {
-            limiterSize = getMemorySize();
-        }
-        return limiterSize;
-    }
-
-    /**
-     * When an application wishes to include a message in a broker transaction
-     * it must set this the tid returned by {@link Transaction#getTid()}
-     * 
-     * @param tid
-     *            Sets the tid used to identify the transaction at the broker.
-     */
-    public void setTransactionId(long tid) {
-        this.tid = tid;
-    }
-
-    /**
-     * @return The tid used to identify the transaction at the broker.
-     */
-    public final long getTransactionId() {
-        return tid;
-    }
-
-    public final void clearTransactionId() {
-        tid = -1;
-    }
-
-    /**
-     * Subclass must implement this to return their current memory size
-     * estimate.
-     * 
-     * @return The memory size of the message.
-     */
-    public abstract int getMemorySize();
-
-    public final boolean isFromStore() {
-        return fromStore;
-    }
-
-    public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
-        synchronized (this) {
-            // Can flush of this message to the store be delayed?
-            if (enableFlushDelay && !delayable) {
-                enableFlushDelay = false;
-            }
-            // If this message is being dispatched then add the queue to the
-            // list of queues for which to save the message when dispatch is
-            // finished:
-            if (dispatching) {
-                addPersistentTarget(sqe);
-                return;
-            }
-            // Otherwise, if it is still in the saver queue, we can add this
-            // queue to the queue list:
-            else if (pendingSave != null) {
-                addPersistentTarget(sqe);
-                if (!delayable) {
-                    pendingSave.requestFlush();
-                }
-                return;
-            }
-        }
-
-        store.saveMessage(sqe, controller, delayable);
-    }
-
-    public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
-        boolean firePersistListener = false;
-        boolean deleted = false;
-        synchronized (this) {
-            // If the message hasn't been saved to the database
-            // then we don't need to issue a delete:
-            if (dispatching || pendingSave != null) {
-
-                deleted = true;
-
-                removePersistentTarget(sqe.getQueueDescriptor());
-                // We get a save context when we place the message in the
-                // database queue. If it has been added to the queue,
-                // and we've removed the last queue, see if we can cancel
-                // the save:
-                if (pendingSave != null && !hasPersistentTargets()) {
-                    if (pendingSave.cancel()) {
-                        pendingSave = null;
-                        if (isPersistent()) {
-                            firePersistListener = true;
-                        }
-                    }
-                }
-            }
-        }
-
-        if (!deleted) {
-            store.deleteQueueElement(sqe);
-        }
-
-        if (firePersistListener) {
-            onMessagePersisted();
-        }
-
-    }
-
-    public final void setStoreTracking(long tracking) {
-        if (storeTracking == -1) {
-            storeTracking = tracking;
-        }
-    }
-
-    public final void beginDispatch(BrokerDatabase database) {
-        this.store = database;
-        dispatching = true;
-        setStoreTracking(database.allocateStoreTracking());
-    }
-
-    public long getStoreTracking() {
-        return storeTracking;
-    }
-
-    public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
-        if (singleTarget != null) {
-            ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
-            list.add(singleTarget);
-            return list;
-        } else if (persistentTargets != null) {
-            return persistentTargets.values();
-        }
-        return null;
-    }
-
-    public void beginStore() {
-        synchronized (this) {
-            pendingSave = null;
-        }
-    }
-
-    private final boolean hasPersistentTargets() {
-        return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
-    }
-
-    private final void removePersistentTarget(QueueDescriptor queue) {
-        if (persistentTargets != null) {
-            persistentTargets.remove(queue);
-            return;
-        }
-
-        if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
-            singleTarget = null;
-        }
-    }
-
-    private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
-        if (persistentTargets != null) {
-            persistentTargets.put(elem.getQueueDescriptor(), elem);
-            return;
-        }
-
-        if (singleTarget == null) {
-            singleTarget = elem;
-            return;
-        }
-
-        if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
-            persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
-            persistentTargets.put(elem.getQueueDescriptor(), elem);
-            persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
-            singleTarget = null;
-        }
-    }
-
-    public final void finishDispatch(ISourceController<?> controller) throws IOException {
-        boolean firePersistListener = false;
-        synchronized (this) {
-            // If any of the targets requested save then save the message
-            // Note that this could be the case even if the message isn't
-            // persistent if a target requested that the message be spooled
-            // for some other reason such as queue memory overflow.
-            if (hasPersistentTargets()) {
-                pendingSave = store.persistReceivedMessage(this, controller);
-            }
-
-            // If none of the targets required persistence, then fire the
-            // persist listener:
-            if (pendingSave == null || !isPersistent()) {
-                firePersistListener = true;
-            }
-            dispatching = false;
-        }
-
-        if (firePersistListener) {
-            onMessagePersisted();
-        }
-    }
-
-    public final MessageRecord createMessageRecord() {
-
-        MessageRecord record = new MessageRecord();
-        record.setEncoding(getStoreEncoding());
-        record.setBuffer(getStoreEncoded());
-        record.setStreamKey((long) 0);
-        record.setMessageId(getMsgId());
-        record.setSize(getFlowLimiterSize());
-        record.setKey(getStoreTracking());
-        return record;
-    }
-
-    /**
-     * @return A buffer representation of the message to be stored in the store.
-     * @throws  
-     */
-    protected abstract Buffer getStoreEncoded();
-
-    /**
-     * @return The encoding scheme used to store the message.
-     */
-    protected abstract AsciiBuffer getStoreEncoding();
-
-    public boolean isFlushDelayable() {
-        // TODO Auto-generated method stub
-        return enableFlushDelay;
-    }
+// TODO:    
+//    // True while the message is being dispatched to the delivery targets:
+//    boolean dispatching = false;
+//
+//    // A non null pending save indicates that the message is the
+//    // saver queue and that the message
+//    OperationContext<?> pendingSave;
+//
+//    // List of persistent targets for which the message should be saved
+//    // when dispatch is complete:
+//    HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
+//    SaveableQueueElement<MessageDelivery> singleTarget;
+//
+//    long storeTracking = -1;
+//    BrokerDatabase store;
+//    boolean fromStore = false;
+//    boolean enableFlushDelay = true;
+//    private int limiterSize = -1;
+//    private long tid=-1;
+//
+//    public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
+//        fromStore = true;
+//        store = database;
+//        storeTracking = mRecord.getKey();
+//        limiterSize = mRecord.getSize();
+//    }
+//
+//    public final int getFlowLimiterSize() {
+//        if (limiterSize == -1) {
+//            limiterSize = getMemorySize();
+//        }
+//        return limiterSize;
+//    }
+//
+//    /**
+//     * When an application wishes to include a message in a broker transaction
+//     * it must set this the tid returned by {@link Transaction#getTid()}
+//     *
+//     * @param tid
+//     *            Sets the tid used to identify the transaction at the broker.
+//     */
+//    public void setTransactionId(long tid) {
+//        this.tid = tid;
+//    }
+//
+//    /**
+//     * @return The tid used to identify the transaction at the broker.
+//     */
+//    public final long getTransactionId() {
+//        return tid;
+//    }
+//
+//    public final void clearTransactionId() {
+//        tid = -1;
+//    }
+//
+//    /**
+//     * Subclass must implement this to return their current memory size
+//     * estimate.
+//     *
+//     * @return The memory size of the message.
+//     */
+//    public abstract int getMemorySize();
+//
+//    public final boolean isFromStore() {
+//        return fromStore;
+//    }
+//
+//    public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
+//        synchronized (this) {
+//            // Can flush of this message to the store be delayed?
+//            if (enableFlushDelay && !delayable) {
+//                enableFlushDelay = false;
+//            }
+//            // If this message is being dispatched then add the queue to the
+//            // list of queues for which to save the message when dispatch is
+//            // finished:
+//            if (dispatching) {
+//                addPersistentTarget(sqe);
+//                return;
+//            }
+//            // Otherwise, if it is still in the saver queue, we can add this
+//            // queue to the queue list:
+//            else if (pendingSave != null) {
+//                addPersistentTarget(sqe);
+//                if (!delayable) {
+//                    pendingSave.requestFlush();
+//                }
+//                return;
+//            }
+//        }
+//
+//        store.saveMessage(sqe, controller, delayable);
+//    }
+//
+//    public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
+//        boolean firePersistListener = false;
+//        boolean deleted = false;
+//        synchronized (this) {
+//            // If the message hasn't been saved to the database
+//            // then we don't need to issue a delete:
+//            if (dispatching || pendingSave != null) {
+//
+//                deleted = true;
+//
+//                removePersistentTarget(sqe.getQueueDescriptor());
+//                // We get a save context when we place the message in the
+//                // database queue. If it has been added to the queue,
+//                // and we've removed the last queue, see if we can cancel
+//                // the save:
+//                if (pendingSave != null && !hasPersistentTargets()) {
+//                    if (pendingSave.cancel()) {
+//                        pendingSave = null;
+//                        if (isPersistent()) {
+//                            firePersistListener = true;
+//                        }
+//                    }
+//                }
+//            }
+//        }
+//
+//        if (!deleted) {
+//            store.deleteQueueElement(sqe);
+//        }
+//
+//        if (firePersistListener) {
+//            onMessagePersisted();
+//        }
+//
+//    }
+//
+//    public final void setStoreTracking(long tracking) {
+//        if (storeTracking == -1) {
+//            storeTracking = tracking;
+//        }
+//    }
+//
+//    public final void beginDispatch(BrokerDatabase database) {
+//        this.store = database;
+//        dispatching = true;
+//        setStoreTracking(database.allocateStoreTracking());
+//    }
+//
+//    public long getStoreTracking() {
+//        return storeTracking;
+//    }
+//
+//    public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
+//        if (singleTarget != null) {
+//            ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
+//            list.add(singleTarget);
+//            return list;
+//        } else if (persistentTargets != null) {
+//            return persistentTargets.values();
+//        }
+//        return null;
+//    }
+//
+//    public void beginStore() {
+//        synchronized (this) {
+//            pendingSave = null;
+//        }
+//    }
+//
+//    private final boolean hasPersistentTargets() {
+//        return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
+//    }
+//
+//    private final void removePersistentTarget(QueueDescriptor queue) {
+//        if (persistentTargets != null) {
+//            persistentTargets.remove(queue);
+//            return;
+//        }
+//
+//        if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
+//            singleTarget = null;
+//        }
+//    }
+//
+//    private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
+//        if (persistentTargets != null) {
+//            persistentTargets.put(elem.getQueueDescriptor(), elem);
+//            return;
+//        }
+//
+//        if (singleTarget == null) {
+//            singleTarget = elem;
+//            return;
+//        }
+//
+//        if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
+//            persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
+//            persistentTargets.put(elem.getQueueDescriptor(), elem);
+//            persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
+//            singleTarget = null;
+//        }
+//    }
+//
+//    public final void finishDispatch(ISourceController<?> controller) throws IOException {
+//        boolean firePersistListener = false;
+//        synchronized (this) {
+//            // If any of the targets requested save then save the message
+//            // Note that this could be the case even if the message isn't
+//            // persistent if a target requested that the message be spooled
+//            // for some other reason such as queue memory overflow.
+//            if (hasPersistentTargets()) {
+//                pendingSave = store.persistReceivedMessage(this, controller);
+//            }
+//
+//            // If none of the targets required persistence, then fire the
+//            // persist listener:
+//            if (pendingSave == null || !isPersistent()) {
+//                firePersistListener = true;
+//            }
+//            dispatching = false;
+//        }
+//
+//        if (firePersistListener) {
+//            onMessagePersisted();
+//        }
+//    }
+//
+//    public final MessageRecord createMessageRecord() {
+//
+//        MessageRecord record = new MessageRecord();
+//        record.setEncoding(getStoreEncoding());
+//        record.setBuffer(getStoreEncoded());
+//        record.setStreamKey((long) 0);
+//        record.setMessageId(getMsgId());
+//        record.setSize(getFlowLimiterSize());
+//        record.setKey(getStoreTracking());
+//        return record;
+//    }
+//
+//    /**
+//     * @return A buffer representation of the message to be stored in the store.
+//     * @throws
+//     */
+//    protected abstract Buffer getStoreEncoded();
+//
+//    /**
+//     * @return The encoding scheme used to store the message.
+//     */
+//    protected abstract AsciiBuffer getStoreEncoding();
+//
+//    public boolean isFlushDelayable() {
+//        // TODO Auto-generated method stub
+//        return enableFlushDelay;
+//    }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerQueueStore.java Wed Jul  7 03:39:03 2010
@@ -25,460 +25,455 @@ import java.util.Iterator;
 import org.apache.activemq.broker.store.QueueDescriptor;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.PrioritySizeLimiter;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.queue.ExclusivePersistentQueue;
-import org.apache.activemq.queue.IPartitionedQueue;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.PartitionedQueue;
-import org.apache.activemq.queue.PersistencePolicy;
-import org.apache.activemq.queue.QueueStore;
-import org.apache.activemq.queue.RestoreListener;
-import org.apache.activemq.queue.SaveableQueueElement;
-import org.apache.activemq.queue.SharedPriorityQueue;
-import org.apache.activemq.queue.SharedQueue;
-import org.apache.activemq.queue.SharedQueueOld;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.fusesource.hawtdispatch.DispatchQueue;
 
-public class BrokerQueueStore implements QueueStore<Long, MessageDelivery> {
-
-    private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
-    private static final boolean USE_OLD_QUEUE = false;
-    private static final boolean USE_PRIORITY_QUEUES = true;
-
-    private BrokerDatabase database;
-    private DispatchQueue dispatchQueue;
-
-    private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
-    private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
-         * #marshal(java.lang.Object)
-         */
-        public MessageRecord marshal(MessageDelivery element) {
-            return element.createMessageRecord();
-        }
-
-        /*
-         * (non-Javadoc)
-         * 
-         * @see
-         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
-         * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
-         */
-        public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
-            ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString());
-            if (handler == null) {
-                try {
-                    handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString());
-                    protocolHandlers.put(record.getEncoding().toString(), handler);
-                } catch (Throwable thrown) {
-                    throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown);
-                }
-            }
-            try {
-                return handler.createMessageDelivery(record);
-            } catch (IOException ioe) {
-                throw new RuntimeException(ioe);
-            }
-        }
-    };
-
-    final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> getMessageMarshaller() {
-        return MESSAGE_MARSHALLER;
-    }
-
-    private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
-        public Long map(MessageDelivery element) {
-            return element.getExpiration();
-        }
-    };
-
-    private static final Mapper<Integer, MessageDelivery> SIZE_MAPPER = new Mapper<Integer, MessageDelivery>() {
-        public Integer map(MessageDelivery element) {
-            return element.getFlowLimiterSize();
-        }
-    };
-
-    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
-        public Integer map(MessageDelivery element) {
-            return element.getPriority();
-        }
-    };
-
-    static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
-        public Long map(MessageDelivery element) {
-            return element.getStoreTracking();
-        }
-    };
-
-    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
-        public Integer map(MessageDelivery element) {
-            // we modulo 10 to have at most 10 partitions which the producers
-            // gets split across.
-            return (int) (element.getProducerId().hashCode() % 10);
-        }
-    };
-
-    public static final short SUBPARTITION_TYPE = 0;
-    public static final short SHARED_QUEUE_TYPE = 1;
-    public static final short DURABLE_QUEUE_TYPE = 2;
-    public static short TRANSACTION_QUEUE_TYPE = 3;
-
-    private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
-    private final HashMap<String, IQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
-
-    private Mapper<Integer, MessageDelivery> partitionMapper;
-
-    private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1;
-    private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
-    // Be default we don't page out elements to disk.
-    private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
-    //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
-
-    private static long dynamicQueueCounter = 0;
-
-    private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
-
-        private static final boolean PAGING_ENABLED = DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
-
-        public boolean isPersistent(MessageDelivery elem) {
-            return elem.isPersistent();
-        }
-
-        public boolean isPageOutPlaceHolders() {
-            return true;
-        }
-
-        public boolean isPagingEnabled() {
-            return PAGING_ENABLED;
-        }
-
-        public int getPagingInMemorySize() {
-            return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
-        }
-
-        public boolean isThrottleSourcesToMemoryLimit() {
-            // Keep the queue in memory.
-            return true;
-        }
-
-        public int getDisconnectedThrottleRate() {
-            // By default don't throttle consumers when disconnected.
-            return 0;
-        }
-
-        public int getRecoveryBias() {
-            return 8;
-        }
-    };
-
-    private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
-    private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
-    // Be default we don't page out elements to disk.
-    //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-    private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
-
-    private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
-
-        private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-
-        public boolean isPersistent(MessageDelivery elem) {
-            return elem.isPersistent();
-        }
-
-        public boolean isPageOutPlaceHolders() {
-            return true;
-        }
-
-        public boolean isPagingEnabled() {
-            return PAGING_ENABLED;
-        }
-
-        public int getPagingInMemorySize() {
-            return DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
-        }
-
-        public boolean isThrottleSourcesToMemoryLimit() {
-            // Keep the queue in memory.
-            return true;
-        }
-
-        public int getDisconnectedThrottleRate() {
-            // By default don't throttle consumers when disconnected.
-            return 0;
-        }
-
-        public int getRecoveryBias() {
-            return 8;
-        }
-    };
+public class BrokerQueueStore { // implements QueueStore<Long, MessageDelivery> {
+// TODO:
+//    private static final Log LOG = LogFactory.getLog(BrokerQueueStore.class);
+//    private static final boolean USE_OLD_QUEUE = false;
+//    private static final boolean USE_PRIORITY_QUEUES = true;
+//
+//    private BrokerDatabase database;
+//    private DispatchQueue dispatchQueue;
+//
+//    private static HashMap<String, ProtocolHandler> protocolHandlers = new HashMap<String, ProtocolHandler>();
+//    private static final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> MESSAGE_MARSHALLER = new BrokerDatabase.MessageRecordMarshaller<MessageDelivery>() {
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+//         * #marshal(java.lang.Object)
+//         */
+//        public MessageRecord marshal(MessageDelivery element) {
+//            return element.createMessageRecord();
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.apollo.broker.BrokerDatabase.MessageRecordMarshaller
+//         * #unMarshall(org.apache.activemq.broker.store.Store.MessageRecord)
+//         */
+//        public MessageDelivery unMarshall(MessageRecord record, QueueDescriptor queue) {
+//            ProtocolHandler handler = protocolHandlers.get(record.getEncoding().toString());
+//            if (handler == null) {
+//                try {
+//                    handler = ProtocolHandlerFactory.createProtocolHandler(record.getEncoding().toString());
+//                    protocolHandlers.put(record.getEncoding().toString(), handler);
+//                } catch (Throwable thrown) {
+//                    throw new RuntimeException("Unknown message format" + record.getEncoding().toString(), thrown);
+//                }
+//            }
+//            try {
+//                return handler.createMessageDelivery(record);
+//            } catch (IOException ioe) {
+//                throw new RuntimeException(ioe);
+//            }
+//        }
+//    };
+//
+//    final BrokerDatabase.MessageRecordMarshaller<MessageDelivery> getMessageMarshaller() {
+//        return MESSAGE_MARSHALLER;
+//    }
+//
+//    private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
+//        public Long map(MessageDelivery element) {
+//            return element.getExpiration();
+//        }
+//    };
+//
+//    private static final Mapper<Integer, MessageDelivery> SIZE_MAPPER = new Mapper<Integer, MessageDelivery>() {
+//        public Integer map(MessageDelivery element) {
+//            return element.getFlowLimiterSize();
+//        }
+//    };
+//
+//    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
+//        public Integer map(MessageDelivery element) {
+//            return element.getPriority();
+//        }
+//    };
+//
+//    static public final Mapper<Long, MessageDelivery> KEY_MAPPER = new Mapper<Long, MessageDelivery>() {
+//        public Long map(MessageDelivery element) {
+//            return element.getStoreTracking();
+//        }
+//    };
+//
+//    static public final Mapper<Integer, MessageDelivery> PARTITION_MAPPER = new Mapper<Integer, MessageDelivery>() {
+//        public Integer map(MessageDelivery element) {
+//            // we modulo 10 to have at most 10 partitions which the producers
+//            // gets split across.
+//            return (int) (element.getProducerId().hashCode() % 10);
+//        }
+//    };
+//
+//    public static final short SUBPARTITION_TYPE = 0;
+//    public static final short SHARED_QUEUE_TYPE = 1;
+//    public static final short DURABLE_QUEUE_TYPE = 2;
+//    public static short TRANSACTION_QUEUE_TYPE = 3;
+//
+//    private final HashMap<String, IQueue<Long, MessageDelivery>> sharedQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
+//    private final HashMap<String, IQueue<Long, MessageDelivery>> durableQueues = new HashMap<String, IQueue<Long, MessageDelivery>>();
+//
+//    private Mapper<Integer, MessageDelivery> partitionMapper;
+//
+//    private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 1024 * 1024 * 1;
+//    private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
+//    // Be default we don't page out elements to disk.
+//    private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+//    //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
+//
+//    private static long dynamicQueueCounter = 0;
+//
+//    private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
+//
+//        private static final boolean PAGING_ENABLED = DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+//
+//        public boolean isPersistent(MessageDelivery elem) {
+//            return elem.isPersistent();
+//        }
+//
+//        public boolean isPageOutPlaceHolders() {
+//            return true;
+//        }
+//
+//        public boolean isPagingEnabled() {
+//            return PAGING_ENABLED;
+//        }
+//
+//        public int getPagingInMemorySize() {
+//            return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+//        }
+//
+//        public boolean isThrottleSourcesToMemoryLimit() {
+//            // Keep the queue in memory.
+//            return true;
+//        }
+//
+//        public int getDisconnectedThrottleRate() {
+//            // By default don't throttle consumers when disconnected.
+//            return 0;
+//        }
+//
+//        public int getRecoveryBias() {
+//            return 8;
+//        }
+//    };
+//
+//    private static final int DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;
+//    private static final int DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD = 1;
+//    // Be default we don't page out elements to disk.
+//    //private static final int DEFAULT_DURABLE_QUEUE_SIZE = DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+//    private static final int DEFAULT_DURABLE_QUEUE_SIZE = 1024 * 1024 * 10;
+//
+//    private static final PersistencePolicy<MessageDelivery> DURABLE_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
+//
+//        private static final boolean PAGING_ENABLED = DEFAULT_DURABLE_QUEUE_SIZE > DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+//
+//        public boolean isPersistent(MessageDelivery elem) {
+//            return elem.isPersistent();
+//        }
+//
+//        public boolean isPageOutPlaceHolders() {
+//            return true;
+//        }
+//
+//        public boolean isPagingEnabled() {
+//            return PAGING_ENABLED;
+//        }
+//
+//        public int getPagingInMemorySize() {
+//            return DEFAULT_DURABLE_QUEUE_PAGING_THRESHOLD;
+//        }
+//
+//        public boolean isThrottleSourcesToMemoryLimit() {
+//            // Keep the queue in memory.
+//            return true;
+//        }
+//
+//        public int getDisconnectedThrottleRate() {
+//            // By default don't throttle consumers when disconnected.
+//            return 0;
+//        }
+//
+//        public int getRecoveryBias() {
+//            return 8;
+//        }
+//    };
+//
+//    public void setDatabase(BrokerDatabase database) {
+//        this.database = database;
+//    }
+//
+//    public void setDispatchQueue(DispatchQueue dispatchQueue) {
+//        this.dispatchQueue = dispatchQueue;
+//    }
+//
+//    public void loadQueues() throws Exception {
+//
+//        // Load shared queues
+//        Iterator<QueueQueryResult> results = database.listQueues(SHARED_QUEUE_TYPE);
+//        while (results.hasNext()) {
+//            QueueQueryResult loaded = results.next();
+//            IQueue<Long, MessageDelivery> queue = createRestoredQueue(null, loaded);
+//            sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
+//            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+//        }
+//
+//        // Load durable queues
+//        results = database.listQueues(DURABLE_QUEUE_TYPE);
+//        while (results.hasNext()) {
+//            QueueQueryResult loaded = results.next();
+//            IQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
+//            durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
+//            LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
+//
+//        }
+//    }
+//
+//    private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
+//
+//        IQueue<Long, MessageDelivery> queue;
+//        if (parent != null) {
+//            queue = parent.createPartition(loaded.getDescriptor().getPartitionKey());
+//        } else {
+//            queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+//        }
+//
+//        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+//
+//        // Creat the child queues
+//        Collection<QueueQueryResult> children = loaded.getPartitions();
+//        if (children != null) {
+//            try {
+//                IPartitionedQueue<Long, MessageDelivery> pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue;
+//                for (QueueQueryResult child : children) {
+//                    createRestoredQueue(pQueue, child);
+//                }
+//            } catch (ClassCastException cce) {
+//                LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName());
+//                throw cce;
+//            }
+//        }
+//
+//        return queue;
+//
+//    }
+//
+//    private IQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
+//
+//        ExclusivePersistentQueue<Long, MessageDelivery> queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
+//        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
+//
+//        //TODO implement this for priority queue:
+//        // Create the child queues
+//        /*
+//         * Collection<QueueQueryResult> children = loaded.getPartitions(); if
+//         * (children != null) { try { IPartitionedQueue<Long, MessageDelivery>
+//         * pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue; for
+//         * (QueueQueryResult child : children) { createRestoredQueue(pQueue,
+//         * child); } } catch (ClassCastException cce) {
+//         * LOG.error("Loaded partition for unpartitionable queue: " +
+//         * queue.getResourceName()); throw cce; } }
+//         */
+//
+//        return queue;
+//
+//    }
+//
+//    public IQueue<Long, MessageDelivery> getQueue(AsciiBuffer queueName) {
+//        //TODO
+//        return null;
+//    }
+//
+//    public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
+//        synchronized (this) {
+//            Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
+//            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+//            ret.addAll(c);
+//            return ret;
+//        }
+//    }
+//
+//    public IQueue<Long, MessageDelivery> createDurableQueue(String name) {
+//        IQueue<Long, MessageDelivery> queue = null;
+//        synchronized (this) {
+//            queue = durableQueues.get(name);
+//            if (queue == null) {
+//                queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+//                queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+//                queue.initialize(0, 0, 0, 0);
+//                durableQueues.put(name, queue);
+//                addQueue(queue.getDescriptor());
+//            }
+//        }
+//
+//        return queue;
+//    }
+//
+//    public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
+//        ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
+//        synchronized (this) {
+//            String name = "temp:" + (dynamicQueueCounter++);
+//            queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+//            queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
+//            queue.initialize(0, 0, 0, 0);
+//            addQueue(queue.getDescriptor());
+//        }
+//        return queue;
+//    }
+//
+//    public Collection<IQueue<Long, MessageDelivery>> getDurableQueues() {
+//        synchronized (this) {
+//            Collection<IQueue<Long, MessageDelivery>> c = durableQueues.values();
+//            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
+//            ret.addAll(c);
+//            return ret;
+//        }
+//    }
+//
+//    public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
+//
+//        IQueue<Long, MessageDelivery> queue = null;
+//        synchronized (this) {
+//            queue = sharedQueues.get(name);
+//            if (queue == null) {
+//                queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+//                queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE);
+//                queue.initialize(0, 0, 0, 0);
+//                sharedQueues.put(name, queue);
+//                addQueue(queue.getDescriptor());
+//            }
+//        }
+//
+//        return queue;
+//    }
+//
+//    private ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueueInternal(final String name, short type) {
+//        ExclusivePersistentQueue<Long, MessageDelivery> queue;
+//
+//        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) {
+//            @Override
+//            public int getElementSize(MessageDelivery elem) {
+//                return elem.getFlowLimiterSize();
+//            }
+//        };
+//        queue = new ExclusivePersistentQueue<Long, MessageDelivery>(name, limiter);
+//        queue.setStore(this);
+//        queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY);
+//        queue.setExpirationMapper(EXPIRATION_MAPPER);
+//        return queue;
+//    }
+//
+//    private IQueue<Long, MessageDelivery> createSharedQueueInternal(final String name, short type) {
+//
+//        IQueue<Long, MessageDelivery> ret;
+//
+//        switch (type) {
+//        case QueueDescriptor.PARTITIONED: {
+//            PartitionedQueue<Long, MessageDelivery> queue = new PartitionedQueue<Long, MessageDelivery>(name) {
+//                @Override
+//                public IQueue<Long, MessageDelivery> createPartition(int partitionKey) {
+//                    IQueue<Long, MessageDelivery> queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
+//                    queue.getDescriptor().setPartitionId(partitionKey);
+//                    queue.getDescriptor().setParent(this.getDescriptor().getQueueName());
+//                    return queue;
+//                }
+//
+//            };
+//            queue.setPartitionMapper(partitionMapper);
+//
+//            ret = queue;
+//            break;
+//        }
+//        case QueueDescriptor.SHARED_PRIORITY: {
+//            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10);
+//            limiter.setPriorityMapper(PRIORITY_MAPPER);
+//            limiter.setSizeMapper(SIZE_MAPPER);
+//            SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
+//            ret = queue;
+//            queue.setKeyMapper(KEY_MAPPER);
+//            queue.setAutoRelease(true);
+//            break;
+//        }
+//        case QueueDescriptor.SHARED: {
+//            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD) {
+//                @Override
+//                public int getElementSize(MessageDelivery elem) {
+//                    return elem.getFlowLimiterSize();
+//                }
+//            };
+//
+//            if (!USE_OLD_QUEUE) {
+//                SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
+//                sQueue.setKeyMapper(KEY_MAPPER);
+//                sQueue.setAutoRelease(true);
+//                ret = sQueue;
+//            } else {
+//                SharedQueueOld<Long, MessageDelivery> sQueue = new SharedQueueOld<Long, MessageDelivery>(name, limiter);
+//                sQueue.setKeyMapper(KEY_MAPPER);
+//                sQueue.setAutoRelease(true);
+//                ret = sQueue;
+//            }
+//            break;
+//        }
+//        default: {
+//            throw new IllegalArgumentException("Unknown queue type" + type);
+//        }
+//        }
+//        ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
+//        ret.setStore(this);
+//        ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
+//        ret.setExpirationMapper(EXPIRATION_MAPPER);
+//
+//        return ret;
+//    }
+//
+//    public final void deleteQueueElement(SaveableQueueElement<MessageDelivery> sqe) {
+//        MessageDelivery md = sqe.getElement();
+//        //If the message delivery isn't null, funnel through it
+//        //since the message may not yet be in the store:
+//        if (md != null) {
+//            md.acknowledge(sqe);
+//        } else {
+//            database.deleteQueueElement(sqe);
+//        }
+//
+//    }
+//
+//    public final boolean isFromStore(MessageDelivery elem) {
+//        return elem.isFromStore();
+//    }
+//
+//    public final void persistQueueElement(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+//        elem.getElement().persist(elem, controller, delayable);
+//    }
+//
+//    public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
+//        database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
+//    }
+//
+//    public final void addQueue(QueueDescriptor queue) {
+//        database.addQueue(queue);
+//    }
+//
+//    public final void deleteQueue(QueueDescriptor queue) {
+//        database.deleteQueue(queue);
+//    }
 
     public void setDatabase(BrokerDatabase database) {
-        this.database = database;
     }
 
     public void setDispatchQueue(DispatchQueue dispatchQueue) {
-        this.dispatchQueue = dispatchQueue;
-    }
-
-    public void loadQueues() throws Exception {
-
-        // Load shared queues
-        Iterator<QueueQueryResult> results = database.listQueues(SHARED_QUEUE_TYPE);
-        while (results.hasNext()) {
-            QueueQueryResult loaded = results.next();
-            IQueue<Long, MessageDelivery> queue = createRestoredQueue(null, loaded);
-            sharedQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
-            LOG.info("Loaded Queue " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
-        }
-
-        // Load durable queues
-        results = database.listQueues(DURABLE_QUEUE_TYPE);
-        while (results.hasNext()) {
-            QueueQueryResult loaded = results.next();
-            IQueue<Long, MessageDelivery> queue = createRestoredDurableQueue(loaded);
-            durableQueues.put(queue.getDescriptor().getQueueName().toString(), queue);
-            LOG.info("Loaded Durable " + queue.getResourceName() + " Messages: " + queue.getEnqueuedCount() + " Size: " + queue.getEnqueuedSize());
-
-        }
-    }
-
-    private IQueue<Long, MessageDelivery> createRestoredQueue(IPartitionedQueue<Long, MessageDelivery> parent, QueueQueryResult loaded) throws IOException {
-
-        IQueue<Long, MessageDelivery> queue;
-        if (parent != null) {
-            queue = parent.createPartition(loaded.getDescriptor().getPartitionKey());
-        } else {
-            queue = createSharedQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
-        }
-
-        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
-
-        // Creat the child queues
-        Collection<QueueQueryResult> children = loaded.getPartitions();
-        if (children != null) {
-            try {
-                IPartitionedQueue<Long, MessageDelivery> pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue;
-                for (QueueQueryResult child : children) {
-                    createRestoredQueue(pQueue, child);
-                }
-            } catch (ClassCastException cce) {
-                LOG.error("Loaded partition for unpartitionable queue: " + queue.getResourceName());
-                throw cce;
-            }
-        }
-
-        return queue;
-
-    }
-
-    private IQueue<Long, MessageDelivery> createRestoredDurableQueue(QueueQueryResult loaded) throws IOException {
-
-        ExclusivePersistentQueue<Long, MessageDelivery> queue = createDurableQueueInternal(loaded.getDescriptor().getQueueName().toString(), loaded.getDescriptor().getQueueType());
-        queue.initialize(loaded.getFirstSequence(), loaded.getLastSequence(), loaded.getCount(), loaded.getSize());
-
-        //TODO implement this for priority queue:
-        // Create the child queues
-        /*
-         * Collection<QueueQueryResult> children = loaded.getPartitions(); if
-         * (children != null) { try { IPartitionedQueue<Long, MessageDelivery>
-         * pQueue = (IPartitionedQueue<Long, MessageDelivery>) queue; for
-         * (QueueQueryResult child : children) { createRestoredQueue(pQueue,
-         * child); } } catch (ClassCastException cce) {
-         * LOG.error("Loaded partition for unpartitionable queue: " +
-         * queue.getResourceName()); throw cce; } }
-         */
-
-        return queue;
-
-    }
-
-    public IQueue<Long, MessageDelivery> getQueue(AsciiBuffer queueName) {
-        //TODO
-        return null;
-    }
-
-    public Collection<IQueue<Long, MessageDelivery>> getSharedQueues() {
-        synchronized (this) {
-            Collection<IQueue<Long, MessageDelivery>> c = sharedQueues.values();
-            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
-            ret.addAll(c);
-            return ret;
-        }
-    }
-
-    public IQueue<Long, MessageDelivery> createDurableQueue(String name) {
-        IQueue<Long, MessageDelivery> queue = null;
-        synchronized (this) {
-            queue = durableQueues.get(name);
-            if (queue == null) {
-                queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
-                queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
-                queue.initialize(0, 0, 0, 0);
-                durableQueues.put(name, queue);
-                addQueue(queue.getDescriptor());
-            }
-        }
-
-        return queue;
-    }
-
-    public ExclusivePersistentQueue<Long, MessageDelivery> createExclusivePersistentQueue() {
-        ExclusivePersistentQueue<Long, MessageDelivery> queue = null;
-        synchronized (this) {
-            String name = "temp:" + (dynamicQueueCounter++);
-            queue = createDurableQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
-            queue.getDescriptor().setApplicationType(DURABLE_QUEUE_TYPE);
-            queue.initialize(0, 0, 0, 0);
-            addQueue(queue.getDescriptor());
-        }
-        return queue;
-    }
-
-    public Collection<IQueue<Long, MessageDelivery>> getDurableQueues() {
-        synchronized (this) {
-            Collection<IQueue<Long, MessageDelivery>> c = durableQueues.values();
-            ArrayList<IQueue<Long, MessageDelivery>> ret = new ArrayList<IQueue<Long, MessageDelivery>>(c.size());
-            ret.addAll(c);
-            return ret;
-        }
-    }
-
-    public IQueue<Long, MessageDelivery> createSharedQueue(String name) {
-
-        IQueue<Long, MessageDelivery> queue = null;
-        synchronized (this) {
-            queue = sharedQueues.get(name);
-            if (queue == null) {
-                queue = createSharedQueueInternal(name, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
-                queue.getDescriptor().setApplicationType(SHARED_QUEUE_TYPE);
-                queue.initialize(0, 0, 0, 0);
-                sharedQueues.put(name, queue);
-                addQueue(queue.getDescriptor());
-            }
-        }
-
-        return queue;
-    }
-
-    private ExclusivePersistentQueue<Long, MessageDelivery> createDurableQueueInternal(final String name, short type) {
-        ExclusivePersistentQueue<Long, MessageDelivery> queue;
-
-        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_DURABLE_QUEUE_SIZE, DEFAULT_DURABLE_QUEUE_RESUME_THRESHOLD) {
-            @Override
-            public int getElementSize(MessageDelivery elem) {
-                return elem.getFlowLimiterSize();
-            }
-        };
-        queue = new ExclusivePersistentQueue<Long, MessageDelivery>(name, limiter);
-        queue.setStore(this);
-        queue.setPersistencePolicy(DURABLE_QUEUE_PERSISTENCE_POLICY);
-        queue.setExpirationMapper(EXPIRATION_MAPPER);
-        return queue;
-    }
-
-    private IQueue<Long, MessageDelivery> createSharedQueueInternal(final String name, short type) {
-
-        IQueue<Long, MessageDelivery> ret;
-
-        switch (type) {
-        case QueueDescriptor.PARTITIONED: {
-            PartitionedQueue<Long, MessageDelivery> queue = new PartitionedQueue<Long, MessageDelivery>(name) {
-                @Override
-                public IQueue<Long, MessageDelivery> createPartition(int partitionKey) {
-                    IQueue<Long, MessageDelivery> queue = createSharedQueueInternal(name + "$" + partitionKey, USE_PRIORITY_QUEUES ? QueueDescriptor.SHARED_PRIORITY : QueueDescriptor.SHARED);
-                    queue.getDescriptor().setPartitionId(partitionKey);
-                    queue.getDescriptor().setParent(this.getDescriptor().getQueueName());
-                    return queue;
-                }
-
-            };
-            queue.setPartitionMapper(partitionMapper);
-
-            ret = queue;
-            break;
-        }
-        case QueueDescriptor.SHARED_PRIORITY: {
-            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, 10);
-            limiter.setPriorityMapper(PRIORITY_MAPPER);
-            limiter.setSizeMapper(SIZE_MAPPER);
-            SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
-            ret = queue;
-            queue.setKeyMapper(KEY_MAPPER);
-            queue.setAutoRelease(true);
-            break;
-        }
-        case QueueDescriptor.SHARED: {
-            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD) {
-                @Override
-                public int getElementSize(MessageDelivery elem) {
-                    return elem.getFlowLimiterSize();
-                }
-            };
-
-            if (!USE_OLD_QUEUE) {
-                SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
-                sQueue.setKeyMapper(KEY_MAPPER);
-                sQueue.setAutoRelease(true);
-                ret = sQueue;
-            } else {
-                SharedQueueOld<Long, MessageDelivery> sQueue = new SharedQueueOld<Long, MessageDelivery>(name, limiter);
-                sQueue.setKeyMapper(KEY_MAPPER);
-                sQueue.setAutoRelease(true);
-                ret = sQueue;
-            }
-            break;
-        }
-        default: {
-            throw new IllegalArgumentException("Unknown queue type" + type);
-        }
-        }
-        ret.getDescriptor().setApplicationType(SUBPARTITION_TYPE);
-        ret.setStore(this);
-        ret.setPersistencePolicy(SHARED_QUEUE_PERSISTENCE_POLICY);
-        ret.setExpirationMapper(EXPIRATION_MAPPER);
-
-        return ret;
-    }
-
-    public final void deleteQueueElement(SaveableQueueElement<MessageDelivery> sqe) {
-        MessageDelivery md = sqe.getElement();
-        //If the message delivery isn't null, funnel through it 
-        //since the message may not yet be in the store:
-        if (md != null) {
-            md.acknowledge(sqe);
-        } else {
-            database.deleteQueueElement(sqe);
-        }
-
-    }
-
-    public final boolean isFromStore(MessageDelivery elem) {
-        return elem.isFromStore();
-    }
-
-    public final void persistQueueElement(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
-        elem.getElement().persist(elem, controller, delayable);
-    }
-
-    public final void restoreQueueElements(QueueDescriptor queue, boolean recordsOnly, long firstSequence, long maxSequence, int maxCount, RestoreListener<MessageDelivery> listener) {
-        database.restoreQueueElements(queue, recordsOnly, firstSequence, maxSequence, maxCount, listener, MESSAGE_MARSHALLER);
-    }
-
-    public final void addQueue(QueueDescriptor queue) {
-        database.addQueue(queue);
     }
 
-    public final void deleteQueue(QueueDescriptor queue) {
-        database.deleteQueue(queue);
+    public void loadQueues() {
     }
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DeliveryTarget.java Wed Jul  7 03:39:03 2010
@@ -17,11 +17,10 @@
 package org.apache.activemq.apollo.broker;
 
 import org.apache.activemq.apollo.broker.MessageDelivery;
-import org.apache.activemq.flow.ISourceController;
 
 public interface DeliveryTarget {
     
-    public void deliver(MessageDelivery message, ISourceController<?> source);
+    public void deliver(MessageDelivery message);
     
     public boolean hasSelector();
     

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/DurableSubscription.java Wed Jul  7 03:39:03 2010
@@ -20,22 +20,19 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.FilterException;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Subscription;
 
 public class DurableSubscription implements BrokerSubscription, DeliveryTarget {
 
-    private final IQueue<Long, MessageDelivery> queue;
+//    private final IQueue<Long, MessageDelivery> queue;
     private final VirtualHost host;
     private final Destination destination;
-    private Subscription<MessageDelivery> connectedSub;
+//    private Subscription<MessageDelivery> connectedSub;
     boolean started = false;
     BooleanExpression selector;
 
-    DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector, IQueue<Long, MessageDelivery> queue) {
+    DurableSubscription(VirtualHost host, Destination destination, BooleanExpression selector) {
         this.host = host;
-        this.queue = queue;
+//        this.queue = queue;
         this.destination = destination;
         this.selector = selector;
         //TODO If a durable subscribes to a queue 
@@ -52,24 +49,27 @@ public class DurableSubscription impleme
     /* (non-Javadoc)
      * @see org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
      */
-    public void deliver(MessageDelivery message, ISourceController<?> source) {
-        queue.add(message, source);
+    public void deliver(MessageDelivery message) {
+//        TODO:
+//        queue.add(message, source);
     }
 
     public synchronized void connect(final ConsumerContext subscription) throws UserAlreadyConnectedException {
-        if (this.connectedSub == null) {
-            this.connectedSub = subscription;
-            queue.addSubscription(connectedSub);
-        } else if (connectedSub != subscription) {
-            throw new UserAlreadyConnectedException();
-        }
+//        TODO:
+//        if (this.connectedSub == null) {
+//            this.connectedSub = subscription;
+//            queue.addSubscription(connectedSub);
+//        } else if (connectedSub != subscription) {
+//            throw new UserAlreadyConnectedException();
+//        }
     }
 
     public synchronized void disconnect(final ConsumerContext subscription) {
-        if (connectedSub != null && connectedSub == subscription) {
-            queue.removeSubscription(connectedSub);
-            connectedSub = null;
-        }
+//        TODO:
+//        if (connectedSub != null && connectedSub == subscription) {
+//            queue.removeSubscription(connectedSub);
+//            connectedSub = null;
+//        }
     }
 
     public boolean matches(MessageDelivery message) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/LocalTransaction.java Wed Jul  7 03:39:03 2010
@@ -23,7 +23,6 @@ import java.util.concurrent.Future;
 
 import javax.transaction.xa.XAException;
 
-import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.util.FutureListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,91 +40,92 @@ public class LocalTransaction extends Tr
 
     private static final Log LOG = LogFactory.getLog(LocalTransaction.class);
     
-    LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
-        super(manager, tid, opQueue);
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
-     */
-    @Override
-    public void commit(boolean onePhase, final TransactionListener listener) throws XAException, IOException {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("commit: " + this);
-        }
-        
-        synchronized(this)
-        {
-            // Get ready for commit.
-            try {
-                prePrepare();
-            } catch (XAException e) {
-                throw e;
-            } catch (Throwable e) {
-                LOG.warn("COMMIT FAILED: ", e);
-                rollback(null);
-                // Let them know we rolled back.
-                XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
-                xae.errorCode = XAException.XA_RBOTHER;
-                xae.initCause(e);
-                throw xae;
-            }
-            
-            //Add the listener for commit
-            if(listeners == null)
-            {
-                listeners = new HashSet<TransactionListener>();
-            }
-            listeners.add(listener);
-            
-            //Update the transaction state to committed,
-            //and on complete process the commit:
-            setState(COMMITED_STATE, new FutureListener<Object>()
-            {
-                public void onFutureComplete(Future<? extends Object> dbCommitResult) {
-                    try {
-                        fireAfterCommit();
-                        startTransactionProcessor();
-                    } catch (InterruptedException e) {
-                        //Shouldn't happen
-                        LOG.warn(new AssertionError(e));
-                    } catch (ExecutionException e) {
-                        LOG.warn("COMMIT FAILED: ", e);
-                    }
-                    catch (Exception e)
-                    {
-                    }
-                }
-            });
-        }
-    }
-
-
-    public int prepare(TransactionListener listener) throws XAException {
-        XAException xae = new XAException("Prepare not implemented on Local Transactions.");
-        xae.errorCode = XAException.XAER_RMERR;
-        throw xae;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.Transaction#rollback()
-     */
-    @Override
-    public void rollback(TransactionListener listener) throws XAException, IOException {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not yet implemnted");
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.activemq.apollo.broker.Transaction#getType()
-     */
-    @Override
-    public byte getType() {
-        return TYPE_LOCAL;
-    }
+//    TODO:
+//    LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
+//        super(manager, tid, opQueue);
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.apollo.broker.Transaction#commit(boolean)
+//     */
+//    @Override
+//    public void commit(boolean onePhase, final TransactionListener listener) throws XAException, IOException {
+//        if (LOG.isDebugEnabled()) {
+//            LOG.debug("commit: " + this);
+//        }
+//
+//        synchronized(this)
+//        {
+//            // Get ready for commit.
+//            try {
+//                prePrepare();
+//            } catch (XAException e) {
+//                throw e;
+//            } catch (Throwable e) {
+//                LOG.warn("COMMIT FAILED: ", e);
+//                rollback(null);
+//                // Let them know we rolled back.
+//                XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
+//                xae.errorCode = XAException.XA_RBOTHER;
+//                xae.initCause(e);
+//                throw xae;
+//            }
+//
+//            //Add the listener for commit
+//            if(listeners == null)
+//            {
+//                listeners = new HashSet<TransactionListener>();
+//            }
+//            listeners.add(listener);
+//
+//            //Update the transaction state to committed,
+//            //and on complete process the commit:
+//            setState(COMMITED_STATE, new FutureListener<Object>()
+//            {
+//                public void onFutureComplete(Future<? extends Object> dbCommitResult) {
+//                    try {
+//                        fireAfterCommit();
+//                        startTransactionProcessor();
+//                    } catch (InterruptedException e) {
+//                        //Shouldn't happen
+//                        LOG.warn(new AssertionError(e));
+//                    } catch (ExecutionException e) {
+//                        LOG.warn("COMMIT FAILED: ", e);
+//                    }
+//                    catch (Exception e)
+//                    {
+//                    }
+//                }
+//            });
+//        }
+//    }
+//
+//
+//    public int prepare(TransactionListener listener) throws XAException {
+//        XAException xae = new XAException("Prepare not implemented on Local Transactions.");
+//        xae.errorCode = XAException.XAER_RMERR;
+//        throw xae;
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see org.apache.activemq.apollo.broker.Transaction#rollback()
+//     */
+//    @Override
+//    public void rollback(TransactionListener listener) throws XAException, IOException {
+//        // TODO Auto-generated method stub
+//        throw new UnsupportedOperationException("Not yet implemnted");
+//    }
+//
+//    /* (non-Javadoc)
+//     * @see org.apache.activemq.apollo.broker.Transaction#getType()
+//     */
+//    @Override
+//    public byte getType() {
+//        return TYPE_LOCAL;
+//    }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDelivery.java Wed Jul  7 03:39:03 2010
@@ -20,8 +20,6 @@ import java.io.IOException;
 
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 
 public interface MessageDelivery {
@@ -90,7 +88,7 @@ public interface MessageDelivery {
 
     public void beginDispatch(BrokerDatabase database); 
     
-    public void finishDispatch(ISourceController<?> controller) throws IOException;
+//    public void finishDispatch(ISourceController<?> controller) throws IOException;
     
     /**
      * Sets the unique id used to identify this message in the store.
@@ -98,36 +96,36 @@ public interface MessageDelivery {
      */
     public void setStoreTracking(long tracking);
     
-    /**
-     * Called by a queue to request that the element be persisted. The save is
-     * done asynchronously, and depending on the state of the message delivery
-     * may not even be issued to the underlying persistence store until a later
-     * date. As such callers should use the acknowledge method to delete this
-     * message rather than directly issuing a delete through the message store
-     * itself. Direct delete from the message store is only safe once the
-     * message has been saved to the store, so callers should request
-     * notification of the save via the
-     * {@link SaveableQueueElement#requestSaveNotify()} method before attempting
-     * to acces the store directly.
-     * 
-     * @param sqe
-     *            The element to save
-     * @param controller
-     *            A flow controller to use in the event that there isn't room in
-     *            the database.
-     * @param delayable
-     *            Whether or not the save operation can be delayed.
-     */
-    public void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable);
-
-    /**
-     * Acknowledges the message for a particular queue. This will cause it to be
-     * deleted from the message store.
-     * 
-     * @param sqe
-     *            The queue element to delete
-     */
-    public void acknowledge(SaveableQueueElement<MessageDelivery> sqe);
+//    /**
+//     * Called by a queue to request that the element be persisted. The save is
+//     * done asynchronously, and depending on the state of the message delivery
+//     * may not even be issued to the underlying persistence store until a later
+//     * date. As such callers should use the acknowledge method to delete this
+//     * message rather than directly issuing a delete through the message store
+//     * itself. Direct delete from the message store is only safe once the
+//     * message has been saved to the store, so callers should request
+//     * notification of the save via the
+//     * {@link SaveableQueueElement#requestSaveNotify()} method before attempting
+//     * to acces the store directly.
+//     *
+//     * @param sqe
+//     *            The element to save
+//     * @param controller
+//     *            A flow controller to use in the event that there isn't room in
+//     *            the database.
+//     * @param delayable
+//     *            Whether or not the save operation can be delayed.
+//     */
+//    public void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable);
+//
+//    /**
+//     * Acknowledges the message for a particular queue. This will cause it to be
+//     * deleted from the message store.
+//     *
+//     * @param sqe
+//     *            The queue element to delete
+//     */
+//    public void acknowledge(SaveableQueueElement<MessageDelivery> sqe);
 
     /**
      * Gets the tracking number used to identify this message in the message

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/MessageDeliveryWrapper.java Wed Jul  7 03:39:03 2010
@@ -20,8 +20,6 @@ import java.io.IOException;
 
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.SaveableQueueElement;
 import org.apache.activemq.util.buffer.AsciiBuffer;
 
 /**
@@ -31,117 +29,51 @@ import org.apache.activemq.util.buffer.A
 public class MessageDeliveryWrapper implements MessageDelivery {
 
     private final MessageDelivery delegate;
+//
+//    public void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
+//        delegate.acknowledge(sqe);
+//    }
 
-    public void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
-        delegate.acknowledge(sqe);
-    }
-
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public <T> T asType(Class<T> type) {
         return delegate.asType(type);
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public MessageRecord createMessageRecord() {
         return delegate.createMessageRecord();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public Destination getDestination() {
         return delegate.getDestination();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public int getFlowLimiterSize() {
         return delegate.getFlowLimiterSize();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public AsciiBuffer getMsgId() {
         return delegate.getMsgId();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public int getPriority() {
         return delegate.getPriority();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public AsciiBuffer getProducerId() {
         return delegate.getProducerId();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public long getStoreTracking() {
         return delegate.getStoreTracking();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public long getTransactionId() {
         return delegate.getTransactionId();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public boolean isFromStore() {
         return delegate.isFromStore();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public boolean isPersistent() {
         return delegate.isPersistent();
     }
@@ -155,71 +87,33 @@ public class MessageDeliveryWrapper impl
         return delegate.getExpiration();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public boolean isResponseRequired() {
         return delegate.isResponseRequired();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     public void onMessagePersisted() {
         delegate.onMessagePersisted();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
-    public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
-        delegate.persist(elem, controller, delayable);
-    }
-
+//    public void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
+//        delegate.persist(elem, controller, delayable);
+//    }
+//
     public MessageEvaluationContext createMessageEvaluationContext() {
         return delegate.createMessageEvaluationContext();
     }
 
-    /**
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.apollo.broker.MessageDelivery#persist(org.apache.activemq.apollo.queue.QueueStore.SaveableQueueElement,
-     *      org.apache.activemq.flow.ISourceController, boolean)
-     */
     MessageDeliveryWrapper(MessageDelivery delivery) {
         delegate = delivery;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.apollo.broker.MessageDelivery#beginDispatch(org.apache
-     * .activemq.apollo.broker.BrokerDatabase)
-     */
     public void beginDispatch(BrokerDatabase database) {
         delegate.beginDispatch(database);
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.apache.activemq.apollo.broker.MessageDelivery#finishDispatch(org.
-     * apache.activemq.flow.ISourceController)
-     */
-    public void finishDispatch(ISourceController<?> controller) throws IOException {
-        delegate.finishDispatch(controller);
-    }
+//    public void finishDispatch(ISourceController<?> controller) throws IOException {
+//        delegate.finishDispatch(controller);
+//    }
 
     /*
      * (non-Javadoc)

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java?rev=961067&r1=961066&r2=961067&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ProtocolHandler.java Wed Jul  7 03:39:03 2010
@@ -23,98 +23,102 @@ import java.util.HashSet;
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.filter.BooleanExpression;
-import org.apache.activemq.flow.AbstractLimitedFlowResource;
-import org.apache.activemq.flow.IFlowSink;
-import org.apache.activemq.queue.Subscription;
 import org.apache.activemq.wireformat.WireFormat;
 
 public interface ProtocolHandler extends Service {
+    void onCommand(Object command);
+    void setConnection(BrokerConnection brokerConnection);
 
-    public void setConnection(BrokerConnection connection);
+    void setWireFormat(WireFormat wireformat);
 
-    public BrokerConnection getConnection();
+    void onException(Exception error);
 
-    public void onCommand(Object command);
-
-    public void onException(Exception error);
-
-    public void setWireFormat(WireFormat wf);
-
-    public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
-
-    /**
-     * ClientContext
-     * <p>
-     * Description: Base interface describing a channel on a physical
-     * connection.
-     * </p>
-     * 
-     * @author cmacnaug
-     * @version 1.0
-     */
-    public interface ClientContext {
-        public ClientContext getParent();
-
-        public Collection<ClientContext> getChildren();
-
-        public void addChild(ClientContext context);
-
-        public void removeChild(ClientContext context);
-
-        public void close();
-
-    }
-
-    public abstract class AbstractClientContext<E extends MessageDelivery> extends AbstractLimitedFlowResource<E> implements ClientContext {
-        protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
-        protected final ClientContext parent;
-        protected boolean closed = false;
-
-        public AbstractClientContext(String name, ClientContext parent) {
-            super(name);
-            this.parent = parent;
-            if (parent != null) {
-                parent.addChild(this);
-            }
-        }
-
-        public ClientContext getParent() {
-            return parent;
-        }
-
-        public void addChild(ClientContext child) {
-            if (!closed) {
-                children.add(child);
-            }
-        }
-
-        public void removeChild(ClientContext child) {
-            if (!closed) {
-                children.remove(child);
-            }
-        }
-
-        public Collection<ClientContext> getChildren() {
-            return children;
-        }
-
-        public void close() {
-
-            closed = true;
-            
-            for (ClientContext c : children) {
-                c.close();
-            }
-
-            if (parent != null) {
-                parent.removeChild(this);
-            }
-
-            super.close();
-        }
-    }
-
-    public interface ConsumerContext extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
+// TODO:    
+//    public void setConnection(BrokerConnection connection);
+//
+//    public BrokerConnection getConnection();
+//
+//    public void onCommand(Object command);
+//
+//    public void onException(Exception error);
+//
+//    public void setWireFormat(WireFormat wf);
+//
+//    public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+//
+//    /**
+//     * ClientContext
+//     * <p>
+//     * Description: Base interface describing a channel on a physical
+//     * connection.
+//     * </p>
+//     *
+//     * @author cmacnaug
+//     * @version 1.0
+//     */
+//    public interface ClientContext {
+//        public ClientContext getParent();
+//
+//        public Collection<ClientContext> getChildren();
+//
+//        public void addChild(ClientContext context);
+//
+//        public void removeChild(ClientContext context);
+//
+//        public void close();
+//
+//    }
+//
+//    public abstract class AbstractClientContext<E extends MessageDelivery> extends AbstractLimitedFlowResource<E> implements ClientContext {
+//        protected final HashSet<ClientContext> children = new HashSet<ClientContext>();
+//        protected final ClientContext parent;
+//        protected boolean closed = false;
+//
+//        public AbstractClientContext(String name, ClientContext parent) {
+//            super(name);
+//            this.parent = parent;
+//            if (parent != null) {
+//                parent.addChild(this);
+//            }
+//        }
+//
+//        public ClientContext getParent() {
+//            return parent;
+//        }
+//
+//        public void addChild(ClientContext child) {
+//            if (!closed) {
+//                children.add(child);
+//            }
+//        }
+//
+//        public void removeChild(ClientContext child) {
+//            if (!closed) {
+//                children.remove(child);
+//            }
+//        }
+//
+//        public Collection<ClientContext> getChildren() {
+//            return children;
+//        }
+//
+//        public void close() {
+//
+//            closed = true;
+//
+//            for (ClientContext c : children) {
+//                c.close();
+//            }
+//
+//            if (parent != null) {
+//                parent.removeChild(this);
+//            }
+//
+//            super.close();
+//        }
+//    }
+//
+    public interface ConsumerContext { // extends ClientContext, Subscription<MessageDelivery>, IFlowSink<MessageDelivery> {
 
         public String getConsumerId();
 
@@ -131,7 +135,7 @@ public interface ProtocolHandler extends
         /**
          * If the destination does not exist, should it automatically be
          * created?
-         * 
+         *
          * @return
          */
         public boolean autoCreateDestination();