You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/21 09:32:09 UTC

svn commit: r1765973 [2/7] - in /qpid/java/branches/transfer-queue: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ bdbstore/src/test/java/org/apache/qpid/server/stor...

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Fri Oct 21 09:32:07 2016
@@ -21,15 +21,12 @@
 package org.apache.qpid.server.message;
 
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.store.MessageEnqueueRecord;
-import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
 
-public interface MessageInstance
+public interface MessageInstance extends BaseMessageInstance
 {
 
 
@@ -50,39 +47,37 @@ public interface MessageInstance
 
     boolean acquiredByConsumer();
 
-    boolean isAcquiredBy(ConsumerImpl consumer);
+    boolean isAcquiredBy(MessageInstanceConsumer consumer);
 
-    boolean removeAcquisitionFromConsumer(ConsumerImpl consumer);
+    boolean removeAcquisitionFromConsumer(MessageInstanceConsumer consumer);
 
     void setRedelivered();
 
     boolean isRedelivered();
 
-    ConsumerImpl getDeliveredConsumer();
+    MessageInstanceConsumer getDeliveredConsumer();
 
     void reject();
 
-    boolean isRejectedBy(ConsumerImpl consumer);
-
-    boolean getDeliveredToConsumer();
+    boolean isRejectedBy(MessageInstanceConsumer consumer);
 
     boolean expired();
 
-    boolean acquire(ConsumerImpl sub);
+    boolean acquire(MessageInstanceConsumer sub);
 
-    boolean makeAcquisitionUnstealable(final ConsumerImpl consumer);
+    boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer);
 
     boolean makeAcquisitionStealable();
 
     int getMaximumDeliveryCount();
 
-    int routeToAlternate(Action<? super MessageInstance> action, ServerTransaction txn);
+    int routeToAlternate(Action<? super BaseMessageInstance> action, ServerTransaction txn);
 
     Filterable asFilterable();
 
-    ConsumerImpl getAcquiringConsumer();
+    MessageInstanceConsumer getAcquiringConsumer();
 
-    MessageEnqueueRecord getEnqueueRecord();
+    InstanceProperties getInstanceProperties();
 
     enum State
     {
@@ -171,7 +166,7 @@ public interface MessageInstance
         }
     }
 
-    abstract class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState
+    abstract class ConsumerAcquiredState<C extends AcquiringMessageInstanceConsumer<C,?>> extends EntryState
     {
         public abstract C getConsumer();
 
@@ -188,7 +183,7 @@ public interface MessageInstance
         }
     }
 
-    final class StealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+    final class StealableConsumerAcquiredState<C extends AcquiringMessageInstanceConsumer<C,?>> extends ConsumerAcquiredState
     {
         private final C _consumer;
         private final UnstealableConsumerAcquiredState<C> _unstealableState;
@@ -211,7 +206,7 @@ public interface MessageInstance
         }
     }
 
