You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/04/27 20:40:49 UTC

svn commit: r769099 [1/5] - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/openwire/ main/java/org/apache/activemq/broker/protocol/ main/java/org/apache/activemq/broker/stomp/ main/jav...

Author: chirino
Date: Mon Apr 27 18:40:44 2009
New Revision: 769099

URL: http://svn.apache.org/viewvc?rev=769099&view=rev
Log:
Applying colins patch for https://issues.apache.org/activemq/browse/AMQ-2230

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/MessageDeliveryStoreHelper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityLinkedList.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityMap.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/SimpleLoadBalancer.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/IFlowSink.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/ISourceController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/NoOpFlowController.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/PrioritySizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/SizeLimiter.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/IQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/LoadBalancedFlowQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Mapper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/MemoryStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PartitionedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/QueueStoreHelper.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Store.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/Subscription.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/TreeMemoryStore.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/util/HashList.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/wireformat/StatefulWireFormat.java
    activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStorePerformance.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/kahadb/KahaDBStoreTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/memory/MemoryStoreTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/AbstractTestConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Message.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Mon Apr 27 18:40:44 2009
@@ -17,129 +17,173 @@
 package org.apache.activemq.broker;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.activemq.broker.store.BrokerDatabase;
 import org.apache.activemq.broker.store.BrokerDatabase.OperationContext;
