You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/05 12:59:50 UTC

svn commit: r1564721 [1/2] - in /qpid/branches/java-broker-amqp-1-0-management/java: broker-core/src/main/java/org/apache/qpid/server/exchange/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/p...

Author: rgodfrey
Date: Wed Feb  5 11:59:49 2014
New Revision: 1564721

URL: http://svn.apache.org/r1564721
Log:
Use abstractions for sources and destinations for message ingress and egress in all protocol transports

Added:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
      - copied, changed from r1564581, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
Modified:
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Feb  5 11:59:49 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.BindingLogSubject;
 import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -429,7 +430,7 @@ public abstract class AbstractExchange i
     public final int send(final ServerMessage message,
                           final InstanceProperties instanceProperties,
                           final ServerTransaction txn,
-                          final Action<QueueEntry> postEnqueueAction)
+                          final Action<MessageInstance> postEnqueueAction)
     {
         List<? extends BaseQueue> queues = route(message, instanceProperties);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Wed Feb  5 11:59:49 2014
@@ -36,6 +36,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -336,7 +337,7 @@ public class DefaultExchange implements 
     public final int send(final ServerMessage message,
                           final InstanceProperties instanceProperties,
                           final ServerTransaction txn,
-                          final Action<QueueEntry> postEnqueueAction)
+                          final Action<MessageInstance> postEnqueueAction)
     {
         final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
         if(q == null)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Wed Feb  5 11:59:49 2014
@@ -24,22 +24,16 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-public interface Exchange extends ExchangeReferrer
+public interface Exchange extends ExchangeReferrer, MessageDestination
 {
     void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete)
             throws AMQException;
@@ -97,19 +91,6 @@ public interface Exchange extends Exchan
     void close() throws AMQException;
 
     /**
-     * Routes a message
-     * @param message the message to be routed
-     * @param instanceProperties the instance properties
-     * @param txn the transaction to enqueue within
-     * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
-     * @return the number of queues in which the message was enqueued performed
-     */
-    int send(ServerMessage message,
-             InstanceProperties instanceProperties,
-             ServerTransaction txn,
-             Action<QueueEntry> postEnqueueAction);
-
-    /**
      * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
      * @param bindingKey
      * @param arguments

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java?rev=1564721&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java Wed Feb  5 11:59:49 2014
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public interface MessageDestination
+{
+
+    public String getName();
+
+    /**
+     * Routes a message
+     * @param message the message to be routed
+     * @param instanceProperties the instance properties
+     * @param txn the transaction to enqueue within
+     * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
+     * @return the number of queues in which the message was enqueued performed
+     */
+    int send(ServerMessage message,
+             InstanceProperties instanceProperties,
+             ServerTransaction txn,
+             Action<MessageInstance> postEnqueueAction);
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Wed Feb  5 11:59:49 2014
@@ -45,9 +45,9 @@ public interface MessageInstance
 
     void decrementDeliveryCount();
 
-    void addStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+    void addStateChangeListener(StateChangeListener<MessageInstance, State> listener);
 
-    boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener);
+    boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener);
 
     boolean acquiredByConsumer();
 
@@ -71,7 +71,7 @@ public interface MessageInstance
 
     int getMaximumDeliveryCount();
 
