You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC

svn commit: r1567616 [7/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/ qpid/cpp/bindings/qpid/ruby/ qp...

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java Wed Feb 12 13:27:51 2014
@@ -70,7 +70,7 @@ public interface Transaction
 
     public static interface Record
     {
-        TransactionLogResource getQueue();
+        TransactionLogResource getResource();
         EnqueueableMessage getMessage();
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java Wed Feb 12 13:27:51 2014
@@ -24,5 +24,7 @@ import java.util.UUID;
 
 public interface TransactionLogResource
 {
+    String getName();
     public UUID getId();
+    boolean isDurable();
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Wed Feb 12 13:27:51 2014
@@ -25,12 +25,14 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 import java.util.Collection;
 import java.util.List;
@@ -88,7 +90,7 @@ public class AsyncAutoCommitTransaction 
 
     }
 
-    public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try
@@ -158,15 +160,15 @@ public class AsyncAutoCommitTransaction 
         }
     }
 
-    public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+    public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
     {
         Transaction txn = null;
         try
         {
-            for(QueueEntry entry : queueEntries)
+            for(MessageInstance entry : queueEntries)
             {
                 ServerMessage message = entry.getMessage();
-                BaseQueue queue = entry.getQueue();
+                TransactionLogResource queue = entry.getOwningResource();
 
                 if(message.isPersistent() && queue.isDurable())
                 {
@@ -210,7 +212,7 @@ public class AsyncAutoCommitTransaction 
     }
 
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Wed Feb 12 13:27:51 2014
@@ -25,11 +25,13 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 import java.util.Collection;
 import java.util.List;
@@ -73,7 +75,7 @@ public class AutoCommitTransaction imple
         immediateAction.postCommit();
     }
 
-    public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try
@@ -105,15 +107,15 @@ public class AutoCommitTransaction imple
 
     }
 
-    public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+    public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
     {
         Transaction txn = null;
         try
         {
-            for(QueueEntry entry : queueEntries)
+            for(MessageInstance entry : queueEntries)
             {
                 ServerMessage message = entry.getMessage();
-                BaseQueue queue = entry.getQueue();
+                TransactionLogResource queue = entry.getOwningResource();
 
                 if(message.isPersistent() && queue.isDurable())
                 {
@@ -152,7 +154,7 @@ public class AutoCommitTransaction imple
     }
 
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Wed Feb 12 13:27:51 2014
@@ -22,10 +22,12 @@
 package org.apache.qpid.server.txn;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Xid;
 
@@ -74,7 +76,7 @@ public class DistributedTransaction impl
         }
     }
 
-    public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         if(_branch != null)
         {
@@ -87,13 +89,13 @@ public class DistributedTransaction impl
         }
     }
 
-    public void dequeue(Collection<QueueEntry> messages, Action postTransactionAction)
+    public void dequeue(Collection<MessageInstance> messages, Action postTransactionAction)
     {
         if(_branch != null)
         {
-            for(QueueEntry entry : messages)
+            for(MessageInstance entry : messages)
             {
-                _branch.dequeue(entry.getQueue(), entry.getMessage());
+                _branch.dequeue(entry.getOwningResource(), entry.getMessage());
             }
             _branch.addPostTransactionAction(postTransactionAction);
         }
@@ -103,13 +105,12 @@ public class DistributedTransaction impl
         }
     }
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         if(_branch != null)
         {
             _branch.enqueue(queue, message);
             _branch.addPostTransactionAction(postTransactionAction);
-            enqueue(Collections.singletonList(queue), message, postTransactionAction);
         }
         else
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Wed Feb 12 13:27:51 2014
@@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Xid;
 
@@ -335,7 +336,7 @@ public class DtxBranch
         {
             if(enqueue.isDurable())
             {
-                _transaction.enqueueMessage(enqueue.getQueue(), enqueue.getMessage());
+                _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
             }
         }
 
@@ -344,7 +345,7 @@ public class DtxBranch
         {
             if(enqueue.isDurable())
             {
-                _transaction.dequeueMessage(enqueue.getQueue(), enqueue.getMessage());
+                _transaction.dequeueMessage(enqueue.getResource(), enqueue.getMessage());
             }
         }
     }
@@ -356,31 +357,31 @@ public class DtxBranch
     }
 
 
-    public void dequeue(BaseQueue queue, EnqueueableMessage message)
+    public void dequeue(TransactionLogResource resource, EnqueueableMessage message)
     {
-        _dequeueRecords.add(new Record(queue, message));
+        _dequeueRecords.add(new Record(resource, message));
     }
 
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message)
     {
         _enqueueRecords.add(new Record(queue, message));
     }
 
     private static final class Record implements Transaction.Record
     {
-        private final BaseQueue _queue;
+        private final TransactionLogResource _resource;
         private final EnqueueableMessage _message;
 
-        public Record(BaseQueue queue, EnqueueableMessage message)
+        public Record(TransactionLogResource resource, EnqueueableMessage message)
         {
-            _queue = queue;
+            _resource = resource;
             _message = message;
         }
 
-        public BaseQueue getQueue()
+        public TransactionLogResource getResource()
         {
-            return _queue;
+            return _resource;
         }
 
         public EnqueueableMessage getMessage()
@@ -390,7 +391,7 @@ public class DtxBranch
 
         public boolean isDurable()
         {
-            return _message.isPersistent() && _queue.isDurable();
+            return _message.isPersistent() && _resource.isDurable();
         }
     }
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Wed Feb 12 13:27:51 2014
@@ -21,7 +21,9 @@
 package org.apache.qpid.server.txn;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,7 +93,7 @@ public class LocalTransaction implements
         _postTransactionActions.add(postTransactionAction);
     }
 
-    public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
@@ -118,7 +120,7 @@ public class LocalTransaction implements
         }
     }
 