-    final class UnstealableConsumerAcquiredState<C extends ConsumerImpl> extends ConsumerAcquiredState
+    final class UnstealableConsumerAcquiredState<C extends AcquiringMessageInstanceConsumer<C,?>> extends ConsumerAcquiredState
     {
         private final StealableConsumerAcquiredState<C> _stealableState;
 
@@ -240,25 +235,15 @@ public interface MessageInstance
 
     boolean isAvailable();
 
-    boolean acquire();
-
     boolean isAcquired();
 
     void release();
 
-    void release(ConsumerImpl release);
+    void release(MessageInstanceConsumer release);
 
     boolean resend();
 
-    void delete();
-
     boolean isDeleted();
 
     boolean isHeld();
-
-    ServerMessage getMessage();
-
-    InstanceProperties getInstanceProperties();
-
-    TransactionLogResource getOwningResource();
 }

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+public interface MessageInstanceConsumer
+{
+
+    boolean isClosed();
+
+    boolean acquires();
+
+    String getName();
+
+    void close();
+
+    void flush();
+
+    void externalStateChange();
+
+    Object getIdentifier();
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstanceConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java Fri Oct 21 09:32:07 2016
@@ -23,23 +23,22 @@ package org.apache.qpid.server.message;
 import java.util.Collection;
 import java.util.EnumSet;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.store.TransactionLogResource;
 
-public interface MessageSource extends TransactionLogResource, MessageNode
+public interface MessageSource<X extends MessageInstanceConsumer> extends TransactionLogResource, MessageNode
 {
-     ConsumerImpl addConsumer(ConsumerTarget target, FilterManager filters,
-                              Class<? extends ServerMessage> messageClass,
-                              String consumerName,
-                              EnumSet<ConsumerImpl.Option> options,
-                              Integer priority)
-            throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
-                   ConsumerAccessRefused;
+    X addConsumer(ConsumerTarget target,
+                  FilterManager filters,
+                  Class<? extends ServerMessage> messageClass,
+                  String consumerName,
+                  EnumSet<ConsumerOption> options,
+                  Integer priority)
+            throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused;
 
-    Collection<? extends ConsumerImpl> getConsumers();
+    Collection<X> getConsumers();
 
     boolean verifySessionAccess(AMQSessionModel<?> session);
 

Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfigurationChangeListener.java (from r1765972, qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/NoopConfigurationChangeListener.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfigurationChangeListener.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfigurationChangeListener.java&p1=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/NoopConfigurationChangeListener.java&r1=1765972&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/NoopConfigurationChangeListener.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfigurationChangeListener.java Fri Oct 21 09:32:07 2016
@@ -20,7 +20,7 @@
  */
 package org.apache.qpid.server.model;
 
-public class NoopConfigurationChangeListener implements ConfigurationChangeListener
+public abstract class AbstractConfigurationChangeListener implements ConfigurationChangeListener
 {
     @Override
     public void stateChanged(ConfiguredObject<?> object, State oldState, State newState)

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java Fri Oct 21 09:32:07 2016
@@ -2596,11 +2596,11 @@ public abstract class AbstractConfigured
     public ListenableFuture<Void> setAttributesAsync(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
     {
         final Map<String,Object> updateAttributes = new HashMap<>(attributes);
-        Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
-        runTask(new Task<Void, RuntimeException>()
+        final Object desiredState = updateAttributes.remove(ConfiguredObject.DESIRED_STATE);
+        return doOnConfigThread(new Task<ListenableFuture<Void>, RuntimeException>()
         {
             @Override
-            public Void execute()
+            public ListenableFuture<Void> execute()
             {
                 authoriseSetAttributes(createProxyForValidation(attributes), attributes.keySet());
                 if (!isSystemProcess())
@@ -2609,7 +2609,29 @@ public abstract class AbstractConfigured
                 }
 
                 changeAttributes(updateAttributes);
-                return null;
+                if(desiredState != null)
+                {
+                    State state;
+                    if(desiredState instanceof State)
+                    {
+                        state = (State)desiredState;
+                    }
+                    else if(desiredState instanceof String)
+                    {
+                        state = State.valueOf((String)desiredState);
+                    }
+                    else
+                    {
+                        throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
+                    }
+                    return setDesiredState(state);
+                }
+                else
+                {
+                    return Futures.immediateFuture(null);
+                }
+
+
             }
 
             @Override
@@ -2630,27 +2652,6 @@ public abstract class AbstractConfigured
                 return "attributes number=" + attributes.size();
             }
         });
-        if(desiredState != null)
-        {
-            State state;
-            if(desiredState instanceof State)
-            {
-                state = (State)desiredState;
-            }
-            else if(desiredState instanceof String)
-            {
-                state = State.valueOf((String)desiredState);
-            }
-            else
-            {
-                throw new IllegalArgumentException("Cannot convert an object of type " + desiredState.getClass().getName() + " to a State");
-            }
-            return setDesiredState(state);
-        }
-        else
-        {
-            return Futures.immediateFuture(null);
-        }
     }
 
     public void forceUpdateAllSecureAttributes()

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java Fri Oct 21 09:32:07 2016
@@ -469,11 +469,13 @@ public abstract class AbstractSystemConf
         return preferenceStoreFactory.createInstance(this, attributes);
     }
 
+    @Override
     protected final Principal getSystemPrincipal()
     {
         return _systemPrincipal;
     }
 
+    @Override
     public Runnable getOnContainerResolveTask()
     {
         return _onContainerResolveTask;
@@ -485,11 +487,13 @@ public abstract class AbstractSystemConf
         _onContainerResolveTask = onContainerResolveTask;
     }
 
+    @Override
     public Runnable getOnContainerCloseTask()
     {
         return _onContainerCloseTask;
     }
 
+    @Override
     public void setOnContainerCloseTask(final Runnable onContainerCloseTask)
     {
         _onContainerCloseTask = onContainerCloseTask;
@@ -497,6 +501,7 @@ public abstract class AbstractSystemConf
 
     private class ShutdownService implements Runnable
     {
+        @Override
         public void run()
         {
             Subject.doAs(getSystemTaskSubject("Shutdown"),

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+@ManagedObject(category = false, type = AnonymousCredential.ANONYMOUS)
+public interface AnonymousCredential<X extends AnonymousCredential<X>> extends Credential<X>
+{
+    String ANONYMOUS = "Anonymous";
+
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/AnonymousCredential.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerImpl.java Fri Oct 21 09:32:07 2016
@@ -120,7 +120,7 @@ public class BrokerImpl extends Abstract
         @Override
         public Result authorise(final SecurityToken token,
                                 final Operation operation,
-                                final ConfiguredObject<?> configuredObject)
+                                final PermissionedObject configuredObject)
         {
             return isSystemProcess() ? Result.ALLOWED : Result.DEFER;
         }
@@ -128,7 +128,7 @@ public class BrokerImpl extends Abstract
         @Override
         public Result authorise(final SecurityToken token,
                                 final Operation operation,
-                                final ConfiguredObject<?> configuredObject,
+                                final PermissionedObject configuredObject,
                                 final Map<String, Object> arguments)
         {
             return isSystemProcess() ? Result.ALLOWED : Result.DEFER;
@@ -1135,17 +1135,11 @@ public class BrokerImpl extends Abstract
     }
 
 
-    private final class AccessControlProviderListener implements ConfigurationChangeListener
+    private final class AccessControlProviderListener extends AbstractConfigurationChangeListener
     {
         private final Set<ConfiguredObject<?>> _bulkChanges = new HashSet<>();
 
         @Override
-        public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
-        {
-
-        }
-
-        @Override
         public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
         {
             if(object.getCategoryClass() == Broker.class && child.getCategoryClass() == AccessControlProvider.class)

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java Fri Oct 21 09:32:07 2016
@@ -96,6 +96,11 @@ public final class BrokerModel extends M
         addRelationship(VirtualHost.class, VirtualHostAccessControlProvider.class);
         addRelationship(VirtualHost.class, Exchange.class);
         addRelationship(VirtualHost.class, Queue.class);
+        addRelationship(VirtualHost.class, RemoteHost.class);
+
+
+        addRelationship(RemoteHost.class, RemoteHostAddress.class);
+        addRelationship(RemoteHost.class, Credential.class);
 
         addRelationship(VirtualHostLogger.class, VirtualHostLogInclusionRule.class);
 

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java Fri Oct 21 09:32:07 2016
@@ -40,7 +40,8 @@ import org.apache.qpid.server.store.Conf
 /**
  * An object that can be "managed" (eg via the web interface) and usually read from configuration.
  */
-public interface ConfiguredObject<X extends ConfiguredObject<X>> extends ContextProvider, TaskExecutorProvider
+public interface ConfiguredObject<X extends ConfiguredObject<X>> extends ContextProvider, TaskExecutorProvider,
+                                                                         PermissionedObject
 {
     String OVER_SIZED_ATTRIBUTE_ALTERNATIVE_TEXT = "Value is too long to display";
 

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Fri Oct 21 09:32:07 2016
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.server.model;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.protocol.AMQSessionModel;
 
 @ManagedObject
-public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>, ConsumerImpl
+public interface Consumer<X extends Consumer<X>> extends ConfiguredObject<X>
 {
     String DISTRIBUTION_MODE = "distributionMode";
     String EXCLUSIVE = "exclusive";
@@ -36,7 +38,7 @@ public interface Consumer<X extends Cons
     String SUSPEND_NOTIFICATION_PERIOD = "consumer.suspendNotificationPeriod";
 
     @ManagedContextDefault( name = SUSPEND_NOTIFICATION_PERIOD)
-    long SUSPEND_NOTIFICATION_PERIOD_DEFAULT = 10000;
+    long SUSPEND_NOTIFICATION_PERIOD_DEFAULT = 10000;AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0);
 
     @ManagedAttribute
     String getDistributionMode();
@@ -70,4 +72,21 @@ public interface Consumer<X extends Cons
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, label = "Prefetch")
     long getUnacknowledgedMessages();
 
+
+    AMQSessionModel getSessionModel();
+
+    long getConsumerNumber();
+
+    boolean isSuspended();
+
+    boolean seesRequeues();
+
+    boolean trySendLock();
+
+
+    void getSendLock();
+
+    void releaseSendLock();
+
+    boolean isActive();
 }

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.List;
+
+import javax.security.sasl.SaslClient;
+
+@ManagedObject(creatable = false)
+public interface Credential<X extends Credential<X>> extends ConfiguredObject<X>
+{
+
+    SaslClient getSaslClient(List<String> mechanisms);
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Credential.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+@ManagedObject(category = false, type = ExternalCredential.EXTERNAL)
+public interface ExternalCredential<X extends ExternalCredential<X>> extends Credential<X>
+{
+    String EXTERNAL = "External";
+
+    @ManagedAttribute
+    String getUsername();
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/ExternalCredential.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public interface PermissionedObject
+{
+    Class<? extends ConfiguredObject> getCategoryClass();
+
+    String getName();
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/PermissionedObject.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Fri Oct 21 09:32:07 2016
@@ -34,11 +34,11 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.CapacityChecker;
-import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.NotificationCheck;
 import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.queue.RecoverableBaseQueue;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.util.Deletable;
@@ -46,8 +46,8 @@ import org.apache.qpid.server.util.Delet
 @ManagedObject( defaultType = "standard", description = Queue.CLASS_DESCRIPTION )
 public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
                                                    Comparable<X>, ExchangeReferrer,
-                                                   BaseQueue,
-                                                   MessageSource,
+                                                   RecoverableBaseQueue,
+                                                   MessageSource<QueueConsumer<?>>,
                                                    CapacityChecker,
                                                    MessageDestination,
                                                    Deletable<X>
@@ -220,7 +220,7 @@ public interface Queue<X extends Queue<X
     Collection<? extends Binding<?>> getBindings();
 
 
-    Collection<? extends Consumer<?>> getConsumers();
+    Collection<QueueConsumer<?>> getConsumers();
 
     //operations
 

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Collection;
+
+@ManagedObject(defaultType = RemoteHost.REMOTE_HOST_TYPE)
+public interface RemoteHost<X extends RemoteHost<X>> extends ConfiguredObject<X>
+{
+
+    String REMOTE_HOST_TYPE = "Standard";
+
+    @ManagedAttribute(defaultValue = "10")
+    int getRetryPeriod();
+
+    @ManagedAttribute(defaultValue = "true")
+    boolean isRedirectFollowed();
+
+    @ManagedAttribute(defaultValue = "[]")
+    Collection<String> getRoutableAddresses();
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHost.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Collection;
+import java.util.List;
+
+@ManagedObject( defaultType = RemoteHostAddress.REMOTE_HOST_ADDRESS_TYPE)
+public interface RemoteHostAddress<X extends RemoteHostAddress<X>> extends ConfiguredObject<X>
+{
+
+    String REMOTE_HOST_ADDRESS_TYPE = "Standard";
+
+    @ManagedAttribute(mandatory = true)
+    String getAddress();
+
+    @ManagedAttribute(mandatory = true)
+    int getPort();
+
+    @ManagedAttribute
+    String getHostName();
+
+    @ManagedAttribute
+    Protocol getProtocol();
+
+    @ManagedAttribute( defaultValue = "TCP" )
+    Transport getTransport();
+
+    @ManagedAttribute
+    KeyStore getKeyStore();
+
+    @ManagedAttribute
+    Collection<TrustStore> getTrustStores();
+
+    @ManagedAttribute(defaultValue = "0")
+    int getDesiredHeartbeatInterval();
+
+
+    @DerivedAttribute
+    List<String> getTlsProtocolWhiteList();
+
+    @DerivedAttribute
+    List<String> getTlsProtocolBlackList();
+
+    @DerivedAttribute
+    List<String> getTlsCipherSuiteWhiteList();
+
+    @DerivedAttribute
+    List<String> getTlsCipherSuiteBlackList();
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/RemoteHostAddress.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+@ManagedObject(category = false, type = UsernamePasswordCredential.USERNAME_PASSWORD)
+public interface UsernamePasswordCredential<X extends UsernamePasswordCredential<X>> extends Credential<X>
+{
+    String USERNAME_PASSWORD = "UsernamePassword";
+
+    @ManagedAttribute
+    String getUsername();
+
+    @ManagedAttribute(secure = true)
+    String getPassword();
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/UsernamePasswordCredential.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Fri Oct 21 09:32:07 2016
@@ -37,7 +37,9 @@ import org.apache.qpid.server.stats.Stat
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.preferences.UserPreferencesCreator;
+import org.apache.qpid.server.transfer.TransferQueue;
 import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.HouseKeepingTask;
 import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
 
@@ -261,8 +263,12 @@ public interface VirtualHost<X extends V
 
     String getLocalAddress(String routingAddress);
 
+    TransferQueue getTransferQueue();
+
     void setFirstOpening(boolean firstOpening);
 
+    boolean makeConnection(RemoteHostAddress<?> address, final Action<Boolean> onConnectionLoss);
+
     interface Transaction
     {
         void dequeue(QueueEntry entry);

Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/plugin/OutboundProtocolEngineCreator.java (from r1755476, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/plugin/OutboundProtocolEngineCreator.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/plugin/OutboundProtocolEngineCreator.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java&r1=1755476&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/plugin/OutboundProtocolEngineCreator.java Fri Oct 21 09:32:07 2016
@@ -19,25 +19,16 @@ package org.apache.qpid.server.plugin;/*
  *
  */
 
-import org.apache.qpid.server.transport.ProtocolEngine;
-import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.federation.OutboundProtocolEngine;
 import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.server.model.RemoteHostAddress;
+import org.apache.qpid.server.model.VirtualHost;
 
-public interface ProtocolEngineCreator extends Pluggable
+public interface OutboundProtocolEngineCreator extends Pluggable
 {
     Protocol getVersion();
-    byte[] getHeaderIdentifier();
-    ProtocolEngine newProtocolEngine(Broker<?> broker,
-                                     ServerNetworkConnection network,
-                                     AmqpPort<?> port,
-                                     Transport transport,
-                                     long id,
-                                     final AggregateTicker aggregateTicker);
+    OutboundProtocolEngine newProtocolEngine(RemoteHostAddress<?> address,
+                                             VirtualHost<?> virtualHost);
 
-    byte[] getSuggestedAlternativeHeader();
 }
 

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Fri Oct 21 09:32:07 2016
@@ -33,7 +33,6 @@ import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -73,7 +72,6 @@ import org.apache.qpid.filter.selector.T
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.JMSSelectorFilter;
@@ -83,6 +81,8 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.ConsumerOption;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageInfo;
@@ -724,7 +724,7 @@ public abstract class AbstractQueue<X ex
                                          final FilterManager filters,
                                          final Class<? extends ServerMessage> messageClass,
                                          final String consumerName,
-                                         final EnumSet<ConsumerImpl.Option> optionSet,
+                                         final EnumSet<ConsumerOption> optionSet,
                                          final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused
@@ -777,7 +777,7 @@ public abstract class AbstractQueue<X ex
                                                   FilterManager filters,
                                                   final Class<? extends ServerMessage> messageClass,
                                                   final String consumerName,
-                                                  EnumSet<ConsumerImpl.Option> optionSet,
+                                                  EnumSet<ConsumerOption> optionSet,
                                                   final Integer priority)
             throws ExistingExclusiveConsumer, ConsumerAccessRefused,
                    ExistingConsumerPreventsExclusive
@@ -787,89 +787,30 @@ public abstract class AbstractQueue<X ex
             throw new ExistingExclusiveConsumer();
         }
 
-        Object exclusiveOwner = _exclusiveOwner;
-        switch(_exclusive)
+        if(_noLocal && !optionSet.contains(ConsumerOption.NO_LOCAL))
         {
-            case CONNECTION:
-                if(exclusiveOwner == null)
-                {
-                    exclusiveOwner = target.getSessionModel().getAMQPConnection();
-                    addExclusivityConstraint(target.getSessionModel().getAMQPConnection());
-                }
-                else
-                {
-                    if(exclusiveOwner != target.getSessionModel().getAMQPConnection())
-                    {
-                        throw new ConsumerAccessRefused();
-                    }
-                }
-                break;
-            case SESSION:
-                if(exclusiveOwner == null)
-                {
-                    exclusiveOwner = target.getSessionModel();
-                    addExclusivityConstraint(target.getSessionModel());
-                }
-                else
-                {
-                    if(exclusiveOwner != target.getSessionModel())
-                    {
-                        throw new ConsumerAccessRefused();
-                    }
-                }
-                break;
-            case LINK:
-                if(getConsumerCount() != 0)
-                {
-                    throw new ConsumerAccessRefused();
-                }
-                break;
-            case PRINCIPAL:
-                Principal currentAuthorizedPrincipal = target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
-                if(exclusiveOwner == null)
-                {
-                    exclusiveOwner = currentAuthorizedPrincipal;
-                }
-                else
-                {
-                    if(!Objects.equals(((Principal) exclusiveOwner).getName(), currentAuthorizedPrincipal.getName()))
-                    {
-                        throw new ConsumerAccessRefused();
-                    }
-                }
-                break;
-            case CONTAINER:
-                if(exclusiveOwner == null)
-                {
-                    exclusiveOwner = target.getSessionModel().getAMQPConnection().getRemoteContainerName();
-                }
-                else
-                {
-                    if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getRemoteContainerName()))
-                    {
-                        throw new ConsumerAccessRefused();
-                    }
-                }
-                break;
-            case NONE:
-                break;
-            default:
-                throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
+            optionSet = EnumSet.copyOf(optionSet);
+            optionSet.add(ConsumerOption.NO_LOCAL);
         }
 
-        boolean exclusive =  optionSet.contains(ConsumerImpl.Option.EXCLUSIVE);
-        boolean isTransient =  optionSet.contains(ConsumerImpl.Option.TRANSIENT);
-
-        if(_noLocal && !optionSet.contains(ConsumerImpl.Option.NO_LOCAL))
+        if(_ensureNondestructiveConsumers)
         {
             optionSet = EnumSet.copyOf(optionSet);
-            optionSet.add(ConsumerImpl.Option.NO_LOCAL);
+            optionSet.removeAll(EnumSet.of(ConsumerOption.SEES_REQUEUES, ConsumerOption.ACQUIRES));
         }
 
+
+
+        boolean exclusive =  optionSet.contains(ConsumerOption.EXCLUSIVE);
+        boolean isTransient =  optionSet.contains(ConsumerOption.TRANSIENT);
+
+
+
         if(exclusive && getConsumerCount() != 0)
         {
             throw new ExistingConsumerPreventsExclusive();
         }
+
         if(!_defaultFiltersMap.isEmpty())
         {
             if(filters == null)
@@ -902,11 +843,6 @@ public abstract class AbstractQueue<X ex
             }
         }
 
-        if(_ensureNondestructiveConsumers)
-        {
-            optionSet = EnumSet.copyOf(optionSet);
-            optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
-        }
 
         QueueConsumerImpl consumer = new QueueConsumerImpl(this,
                                                            target,
@@ -916,7 +852,11 @@ public abstract class AbstractQueue<X ex
                                                            optionSet,
                                                            priority);
 
-        _exclusiveOwner = exclusiveOwner;
+        checkExclusivity(target);
+
+
+        consumer.open();
+
         target.consumerAdded(consumer);
 
 
@@ -964,6 +904,80 @@ public abstract class AbstractQueue<X ex
         return consumer;
     }
 
+    private void checkExclusivity(final ConsumerTarget target) throws ConsumerAccessRefused
+    {
+        Object exclusiveOwner = _exclusiveOwner;
+        switch(_exclusive)
+        {
+            case CONNECTION:
+                if(exclusiveOwner == null)
+                {
+                    exclusiveOwner = target.getSessionModel().getAMQPConnection();
+                    addExclusivityConstraint(target.getSessionModel().getAMQPConnection());
+                }
+                else
+                {
+                    if(exclusiveOwner != target.getSessionModel().getAMQPConnection())
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case SESSION:
+                if(exclusiveOwner == null)
+                {
+                    exclusiveOwner = target.getSessionModel();
+                    addExclusivityConstraint(target.getSessionModel());
+                }
+                else
+                {
+                    if(exclusiveOwner != target.getSessionModel())
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case LINK:
+                if(getConsumerCount() != 0)
+                {
+                    throw new ConsumerAccessRefused();
+                }
+                break;
+            case PRINCIPAL:
+                Principal currentAuthorizedPrincipal = target.getSessionModel().getAMQPConnection().getAuthorizedPrincipal();
+                if(exclusiveOwner == null)
+                {
+                    exclusiveOwner = currentAuthorizedPrincipal;
+                }
+                else
+                {
+                    if(!Objects.equals(((Principal) exclusiveOwner).getName(), currentAuthorizedPrincipal.getName()))
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case CONTAINER:
+                if(exclusiveOwner == null)
+                {
+                    exclusiveOwner = target.getSessionModel().getAMQPConnection().getRemoteContainerName();
+                }
+                else
+                {
+                    if(!exclusiveOwner.equals(target.getSessionModel().getAMQPConnection().getRemoteContainerName()))
+                    {
+                        throw new ConsumerAccessRefused();
+                    }
+                }
+                break;
+            case NONE:
+                break;
+            default:
+                throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
+        }
+        _exclusiveOwner = exclusiveOwner;
+    }
+
     @Override
     protected ListenableFuture<Void> beforeClose()
     {
@@ -1112,7 +1126,7 @@ public abstract class AbstractQueue<X ex
 
     // ------ Enqueue / Dequeue
 
-    public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
+    public final void enqueue(ServerMessage message, Action<? super BaseMessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
         incrementQueueCount();
         incrementQueueSize(message);
@@ -2057,6 +2071,57 @@ public abstract class AbstractQueue<X ex
         txn.commit();
     }
 
+    int routeToAlternate(final QueueEntry queueEntry, final Action<? super BaseMessageInstance> action, ServerTransaction txn)
+    {
+        if (!queueEntry.isAcquired())
+        {
+            throw new IllegalStateException("Illegal queue entry state. " + this + " is not acquired.");
+        }
+
+        Exchange<?> alternateExchange = getAlternateExchange();
+        boolean autocommit =  txn == null;
+        int enqueues;
+
+        if(autocommit)
+        {
+            txn = new LocalTransaction(getVirtualHost().getMessageStore());
+        }
+
+        if (alternateExchange != null)
+        {
+            enqueues = alternateExchange.send(queueEntry.getMessage(),
+                                              queueEntry.getMessage().getInitialRoutingAddress(),
+                                              queueEntry.getInstanceProperties(),
+                                              txn,
+                                              action);
+        }
+        else
+        {
+            enqueues = 0;
+        }
+
+        txn.dequeue(queueEntry.getEnqueueRecord(), new ServerTransaction.Action()
+        {
+            public void postCommit()
+            {
+                queueEntry.delete();
+            }
+
+            public void onRollback()
+            {
+
+            }
+        });
+
+        if(autocommit)
+        {
+            txn.commit();
+        }
+
+        return enqueues;
+    }
+
+
     private void performQueueDeleteTasks()
     {
         for (Action<? super X> task : _deleteTaskList)
@@ -3038,7 +3103,7 @@ public abstract class AbstractQueue<X ex
                                                                                         final String routingAddress,
                                                                                         final InstanceProperties instanceProperties,
                                                                                         final ServerTransaction txn,
-                                                                                        final Action<? super MessageInstance> postEnqueueAction)
+                                                                                        final Action<? super BaseMessageInstance> postEnqueueAction)
     {
         if (_virtualHost.getState() != State.ACTIVE)
         {
@@ -3169,7 +3234,7 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
             case CONNECTION:
                 AMQSessionModel session = null;
-                for(ConsumerImpl c : getConsumers())
+                for(QueueConsumer c : getConsumers())
                 {
                     if(session == null)
                     {
@@ -3195,7 +3260,7 @@ public abstract class AbstractQueue<X ex
             case CONTAINER:
             case PRINCIPAL:
                 AMQPConnection con = null;
-                for(ConsumerImpl c : getConsumers())
+                for(QueueConsumer c : getConsumers())
                 {
                     if(con == null)
                     {
@@ -3223,7 +3288,7 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case PRINCIPAL:
                 String containerID = null;
-                for(ConsumerImpl c : getConsumers())
+                for(QueueConsumer c : getConsumers())
                 {
                     if(containerID == null)
                     {
@@ -3254,7 +3319,7 @@ public abstract class AbstractQueue<X ex
             case NONE:
             case CONTAINER:
                 Principal principal = null;
-                for(ConsumerImpl c : getConsumers())
+                for(QueueConsumer c : getConsumers())
                 {
                     if(principal == null)
                     {
@@ -3494,7 +3559,7 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private class DeletedChildListener implements ConfigurationChangeListener
+    private class DeletedChildListener extends AbstractConfigurationChangeListener
     {
         @Override
         public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
@@ -3504,39 +3569,6 @@ public abstract class AbstractQueue<X ex
                 AbstractQueue.this.childRemoved(object);
             }
         }
-
-        @Override
-        public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
-        {
-
-        }
-
-        @Override
-        public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
-        {
-
-        }
-
-        @Override
-        public void attributeSet(final ConfiguredObject object,
-                                 final String attributeName,
-                                 final Object oldAttributeValue,
-                                 final Object newAttributeValue)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeStart(final ConfiguredObject<?> object)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeEnd(final ConfiguredObject<?> object)
-        {
-
-        }
     }
 
     private static class EnqueueRequest

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Fri Oct 21 09:32:07 2016
@@ -21,7 +21,7 @@
 
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.BaseMessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
@@ -29,7 +29,7 @@ import org.apache.qpid.server.util.Actio
 
 public interface BaseQueue extends TransactionLogResource
 {
-    void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord record);
+    void enqueue(ServerMessage message, Action<? super BaseMessageInstance> action, MessageEnqueueRecord record);
 
     boolean isDurable();
     boolean isDeleted();

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java Fri Oct 21 09:32:07 2016
@@ -20,21 +20,20 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
-import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.util.StateChangeListener;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
 import org.apache.qpid.server.message.ServerMessage;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import org.apache.qpid.server.util.StateChangeListener;
 
 public class DefinedGroupMessageGroupManager implements MessageGroupManager
 {
@@ -183,7 +182,7 @@ public class DefinedGroupMessageGroupMan
             }
         }
 
-        ConsumerImpl assignedSub = group.getConsumer();
+        QueueConsumer<?> assignedSub = group.getConsumer();
 
         if(assignedSub == sub)
         {

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Fri Oct 21 09:32:07 2016
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -220,7 +220,7 @@ public class LastValueQueueList extends
         }
 
         @Override
-        public void release(ConsumerImpl consumer)
+        public void release(MessageInstanceConsumer consumer)
         {
             super.release(consumer);
 

Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedBaseQueueEntryList.java (from r1765972, qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedBaseQueueEntryList.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedBaseQueueEntryList.java&p1=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java&r1=1765972&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedBaseQueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -25,23 +25,22 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 
-public abstract class OrderedQueueEntryList implements QueueEntryList
+public abstract class OrderedBaseQueueEntryList<T extends RecoverableBaseQueue> implements QueueEntryList
 {
 
     private final OrderedQueueEntry _head;
 
     private volatile OrderedQueueEntry _tail;
 
-    static final AtomicReferenceFieldUpdater<OrderedQueueEntryList, OrderedQueueEntry>
+    static final AtomicReferenceFieldUpdater<OrderedBaseQueueEntryList, OrderedQueueEntry>
             _tailUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (OrderedQueueEntryList.class, OrderedQueueEntry.class, "_tail");
+                (OrderedBaseQueueEntryList.class, OrderedQueueEntry.class, "_tail");
 
 
-    private final Queue<?> _queue;
+    private final T _queue;
 
     static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry>
                 _nextUpdater = OrderedQueueEntry._nextUpdater;
@@ -51,7 +50,7 @@ public abstract class OrderedQueueEntryL
     private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
 
 
-    public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator)
+    public OrderedBaseQueueEntryList(T queue, HeadCreator headCreator)
     {
         _queue = queue;
         _head = headCreator.createHead(this);
@@ -73,7 +72,7 @@ public abstract class OrderedQueueEntryL
     }
 
 
-    public Queue<?> getQueue()
+    public T getQueue()
     {
         return _queue;
     }

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java Fri Oct 21 09:32:07 2016
@@ -34,12 +34,12 @@ public abstract class OrderedQueueEntry
 
     private volatile OrderedQueueEntry _next;
 
-    public OrderedQueueEntry(OrderedQueueEntryList queueEntryList)
+    public OrderedQueueEntry(OrderedBaseQueueEntryList queueEntryList)
     {
         super(queueEntryList);
     }
 
-    public OrderedQueueEntry(OrderedQueueEntryList queueEntryList,
+    public OrderedQueueEntry(OrderedBaseQueueEntryList queueEntryList,
                              ServerMessage message,
                              final MessageEnqueueRecord messageEnqueueRecord)
     {
@@ -63,7 +63,7 @@ public abstract class OrderedQueueEntry
             final OrderedQueueEntry newNext = next.getNextNode();
             if(newNext != null)
             {
-                OrderedQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
+                OrderedBaseQueueEntryList._nextUpdater.compareAndSet(this, next, newNext);
                 next = getNextNode();
             }
             else

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public abstract class OrderedQueueEntryList extends OrderedBaseQueueEntryList<AbstractQueue<?>>
+{
+    public OrderedQueueEntryList(final AbstractQueue<?> queue,
+                                 final HeadCreator headCreator)
+    {
+        super(queue, headCreator);
+    }
+
+    @Override
+    public void onAcquiredByConsumer(final QueueEntry queueEntry, final MessageInstanceConsumer consumer)
+    {
+        getQueue().incrementUnackedMsgCount(queueEntry);
+    }
+
+    @Override
+    public void onNoLongerAcquiredByConsumer(final QueueEntry queueEntry)
+    {
+        getQueue().decrementUnackedMsgCount(queueEntry);
+    }
+
+    @Override
+    public void requeue(final QueueEntry queueEntry)
+    {
+        getQueue().requeue(queueEntry);
+    }
+
+    @Override
+    public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+    {
+        return getQueue().isHeld(queueEntry, evaluationTime);
+    }
+
+    @Override
+    public void dequeue(final QueueEntry queueEntry)
+    {
+        getQueue().dequeue(queueEntry);
+    }
+
+    @Override
+    public int routeToAlternate(final QueueEntry queueEntry, final Action<? super BaseMessageInstance> action, ServerTransaction txn)
+    {
+        return getQueue().routeToAlternate(queueEntry, action, txn);
+    }
+
+    @Override
+    public int getMaximumDeliveryAttempts()
+    {
+        return getQueue().getMaximumDeliveryAttempts();
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Fri Oct 21 09:32:07 2016
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Queue;
 
-public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer<X>
+public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer<X>, AcquiringMessageInstanceConsumer<X, ConsumerTarget>
 {
     void flushBatched();
 

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Oct 21 09:32:07 2016
@@ -46,9 +46,9 @@ import org.apache.qpid.server.logging.Ev
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.ConsumerOption;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
@@ -123,7 +123,7 @@ class QueueConsumerImpl
                       ConsumerTarget target, final String consumerName,
                       final FilterManager filters,
                       final Class<? extends ServerMessage> messageClass,
-                      EnumSet<Option> optionSet,
+                      EnumSet<ConsumerOption> optionSet,
                       final Integer priority)
     {
         super(parentsMap(queue, target.getSessionModel().getModelObject()),
@@ -132,18 +132,16 @@ class QueueConsumerImpl
         _sessionReference = target.getSessionModel().getConnectionReference();
         _consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
         _filters = filters;
-        _acquires = optionSet.contains(Option.ACQUIRES);
-        _seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
-        _isTransient = optionSet.contains(Option.TRANSIENT);
+        _acquires = optionSet.contains(ConsumerOption.ACQUIRES);
+        _seesRequeues = optionSet.contains(ConsumerOption.SEES_REQUEUES);
+        _isTransient = optionSet.contains(ConsumerOption.TRANSIENT);
         _target = target;
         _queue = queue;
 
         // Access control
         authorise(Operation.CREATE);
 
-        open();
 
-        setupLogging();
 
         _listener = new StateChangeListener<ConsumerTarget, ConsumerTarget.State>()
         {
@@ -165,20 +163,29 @@ class QueueConsumerImpl
                 getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
             }
         };
+
+
+    }
+
+    @Override
+    protected void onOpen()
+    {
+        super.onOpen();
+        setupLogging();
     }
 
     private static Map<String, Object> createAttributeMap(String name,
                                                           FilterManager filters,
-                                                          EnumSet<Option> optionSet,
+                                                          EnumSet<ConsumerOption> optionSet,
                                                           Integer priority)
     {
         Map<String,Object> attributes = new HashMap<String, Object>();
         attributes.put(ID, UUID.randomUUID());
         attributes.put(NAME, name);
-        attributes.put(EXCLUSIVE, optionSet.contains(Option.EXCLUSIVE));
-        attributes.put(NO_LOCAL, optionSet.contains(Option.NO_LOCAL));
-        attributes.put(DISTRIBUTION_MODE, optionSet.contains(Option.ACQUIRES) ? "MOVE" : "COPY");
-        attributes.put(DURABLE,optionSet.contains(Option.DURABLE));
+        attributes.put(EXCLUSIVE, optionSet.contains(ConsumerOption.EXCLUSIVE));
+        attributes.put(NO_LOCAL, optionSet.contains(ConsumerOption.NO_LOCAL));
+        attributes.put(DISTRIBUTION_MODE, optionSet.contains(ConsumerOption.ACQUIRES) ? "MOVE" : "COPY");
+        attributes.put(DURABLE,optionSet.contains(ConsumerOption.DURABLE));
         attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
         if(priority != null)
         {
@@ -279,9 +286,9 @@ class QueueConsumerImpl
     }
 
     @Override
-    public MessageSource getMessageSource()
+    public Object getIdentifier()
     {
-        return _queue;
+        return getConsumerNumber();
     }
 
     @Override

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Oct 21 09:32:07 2016
@@ -20,20 +20,20 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.model.Queue;
 
 public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
 {
 
-    Queue<?> getQueue();
+    RecoverableBaseQueue getQueue();
 
     long getSize();
 
     boolean acquireOrSteal(final Runnable delayedAcquisitionTask);
 
-    QueueConsumer getDeliveredConsumer();
+    AcquiringMessageInstanceConsumer<?,?> getDeliveredConsumer();
 
     boolean isQueueDeleted();
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org