-    int routeToAlternate(Action<QueueEntry> action, ServerTransaction txn);
+    int routeToAlternate(Action<MessageInstance> action, ServerTransaction txn);
 
     Filterable asFilterable();
 

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1564721&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Wed Feb  5 11:59:49 2014
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.store.TransactionLogResource;
+
+import java.util.Collection;
+import java.util.EnumSet;
+
+public interface MessageSource extends TransactionLogResource
+{
+    Consumer addConsumer(ConsumerTarget target, FilterManager filters,
+                         Class<? extends ServerMessage> messageClass,
+                         String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
+
+    Collection<Consumer> getConsumers();
+
+    void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
+
+    void removeConsumerRegistrationListener(ConsumerRegistrationListener listener);
+
+    AuthorizationHolder getAuthorizationHolder();
+
+    void setAuthorizationHolder(AuthorizationHolder principalHolder);
+
+    void setExclusiveOwningSession(AMQSessionModel owner);
+
+    AMQSessionModel getExclusiveOwningSession();
+
+    boolean isExclusive();
+
+    void enqueue(ServerMessage message) throws AMQException;
+
+    interface ConsumerRegistrationListener
+    {
+        void consumerAdded(AMQQueue queue, Consumer consumer);
+        void consumerRemoved(AMQQueue queue, Consumer consumer);
+    }
+
+    /**
+     * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
+     * already exists.
+     *
+     * <p/><table id="crc"><caption>CRC Card</caption>
+     * <tr><th> Responsibilities <th> Collaborations
+     * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists.
+     * </table>
+     *
+     * @todo Not an AMQP exception as no status code.
+     *
+     * @todo Move to top level, used outside this class.
+     */
+    static final class ExistingExclusiveConsumer extends AMQException
+    {
+
+        public ExistingExclusiveConsumer()
+        {
+            super("");
+        }
+    }
+
+    /**
+     * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer
+     * already exists.
+     *
+     * <p/><table id="crc"><caption>CRC Card</caption>
+     * <tr><th> Responsibilities <th> Collaborations
+     * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists.
+     * </table>
+     *
+     * @todo Not an AMQP exception as no status code.
+     *
+     * @todo Move to top level, used outside this class.
+     */
+    static final class ExistingConsumerPreventsExclusive extends AMQException
+    {
+        public ExistingConsumerPreventsExclusive()
+        {
+            super("");
+        }
+    }
+}

Added: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java?rev=1564721&view=auto
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (added)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java Wed Feb  5 11:59:49 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+public interface CapacityChecker
+{
+    void checkCapacity(AMQSessionModel channel);
+}

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Feb  5 11:59:49 2014
@@ -25,25 +25,19 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
-import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.consumer.Consumer;
-import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
+public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker
 {
-    String getName();
 
     public interface NotificationListener
     {
@@ -75,29 +69,9 @@ public interface AMQQueue extends Compar
     boolean isAutoDelete();
 
     String getOwner();
-    AuthorizationHolder getAuthorizationHolder();
-    void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
-    void setExclusiveOwningSession(AMQSessionModel owner);
-    AMQSessionModel getExclusiveOwningSession();
 
     VirtualHost getVirtualHost();
 
-    Consumer addConsumer(final ConsumerTarget target, final FilterManager filters,
-                         final Class<? extends ServerMessage> messageClass,
-                         final String consumerName, EnumSet<Consumer.Option> options) throws AMQException;
-
-    Collection<Consumer> getConsumers();
-
-    interface ConsumerRegistrationListener
-    {
-        void consumerAdded(AMQQueue queue, Consumer consumer);
-        void consumerRemoved(AMQQueue queue, Consumer consumer);
-    }
-
-    void addConsumerRegistrationListener(ConsumerRegistrationListener listener);
-    void removeConsumerRegistrationListener(ConsumerRegistrationListener listener);
-
 
     int getConsumerCount();
 
@@ -215,8 +189,6 @@ public interface AMQQueue extends Compar
 
     void stop();
 
-    boolean isExclusive();
-
     Exchange getAlternateExchange();
 
     void setAlternateExchange(Exchange exchange);
@@ -224,51 +196,6 @@ public interface AMQQueue extends Compar
     Collection<String> getAvailableAttributes();
     Object getAttribute(String attrName);
 
-    void checkCapacity(AMQSessionModel channel);
-
-    /**
-     * ExistingExclusiveConsumer signals a failure to create a consumer, because an exclusive consumer
-     * already exists.
-     *
-     * <p/><table id="crc"><caption>CRC Card</caption>
-     * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create a consumer, because an exclusive consumer already exists.
-     * </table>
-     *
-     * @todo Not an AMQP exception as no status code.
-     *
-     * @todo Move to top level, used outside this class.
-     */
-    static final class ExistingExclusiveConsumer extends AMQException
-    {
-
-        public ExistingExclusiveConsumer()
-        {
-            super("");
-        }
-    }
-
-    /**
-     * ExistingConsumerPreventsExclusive signals a failure to create an exclusive consumer, as a consumer
-     * already exists.
-     *
-     * <p/><table id="crc"><caption>CRC Card</caption>
-     * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create an exclusive consumer, as a consumer already exists.
-     * </table>
-     *
-     * @todo Not an AMQP exception as no status code.
-     *
-     * @todo Move to top level, used outside this class.
-     */
-    static final class ExistingConsumerPreventsExclusive extends AMQException
-    {
-        public ExistingConsumerPreventsExclusive()
-        {
-            super("");
-        }
-    }
-
     void configure(QueueConfiguration config);
 
     void setExclusive(boolean exclusive);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Wed Feb  5 11:59:49 2014
@@ -22,6 +22,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.util.Action;
@@ -29,7 +30,7 @@ import org.apache.qpid.server.util.Actio
 public interface BaseQueue extends TransactionLogResource
 {
     void enqueue(ServerMessage message) throws AMQException;
-    void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException;
+    void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException;
 
     boolean isDurable();
     boolean isDeleted();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Wed Feb  5 11:59:49 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -240,19 +241,19 @@ public class DefinedGroupMessageGroupMan
         return groupVal;
     }
 
-    private class GroupStateChangeListener implements StateChangeListener<QueueEntry, QueueEntry.State>
+    private class GroupStateChangeListener implements StateChangeListener<MessageInstance, QueueEntry.State>
     {
         private final Group _group;
 
         public GroupStateChangeListener(final Group group,
-                                        final QueueEntry entry)
+                                        final MessageInstance entry)
         {
             _group = group;
         }
 
-        public void stateChanged(final QueueEntry entry,
-                                 final QueueEntry.State oldState,
-                                 final QueueEntry.State newState)
+        public void stateChanged(final MessageInstance entry,
+                                 final MessageInstance.State oldState,
+                                 final MessageInstance.State newState)
         {
             synchronized (DefinedGroupMessageGroupManager.this)
             {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Feb  5 11:59:49 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -62,7 +63,7 @@ public abstract class QueueEntryImpl imp
         (QueueEntryImpl.class, EntryState.class, "_state");
 
 
-    private volatile Set<StateChangeListener<QueueEntry, State>> _stateChangeListeners;
+    private volatile Set<StateChangeListener<MessageInstance, State>> _stateChangeListeners;
 
     private static final
         AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -332,7 +333,7 @@ public abstract class QueueEntryImpl imp
 
     private void notifyStateChange(final State oldState, final State newState)
     {
-        for(StateChangeListener<QueueEntry, State> l : _stateChangeListeners)
+        for(StateChangeListener<MessageInstance, State> l : _stateChangeListeners)
         {
             l.stateChanged(this, oldState, newState);
         }
@@ -363,7 +364,7 @@ public abstract class QueueEntryImpl imp
         dispose();
     }
 
-    public int routeToAlternate(final Action<QueueEntry> action, ServerTransaction txn)
+    public int routeToAlternate(final Action<MessageInstance> action, ServerTransaction txn)
     {
         final AMQQueue currentQueue = getQueue();
         Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -408,21 +409,21 @@ public abstract class QueueEntryImpl imp
         return getQueue().isDeleted();
     }
 
-    public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+    public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
     {
-        Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
+        Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
         if(listeners == null)
         {
-            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<QueueEntry, State>>());
+            _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance, State>>());
             listeners = _stateChangeListeners;
         }
 
         listeners.add(listener);
     }
 
-    public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+    public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
     {
-        Set<StateChangeListener<QueueEntry, State>> listeners = _stateChangeListeners;
+        Set<StateChangeListener<MessageInstance, State>> listeners = _stateChangeListeners;
         if(listeners != null)
         {
             return listeners.remove(listener);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Feb  5 11:59:49 2014
@@ -42,6 +42,7 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.QueueActor;
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -635,7 +636,7 @@ public class SimpleAMQQueue implements A
         enqueue(message, null);
     }
 
-    public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+    public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
     {
         incrementQueueCount();
         incrementQueueSize(message);
@@ -1967,7 +1968,7 @@ public class SimpleAMQQueue implements A
         return _notificationChecks;
     }
 
-    private final class QueueEntryListener implements StateChangeListener<QueueEntry, QueueEntry.State>
+    private final class QueueEntryListener implements StateChangeListener<MessageInstance, QueueEntry.State>
     {
 
         private final QueueConsumer _sub;
@@ -1988,7 +1989,7 @@ public class SimpleAMQQueue implements A
             return System.identityHashCode(_sub);
         }
 
-        public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
+        public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
         {
             entry.removeStateChangeListener(this);
             deliverAsync(_sub);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Wed Feb  5 11:59:49 2014
@@ -20,6 +20,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -49,7 +50,7 @@ public class SortedQueue extends OutOfOr
         return _sortedPropertyName;
     }
 
-    public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+    public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
     {
         synchronized (_sortedQueueLock)
         {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Wed Feb  5 11:59:49 2014
@@ -212,7 +212,7 @@ public class AsyncAutoCommitTransaction 
     }
 
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Wed Feb  5 11:59:49 2014
@@ -154,7 +154,7 @@ public class AutoCommitTransaction imple
     }
 
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Wed Feb  5 11:59:49 2014
@@ -105,7 +105,7 @@ public class DistributedTransaction impl
         }
     }
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         if(_branch != null)
         {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Wed Feb  5 11:59:49 2014
@@ -363,7 +363,7 @@ public class DtxBranch
     }
 
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message)
     {
         _enqueueRecords.add(new Record(queue, message));
     }

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Wed Feb  5 11:59:49 2014
@@ -197,7 +197,7 @@ public class LocalTransaction implements
         }
     }
 