-    public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+    public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
@@ -126,10 +128,10 @@ public class LocalTransaction implements
 
         try
         {
-            for(QueueEntry entry : queueEntries)
+            for(MessageInstance entry : queueEntries)
             {
                 ServerMessage message = entry.getMessage();
-                BaseQueue queue = entry.getQueue();
+                TransactionLogResource queue = entry.getOwningResource();
 
                 if(message.isPersistent() && queue.isDurable())
                 {
@@ -195,7 +197,7 @@ public class LocalTransaction implements
         }
     }
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         sync();
         _postTransactionActions.add(postTransactionAction);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Wed Feb 12 13:27:51 2014
@@ -24,8 +24,9 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.TransactionLogResource;
 
 
 /**
@@ -79,21 +80,21 @@ public interface ServerTransaction
      * 
      * A store operation will result only for a persistent message on a durable queue.
      */
-    void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction);
+    void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
 
     /** 
      * Dequeue a message(s) from queue(s) registering a post transaction action.
      * 
      * Store operations will result only for a persistent messages on durable queues.
      */
-    void dequeue(Collection<QueueEntry> messages, Action postTransactionAction);
+    void dequeue(Collection<MessageInstance> messages, Action postTransactionAction);
 
     /** 
      * Enqueue a message to a queue registering a post transaction action.
      * 
      * A store operation will result only for a persistent message on a durable queue.
      */