+import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore;
+import org.apache.activemq.queue.QueueStore.QueueDescriptor;
 
 public abstract class BrokerMessageDelivery implements MessageDelivery {
 
-    HashSet<AsciiBuffer> persistentTargets;
-    // Indicates whether or not the message has been saved to the
-    // database, if not then in memory updates can be done.
-    boolean saved = false;
+    // True while the message is being dispatched to the delivery targets:
+    boolean dispatching = false;
+
+    // A non null pending save indicates that the message is the
+    // saver queue and that the message
+    OperationContext pendingSave;
+
+    // List of persistent targets for which the message should be saved
+    // when dispatch is complete:
+    HashMap<QueueStore.QueueDescriptor, Long> persistentTargets;
+
     long storeTracking = -1;
     BrokerDatabase store;
     boolean fromStore = false;
     boolean enableFlushDelay = true;
-    OperationContext saveContext;
-    boolean cancelled = false;
+    private int limiterSize = -1;
 
-    public void setFromStore(boolean val) {
+    public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
         fromStore = true;
+        store = database;
+        storeTracking = mRecord.getKey();
+        limiterSize = mRecord.getSize();
+    }
+
+    public final int getFlowLimiterSize() {
+        if (limiterSize == -1) {
+            limiterSize = getMemorySize();
+        }
+        return limiterSize;
     }
 
+    /**
+     * Subclass must implement this to return their current memory size
+     * estimate.
+     * 
+     * @return The memory size of the message.
+     */
+    public abstract int getMemorySize();
+
     public final boolean isFromStore() {
         return fromStore;
     }
 
-    public final void persist(AsciiBuffer queue, boolean delayable) throws IOException {
-
+    public final void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long queueSequence, boolean delayable) throws IOException {
         synchronized (this) {
-            if (!saved) {
+            // Can flush of this message to the store be delayed?
+            if (enableFlushDelay && !delayable) {
+                enableFlushDelay = false;
+            }
+            // If this message is being dispatched then add the queue to the
+            // list of queues for which to save the message when dispatch is
+            // finished:
+            if (dispatching) {
                 if (persistentTargets == null) {
-                    persistentTargets = new HashSet<AsciiBuffer>();
+                    persistentTargets = new HashMap<QueueStore.QueueDescriptor, Long>();
                 }
-                persistentTargets.add(queue);
+                persistentTargets.put(queue, queueSequence);
                 return;
             }
-            if (!delayable) {
-                enableFlushDelay = false;
+            // Otherwise, if it is still in the saver queue, we can add this
+            // queue to the queue list:
+            else if (pendingSave != null) {
+                persistentTargets.put(queue, queueSequence);
+                if (!delayable) {
+                    pendingSave.requestFlush();
+                }
+                return;
             }
         }
 
-        // TODO probably need to pass in the saving queue's source controller
-        // here and treat it like it is dispatching to the saver queue.
-        store.saveMessage(this, queue, null);
+        store.saveMessage(this, queue, queueSequence, controller);
     }
 
-    public final void delete(AsciiBuffer queue) {
+    public final void acknowledge(QueueStore.QueueDescriptor queue) {
         boolean firePersistListener = false;
+        boolean deleted = false;
         synchronized (this) {
-            if (!saved) {
-                persistentTargets.remove(queue);
-                if (persistentTargets.isEmpty()) {
-                    if (saveContext != null) {
+            // If the message hasn't been saved to the database
+            // then we don't need to issue a delete:
+            if (dispatching || pendingSave != null) {
 
-                        if (!cancelled) {
-                            if (saveContext.cancel()) {
-                                cancelled = true;
-                                firePersistListener = true;
-                            }
+                // Remove the queue:
+                persistentTargets.remove(queue);
+                deleted = true;
 
-                            saved = true;
+                // We get a save context when we place the message in the
+                // database queue. If it has been added to the queue,
+                // and we've removed the last queue, see if we can cancel
+                // the save:
+                if (pendingSave != null && persistentTargets.isEmpty()) {
+                    if (pendingSave.cancel()) {
+                        pendingSave = null;
+                        if (isPersistent()) {
+                            firePersistListener = true;
                         }
                     }
                 }
-            } else {
-                store.deleteMessage(this, queue);
             }
         }
 
+        if (!deleted) {
+            store.deleteMessage(this, queue);
+        }
+
         if (firePersistListener) {
             onMessagePersisted();
         }
 
     }
 
-    public void setStoreTracking(long storeTracking) {
-        this.storeTracking = storeTracking;
+    public void beginDispatch(BrokerDatabase database) {
+        this.store = database;
+        dispatching = true;
+        if (storeTracking == -1) {
+            storeTracking = database.allocateStoreTracking();
+        }
     }
 
     public long getStoreTracking() {
         return storeTracking;
     }
 
-    public Collection<AsciiBuffer> getPersistentQueues() {
-        return persistentTargets;
+    public Set<Entry<QueueDescriptor, Long>> getPersistentQueues() {
+        return persistentTargets.entrySet();
     }
 
     public void beginStore() {
         synchronized (this) {
-            saved = true;
+            pendingSave = null;
         }
     }
 
-    public void persistIfNeeded(ISourceController<?> controller) throws IOException {
+    public void finishDispatch(ISourceController<?> controller) throws IOException {
         boolean firePersistListener = false;
         synchronized (this) {
-            boolean saveNeeded = true;
-            if (persistentTargets == null || persistentTargets.isEmpty()) {
-                saveNeeded = false;
-                saved = true;
-            }
-
             // If any of the targets requested save then save the message
             // Note that this could be the case even if the message isn't
             // persistent if a target requested that the message be spooled
             // for some other reason such as queue memory overflow.
-            if (saveNeeded) {
-                saveContext = store.persistReceivedMessage(this, controller);
+            if (persistentTargets != null && !persistentTargets.isEmpty()) {
+                pendingSave = store.persistReceivedMessage(this, controller);
             }
+
             // If none of the targets required persistence, then fire the
             // persist listener:
-            else if (isResponseRequired() && isPersistent()) {
+            if (pendingSave == null || !isPersistent()) {
                 firePersistListener = true;
             }
+            dispatching = false;
         }
 
         if (firePersistListener) {
             onMessagePersisted();
         }
-
     }
 
     public boolean isFlushDelayable() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DBQueueStore.java Mon Apr 27 18:40:44 2009
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.broker.store.BrokerDatabase;
-import org.apache.activemq.broker.store.BrokerDatabase.MessageRestoreListener;
-import org.apache.activemq.broker.store.BrokerDatabase.RestoredMessage;
-import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.dispatch.IDispatcher.DispatchContext;
-import org.apache.activemq.dispatch.IDispatcher.Dispatchable;
-import org.apache.activemq.protobuf.AsciiBuffer;
-import org.apache.activemq.queue.Mapper;
-import org.apache.activemq.queue.Store;
-import org.apache.activemq.queue.Subscription;
-
-public class DBQueueStore<K> implements Store<K, MessageDelivery> {
-
-    private final BrokerDatabase database;
-    private final AsciiBuffer queue;
-    private final MessageRetriever retriever;
-
-    private long firstKey = -1;
-    private long lastKey = -1;
-
-    private int count = 0;
-    private boolean loading = true;
-
-    protected HashMap<K, DBStoreNode> map = new HashMap<K, DBStoreNode>();
-    protected TreeMap<Long, DBStoreNode> order = new TreeMap<Long, DBStoreNode>();
-    private Mapper<K, MessageDelivery> keyExtractor;
-
-    DBQueueStore(BrokerDatabase database, AsciiBuffer queue, IDispatcher dispatcher) {
-        this.database = database;
-        this.queue = queue;
-        retriever = new MessageRetriever(dispatcher);
-        retriever.start();
-    }
-
-    public StoreNode<K, MessageDelivery> add(K key, MessageDelivery delivery) {
-
-        // New to this queue?
-        if (delivery.getStoreTracking() > lastKey) {
-            return addInternal(key, delivery);
-        } else {
-            throw new IllegalArgumentException(this + " Duplicate key: " + delivery);
-        }
-    }
-
-    public void setKeyMapper(Mapper<K, MessageDelivery> keyExtractor) {
-        this.keyExtractor = keyExtractor;
-    }
-    
-    private DBStoreNode addInternal(K key, MessageDelivery delivery) {
-        DBStoreNode node = new DBStoreNode(delivery);
-        map.put(keyExtractor.map(delivery), node);
-        order.put(delivery.getStoreTracking(), node);
-        return node;
-    }
-
-    public boolean isEmpty() {
-        return count == 0;
-    }
-
-    public StoreCursor<K, MessageDelivery> openCursor() {
-        return new DBStoreCursor();
-    }
-
-    public StoreCursor<K, MessageDelivery> openCursorAt(StoreNode<K, MessageDelivery> next) {
-        DBStoreCursor cursor = new DBStoreCursor();
-        cursor.next = (DBStoreNode) next;
-        return cursor;
-    }
-
-    public StoreNode<K, MessageDelivery> remove(K key) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    public int size() {
-        return count;
-    }
-
-    private class DBStoreCursor implements StoreCursor<K, MessageDelivery> {
-        private long pos;
-        private long last = -1;
-        
-        private DBStoreNode node;
-        private DBStoreNode next;
-
-        public StoreNode<K, MessageDelivery> peekNext() {
-            // TODO Auto-generated method stub
-            return null;
-        }
-
-        public void setNext(StoreNode<K, MessageDelivery> node) {
-            this.next = (DBStoreNode) next;
-
-        }
-
-        public boolean hasNext() {
-            if (next != null)
-                return true;
-
-            SortedMap<Long, DBStoreNode> m = order.tailMap(last + 1);
-            if (m.isEmpty()) {
-                next = null;
-            } else {
-                next = m.get(m.firstKey());
-            }
-            return next != null;
-        }
-
-        public StoreNode<K, MessageDelivery> next() {
-            try {
-                hasNext();
-                return next;
-            } finally {
-                last = next.tracking;
-                next = null;
-            }
-        }
-
-        public boolean isReady() {
-            return !loading;
-        }
-        
-        public void remove() {
-            database.deleteMessage(node.delivery, queue);
-        }
-    }
-
-    private class DBStoreNode implements StoreNode<K, MessageDelivery> {
-        private MessageDelivery delivery;
-        private K key;
-        private long ownerId = -1;
-        private final long tracking;
-
-        DBStoreNode(MessageDelivery delivery) {
-            this.delivery = delivery;
-            tracking = delivery.getStoreTracking();
-            key = keyExtractor.map(delivery);
-            retriever.save(this);
-        }
-
-        public boolean acquire(Subscription<MessageDelivery> owner) {
-            long id = owner.getSink().getResourceId();
-            // TODO Auto-generated method stub
-            if (ownerId == -1 || id == ownerId) {
-                ownerId = owner.getSink().getResourceId();
-                return true;
-            }
-            return false;
-        }
-
-        public K getKey() {
-            return key;
-        }
-
-        public MessageDelivery getValue() {
-            return delivery;
-        }
-
-        public void unacquire() {
-            ownerId = -1;
-        }
-    }
-
-    private class MessageRetriever implements Dispatchable, MessageRestoreListener {
-
-        private final DispatchContext dispatchContext;
-        private AtomicBoolean loaded = new AtomicBoolean(false);
-
-        private long loadCursor = 0;
-        private long max = -1;
-        private long loadedCount;
-        
-        private final ConcurrentLinkedQueue<RestoredMessage> restoredMsgs = new ConcurrentLinkedQueue<RestoredMessage>();
-
-        MessageRetriever(IDispatcher dispatcher) {
-            dispatchContext = dispatcher.register(this, "MessageRetriever-" + queue);
-        }
-
-        public void save(DBStoreNode node) {
-            try {
-                node.delivery.persist(queue, false);
-            } catch (IOException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-        }
-
-        public void start() {
-            if (!loaded.get()) {
-                database.restoreMessages(queue, loadCursor, 50, this);
-            }
-        }
-
-        public boolean dispatch() {
-            while (true) {
-                RestoredMessage restored = restoredMsgs.poll();
-
-                if (restored == null) {
-                    break;
-                }
-
-                try {
-                    MessageDelivery delivery = restored.getMessageDelivery();
-                    addInternal(keyExtractor.map(delivery), delivery);
-                    if (firstKey == -1) {
-                        firstKey = delivery.getStoreTracking();
-                    }
-                    if (lastKey < delivery.getStoreTracking()) {
-                        lastKey = delivery.getStoreTracking();
-                    }
-                    loadedCount++;
-
-                } catch (IOException e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                }
-            }
-
-            if (!loaded.get()) {
-                database.restoreMessages(queue, loadCursor, 50, this);
-            }
-            return false;
-        }
-
-        public void messagesRestored(Collection<RestoredMessage> msgs) {
-            if (!msgs.isEmpty()) {
-                restoredMsgs.addAll(msgs);
-            } else {
-                loaded.set(true);
-            }
-            dispatchContext.requestDispatch();
-        }
-    }    
-}

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/DeliveryTarget.java Mon Apr 27 18:40:44 2009
@@ -19,7 +19,6 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.protobuf.AsciiBuffer;
 
 public interface DeliveryTarget {
     
@@ -27,9 +26,9 @@
     
     public IFlowSink<MessageDelivery> getSink();
     
+    public boolean hasSelector();
+    
     public boolean match(MessageDelivery message);
     
     public boolean isDurable();
-    
-    public AsciiBuffer getPersistentQueueName();
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageBroker.java Mon Apr 27 18:40:44 2009
@@ -48,6 +48,7 @@
     private String name;
     private IDispatcher dispatcher;
     private BrokerDatabase database;
+    
     private final AtomicBoolean stopping = new AtomicBoolean();
 
     public String getName() {
@@ -123,6 +124,14 @@
         this.dispatcher = dispatcher;
     }
 
+    public BrokerDatabase getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(BrokerDatabase database) {
+        this.database = database;
+    }
+    
     public String getBindUri() {
         return bindUri;
     }
@@ -149,8 +158,7 @@
     public VirtualHost getDefaultVirtualHost() {
         synchronized (virtualHosts) {
             if (defaultVirtualHost == null) {
-                defaultVirtualHost = new VirtualHost();
-                defaultVirtualHost.setDatabase(database);
+                defaultVirtualHost = new VirtualHost(this);
                 ArrayList<AsciiBuffer> names = new ArrayList<AsciiBuffer>(1);
                 names.add(new AsciiBuffer("default"));
                 defaultVirtualHost.setHostNames(names);
@@ -188,7 +196,6 @@
                 setDefaultVirtualHost(host);
             }
         }
-        host.setDatabase(database);
     }
 
     public synchronized void removeVirtualHost(VirtualHost host) throws Exception {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/MessageDelivery.java Mon Apr 27 18:40:44 2009
@@ -19,8 +19,10 @@
 import java.io.IOException;
 
 import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
 
 public interface MessageDelivery {
 
@@ -50,7 +52,7 @@
 
     /**
      * Called when the message's persistence requirements have been met. This
-     * method must not block.
+     * method must not restoreBlock.
      */
     public void onMessagePersisted();
 
@@ -58,16 +60,31 @@
 
     public Buffer getTransactionId();
 
-    public void persist(AsciiBuffer queue, boolean delayable) throws IOException;
+    /**
+     * Asynchronously persists a message in the store.
+     * 
+     * @param queue
+     *            The queue against which to save the message.
+     * @param controller
+     *            The source of the message.
+     * @param sequenceNumber
+     *            The sequence number of the message in the queue
+     * @param delayable
+     *            Can be set to indicate that flush of the message can be
+     *            delayed in the hopes that an acknowledgement will negate the
+     *            need for a delete
+     * @throws IOException If there is an exception serializing the message. 
+     */
+    public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException;
 
-    public void delete(AsciiBuffer queue);
-    
     /**
-     * Sets the unique storage tracking number. 
-     * @param tracking The tracking number. 
+     * Acknowledges the message for a particular queue. This will cause it to be 
+     * deleted from the message store. 
+     * 
+     * @param queue The queue for which to acknowledge the message.
      */
-    public void setStoreTracking(long tracking);
-    
+    public void acknowledge(QueueStore.QueueDescriptor queue);
+
     /**
      * Gets the tracking number used to identify this message in the message
      * store.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Queue.java Mon Apr 27 18:40:44 2009
@@ -24,122 +24,36 @@
 import org.apache.activemq.broker.MessageDelivery;
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.PrioritySizeLimiter;
-import org.apache.activemq.flow.SizeLimiter;
-import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.queue.IQueue;
-import org.apache.activemq.queue.Mapper;
-import org.apache.activemq.queue.PartitionedQueue;
-import org.apache.activemq.queue.SharedPriorityQueue;
-import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
 
 public class Queue implements DeliveryTarget {
 
     HashMap<DeliveryTarget, Subscription<MessageDelivery>> subs = new HashMap<DeliveryTarget, Subscription<MessageDelivery>>();
     private Destination destination;
-    private IQueue<AsciiBuffer, MessageDelivery> queue;
-    private MessageBroker broker;
-    
-    private Mapper<Integer, MessageDelivery> partitionMapper;
-    private Mapper<AsciiBuffer, MessageDelivery> keyExtractor;
-
-    private IQueue<AsciiBuffer, MessageDelivery> createQueue() {
+    private IQueue<Long, MessageDelivery> queue;
+    private VirtualHost virtualHost;
 
-        if (partitionMapper!=null) {
-            PartitionedQueue<Integer, AsciiBuffer, MessageDelivery> queue = new PartitionedQueue<Integer, AsciiBuffer, MessageDelivery>() {
-                @Override
-                protected IQueue<AsciiBuffer, MessageDelivery> cratePartition(Integer partitionKey) {
-                    return createSharedFlowQueue();
-                }
-
-                public boolean isElementPersistent(MessageDelivery elem) {
-                    return elem.isPersistent();
-                }
-            };
-            queue.setPartitionMapper(partitionMapper);
-            queue.setResourceName(destination.getName().toString());
-            return queue;
-        } else {
-            return createSharedFlowQueue();
-        }
+    Queue(IQueue<Long, MessageDelivery> queue)
+    {
+        this.queue = queue;
     }
-
-
-    public static final Mapper<Integer, MessageDelivery> PRIORITY_MAPPER = new Mapper<Integer, MessageDelivery>() {
-        public Integer map(MessageDelivery element) {
-            return element.getPriority();
-        }
-    };
     
-    private IQueue<AsciiBuffer, MessageDelivery> createSharedFlowQueue() {
-        if (MessageBroker.MAX_PRIORITY > 1) {
-            PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100, 1, MessageBroker.MAX_PRIORITY);
-            limiter.setPriorityMapper(PRIORITY_MAPPER);
-            SharedPriorityQueue<AsciiBuffer, MessageDelivery> queue = new SharedPriorityQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
-            queue.setKeyMapper(keyExtractor);
-            queue.setAutoRelease(true);
-            //DBQueueStore<AsciiBuffer> store = new DBQueueStore<AsciiBuffer>(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher());
-            //store.setKeyMapper(keyExtractor);
-            //queue.setStore(store);
-            queue.setDispatcher(broker.getDispatcher());
-            return queue;
-        } else {
-            SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 1);
-            SharedQueue<AsciiBuffer, MessageDelivery> queue = new SharedQueue<AsciiBuffer, MessageDelivery>(destination.getName().toString(), limiter);
-            queue.setKeyMapper(keyExtractor);
-            queue.setAutoRelease(true);
-            //DBQueueStore<AsciiBuffer> store = new DBQueueStore<AsciiBuffer>(broker.getDefaultVirtualHost().getDatabase(), queue, broker.getDispatcher());
-            //store.setKeyMapper(keyExtractor);
-            //queue.setStore(store);
-            queue.setDispatcher(broker.getDispatcher());
-            return queue;
-        }
-    }
-
     public final void deliver(MessageDelivery delivery, ISourceController<?> source) {
-        try {
-            if(delivery.isPersistent())
-            {
-                delivery.persist(destination.getName(), true);
-            }
-        } catch (IOException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-        
         queue.add(delivery, source);
     }
-    
-    public final Destination getDestination() {
-        return destination;
-    }
 
     public final void addConsumer(final DeliveryTarget dt) {
-        Subscription<MessageDelivery> sub = new Subscription<MessageDelivery>() {
-            public boolean isPreAcquired() {
-                return true;
-            }
-
-            public boolean matches(MessageDelivery message) {
-                return dt.match(message);
-            }
-
-            public boolean isRemoveOnDispatch() {
-                return true;
-            }
+        Subscription<MessageDelivery> sub = new QueueSubscription(dt);
 
-            public IFlowSink<MessageDelivery> getSink() {
-                return dt.getSink();
-            }
-
-            @Override
-            public String toString() {
-                return getSink().toString();
-            }
-        };
-        subs.put(dt, sub);
-        queue.addSubscription(sub);
+        Subscription<MessageDelivery> old = subs.put(dt, sub);
+        if (old == null) {
+            queue.addSubscription(sub);
+        } else {
+            subs.put(dt, old);
+        }
     }
 
     public boolean removeSubscirption(final DeliveryTarget dt) {
@@ -151,54 +65,108 @@
     }
 
     public void start() throws Exception {
-        queue = createQueue();
+        queue.start();
     }
 
     public void stop() throws Exception {
+        if (queue != null) {
+            queue.stop();
+        }
     }
 
     public IFlowSink<MessageDelivery> getSink() {
         return queue;
     }
 
+    public boolean hasSelector() {
+        return false;
+    }
+
     public boolean match(MessageDelivery message) {
         return true;
     }
 
-    public MessageBroker getBroker() {
-        return broker;
+    public VirtualHost getBroker() {
+        return virtualHost;
     }
 
-    public void setBroker(MessageBroker broker) {
-        this.broker = broker;
+    public void setVirtualHost(VirtualHost virtualHost) {
+        this.virtualHost = virtualHost;
     }
 
-    public Mapper<Integer, MessageDelivery> getPartitionMapper() {
-        return partitionMapper;
+    public void setDestination(Destination destination) {
+        this.destination = destination;
     }
 
-    public void setPartitionMapper(Mapper<Integer, MessageDelivery> partitionMapper) {
-        this.partitionMapper = partitionMapper;
+    public final Destination getDestination() {
+        return destination;
     }
-
-    public Mapper<AsciiBuffer, MessageDelivery> getKeyExtractor() {
-        return keyExtractor;
+    
+    public boolean isDurable()
+    {
+        return true;
     }
 
-    public void setKeyExtractor(Mapper<AsciiBuffer, MessageDelivery> keyExtractor) {
-        this.keyExtractor = keyExtractor;
-    }
+    public static class QueueSubscription implements Subscription<MessageDelivery> {
+        final DeliveryTarget target;
 
-    public void setDestination(Destination destination) {
-        this.destination = destination;
-    }
+        public QueueSubscription(DeliveryTarget dt) {
+            this.target = dt;
+        }
+
+        public boolean isPreAcquired() {
+            return true;
+        }
+
+        public boolean matches(MessageDelivery message) {
+            return target.match(message);
+        }
+
+        public boolean hasSelector() {
+            return target.hasSelector();
+        }
+
+        public boolean isRemoveOnDispatch() {
+            return false;
+        }
 
-    public AsciiBuffer getPersistentQueueName() {
-        // TODO Auto-generated method stub
-        return destination.getName();
+        public IFlowSink<MessageDelivery> getSink() {
+            return target.getSink();
+        }
+
+        @Override
+        public String toString() {
+            return target.getSink().toString();
+        }
+
+        public boolean offer(MessageDelivery elem, ISourceController<MessageDelivery> controller, SubscriptionDeliveryCallback callback) {
+            return target.getSink().offer(new QueueDelivery(elem, callback), controller);
+        }
+
+        public boolean isBrowser() {
+            return false;
+        }
     }
 
-    public boolean isDurable() {
-        return true;
+    private static class QueueDelivery extends MessageDeliveryWrapper {
+        private final SubscriptionDeliveryCallback callback;
+
+        QueueDelivery(MessageDelivery delivery, SubscriptionDeliveryCallback callback) {
+            super(delivery);
+            this.callback = callback;
+        }
+
+        @Override
+        public void persist(QueueStore.QueueDescriptor queue, ISourceController<?> controller, long sequenceNumber, boolean delayable) throws IOException {
+            // We override this for queue deliveries as the sub needn't
+            // persist the message
+        }
+
+        public void acknowledge(QueueStore.QueueDescriptor queue) {
+            if (callback != null) {
+                callback.acknowledge();
+            }
+        }
+
     }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/Router.java Mon Apr 27 18:40:44 2009
@@ -89,37 +89,26 @@
         //        
         Collection<DeliveryTarget> targets = route(msg.getDestination(), msg);
 
-        msg.store = database;
-        msg.setStoreTracking(msg.store.allocateStoreTracking());
+        //Set up the delivery for persistence:
+        msg.beginDispatch(database);
 
-        // TODO:
-        // Consider doing some caching of this target list. Most producers
-        // always send to the same destination.
-        if (targets != null) {
-
-            if (msg.isResponseRequired()) {
-                // We need to ack the message once we ensure we won't loose it.
-                // We know we won't loose it once it's persisted or delivered to
-                // a consumer Setup a callback to get notifed once one of those
-                // happens.
-                if (!msg.isPersistent()) {
-                    // Let the client know the broker got the message.
-                    msg.onMessagePersisted();
+        try
+        {
+            // TODO:
+            // Consider doing some caching of this sub list. Most producers
+            // always send to the same destination.
+            if (targets != null) {
+                // The sinks will request persistence via MessageDelivery.persist()
+                // if they require persistence:
+                for (DeliveryTarget dt : targets) {
+                    dt.deliver(msg, controller);
                 }
             }
-
-            // The sinks will request persistence via MessageDelivery.persist()
-            // if they require persistence:
-            for (DeliveryTarget dt : targets) {
-                dt.deliver(msg, controller);
-                //if (dt.match(msg)) {
-                //    
-                //    dt.getSink().add(msg, controller);
-                //}
-            }
-            
+        }
+        finally
+        {
             try {
-                msg.persistIfNeeded(controller);
+                msg.finishDispatch(controller);
             } catch (IOException ioe) {
                 //TODO: Error serializing the message, this should trigger an error
                 //This is a pretty severe error as we've already delivered
@@ -128,13 +117,6 @@
                 //should persist the message prior to sending to the recips?
                 ioe.printStackTrace();
             }
-
-        } else {
-            // Let the client know we got the message even though there
-            // were no valid targets to deliver the message to.
-            if (msg.isResponseRequired()) {
-                msg.onMessagePersisted();
-            }
         }
     }
 
@@ -158,7 +140,6 @@
 
     public void setVirtualHost(VirtualHost virtualHost) {
         this.virtualHost = virtualHost;
-        this.database = virtualHost.getDatabase();
     }
 
     public VirtualHost getVirtualHost() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/VirtualHost.java Mon Apr 27 18:40:44 2009
@@ -20,66 +20,105 @@
 import java.util.HashMap;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.broker.store.BrokerDatabase;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.IQueue;
 
 /**
  * @author chirino
  */
 public class VirtualHost implements Service {
-    
-    final private HashMap<Destination, Queue> queues = new HashMap<Destination, Queue>();
+
+    final private BrokerQueueStore queueStore;
+    final private MessageBroker broker;
+    final private HashMap<AsciiBuffer, Queue> queues = new HashMap<AsciiBuffer, Queue>();
     private ArrayList<AsciiBuffer> hostNames = new ArrayList<AsciiBuffer>();
     private Router router;
-    private BrokerDatabase database;
-    
-    public VirtualHost() {
+    private boolean started;
+
+    public VirtualHost(MessageBroker broker) {
+        this.broker = broker;
         this.router = new Router();
         this.router.setVirtualHost(this);
+        this.queueStore = new BrokerQueueStore();
     }
-    
+
     public AsciiBuffer getHostName() {
-        if( hostNames.size() > 0 ) {
+        if (hostNames.size() > 0) {
             hostNames.get(0);
         }
         return null;
     }
-    
+
     public ArrayList<AsciiBuffer> getHostNames() {
         return hostNames;
     }
+
     public void setHostNames(ArrayList<AsciiBuffer> hostNames) {
         this.hostNames = hostNames;
     }
-    
+
     public Router getRouter() {
         return router;
     }
 
-    public void start() throws Exception {
+    public synchronized void start() throws Exception {
+
+        if (started) {
+            return;
+        }
+
+        router.setDatabase(broker.getDatabase());
+
+        queueStore.setDatabase(broker.getDatabase());
+        queueStore.setDispatcher(broker.getDispatcher());
+        queueStore.loadQueues();
+        // Create Queue instances
+        for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
+            Queue queue = new Queue(iQueue);
+            Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
+            Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName());
+            queue.setDestination(dest);
+            domain.add(dest.getName(), queue);
+            queues.put(dest.getName(), queue);
+        }
         for (Queue queue : queues.values()) {
             queue.start();
         }
+        started = true;
     }
-    public void stop() throws Exception {
+
+    public synchronized void stop() throws Exception {
+        if (!started) {
+            return;
+        }
         for (Queue queue : queues.values()) {
             queue.stop();
         }
+        started = false;
     }
 
-    public void addQueue(Queue queue) {
-        Domain domain = router.getDomain(queue.getDestination().getDomain());
-        domain.add(queue.getDestination().getName(), queue);
-    }
-
-    public BrokerDatabase getDatabase() {
-        return database;
+    public synchronized Queue createQueue(Destination dest) throws Exception {
+        if(!started)
+        {
+            //Queues from the store must be loaded before we can create new ones:
+            throw new IllegalStateException("Can't create queue on unstarted host");
+        }
+        
+        Queue queue = queues.get(dest);
+        // If the queue doesn't exist create it:
+        if (queue == null) {
+            IQueue<Long, MessageDelivery> iQueue = queueStore.createSharedQueue(dest.getName().toString());
+            queue = new Queue(iQueue);
+            queue.setDestination(dest);
+            Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
+            domain.add(dest.getName(), queue);
+            queues.put(dest.getName(), queue);
+        }
+        queue.start();
+        return queue;
     }
 
-    public void setDatabase(BrokerDatabase store) {
-        this.database = store;
-        router.setDatabase(database);
+    public BrokerQueueStore getQueueStore() {
+        return queueStore;
     }
-
-
 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Mon Apr 27 18:40:44 2009
@@ -36,6 +36,7 @@
     private AsciiBuffer producerId;
     private OpenWireFormat storeWireFormat;
     private PersistListener persistListener = null;
+    private final int size;
 
     public interface PersistListener {
         public void onMessagePersisted(OpenWireMessageDelivery delivery);
@@ -43,6 +44,7 @@
 
     public OpenWireMessageDelivery(Message message) {
         this.message = message;
+        this.size = message.getSize();
     }
 
     public void setPersistListener(PersistListener listener) {
@@ -56,7 +58,8 @@
         return destination;
     }
 
-    public int getFlowLimiterSize() {
+    public int getMemorySize() {
+        //return size;
         return 1;
     }
 
@@ -112,6 +115,7 @@
         record.setBuffer(new Buffer(bytes.getData(), bytes.getOffset(), bytes.getLength()));
         record.setStreamKey((long) 0);
         record.setMessageId(getMsgId());
+        record.setSize(getFlowLimiterSize());
         return record;
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Mon Apr 27 18:40:44 2009
@@ -20,12 +20,14 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
 import org.apache.activemq.WindowLimiter;
 import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.DeliveryTarget;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
@@ -83,6 +85,7 @@
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.CommandVisitor;
@@ -100,6 +103,10 @@
     private OpenWireFormat storeWireFormat;
     private Router router;
 
+    public OpenwireProtocolHandler() {
+        setStoreWireFormat(new OpenWireFormat());
+    }
+
     public void start() throws Exception {
 
     }
@@ -133,7 +140,6 @@
                 public Response processAddConsumer(ConsumerInfo info) throws Exception {
                     ConsumerContext ctx = new ConsumerContext(info);
                     consumers.put(info.getConsumerId(), ctx);
-                    router.bind(convert(info.getDestination()), ctx);
                     return ack(command);
                 }
 
@@ -403,11 +409,13 @@
         private final ConsumerInfo info;
         private String name;
         private BooleanExpression selector;
-        private boolean durable;
-        private AsciiBuffer durableQueueName;
+        private boolean isDurable;
+        private boolean isQueueReceiver;
+        private QueueStore.QueueDescriptor durableQueueId;
 
         private SingleFlowRelay<MessageDelivery> queue;
         public WindowLimiter<MessageDelivery> limiter;
+        private AtomicLong deliverySequence = new AtomicLong(0);
 
         HashMap<MessageId, MessageDelivery> pendingMessages = new HashMap<MessageId, MessageDelivery>();
         LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
@@ -415,11 +423,13 @@
         public ConsumerContext(final ConsumerInfo info) throws InvalidSelectorException {
             this.info = info;
             this.name = info.getConsumerId().toString();
-            durable = info.isDurable();
-            if (durable) {
-                durableQueueName = new AsciiBuffer(info.getSubscriptionName());
+
+            isDurable = info.isDurable();
+            if (isDurable) {
+                durableQueueId = new QueueStore.QueueDescriptor();
+                durableQueueId.setQueueName(new AsciiBuffer(info.getSubscriptionName()));
                 try {
-                    connection.getBroker().getDefaultVirtualHost().getDatabase().addQueue(durableQueueName);
+                    connection.getBroker().getDefaultVirtualHost().getQueueStore().addQueue(durableQueueId);
                 } catch (Throwable thrown) {
                     thrown.printStackTrace();
                 }
@@ -442,36 +452,42 @@
                     md.setMessage(msg);
                     md.setDestination(msg.getDestination());
                     // Add to the pending list if persistent and we are durable:
-                    if (isDurable() && message.isPersistent()) {
+                    if (message.isPersistent() && (isDurable() || isQueueReceiver())) {
                         synchronized (queue) {
                             Object old = pendingMessages.put(msg.getMessageId(), message);
-                            if(old != null)
-                            {
+                            if (old != null) {
                                 new Exception("Duplicate message id: " + msg.getMessageId()).printStackTrace();
                             }
                             pendingMessageIds.add(msg.getMessageId());
                             connection.write(md);
                         }
-                    }
-                    else
-                    {
+                    } else {
+                        if (isQueueReceiver()) {
+                            message.acknowledge(durableQueueId);
+                        }
                         connection.write(md);
                     }
                 };
             });
+
+            // Subscribe
+            if (info.getDestination().isQueue()) {
+                isQueueReceiver = true;
+            }
+            router.bind(convert(info.getDestination()), this);
         }
+
         public void ack(MessageAck info) {
-            //TODO: The pending message queue could probably be optimized to avoid having
-            //to create a new list here. 
-            LinkedList<MessageDelivery> acked = new LinkedList<MessageDelivery>();            
+            // TODO: The pending message queue could probably be optimized to
+            // avoid having to create a new list here.
+            LinkedList<MessageDelivery> acked = new LinkedList<MessageDelivery>();
             synchronized (queue) {
-                if (isDurable()) {
+                if (isDurable() || isQueueReceiver()) {
                     MessageId id = info.getLastMessageId();
                     while (!pendingMessageIds.isEmpty()) {
                         MessageId pendingId = pendingMessageIds.getFirst();
                         MessageDelivery delivery = pendingMessages.remove(pendingId);
                         acked.add(delivery);
-                        delivery.delete(durableQueueName);
                         pendingMessageIds.removeFirst();
                         if (pendingId.equals(id)) {
                             break;
@@ -481,12 +497,11 @@
                 }
                 limiter.onProtocolCredit(info.getMessageCount());
             }
-            
-            //Delete outside of synchronization on queue to avoid contention with enqueueing
-            //threads. 
-            for(MessageDelivery delivery : acked)
-            {
-                delivery.delete(durableQueueName);
+
+            // Delete outside of synchronization on queue to avoid contention
+            // with enqueueing threads.
+            for (MessageDelivery delivery : acked) {
+                delivery.acknowledge(durableQueueId);
             }
         }
 
@@ -501,9 +516,9 @@
 
             if (isDurable() && delivery.isPersistent()) {
                 try {
-                    delivery.persist(durableQueueName, true);
+                    delivery.persist(durableQueueId, null, deliverySequence.incrementAndGet(), true);
                 } catch (IOException e) {
-                    // TODO Auto-generated catch block
+                    // TODO Auto-generated catch restoreBlock
                     e.printStackTrace();
                 }
             }
@@ -511,6 +526,10 @@
             queue.add(delivery, source);
         }
 
+        public boolean hasSelector() {
+            return selector != null;
+        }
+
         public boolean match(MessageDelivery message) {
             Message msg = message.asType(Message.class);
             if (msg == null) {
@@ -529,7 +548,11 @@
         }
 
         public boolean isDurable() {
-            return durable;
+            return isDurable;
+        }
+
+        public boolean isQueueReceiver() {
+            return isQueueReceiver;
         }
 
         public AsciiBuffer getPersistentQueueName() {
@@ -550,8 +573,7 @@
         AsciiBuffer domain;
         if (dest.isQueue()) {
             domain = Router.QUEUE_DOMAIN;
-        }
-        if (dest.isTopic()) {
+        } else if (dest.isTopic()) {
             domain = Router.TOPIC_DOMAIN;
         } else {
             throw new IllegalArgumentException("Unsupported domain type: " + dest);
@@ -592,13 +614,18 @@
 
     public void setWireFormat(WireFormat wireFormat) {
         this.wireFormat = (OpenWireFormat) wireFormat;
-        this.storeWireFormat = this.wireFormat.copy();
+        setStoreWireFormat(this.wireFormat.copy());
+    }
+
+    private void setStoreWireFormat(OpenWireFormat wireFormat) {
+        this.storeWireFormat = wireFormat;
+        storeWireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION);
         storeWireFormat.setCacheEnabled(false);
         storeWireFormat.setTightEncodingEnabled(false);
         storeWireFormat.setSizePrefixDisabled(false);
     }
 
-    public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException {
+    public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException {
         Buffer buf = record.getBuffer();
         Message message = (Message) storeWireFormat.unmarshal(new ByteSequence(buf.data, buf.offset, buf.length));
         OpenWireMessageDelivery delivery = new OpenWireMessageDelivery(message);

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/protocol/ProtocolHandler.java Mon Apr 27 18:40:44 2009
@@ -20,7 +20,7 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.BrokerConnection;
-import org.apache.activemq.broker.MessageDelivery;
+import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -31,6 +31,6 @@
     public void onException(Exception error);
     public void setWireFormat(WireFormat wf);
     
-    public MessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
+    public BrokerMessageDelivery createMessageDelivery(MessageRecord record) throws IOException;
 
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Mon Apr 27 18:40:44 2009
@@ -54,7 +54,7 @@
         return destination;
     }
 
-    public int getFlowLimiterSize() {
+    public int getMemorySize() {
         return frame.getContent().length;
     }
 

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=769099&r1=769098&r2=769099&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Mon Apr 27 18:40:44 2009
@@ -25,11 +25,13 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.InvalidSelectorException;
 
 import org.apache.activemq.WindowLimiter;
 import org.apache.activemq.broker.BrokerConnection;
+import org.apache.activemq.broker.BrokerMessageDelivery;
 import org.apache.activemq.broker.DeliveryTarget;
 import org.apache.activemq.broker.Destination;
 import org.apache.activemq.broker.MessageDelivery;
@@ -49,6 +51,7 @@
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.queue.QueueStore;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.transport.stomp.Stomp;
@@ -99,7 +102,7 @@
             }
         });
         actionHandlers.put(Stomp.Commands.SEND, new ActionHander() {
-            
+
             public void onStompFrame(StompFrame frame) throws Exception {
                 String dest = frame.getHeaders().get(Stomp.Headers.Send.DESTINATION);
                 Destination destination = translator(frame).convertToDestination(StompProtocolHandler.this, dest);
@@ -276,7 +279,8 @@
         private LinkedHashMap<AsciiBuffer, AsciiBuffer> sentMessageIds = new LinkedHashMap<AsciiBuffer, AsciiBuffer>();
 
         private boolean durable;
-        private AsciiBuffer durableQueueName;
+        private QueueStore.QueueDescriptor durableQueueId;
+        private AtomicLong deliverySequence = new AtomicLong(0);
 
         public ConsumerContext(final StompFrame subscribe) throws Exception {
             translator = translator(subscribe);
@@ -359,6 +363,10 @@
             return queue;
         }
 
+        public boolean hasSelector() {
+            return false;
+        }
+
         public boolean match(MessageDelivery message) {
             StompFrame stompMessage = message.asType(StompFrame.class);
             if (stompMessage == null) {
@@ -394,7 +402,7 @@
 
             if (isDurable() && delivery.isPersistent()) {
                 try {
-                    delivery.persist(durableQueueName, true);
+                    delivery.persist(durableQueueId, null, deliverySequence.incrementAndGet(), true);
                 } catch (IOException e) {
                     // TODO Auto-generated catch block
                     e.printStackTrace();
@@ -512,7 +520,7 @@
         return null;
     }
 
-    public MessageDelivery createMessageDelivery(MessageRecord record) {
+    public BrokerMessageDelivery createMessageDelivery(MessageRecord record) {
         throw new UnsupportedOperationException();
     }
 }