-    public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+    public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
     {
         sync();
         _postTransactionActions.add(postTransactionAction);

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Wed Feb  5 11:59:49 2014
@@ -94,7 +94,7 @@ public interface ServerTransaction
      * 
      * A store operation will result only for a persistent message on a durable queue.
      */
-    void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction);
+    void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction);
 
     /** 
      * Enqueue a message(s) to queue(s) registering a post transaction action.

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Feb  5 11:59:49 2014
@@ -47,6 +47,8 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -441,6 +443,12 @@ public abstract class AbstractVirtualHos
     }
 
     @Override
+    public MessageSource getMessageSource(final String name)
+    {
+        return getQueue(name);
+    }
+
+    @Override
     public AMQQueue getQueue(UUID id)
     {
         return _queueRegistry.getQueue(id);
@@ -524,6 +532,13 @@ public abstract class AbstractVirtualHos
 
     }
 
+
+    @Override
+    public MessageDestination getMessageDestination(final String name)
+    {
+        return getExchange(name);
+    }
+
     @Override
     public Exchange getExchange(String name)
     {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Feb  5 11:59:49 2014
@@ -30,6 +30,8 @@ import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -49,6 +51,7 @@ public interface VirtualHost extends Dur
     String getName();
 
     AMQQueue getQueue(String name);
+    MessageSource getMessageSource(String name);
 
     AMQQueue getQueue(UUID id);
 
@@ -76,6 +79,7 @@ public interface VirtualHost extends Dur
 
     void removeExchange(Exchange exchange, boolean force) throws AMQException;
 
+    MessageDestination getMessageDestination(String name);
     Exchange getExchange(String name);
     Exchange getExchange(UUID id);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Wed Feb  5 11:59:49 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
@@ -305,7 +306,7 @@ public class MockAMQQueue implements AMQ
     {
     }
 
-    public void enqueue(ServerMessage message, Action<QueueEntry> action) throws AMQException
+    public void enqueue(ServerMessage message, Action<MessageInstance> action) throws AMQException
     {
     }
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Wed Feb  5 11:59:49 2014
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.consumer.Consumer;
@@ -62,7 +63,7 @@ public class MockQueueEntry implements Q
         return false;
     }
 
-    public void addStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+    public void addStateChangeListener(StateChangeListener<MessageInstance, State> listener)
     {
 
     }
@@ -72,7 +73,7 @@ public class MockQueueEntry implements Q
 
     }
 
-    public int routeToAlternate(final Action<QueueEntry> action, final ServerTransaction txn)
+    public int routeToAlternate(final Action<MessageInstance> action, final ServerTransaction txn)
     {
         return 0;
     }
@@ -152,7 +153,7 @@ public class MockQueueEntry implements Q
     }
 
 
-    public boolean removeStateChangeListener(StateChangeListener<QueueEntry, State> listener)
+    public boolean removeStateChangeListener(StateChangeListener<MessageInstance, State> listener)
     {
 
         return false;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Wed Feb  5 11:59:49 2014
@@ -245,11 +245,11 @@ public class SimpleAMQQueueTest extends 
                                                       Consumer.Option.SEES_REQUEUES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+        Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
         {
-            public void performAction(QueueEntry entry)
+            public void performAction(MessageInstance entry)
             {
-                queueEntries.add(entry);
+                queueEntries.add((QueueEntry) entry);
             }
         };
 
@@ -298,11 +298,11 @@ public class SimpleAMQQueueTest extends 
                                                       Consumer.Option.ACQUIRES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+        Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
         {
-            public void performAction(QueueEntry entry)
+            public void performAction(MessageInstance entry)
             {
-                queueEntries.add(entry);
+                queueEntries.add((QueueEntry) entry);
             }
         };
 
@@ -356,11 +356,11 @@ public class SimpleAMQQueueTest extends 
                                                       Consumer.Option.SEES_REQUEUES));
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+        Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
         {
-            public void performAction(QueueEntry entry)
+            public void performAction(MessageInstance entry)
             {
-                queueEntries.add(entry);
+                queueEntries.add((QueueEntry) entry);
             }
         };
 
@@ -420,11 +420,11 @@ public class SimpleAMQQueueTest extends 
 
 
         final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        Action<QueueEntry> postEnqueueAction = new Action<QueueEntry>()
+        Action<MessageInstance> postEnqueueAction = new Action<MessageInstance>()
         {
-            public void performAction(QueueEntry entry)
+            public void performAction(MessageInstance entry)
             {
-                queueEntries.add(entry);
+                queueEntries.add((QueueEntry)entry);
             }
         };
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Wed Feb  5 11:59:49 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -127,6 +129,12 @@ public class MockVirtualHost implements 
     }
 
     @Override
+    public MessageSource getMessageSource(final String name)
+    {
+        return null;
+    }
+
+    @Override
     public AMQQueue getQueue(UUID id)
     {
         return null;
@@ -174,6 +182,12 @@ public class MockVirtualHost implements 
     }
 
     @Override
+    public MessageDestination getMessageDestination(final String name)
+    {
+        return null;
+    }
+
+    @Override
     public Exchange getExchange(String name)
     {
         return null;

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Wed Feb  5 11:59:49 2014
@@ -385,13 +385,13 @@ public class ConsumerTarget_0_10 extends
         final LogActor logActor = CurrentActor.get();
         final ServerMessage msg = entry.getMessage();
 
-        int requeues = entry.routeToAlternate(new Action<QueueEntry>()
+        int requeues = entry.routeToAlternate(new Action<MessageInstance>()
                     {
                         @Override
-                        public void performAction(final QueueEntry requeueEntry)
+                        public void performAction(final MessageInstance requeueEntry)
                         {
                             logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
-                                                                            requeueEntry.getQueue().getName()));
+                                                                            requeueEntry.getOwningResource().getName()));
                         }
                     }, null);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Wed Feb  5 11:59:49 2014
@@ -46,7 +46,6 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
 import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -55,14 +54,16 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.DistributedTransaction;
@@ -104,12 +105,16 @@ public class ServerSession extends Sessi
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
     private ChannelLogSubject _logSubject;
     private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
-    private final Action<QueueEntry> _checkCapacityAction = new Action<QueueEntry>()
+    private final Action<MessageInstance> _checkCapacityAction = new Action<MessageInstance>()
     {
         @Override
-        public void performAction(final QueueEntry entry)
+        public void performAction(final MessageInstance entry)
         {
-            entry.getQueue().checkCapacity(ServerSession.this);
+            TransactionLogResource queue = entry.getOwningResource();
+            if(queue instanceof CapacityChecker)
+            {
+                ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+            }
         }
     };
 
@@ -188,7 +193,7 @@ public class ServerSession extends Sessi
 
     public int enqueue(final MessageTransferMessage message,
                        final InstanceProperties instanceProperties,
-                       final Exchange exchange)
+                       final MessageDestination exchange)
     {
         if(_outstandingCredit.get() != UNLIMITED_CREDIT
                 && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Wed Feb  5 11:59:49 2014
@@ -35,7 +35,9 @@ import org.apache.qpid.server.filter.Fil
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
@@ -196,7 +198,7 @@ public class ServerSessionDelegate exten
                 String queueName = method.getQueue();
                 VirtualHost vhost = getVirtualHost(session);
 
-                final AMQQueue queue = vhost.getQueue(queueName);
+                final MessageSource queue = vhost.getMessageSource(queueName);
 
                 if(queue == null)
                 {
@@ -308,7 +310,7 @@ public class ServerSessionDelegate exten
     @Override
     public void messageTransfer(Session ssn, final MessageTransfer xfr)
     {
-        final Exchange exchange = getExchangeForMessage(ssn, xfr);
+        final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
 
         final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
         if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -327,7 +329,6 @@ public class ServerSessionDelegate exten
             return;
         }
 
-        final Exchange exchangeInUse;
         final MessageStore store = getVirtualHost(ssn).getMessageStore();
         final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
         final ServerSession serverSession = (ServerSession) ssn;
@@ -829,24 +830,24 @@ public class ServerSessionDelegate exten
         return getVirtualHost(session).getExchange(exchangeName);
     }
 
-    private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+    private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
     {
         VirtualHost virtualHost = getVirtualHost(ssn);
 
-        Exchange exchange;
+        MessageDestination destination;
         if(xfr.hasDestination())
         {
-            exchange = virtualHost.getExchange(xfr.getDestination());
-            if(exchange == null)
+            destination = virtualHost.getMessageDestination(xfr.getDestination());
+            if(destination == null)
             {
-                exchange = virtualHost.getDefaultExchange();
+                destination = virtualHost.getDefaultExchange();
             }
         }
         else
         {
-            exchange = virtualHost.getDefaultExchange();
+            destination = virtualHost.getDefaultExchange();
         }
-        return exchange;
+        return destination;
     }
 
     private VirtualHost getVirtualHost(Session session)

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Feb  5 11:59:49 2014
@@ -56,9 +56,12 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -256,7 +259,7 @@ public class AMQChannel implements AMQSe
         return _channelId;
     }
 
-    public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
+    public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException
     {
         String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
         SecurityManager securityManager = getVirtualHost().getSecurityManager();
@@ -265,7 +268,7 @@ public class AMQChannel implements AMQSe
             throw new AMQSecurityException("Permission denied: " + e.getName());
         }
         _currentMessage = new IncomingMessage(info);
-        _currentMessage.setExchange(e);
+        _currentMessage.setMessageDestination(e);
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -350,7 +353,7 @@ public class AMQChannel implements AMQSe
                                     }
                                 };
 
-                        int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+                        int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
                                                                           immediate ? _immediateAction : _capacityCheckAction);
                         if(enqueues == 0)
                         {
@@ -497,19 +500,19 @@ public class AMQChannel implements AMQSe
      * Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
      * up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
      *
+     *
      * @param tag       the tag chosen by the client (if null, server will generate one)
-     * @param queue     the queue to subscribe to
+     * @param source     the queue to subscribe to
      * @param acks      Are acks enabled for this subscriber
      * @param filters   Filters to apply to this subscriber
      *
-     * @param noLocal   Flag stopping own messages being received.
      * @param exclusive Flag requesting exclusive access to the queue
      * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
      *
      * @throws AMQException                  if something goes wrong
      */