-    void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction);
+    void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
 
     /** 
      * Enqueue a message(s) to queue(s) registering a post transaction action.

Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/
            ('svn:mergeinfo' removed)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Feb 12 13:27:51 2014
@@ -52,8 +52,13 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.plugin.SystemNodeCreator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.LinkRegistry;
@@ -105,8 +110,8 @@ public abstract class AbstractVirtualHos
 
     private final DtxRegistry _dtxRegistry;
     private final AMQQueueFactory _queueFactory;
-
-    private final org.apache.qpid.server.model.VirtualHost _virtualHost;
+    private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry();
+    private final org.apache.qpid.server.model.VirtualHost _model;
 
     private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
 
@@ -125,6 +130,13 @@ public abstract class AbstractVirtualHos
     private volatile State _state = State.INITIALISING;
     private volatile ScheduledThreadPoolExecutor _houseKeepingTasks;
 
+    private final Map<String, MessageDestination> _systemNodeDestinations =
+            Collections.synchronizedMap(new HashMap<String,MessageDestination>());
+
+    private final Map<String, MessageSource> _systemNodeSources =
+            Collections.synchronizedMap(new HashMap<String,MessageSource>());
+
+
     public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
                                StatisticsGatherer brokerStatisticsGatherer,
                                SecurityManager parentSecurityManager,
@@ -146,6 +158,7 @@ public abstract class AbstractVirtualHos
         _vhostConfig = hostConfig;
         _name = _vhostConfig.getName();
         _dtxRegistry = new DtxRegistry();
+        _model = virtualHost;
 
         _id = UUIDGenerator.generateVhostUUID(_name);
 
@@ -162,7 +175,7 @@ public abstract class AbstractVirtualHos
 
         _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
 
-        _virtualHost = virtualHost;
+        registerSystemNodes();
 
         initialiseStatistics();
 
@@ -179,7 +192,7 @@ public abstract class AbstractVirtualHos
 
             try
             {
-                initialiseStorage(_vhostConfig, _virtualHost);
+                initialiseStorage(_vhostConfig, _model);
 
                 getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
                 getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
@@ -205,6 +218,16 @@ public abstract class AbstractVirtualHos
         setState(State.QUIESCED);
     }
 
+    private void registerSystemNodes()
+    {
+        QpidServiceLoader<SystemNodeCreator> qpidServiceLoader = new QpidServiceLoader<SystemNodeCreator>();
+        Iterable<SystemNodeCreator> factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class);
+        for(SystemNodeCreator creator : factories)
+        {
+            creator.register(_systemNodeRegistry);
+        }
+    }
+
     abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig,
                                               org.apache.qpid.server.model.VirtualHost virtualHost) throws Exception;
 
@@ -494,6 +517,13 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
+    public MessageSource getMessageSource(final String name)
+    {
+        MessageSource systemSource = _systemNodeSources.get(name);
+        return systemSource == null ? getQueue(name) : systemSource;
+    }
+
+    @Override
     public AMQQueue getQueue(UUID id)
     {
         return _queueRegistry.getQueue(id);
@@ -577,6 +607,14 @@ public abstract class AbstractVirtualHos
 
     }
 
+
+    @Override
+    public MessageDestination getMessageDestination(final String name)
+    {
+        MessageDestination destination = _systemNodeDestinations.get(name);
+        return destination == null ? getExchange(name) : destination;
+    }
+
     @Override
     public Exchange getExchange(String name)
     {
@@ -690,7 +728,7 @@ public abstract class AbstractVirtualHos
 
     protected void passivate(String reason)
     {
-        _virtualHost.removeChangeListener(this);
+        _model.removeChangeListener(this);
         removeHouseKeepingTasks();
 
         //Stop Connections
@@ -907,7 +945,7 @@ public abstract class AbstractVirtualHos
         try
         {
             initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
-            _virtualHost.addChangeListener(this);
+            _model.addChangeListener(this);
             finalState = State.ACTIVE;
         }
         finally
@@ -1059,4 +1097,44 @@ public abstract class AbstractVirtualHos
         }
     }
 
+    private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry
+    {
+        @Override
+        public void registerSystemNode(final MessageNode node)
+        {
+            if(node instanceof MessageDestination)
+            {
+                _systemNodeDestinations.put(node.getName(), (MessageDestination) node);
+            }
+            if(node instanceof MessageSource)
+            {
+                _systemNodeSources.put(node.getName(), (MessageSource)node);
+            }
+        }
+
+        @Override
+        public void removeSystemNode(final MessageNode node)
+        {
+            if(node instanceof MessageDestination)
+            {
+                _systemNodeDestinations.remove(node.getName());
+            }
+            if(node instanceof MessageSource)
+            {
+                _systemNodeSources.remove(node.getName());
+            }
+        }
+
+        @Override
+        public VirtualHost getVirtualHost()
+        {
+            return AbstractVirtualHost.this;
+        }
+
+        @Override
+        public org.apache.qpid.server.model.VirtualHost getVirtualHostModel()
+        {
+            return _model;
+        }
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Feb 12 13:27:51 2014
@@ -29,6 +29,9 @@ import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageNode;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -48,6 +51,7 @@ public interface VirtualHost extends Dur
     String getName();
 
     AMQQueue getQueue(String name);
+    MessageSource getMessageSource(String name);
 
     AMQQueue getQueue(UUID id);
 
@@ -75,6 +79,8 @@ public interface VirtualHost extends Dur
 
     void removeExchange(Exchange exchange, boolean force) throws AMQException;
 
+    MessageDestination getMessageDestination(String name);
+
     Exchange getExchange(String name);
     Exchange getExchange(UUID id);
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Wed Feb 12 13:27:51 2014
@@ -119,7 +119,7 @@ public class VirtualHostConfigRecoveryHa
         }
         for(Transaction.Record record : enqueues)
         {
-            final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
+            final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
             if(queue != null)
             {
                 final long messageId = record.getMessage().getMessageNumber();
@@ -141,7 +141,7 @@ public class VirtualHostConfigRecoveryHa
                             try
                             {
 
-                                queue.enqueue(message, true, null);
+                                queue.enqueue(message, null);
                                 ref.release();
                             }
                             catch (AMQException e)
@@ -173,13 +173,13 @@ public class VirtualHostConfigRecoveryHa
                 StringBuilder xidString = xidAsString(id);
                 CurrentActor.get().message(_logSubject,
                                            TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
-                                                                                      record.getQueue().getId().toString()));
+                                                                                      record.getResource().getId().toString()));
 
             }
         }
         for(Transaction.Record record : dequeues)
         {
-            final AMQQueue queue = _virtualHost.getQueue(record.getQueue().getId());
+            final AMQQueue queue = _virtualHost.getQueue(record.getResource().getId());
             if(queue != null)
             {
                 final long messageId = record.getMessage().getMessageNumber();
@@ -223,7 +223,7 @@ public class VirtualHostConfigRecoveryHa
                 StringBuilder xidString = xidAsString(id);
                 CurrentActor.get().message(_logSubject,
                                            TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
-                                                                                      record.getQueue().getId().toString()));
+                                                                                      record.getResource().getId().toString()));
             }
 
         }
@@ -292,7 +292,7 @@ public class VirtualHostConfigRecoveryHa
                         count = 0;
                     }
 
-                    queue.enqueue(message);
+                    queue.enqueue(message,null);
 
                     _queueRecoveries.put(queueName, ++count);
                 }
@@ -312,10 +312,22 @@ public class VirtualHostConfigRecoveryHa
                         new TransactionLogResource()
                         {
                             @Override
+                            public String getName()
+                            {
+                                return "<<UNKNOWN>>";
+                            }
+
+                            @Override
                             public UUID getId()
                             {
                                 return queueId;
                             }
+
+                            @Override
+                            public boolean isDurable()
+                            {
+                                return false;
+                            }
                         };
                 txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
                 txn.commitTranAsync();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java Wed Feb 12 13:27:51 2014
@@ -27,7 +27,7 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.XMLConfiguration;
 
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.PriorityQueue;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.server.util.BrokerTestHelper;
@@ -113,17 +113,17 @@ public class VirtualHostConfigurationTes
 
         // Check that atest was a priority queue with 5 priorities
         AMQQueue atest = vhost.getQueue("atest");
-        assertTrue(atest instanceof AMQPriorityQueue);
-        assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
+        assertTrue(atest instanceof PriorityQueue);
+        assertEquals(5, ((PriorityQueue) atest).getPriorities());
 
         // Check that ptest was a priority queue with 10 priorities
         AMQQueue ptest = vhost.getQueue("ptest");
-        assertTrue(ptest instanceof AMQPriorityQueue);
-        assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
+        assertTrue(ptest instanceof PriorityQueue);
+        assertEquals(10, ((PriorityQueue) ptest).getPriorities());
 
         // Check that ntest wasn't a priority queue
         AMQQueue ntest = vhost.getQueue("ntest");
-        assertFalse(ntest instanceof AMQPriorityQueue);
+        assertFalse(ntest instanceof PriorityQueue);
     }
 
     public void testQueueAlerts() throws Exception

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java Wed Feb 12 13:27:51 2014
@@ -25,6 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -73,7 +74,7 @@ public class TopicExchangeTest extends Q
 
     public void testNoRoute() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
+        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
 
@@ -85,7 +86,7 @@ public class TopicExchangeTest extends Q
 
     public void testDirectMatch() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
+        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
 
@@ -108,7 +109,7 @@ public class TopicExchangeTest extends Q
 
     public void testStarMatch() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
+        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, false, null);
         _exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
 
 
@@ -139,7 +140,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashMatch() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
+        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, false, null);
         _exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
 
 
@@ -190,7 +191,7 @@ public class TopicExchangeTest extends Q
 
     public void testMidHash() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
+        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
 
@@ -216,7 +217,7 @@ public class TopicExchangeTest extends Q
 
     public void testMatchAfterHash() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
 
@@ -255,7 +256,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashAfterHash() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
 
@@ -277,7 +278,7 @@ public class TopicExchangeTest extends Q
 
     public void testHashHash() throws AMQException
     {
-        AMQQueue queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
+        AMQQueue<?,?,?> queue = _vhost.createQueue(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false,
                 false, null);
         _exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
 
@@ -321,7 +322,7 @@ public class TopicExchangeTest extends Q
         when(message.getMessageNumber()).thenReturn(messageNumber);
         for(BaseQueue q : queues)
         {
-            q.enqueue(message);
+            q.enqueue(message, null);
         }
 
         return queues.size();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java Wed Feb 12 13:27:51 2014
@@ -38,7 +38,7 @@ public class VirtualHostMessagesTest ext
         validateLogMessage(log, "VHT-1001", expected);
     }
 
-    public void testSubscriptionClosed()
+    public void testVirtualhostClosed()
     {
         _logMessage = VirtualHostMessages.CLOSED();
         List<Object> log = performLog();

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java Wed Feb 12 13:27:51 2014
@@ -231,7 +231,7 @@ public class AMQQueueFactoryTest extends
                 false,
                 attributes);
 
-        assertEquals("Queue not a priority queue", AMQPriorityQueue.class, queue.getClass());
+        assertEquals("Queue not a priority queue", PriorityQueue.class, queue.getClass());
         verifyQueueRegistered("testPriorityQueue");
         verifyRegisteredQueueCount(1);
     }
@@ -246,7 +246,7 @@ public class AMQQueueFactoryTest extends
                 false,
                 false,
                 null);
-        assertEquals("Queue not a simple queue", SimpleAMQQueue.class, queue.getClass());
+        assertEquals("Queue not a simple queue", StandardQueue.class, queue.getClass());
         verifyQueueRegistered(queueName);
 
         //verify that no alternate exchange or DLQ were produced

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java Wed Feb 12 13:27:51 2014
@@ -28,6 +28,9 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Collections;
+import java.util.UUID;
+
 public class ConflationQueueListTest extends TestCase
 {
     private static final String CONFLATION_KEY = "CONFLATION_KEY";
@@ -37,13 +40,15 @@ public class ConflationQueueListTest ext
     private static final String TEST_KEY_VALUE2 = "testKeyValue2";
 
     private ConflationQueueList _list;
-    private AMQQueue _queue = createTestQueue();
+    private ConflationQueue _queue;
 
     @Override
     protected void setUp() throws Exception
     {
         super.setUp();
-        _list = new ConflationQueueList(_queue, CONFLATION_KEY);
+        _queue = new ConflationQueue(UUID.randomUUID(), getName(), false, null, false, false, mock(VirtualHost.class),
+                                     Collections.<String,Object>emptyMap(),CONFLATION_KEY);
+        _list = _queue.getEntries();
     }
 
     public void testListHasNoEntries()
@@ -175,7 +180,8 @@ public class ConflationQueueListTest ext
 
     private int countEntries(ConflationQueueList list)
     {
-        QueueEntryIterator<SimpleQueueEntryImpl> iterator = list.iterator();
+        QueueEntryIterator<ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList,QueueConsumer<?,ConflationQueueList.ConflationQueueEntry, ConflationQueue, ConflationQueueList>> iterator =
+                list.iterator();
         int count = 0;
         while(iterator.advance())
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Wed Feb 12 13:27:51 2014
@@ -24,15 +24,23 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
@@ -171,6 +179,8 @@ public class MockAMQQueue implements AMQ
         return null;
     }
 
+
+
     public boolean isDurable()
     {
         return false;
@@ -202,32 +212,67 @@ public class MockAMQQueue implements AMQ
         return _virtualhost;
     }
 
-    public String getName()
+    @Override
+    public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
     {
-        return _name;
+        return false;
     }
 
-    public void registerSubscription(Subscription subscription, boolean exclusive) throws AMQException
+    @Override
+    public void addQueueDeleteTask(final Action task)
+    {
+
+    }
+
+    @Override
+    public void enqueue(final ServerMessage message, final Action action) throws AMQException
     {
 
     }
 
-    public void unregisterSubscription(Subscription subscription) throws AMQException
+    @Override
+    public int compareTo(final Object o)
     {
+        return 0;
+    }
+
+    @Override
+    public Consumer addConsumer(final ConsumerTarget target,
+                                final FilterManager filters,
+                                final Class messageClass,
+                                final String consumerName,
+                                final EnumSet options) throws AMQException
+    {
+        return new QueueConsumer(filters, messageClass, options.contains(Consumer.Option.ACQUIRES),
+                                 options.contains(Consumer.Option.SEES_REQUEUES), consumerName,
+                                 options.contains(Consumer.Option.TRANSIENT), target );
+    }
 
+
+    public String getName()
+    {
+        return _name;
     }
 
-    public Collection<Subscription> getConsumers()
+    public  int send(final ServerMessage message,
+                     final InstanceProperties instanceProperties,
+                     final ServerTransaction txn,
+                     final Action postEnqueueAction)
+    {
+        return 0;
+    }
+
+    public Collection<QueueConsumer> getConsumers()
     {
         return Collections.emptyList();
     }
 
-    public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+    public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
     {
 
     }
 
-    public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+    public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
     {
 
     }
@@ -242,7 +287,7 @@ public class MockAMQQueue implements AMQ
         return 0;
     }
 
-    public boolean hasExclusiveSubscriber()
+    public boolean hasExclusiveConsumer()
     {
         return false;
     }
@@ -293,50 +338,39 @@ public class MockAMQQueue implements AMQ
        return getMessageCount();
     }
 
-    public void enqueue(ServerMessage message) throws AMQException
-    {
-    }
-
-    public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
-    {
-    }
-
-
-    public void enqueue(ServerMessage message, boolean sync, PostEnqueueAction action) throws AMQException
-    {
-    }
 
     public void requeue(QueueEntry entry)
     {
     }
 
-    public void requeue(QueueEntryImpl storeContext, Subscription subscription)
+    public void dequeue(QueueEntry entry)
     {
     }
 
-    public void dequeue(QueueEntry entry, Subscription sub)
-    {
-    }
-
-    public boolean resend(QueueEntry entry, Subscription subscription) throws AMQException
+    public boolean resend(QueueEntry entry, QueueConsumer consumer) throws AMQException
     {
         return false;
     }
 
-    public void addQueueDeleteTask(Task task)
+    @Override
+    public void removeQueueDeleteTask(final Action task)
     {
+
     }
 
-    public void removeQueueDeleteTask(final Task task)
+    @Override
+    public void decrementUnackedMsgCount(final QueueEntry queueEntry)
     {
+
     }
 
-    public List<QueueEntry> getMessagesOnTheQueue()
+    @Override
+    public List getMessagesOnTheQueue()
     {
         return null;
     }
 
-    public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+    public List getMessagesOnTheQueue(long fromMessageId, long toMessageId)
     {
         return null;
     }
@@ -356,7 +390,7 @@ public class MockAMQQueue implements AMQ
         return null;
     }
 
-    public List<QueueEntry> getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
+    public List getMessagesRangeOnTheQueue(long fromPosition, long toPosition)
     {
         return null;
     }
@@ -427,12 +461,12 @@ public class MockAMQQueue implements AMQ
         return null;
     }
 
-    public void flushSubscription(Subscription sub) throws AMQException
+    public void flushConsumer(Consumer sub) throws AMQException
     {
 
     }
 
-    public void deliverAsync(Subscription sub)
+    public void deliverAsync(Consumer sub)
     {
 
     }
@@ -563,10 +597,6 @@ public class MockAMQQueue implements AMQ
         return 0;
     }
 
-    public void decrementUnackedMsgCount(QueueEntry queueEntry)
-    {
-
-    }
 
     public long getUnackedMessageCount()
     {
@@ -610,4 +640,5 @@ public class MockAMQQueue implements AMQ
     {
         return null;
     }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java Wed Feb 12 13:27:51 2014
@@ -26,12 +26,16 @@ import static org.mockito.Mockito.when;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
 
+import java.util.Collections;
+import java.util.UUID;
+
 public class PriorityQueueListTest extends QpidTestCase
 {
     private static final byte[] PRIORITIES = {4, 5, 5, 4};
-    PriorityQueueList _list = new PriorityQueueList(null, 10);
+    PriorityQueueList _list;
 
     private QueueEntry _priority4message1;
     private QueueEntry _priority4message2;
@@ -42,6 +46,17 @@ public class PriorityQueueListTest exten
     {
         QueueEntry[] entries = new QueueEntry[PRIORITIES.length];
 
+        PriorityQueue queue = new PriorityQueue(UUID.randomUUID(),
+                                                getName(),
+                                                false,
+                                                null,
+                                                false,
+                                                false,
+                                                mock(VirtualHost.class),
+                                                Collections.<String,Object>emptyMap(),
+                                                10);
+        _list = queue.getEntries();
+
         for (int i = 0; i < PRIORITIES.length; i++)
         {
             ServerMessage<?> message = mock(ServerMessage.class);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java Wed Feb 12 13:27:51 2014
@@ -21,13 +21,19 @@ package org.apache.qpid.server.queue;
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.QueueEntry.EntryState;
-import org.apache.qpid.server.subscription.MockSubscription;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.UUID;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -57,6 +63,15 @@ public abstract class QueueEntryImplTest
         _queueEntry3 = getQueueEntryImpl(3);
     }
 
+
+    protected void mockLogging()
+    {
+        final LogActor logActor = mock(LogActor.class);
+        when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+        CurrentActor.setDefault(logActor);
+    }
+
+
     public void testAcquire()
     {
         assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method",
@@ -113,11 +128,19 @@ public abstract class QueueEntryImplTest
      */
     private void acquire()
     {
-        _queueEntry.acquire(new MockSubscription());
+        _queueEntry.acquire(newConsumer());
         assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
                 _queueEntry.isAcquired());
     }
 