-    public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
-                                           FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+    public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+                                            FieldTable filters, boolean exclusive) throws AMQException
     {
         if (tag == null)
         {
@@ -557,7 +560,7 @@ public class AMQChannel implements AMQSe
         try
         {
             Consumer sub =
-                    queue.addConsumer(target,
+                    source.addConsumer(target,
                                       FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
                                       AMQMessage.class,
                                       AMQShortString.toString(tag),
@@ -1189,16 +1192,16 @@ public class AMQChannel implements AMQSe
     }
 
 
-    private class ImmediateAction implements Action<QueueEntry>
+    private class ImmediateAction implements Action<MessageInstance>
     {
 
         public ImmediateAction()
         {
         }
 
-        public void performAction(QueueEntry entry)
+        public void performAction(MessageInstance entry)
         {
-            AMQQueue queue = entry.getQueue();
+            TransactionLogResource queue = entry.getOwningResource();
 
             if (!entry.getDeliveredToConsumer() && entry.acquire())
             {
@@ -1246,19 +1249,25 @@ public class AMQChannel implements AMQSe
             }
             else
             {
-                queue.checkCapacity(AMQChannel.this);
+                if(queue instanceof CapacityChecker)
+                {
+                    ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+                }
             }
 
         }
     }
 
-    private final class CapacityCheckAction implements Action<QueueEntry>
+    private final class CapacityCheckAction implements Action<MessageInstance>
     {
         @Override
-        public void performAction(final QueueEntry entry)
+        public void performAction(final MessageInstance entry)
         {
-            AMQQueue queue = entry.getQueue();
-            queue.checkCapacity(AMQChannel.this);
+            TransactionLogResource queue = entry.getOwningResource();
+            if(queue instanceof CapacityChecker)
+            {
+                ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+            }
         }
     }
 
@@ -1477,13 +1486,13 @@ public class AMQChannel implements AMQSe
             final ServerMessage msg = rejectedQueueEntry.getMessage();
             final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
 
-            int requeues = rejectedQueueEntry.routeToAlternate(new Action<QueueEntry>()
+            int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
                 {
                     @Override
-                    public void performAction(final QueueEntry requeueEntry)
+                    public void performAction(final MessageInstance requeueEntry)
                     {
                         _actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
-                                                                                   requeueEntry.getQueue().getName()));
+                                                                                   requeueEntry.getOwningResource().getName()));
                     }
                 }, null);
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Wed Feb  5 11:59:49 2014
@@ -49,13 +49,13 @@ import java.util.concurrent.atomic.Atomi
 public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
 {
 
-    private final StateChangeListener<QueueEntry, QueueEntry.State> _entryReleaseListener =
-            new StateChangeListener<QueueEntry, QueueEntry.State>()
+    private final StateChangeListener<MessageInstance, MessageInstance.State> _entryReleaseListener =
+            new StateChangeListener<MessageInstance, MessageInstance.State>()
             {
                 @Override
-                public void stateChanged(final QueueEntry entry,
-                                         final QueueEntry.State oldSate,
-                                         final QueueEntry.State newState)
+                public void stateChanged(final MessageInstance entry,
+                                         final MessageInstance.State oldSate,
+                                         final MessageInstance.State newState)
                 {
                     if (oldSate == QueueEntry.State.ACQUIRED && (newState == QueueEntry.State.AVAILABLE || newState == QueueEntry.State.DEQUEUED))
                     {
@@ -463,7 +463,7 @@ public abstract class ConsumerTarget_0_8
         _creditManager.restoreCredit(1, message.getSize());
     }
 
-    protected final StateChangeListener<QueueEntry, QueueEntry.State> getReleasedStateChangeListener()
+    protected final StateChangeListener<MessageInstance, MessageInstance.State> getReleasedStateChangeListener()
     {
         return _entryReleaseListener;
     }
@@ -526,11 +526,11 @@ public abstract class ConsumerTarget_0_8
         final long size = entry.getMessage().getSize();
         _unacknowledgedBytes.addAndGet(size);
         _unacknowledgedCount.incrementAndGet();
-        entry.addStateChangeListener(new StateChangeListener<QueueEntry, QueueEntry.State>()
+        entry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.State>()
         {
-            public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+            public void stateChanged(MessageInstance entry, MessageInstance.State oldState, MessageInstance.State newState)
             {
-                if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+                if(oldState.equals(MessageInstance.State.ACQUIRED) && !newState.equals(MessageInstance.State.ACQUIRED))
                 {
                     _unacknowledgedBytes.addAndGet(-size);
                     _unacknowledgedCount.decrementAndGet();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Wed Feb  5 11:59:49 2014
@@ -20,15 +20,12 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageDestination;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,7 +35,7 @@ public class IncomingMessage
 
     private final MessagePublishInfo _messagePublishInfo;
     private ContentHeaderBody _contentHeaderBody;
-    private Exchange _exchange;
+    private MessageDestination _messageDestination;
 
     /**
      * Keeps a track of how many bytes we have received in body frames
@@ -77,9 +74,9 @@ public class IncomingMessage
         return _messagePublishInfo.getExchange();
     }
 
-    public Exchange getExchange()
+    public MessageDestination getDestination()
     {
-        return _exchange;
+        return _messageDestination;
     }
 
     public ContentHeaderBody getContentHeader()
@@ -92,9 +89,9 @@ public class IncomingMessage
         return getContentHeader().getBodySize();
     }
 
-    public void setExchange(final Exchange e)
+    public void setMessageDestination(final MessageDestination e)
     {
-        _exchange = e;
+        _messageDestination = e;
     }
 
     public int getBodyCount() throws AMQException

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Wed Feb  5 11:59:49 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicConsumeBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler i
                               " args:" + body.getArguments());
             }
 
-            AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+            MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
 
             if (queue == null)
             {
@@ -120,8 +121,11 @@ public class BasicConsumeMethodHandler i
                     if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
                     {
 
-                        AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
-                                                                              body.getArguments(), body.getNoLocal(), body.getExclusive());
+                        AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+                                                                               queue,
+                                                                               !body.getNoAck(),
+                                                                               body.getArguments(),
+                                                                               body.getExclusive());
                         if (!body.getNowait())
                         {
                             MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Wed Feb  5 11:59:49 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.protocol.v0_8.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -67,7 +68,7 @@ public class BasicPublishMethodHandler i
         }
 
         VirtualHost vHost = session.getVirtualHost();
-        Exchange exch = vHost.getExchange(exchangeName.toString());
+        MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
         // if the exchange does not exist we raise a channel exception
         if (exch == null)
         {

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Wed Feb  5 11:59:49 2014
@@ -140,7 +140,7 @@ public class AcknowledgeTest extends Qpi
         assertEquals("Channel should have no unacked msgs ", 0, getChannel().getUnacknowledgedMessageMap().size());
 
         //Subscribe to the queue
-        AMQShortString subscriber = _channel.subscribeToQueue(null, _queue, true, null, false, true);
+        AMQShortString subscriber = _channel.consumeFromSource(null, _queue, true, null, true);
 
         getQueue().deliverAsync();
 

Modified: qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java?rev=1564721&r1=1564720&r2=1564721&view=diff
==============================================================================
--- qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java (original)
+++ qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java Wed Feb  5 11:59:49 2014
@@ -144,6 +144,6 @@ public class QueueBrowserUsesNoAckTest e
         FieldTable filters = new FieldTable();
         filters.put(AMQPFilterTypes.NO_CONSUME.getValue(), true);
 
-        return channel.subscribeToQueue(null, queue, true, filters, false, true);
+        return channel.consumeFromSource(null, queue, true, filters, true);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org