+    private QueueConsumer newConsumer()
+    {
+        final ConsumerTarget target = mock(ConsumerTarget.class);
+        when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
+        final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target);
+        return consumer;
+    }
+
     /**
      * A helper method to get entry state
      *
@@ -140,36 +163,34 @@ public abstract class QueueEntryImplTest
     }
 
     /**
-     * Tests rejecting a queue entry records the Subscription ID
-     * for later verification by isRejectedBy(subscriptionId).
+     * Tests rejecting a queue entry records the Consumer ID
+     * for later verification by isRejectedBy(consumerId).
      */
     public void testRejectAndRejectedBy()
     {
-        Subscription sub = new MockSubscription();
-        long subId = sub.getSubscriptionID();
+        QueueConsumer sub = newConsumer();
 
-        assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
-        assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired());
+        assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+        assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
 
-        //acquire, reject, and release the message using the subscription
+        //acquire, reject, and release the message using the consumer
         assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
         _queueEntry.reject();
         _queueEntry.release();
 
         //verify the rejection is recorded
-        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
+        assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
 
-        //repeat rejection using a second subscription
-        Subscription sub2 = new MockSubscription();
-        long sub2Id = sub2.getSubscriptionID();
+        //repeat rejection using a second consumer
+        QueueConsumer sub2 = newConsumer();
 
-        assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+        assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
         assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
         _queueEntry.reject();
 
-        //verify it still records being rejected by both subscriptions
-        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId));
-        assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id));
+        //verify it still records being rejected by both consumers
+        assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
+        assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
     }
 
     /**
@@ -179,7 +200,9 @@ public abstract class QueueEntryImplTest
     {
         int numberOfEntries = 5;
         QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
-        SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+        StandardQueue queue = new StandardQueue(UUID.randomUUID(), getName(), false, null, false, false,
+                                                mock(VirtualHost.class), Collections.<String,Object>emptyMap());
+        OrderedQueueEntryList queueEntryList = queue.getEntries();
 
         // create test entries
         for(int i = 0; i < numberOfEntries ; i++)

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java Wed Feb 12 13:27:51 2014
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.consumer.Consumer;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -32,20 +33,21 @@ import static org.mockito.Mockito.when;
 /**
  * Abstract test class for QueueEntryList implementations.
  */
-public abstract class QueueEntryListTestBase extends TestCase
+public abstract class QueueEntryListTestBase<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer> extends TestCase
 {
-    protected static final AMQQueue _testQueue = new MockAMQQueue("test");
-    public abstract QueueEntryList<QueueEntry> getTestList();
-    public abstract QueueEntryList<QueueEntry> getTestList(boolean newList);
+    public abstract L getTestList();
+    public abstract L getTestList(boolean newList);
     public abstract long getExpectedFirstMsgId();
     public abstract int getExpectedListLength();
     public abstract ServerMessage getTestMessageToAdd() throws AMQException;
 
     public void testGetQueue()
     {
-        assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), _testQueue);
+        assertEquals("Unexpected head entry returned by getHead()", getTestList().getQueue(), getTestQueue());
     }
 
+    protected abstract Q getTestQueue();
+
     /**
      * Test to add a message with properties specific to the queue type.
      * @see QueueEntryListTestBase#getTestList()
@@ -54,10 +56,10 @@ public abstract class QueueEntryListTest
      */
     public void testAddSpecificMessage() throws AMQException
     {
-        final QueueEntryList<QueueEntry> list = getTestList();
+        final L list = getTestList();
         list.add(getTestMessageToAdd());
 
-        final QueueEntryIterator<?> iter = list.iterator();
+        final QueueEntryIterator<E,Q,L,C> iter = list.iterator();
         int count = 0;
         while(iter.advance())
         {
@@ -75,11 +77,11 @@ public abstract class QueueEntryListTest
      */
     public void testAddGenericMessage() throws AMQException
     {
-        final QueueEntryList<QueueEntry> list = getTestList();
+        final L list = getTestList();
         final ServerMessage message = createServerMessage(666l);
         list.add(message);
 
-        final QueueEntryIterator<?> iter = list.iterator();
+        final QueueEntryIterator<E,Q,L,C> iter = list.iterator();
         int count = 0;
         while(iter.advance())
         {
@@ -109,8 +111,8 @@ public abstract class QueueEntryListTest
      */
     public void testListNext()
     {
-        final QueueEntryList<QueueEntry> entryList = getTestList();
-        QueueEntry entry = entryList.getHead();
+        final L entryList = getTestList();
+        E entry = entryList.getHead();
         int count = 0;
         while(entryList.next(entry) != null)
         {
@@ -127,7 +129,7 @@ public abstract class QueueEntryListTest
      */
     public void testIterator()
     {
-        final QueueEntryIterator<?> iter = getTestList().iterator();
+        final QueueEntryIterator<E,Q,L,C> iter = getTestList().iterator();
         int count = 0;
         while(iter.advance())
         {
@@ -145,10 +147,10 @@ public abstract class QueueEntryListTest
     public void testDequeuedMessagedNotPresentInIterator() throws Exception
     {
         final int numberOfMessages = getExpectedListLength();
-        final QueueEntryList<QueueEntry> entryList = getTestList();
+        final L entryList = getTestList();
 
         // dequeue all even messages
-        final QueueEntryIterator<?> it1 = entryList.iterator();
+        final QueueEntryIterator<E,Q,L,C> it1 = entryList.iterator();
         int counter = 0;
         while (it1.advance())
         {
@@ -161,7 +163,7 @@ public abstract class QueueEntryListTest
         }
 
         // iterate and check that dequeued messages are not returned by iterator
-        final QueueEntryIterator<?> it2 = entryList.iterator();
+        final QueueEntryIterator<E,Q,L,C> it2 = entryList.iterator();
         int counter2 = 0;
         while(it2.advance())
         {
@@ -180,7 +182,7 @@ public abstract class QueueEntryListTest
      */
     public void testGetHead()
     {
-        final QueueEntry head = getTestList().getHead();
+        final E head = getTestList().getHead();
         assertNull("Head entry should not contain an actual message", head.getMessage());
         assertEquals("Unexpected message id for first list entry", getExpectedFirstMsgId(), getTestList().next(head)
                         .getMessage().getMessageNumber());
@@ -192,16 +194,16 @@ public abstract class QueueEntryListTest
      */
     public void testEntryDeleted()
     {
-        final QueueEntry head = getTestList().getHead();
+        final E head = getTestList().getHead();
 
-        final QueueEntry first = getTestList().next(head);
+        final E first = getTestList().next(head);
         first.delete();
 
-        final QueueEntry second = getTestList().next(head);
+        final E second = getTestList().next(head);
         assertNotSame("After deletion the next entry should be different", first.getMessage().getMessageNumber(), second
                         .getMessage().getMessageNumber());
 
-        final QueueEntry third = getTestList().next(first);
+        final E third = getTestList().next(first);
         assertEquals("After deletion the deleted nodes next node should be the same as the next from head", second
                         .getMessage().getMessageNumber(), third.getMessage().getMessageNumber());
     }
@@ -215,11 +217,11 @@ public abstract class QueueEntryListTest
      */
     public void testIteratorIgnoresDeletedFinalNode() throws Exception
     {
-        QueueEntryList<QueueEntry> list = getTestList(true);
+        L list = getTestList(true);
         int i = 0;
 
-        QueueEntry queueEntry1 = list.add(createServerMessage(i++));
-        QueueEntry queueEntry2 = list.add(createServerMessage(i++));
+        E queueEntry1 = list.add(createServerMessage(i++));
+        E queueEntry2 = list.add(createServerMessage(i++));
 
         assertSame(queueEntry2, list.next(queueEntry1));
         assertNull(list.next(queueEntry2));
@@ -228,7 +230,7 @@ public abstract class QueueEntryListTest
         queueEntry2.delete();
         assertTrue("Deleting node should have succeeded", queueEntry2.isDeleted());
 
-        QueueEntryIterator<QueueEntry> iter = list.iterator();
+        QueueEntryIterator<E,Q,L,C> iter = list.iterator();
 
         //verify the iterator isn't 'atTail', can advance, and returns the 1st QueueEntry
         assertFalse("Iterator should not have been 'atTail'", iter.atTail());

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java Wed Feb 12 13:27:51 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
 import junit.framework.Assert;
 
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
+import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
 
 /**
  * Test extension of SortedQueueEntryList that provides data structure validation tests.
@@ -30,22 +30,28 @@ import org.apache.qpid.server.queue.Sort
  */
 public class SelfValidatingSortedQueueEntryList extends SortedQueueEntryList
 {
-    public SelfValidatingSortedQueueEntryList(AMQQueue queue, String propertyName)
+    public SelfValidatingSortedQueueEntryList(SortedQueue queue, String propertyName)
     {
         super(queue, propertyName);
     }
 
+    @Override
+    public SortedQueue getQueue()
+    {
+        return super.getQueue();
+    }
+
     @Override /** Overridden to automatically check queue properties before and after. */
-    public SortedQueueEntryImpl add(final ServerMessage message)
+    public SortedQueueEntry add(final ServerMessage message)
     {
         assertQueueProperties(); //before add
-        final SortedQueueEntryImpl result = super.add(message);
+        final SortedQueueEntry result = super.add(message);
         assertQueueProperties(); //after add
         return result;
     }
 
     @Override /** Overridden to automatically check queue properties before and after. */
-    public void entryDeleted(SortedQueueEntryImpl entry)
+    public void entryDeleted(SortedQueueEntry entry)
     {
         assertQueueProperties(); //before delete
         super.entryDeleted(entry);
@@ -73,7 +79,7 @@ public class SelfValidatingSortedQueueEn
         assertTreeIntegrity(getRoot());
     }
 
-    public void assertTreeIntegrity(final SortedQueueEntryImpl node)
+    public void assertTreeIntegrity(final SortedQueueEntry node)
     {
         if(node == null)
         {
@@ -109,7 +115,7 @@ public class SelfValidatingSortedQueueEn
         assertLeavesSameBlackPath(getRoot());
     }
 
-    public int assertLeavesSameBlackPath(final SortedQueueEntryImpl node)
+    public int assertLeavesSameBlackPath(final SortedQueueEntry node)
     {
         if(node == null)
         {
@@ -133,7 +139,7 @@ public class SelfValidatingSortedQueueEn
         assertChildrenOfRedAreBlack(getRoot());
     }
 
-    public void assertChildrenOfRedAreBlack(final SortedQueueEntryImpl node)
+    public void assertChildrenOfRedAreBlack(final SortedQueueEntry node)
     {
         if(node == null)
         {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java Wed Feb 12 13:27:51 2014
@@ -21,8 +21,14 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.UUID;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -30,7 +36,20 @@ import static org.mockito.Mockito.when;
 public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
 {
 
-    private SimpleQueueEntryList queueEntryList = new SimpleQueueEntryList(new MockAMQQueue("test"));
+    private OrderedQueueEntryList queueEntryList;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        mockLogging();
+
+        StandardQueue queue = new StandardQueue(UUID.randomUUID(), "SimpleQueueEntryImplTest", false, null,false, false, mock(VirtualHost.class),null);
+
+        queueEntryList = queue.getEntries();
+
+        super.setUp();
+    }
+
 
     public QueueEntryImpl getQueueEntryImpl(int msgId) throws AMQException
     {

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java Wed Feb 12 13:27:51 2014
@@ -21,20 +21,26 @@ package org.apache.qpid.server.queue;
 
 import java.util.Collections;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Arrays;
+import java.util.UUID;
 
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class SortedQueueEntryListTest extends QueueEntryListTestBase
+public class SortedQueueEntryListTest extends QueueEntryListTestBase<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>>
 {
     private static SelfValidatingSortedQueueEntryList _sqel;
 
+
     public final static String keys[] = { " 73", " 18", " 11", "127", "166", "163", " 69", " 60", "191", "144",
                                           " 17", "161", "145", "140", "157", " 47", "136", " 56", "176", " 81",
                                           "195", " 96", "  2", " 68", "101", "141", "159", "187", "149", " 45",
@@ -62,16 +68,30 @@ public class SortedQueueEntryListTest ex
 
     private final static String keysSorted[] = keys.clone();
 
+    private SortedQueue _testQueue;
+
     @Override
     protected void setUp() throws Exception
     {
+        mockLogging();
+
+        // Create test list
+        _testQueue = new SortedQueue(UUID.randomUUID(), getName(), false, null, false,false, mock(VirtualHost.class), null, "KEY", new QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>()
+        {
+
+            @Override
+            public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
+            {
+                return new SelfValidatingSortedQueueEntryList(queue, "KEY");
+            }
+        });
+        _sqel = (SelfValidatingSortedQueueEntryList) _testQueue.getEntries();
+
         super.setUp();
 
         // Create result array
         Arrays.sort(keysSorted);
 
-        // Create test list
-        _sqel = new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
 
         // Build test list
         long messageId = 0L;
@@ -83,14 +103,22 @@ public class SortedQueueEntryListTest ex
 
     }
 
+    protected void mockLogging()
+    {
+        final LogActor logActor = mock(LogActor.class);
+        when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
+        CurrentActor.setDefault(logActor);
+    }
+
+
     @Override
-    public QueueEntryList getTestList()
+    public SortedQueueEntryList getTestList()
     {
         return getTestList(false);
     }
 
     @Override
-    public QueueEntryList getTestList(boolean newList)
+    public SortedQueueEntryList getTestList(boolean newList)
     {
         if(newList)
         {
@@ -117,6 +145,12 @@ public class SortedQueueEntryListTest ex
         return generateTestMessage(1, "test value");
     }
 
+    @Override
+    protected SortedQueue getTestQueue()
+    {
+        return _testQueue;
+    }
+
     private ServerMessage generateTestMessage(final long id, final String keyValue) throws AMQException
     {
         final ServerMessage message = mock(ServerMessage.class);
@@ -138,7 +172,7 @@ public class SortedQueueEntryListTest ex
         super.testIterator();
 
         // Test sorted order of list
-        final QueueEntryIterator<?> iter = getTestList().iterator();
+        final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
         int count = 0;
         while(iter.advance())
         {
@@ -147,12 +181,12 @@ public class SortedQueueEntryListTest ex
         }
     }
 
-    private Object getSortedKeyValue(QueueEntryIterator<?> iter)
+    private Object getSortedKeyValue(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter)
     {
         return (iter.getNode()).getMessage().getMessageHeader().getHeader("KEY");
     }
 
-    private Long getMessageId(QueueEntryIterator<?> iter)
+    private Long getMessageId(QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter)
     {
         return (iter.getNode()).getMessage().getMessageNumber();
     }
@@ -169,7 +203,7 @@ public class SortedQueueEntryListTest ex
             _sqel.add(msg);
         }
 
-        final QueueEntryIterator<?> iter = getTestList().iterator();
+        final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
         int count=0;
         while(iter.advance())
         {
@@ -190,12 +224,13 @@ public class SortedQueueEntryListTest ex
             _sqel.add(msg);
         }
 
-        final QueueEntryIterator<?> iter = getTestList().iterator();
+        final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
         int count=0;
         while(iter.advance())
         {
             assertNull("Sorted queue entry value is not as expected", getSortedKeyValue(iter));
-            assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter));        }
+            assertEquals("Message id not as expected", Long.valueOf(count++), getMessageId(iter));
+        }
     }
 
     public void testAscendingSortKeys() throws Exception
@@ -211,7 +246,7 @@ public class SortedQueueEntryListTest ex
             _sqel.add(msg);
         }
 
-        final QueueEntryIterator<?> iter = getTestList().iterator();
+        final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
         int count=0;
         while(iter.advance())
         {
@@ -234,7 +269,7 @@ public class SortedQueueEntryListTest ex
             _sqel.add(msg);
         }
 
-        final QueueEntryIterator<?> iter = getTestList().iterator();
+        final QueueEntryIterator<SortedQueueEntry, SortedQueue, SortedQueueEntryList, QueueConsumer<?,SortedQueueEntry, SortedQueue, SortedQueueEntryList>> iter = getTestList().iterator();
         int count=0;
         while(iter.advance())
         {
@@ -251,7 +286,7 @@ public class SortedQueueEntryListTest ex
         ServerMessage msg = generateTestMessage(1, "A");
         _sqel.add(msg);
 
-        SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+        SortedQueueEntry entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 1);
 
         msg = generateTestMessage(2, "B");
@@ -271,7 +306,7 @@ public class SortedQueueEntryListTest ex
         ServerMessage msg = generateTestMessage(1, "B");
         _sqel.add(msg);
 
-        SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+        SortedQueueEntry entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "B", 1);
 
         msg = generateTestMessage(2, "A");
@@ -290,7 +325,7 @@ public class SortedQueueEntryListTest ex
 
         ServerMessage msg = generateTestMessage(1, "A");
         _sqel.add(msg);
-        SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+        SortedQueueEntry entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "A", 1);
 
         msg = generateTestMessage(2, "C");
@@ -322,7 +357,7 @@ public class SortedQueueEntryListTest ex
         ServerMessage msg = generateTestMessage(1, "B");
         _sqel.add(msg);
 
-        SortedQueueEntryImpl entry = _sqel.next(_sqel.getHead());
+        SortedQueueEntry entry = _sqel.next(_sqel.getHead());
         validateEntry(entry, "B", 1);
 
         msg = generateTestMessage(2, "D");
@@ -362,7 +397,7 @@ public class SortedQueueEntryListTest ex
         validateEntry(entry, "D", 2);
     }
 
-    private void validateEntry(final SortedQueueEntryImpl entry, final String expectedSortKey, final long expectedMessageId)
+    private void validateEntry(final SortedQueueEntry entry, final String expectedSortKey, final long expectedMessageId)
     {
         assertEquals("Sorted queue entry value is not as expected",
                         expectedSortKey, entry.getMessage().getMessageHeader().getHeader("KEY"));

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Wed Feb 12 13:27:51 2014
@@ -464,7 +464,7 @@ public abstract class AbstractDurableCon
         }
 
         @Override
-        public TransactionLogResource getQueue()
+        public TransactionLogResource getResource()
         {
             return _queue;
         }
@@ -505,7 +505,7 @@ public abstract class AbstractDurableCon
             {
                 return false;
             }
-            if (_queue == null && other.getQueue() != null)
+            if (_queue == null && other.getResource() != null)
             {
                 return false;
             }
@@ -513,7 +513,7 @@ public abstract class AbstractDurableCon
             {
                 return false;
             }
-            return _queue.getId().equals(other.getQueue().getId());
+            return _queue.getId().equals(other.getResource().getId());
         }
 
     }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Wed Feb 12 13:27:51 2014
@@ -158,6 +158,12 @@ public abstract class MessageStoreQuotaE
         return _transactionResource;
     }
 
+    @Override
+    public boolean isDurable()
+    {
+        return true;
+    }
+
     private static class TestMessage implements EnqueueableMessage
     {
         private final StoredMessage<?> _handle;

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java Wed Feb 12 13:27:51 2014
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
 
 import org.apache.qpid.framing.EncodingUtils;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.util.ByteBufferOutputStream;
 
 public class TestMessageMetaData implements StorableMessageMetaData
@@ -72,7 +71,7 @@ public class TestMessageMetaData impleme
     }
 
     @Override
-    public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+    public int writeToBuffer(ByteBuffer dest)
     {
         int oldPosition = dest.position();
         try

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Wed Feb 12 13:27:51 2014
@@ -20,12 +20,13 @@
  */
 package org.apache.qpid.server.txn;
 
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.MockQueueEntry;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.MockMessageInstance;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.MockStoreTransaction.TransactionState;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -47,7 +48,7 @@ public class AutoCommitTransactionTest e
     private MessageStore _transactionLog;
     private AMQQueue _queue;
     private List<AMQQueue> _queues;
-    private Collection<QueueEntry> _queueEntries;
+    private Collection<MessageInstance> _queueEntries;
     private ServerMessage _message;
     private MockAction _action;
     private MockStoreTransaction _storeTransaction;
@@ -373,9 +374,9 @@ public class AutoCommitTransactionTest e
         assertFalse("Rollback action must be fired",  _action.isRollbackActionFired());
     }  
     
-    private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
+    private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
     {
-        Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+        Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
         
         assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length);
         
@@ -384,7 +385,7 @@ public class AutoCommitTransactionTest e
             final AMQQueue queue = createTestAMQQueue(queueDurableFlags[i]);
             final ServerMessage message = createTestMessage(messagePersistentFlags[i]);
             
-            queueEntries.add(new MockQueueEntry()
+            queueEntries.add(new MockMessageInstance()
             {
 
                 @Override
@@ -394,7 +395,7 @@ public class AutoCommitTransactionTest e
                 }
 
                 @Override
-                public AMQQueue getQueue()
+                public TransactionLogResource getOwningResource()
                 {
                     return queue;
                 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org