You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/21 09:32:09 UTC

svn commit: r1765973 [3/7] - in /qpid/java/branches/transfer-queue: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/ bdbstore/src/test/java/org/apache/qpid/server/stor...

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Oct 21 09:32:07 2016
@@ -30,15 +30,16 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
+import org.apache.qpid.server.message.BaseMessageInstance;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.LocalTransaction;
@@ -55,7 +56,7 @@ public abstract class QueueEntryImpl imp
 
     private final MessageReference _message;
 
-    private Set<Long> _rejectedBy = null;
+    private Set<Object> _rejectedBy = null;
 
     private static final EntryState HELD_STATE = new EntryState()
     {
@@ -177,7 +178,7 @@ public abstract class QueueEntryImpl imp
         return _entryId;
     }
 
-    public Queue<?> getQueue()
+    public RecoverableBaseQueue getQueue()
     {
         return _queueEntryList.getQueue();
     }
@@ -263,7 +264,7 @@ public abstract class QueueEntryImpl imp
         boolean acquired = acquire();
         if(!acquired)
         {
-            QueueConsumer consumer = getDeliveredConsumer();
+            AcquiringMessageInstanceConsumer<?,?> consumer = getDeliveredConsumer();
             acquired = removeAcquisitionFromConsumer(consumer);
             if(acquired)
             {
@@ -304,19 +305,19 @@ public abstract class QueueEntryImpl imp
         return acquired;
     }
 
-    public boolean acquire(ConsumerImpl sub)
+    public boolean acquire(MessageInstanceConsumer sub)
     {
-        final boolean acquired = acquire(((QueueConsumer<?>) sub).getOwningState().getUnstealableState());
+        final boolean acquired = acquire(((AcquiringMessageInstanceConsumer<?,?>) sub).getOwningState().getUnstealableState());
         if(acquired)
         {
             _deliveryCountUpdater.compareAndSet(this,-1,0);
-            getQueue().incrementUnackedMsgCount(this);
+            _queueEntryList.onAcquiredByConsumer(this, sub);
         }
         return acquired;
     }
 
     @Override
-    public boolean makeAcquisitionUnstealable(final ConsumerImpl consumer)
+    public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
     {
         EntryState state = _state;
         if(state instanceof StealableConsumerAcquiredState
@@ -357,9 +358,9 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public ConsumerImpl getAcquiringConsumer()
+    public MessageInstanceConsumer getAcquiringConsumer()
     {
-        ConsumerImpl consumer;
+        AcquiringMessageInstanceConsumer<?,?> consumer;
         EntryState state = _state;
         if (state instanceof ConsumerAcquiredState)
         {
@@ -373,14 +374,14 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public boolean isAcquiredBy(ConsumerImpl consumer)
+    public boolean isAcquiredBy(MessageInstanceConsumer consumer)
     {
         EntryState state = _state;
         return (state instanceof ConsumerAcquiredState && ((ConsumerAcquiredState)state).getConsumer() == consumer);
     }
 
     @Override
-    public boolean removeAcquisitionFromConsumer(ConsumerImpl consumer)
+    public boolean removeAcquisitionFromConsumer(MessageInstanceConsumer consumer)
     {
         EntryState state = _state;
         if(state instanceof StealableConsumerAcquiredState
@@ -411,7 +412,7 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public void release(ConsumerImpl consumer)
+    public void release(MessageInstanceConsumer consumer)
     {
         EntryState state = _state;
         if(isAcquiredBy(consumer) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
@@ -424,12 +425,12 @@ public abstract class QueueEntryImpl imp
     {
         if (previousState instanceof ConsumerAcquiredState)
         {
-            getQueue().decrementUnackedMsgCount(this);
+            _queueEntryList.onNoLongerAcquiredByConsumer(this);
         }
 
         if(!getQueue().isDeleted())
         {
-            getQueue().requeue(this);
+            _queueEntryList.requeue(this);
             if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
             {
                 notifyStateChange(previousState, AVAILABLE_STATE);
@@ -448,7 +449,7 @@ public abstract class QueueEntryImpl imp
         EntryState state;
         while((state = _state).getState() == State.AVAILABLE)
         {
-            boolean isHeld = getQueue().isHeld(this, evaluationTime);
+            boolean isHeld = _queueEntryList.isHeld(this, evaluationTime);
             if(state == AVAILABLE_STATE && isHeld)
             {
                 if(!_stateUpdater.compareAndSet(this, state, HELD_STATE))
@@ -475,20 +476,20 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
-    public QueueConsumer getDeliveredConsumer()
+    public AcquiringMessageInstanceConsumer getDeliveredConsumer()
     {
-        return (QueueConsumer) getAcquiringConsumer();
+        return (AcquiringMessageInstanceConsumer) getAcquiringConsumer();
     }
 
     public void reject()
     {
-        QueueConsumer consumer = getDeliveredConsumer();
+        AcquiringMessageInstanceConsumer<?,?> consumer = getDeliveredConsumer();
 
         if (consumer != null)
         {
             if (_rejectedBy == null)
             {
-                _rejectedBy = new HashSet<Long>();
+                _rejectedBy = new HashSet<>();
             }
 
             _rejectedBy.add(consumer.getConsumerNumber());
@@ -499,12 +500,12 @@ public abstract class QueueEntryImpl imp
         }
     }
 
-    public boolean isRejectedBy(ConsumerImpl consumer)
+    public boolean isRejectedBy(MessageInstanceConsumer consumer)
     {
 
         if (_rejectedBy != null) // We have consumers that rejected this message
         {
-            return _rejectedBy.contains(consumer.getConsumerNumber());
+            return _rejectedBy.contains(consumer.getIdentifier());
         }
         else // This message hasn't been rejected yet.
         {
@@ -525,10 +526,10 @@ public abstract class QueueEntryImpl imp
         {
             if (state instanceof ConsumerAcquiredState)
             {
-                getQueue().decrementUnackedMsgCount(this);
+                _queueEntryList.onNoLongerAcquiredByConsumer(this);
             }
 
-            getQueue().dequeue(this);
+            _queueEntryList.dequeue(this);
             if(_stateChangeListeners != null)
             {
                 notifyStateChange(state, DEQUEUED_STATE);
@@ -582,55 +583,10 @@ public abstract class QueueEntryImpl imp
         }
     }
 
-    public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
+    public int routeToAlternate(final Action<? super BaseMessageInstance> action, ServerTransaction txn)
     {
-        if (!isAcquired())
-        {
-            throw new IllegalStateException("Illegal queue entry state. " + this + " is not acquired.");
-        }
-
-        final Queue<?> currentQueue = getQueue();
-        Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
-        boolean autocommit =  txn == null;
-        int enqueues;
-
-        if(autocommit)
-        {
-            txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
-        }
-
-        if (alternateExchange != null)
-        {
-            enqueues = alternateExchange.send(getMessage(),
-                                              getMessage().getInitialRoutingAddress(),
-                                              getInstanceProperties(),
-                                              txn,
-                                              action);
-        }
-        else
-        {
-            enqueues = 0;
-        }
+        return _queueEntryList.routeToAlternate(this, action, txn);
 
-        txn.dequeue(getEnqueueRecord(), new ServerTransaction.Action()
-        {
-            public void postCommit()
-            {
-                delete();
-            }
-
-            public void onRollback()
-            {
-
-            }
-        });
-
-        if(autocommit)
-        {
-            txn.commit();
-        }
-
-        return enqueues;
     }
 
     public boolean isQueueDeleted()
@@ -692,7 +648,7 @@ public abstract class QueueEntryImpl imp
     @Override
     public int getMaximumDeliveryCount()
     {
-        return getQueue().getMaximumDeliveryAttempts();
+        return _queueEntryList.getMaximumDeliveryAttempts();
     }
 
     public void incrementDeliveryCount()
@@ -723,7 +679,7 @@ public abstract class QueueEntryImpl imp
     @Override
     public boolean resend()
     {
-        QueueConsumer sub = getDeliveredConsumer();
+        AcquiringMessageInstanceConsumer<?,?> sub = getDeliveredConsumer();
         if(sub != null)
         {
             return sub.resend(this);
@@ -734,7 +690,7 @@ public abstract class QueueEntryImpl imp
     @Override
     public TransactionLogResource getOwningResource()
     {
-        return getQueue();
+        return _queueEntryList.getQueue();
     }
 
     public void setRedelivered()

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -20,13 +20,16 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 
 public interface QueueEntryList
 {
-    Queue<?> getQueue();
+    RecoverableBaseQueue getQueue();
 
     QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord);
 
@@ -44,4 +47,17 @@ public interface QueueEntryList
     
     int getPriorities();
 
+    void onAcquiredByConsumer(QueueEntry queueEntry, final MessageInstanceConsumer consumer);
+
+    void onNoLongerAcquiredByConsumer(QueueEntry queueEntry);
+
+    void requeue(QueueEntry queueEntry);
+
+    boolean isHeld(QueueEntry queueEntry, long evaluationTime);
+
+    void dequeue(QueueEntry queueEntry);
+
+    int routeToAlternate(QueueEntry queueEntry, Action<? super BaseMessageInstance> action, ServerTransaction txn);
+
+    int getMaximumDeliveryAttempts();
 }

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+
+public interface RecoverableBaseQueue extends BaseQueue
+{
+    void recover(ServerMessage<?> message, MessageEnqueueRecord record);
+
+    void completeRecovery();
+
+    VirtualHost<?> getVirtualHost();
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/RecoverableBaseQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -20,9 +20,13 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.SortedQueueEntry.Colour;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 
 /**
  * A sorted implementation of QueueEntryList.
@@ -52,6 +56,50 @@ public class SortedQueueEntryList implem
         return _queue;
     }
 
+
+    @Override
+    public void onAcquiredByConsumer(final QueueEntry queueEntry, final MessageInstanceConsumer consumer)
+    {
+        getQueue().incrementUnackedMsgCount(queueEntry);
+    }
+
+    @Override
+    public void onNoLongerAcquiredByConsumer(final QueueEntry queueEntry)
+    {
+        getQueue().decrementUnackedMsgCount(queueEntry);
+    }
+
+    @Override
+    public void requeue(final QueueEntry queueEntry)
+    {
+        getQueue().requeue(queueEntry);
+    }
+
+    @Override
+    public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+    {
+        return getQueue().isHeld(queueEntry, evaluationTime);
+    }
+
+    @Override
+    public void dequeue(final QueueEntry queueEntry)
+    {
+        getQueue().dequeue(queueEntry);
+    }
+
+    @Override
+    public int routeToAlternate(final QueueEntry queueEntry, final Action<? super BaseMessageInstance> action, ServerTransaction txn)
+    {
+        return getQueue().routeToAlternate(queueEntry, action, txn);
+    }
+
+    @Override
+    public int getMaximumDeliveryAttempts()
+    {
+        return getQueue().getMaximumDeliveryAttempts();
+    }
+
+
     public SortedQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
     {
         synchronized(_lock)

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AbstractKeyStore.java Fri Oct 21 09:32:07 2016
@@ -37,9 +37,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.KeyStoreMessages;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.IntegrityViolationException;
 import org.apache.qpid.server.model.KeyStore;
@@ -115,7 +115,7 @@ public abstract class AbstractKeyStore<X
         else
         {
             final int frequency = checkFrequency;
-            getBroker().addChangeListener(new ConfigurationChangeListener()
+            getBroker().addChangeListener(new AbstractConfigurationChangeListener()
             {
                 @Override
                 public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
@@ -135,38 +135,6 @@ public abstract class AbstractKeyStore<X
                     }
                 }
 
-                @Override
-                public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
-                {
-
-                }
-
-                @Override
-                public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
-                {
-
-                }
-
-                @Override
-                public void attributeSet(final ConfiguredObject<?> object,
-                                         final String attributeName,
-                                         final Object oldAttributeValue,
-                                         final Object newAttributeValue)
-                {
-
-                }
-
-                @Override
-                public void bulkChangeStart(final ConfiguredObject<?> object)
-                {
-
-                }
-
-                @Override
-                public void bulkChangeEnd(final ConfiguredObject<?> object)
-                {
-
-                }
             });
         }
     }

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java Fri Oct 21 09:32:07 2016
@@ -22,7 +22,7 @@ import java.util.Map;
 
 import javax.security.auth.Subject;
 
-import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.PermissionedObject;
 import org.apache.qpid.server.security.access.Operation;
 
 public interface AccessControl<T extends SecurityToken>
@@ -34,9 +34,9 @@ public interface AccessControl<T extends
 
     T newToken(Subject subject);
 
-    Result authorise(T token, Operation operation, ConfiguredObject<?> configuredObject);
+    Result authorise(T token, Operation operation, PermissionedObject configuredObject);
 
-    Result authorise(T token, Operation operation, ConfiguredObject<?> configuredObject, Map<String,Object> arguments);
+    Result authorise(T token, Operation operation, PermissionedObject configuredObject, Map<String,Object> arguments);
 
     final class FixedResultAccessControl implements AccessControl<SecurityToken>
     {
@@ -68,7 +68,7 @@ public interface AccessControl<T extends
         @Override
         public Result authorise(final SecurityToken token,
                                 final Operation operation,
-                                final ConfiguredObject<?> configuredObject)
+                                final PermissionedObject configuredObject)
         {
             return _result;
         }
@@ -76,7 +76,7 @@ public interface AccessControl<T extends
         @Override
         public Result authorise(final SecurityToken token,
                                 final Operation operation,
-                                final ConfiguredObject<?> configuredObject,
+                                final PermissionedObject configuredObject,
                                 final Map<String, Object> arguments)
         {
             return _result;

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/CompoundAccessControl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/CompoundAccessControl.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/CompoundAccessControl.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/CompoundAccessControl.java Fri Oct 21 09:32:07 2016
@@ -29,7 +29,7 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
-import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.PermissionedObject;
 import org.apache.qpid.server.security.access.Operation;
 
 public class CompoundAccessControl implements AccessControl<CompoundSecurityToken>
@@ -79,7 +79,7 @@ public class CompoundAccessControl imple
     @Override
     public Result authorise(final CompoundSecurityToken token,
                             final Operation operation,
-                            final ConfiguredObject<?> configuredObject)
+                            final PermissionedObject configuredObject)
     {
         return authorise(token, operation, configuredObject, Collections.<String,Object>emptyMap());
     }
@@ -87,14 +87,14 @@ public class CompoundAccessControl imple
     @Override
     public Result authorise(final CompoundSecurityToken token,
                             final Operation operation,
-                            final ConfiguredObject<?> configuredObject,
+                            final PermissionedObject configuredObject,
                             final Map<String, Object> arguments)
     {
         List<AccessControl<?>> underlying = _underlyingControls.get();
         Map<AccessControl<?>, SecurityToken> compoundToken = token == null ? null : token.getCompoundToken(underlying);
         for(AccessControl control : underlying)
         {
-            SecurityToken underlyingToken = compoundToken == null ? null : compoundToken.get(underlying);
+            SecurityToken underlyingToken = compoundToken == null ? null : compoundToken.get(control);
             final Result result = control.authorise(underlyingToken, operation, configuredObject, arguments);
             if(result.isFinal())
             {

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Fri Oct 21 09:32:07 2016
@@ -35,21 +35,20 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ConsumerOption;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.message.internal.InternalMessageHeader;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.TrustStore;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.virtualhost.AbstractSystemMessageSource;
 
-public class TrustStoreMessageSource extends AbstractSystemMessageSource implements MessageSource
+public class TrustStoreMessageSource extends AbstractSystemMessageSource
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(TrustStoreMessageSource.class);
 
@@ -63,7 +62,7 @@ public class TrustStoreMessageSource ext
         super(getSourceNameFromTrustStore(trustStore), virtualHost);
         _virtualHost = virtualHost;
         _trustStore = trustStore;
-        _trustStore.addChangeListener(new ConfigurationChangeListener()
+        _trustStore.addChangeListener(new AbstractConfigurationChangeListener()
         {
             @Override
             public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
@@ -75,18 +74,6 @@ public class TrustStoreMessageSource ext
             }
 
             @Override
-            public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
-            {
-
-            }
-
-            @Override
-            public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
-            {
-
-            }
-
-            @Override
             public void attributeSet(final ConfiguredObject<?> object,
                                      final String attributeName,
                                      final Object oldAttributeValue,
@@ -95,17 +82,7 @@ public class TrustStoreMessageSource ext
                 updateCertCache();
             }
 
-            @Override
-            public void bulkChangeStart(final ConfiguredObject<?> object)
-            {
-
-            }
 
-            @Override
-            public void bulkChangeEnd(final ConfiguredObject<?> object)
-            {
-
-            }
         });
         if(_trustStore.getState() == State.ACTIVE)
         {
@@ -114,15 +91,16 @@ public class TrustStoreMessageSource ext
     }
 
     @Override
-    public Consumer addConsumer(final ConsumerTarget target,
-                                final FilterManager filters,
-                                final Class<? extends ServerMessage> messageClass,
-                                final String consumerName,
-                                final EnumSet<ConsumerImpl.Option> options, final Integer priority)
+    public SystemMessageSourceConsumer addConsumer(final ConsumerTarget target,
+                                                   final FilterManager filters,
+                                                   final Class<? extends ServerMessage> messageClass,
+                                                   final String consumerName,
+                                                   final EnumSet<ConsumerOption> options, final Integer priority)
             throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused
     {
-        final Consumer consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
+        final SystemMessageSourceConsumer
+                consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
         consumer.send(createMessage());
         target.queueEmpty();
         return consumer;
@@ -141,7 +119,7 @@ public class TrustStoreMessageSource ext
     {
         InternalMessage message = createMessage();
 
-        for(Consumer c : new ArrayList<>(getConsumers()))
+        for(SystemMessageSourceConsumer c : new ArrayList<>(getConsumers()))
         {
             c.send(message);
         }

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSourceCreator.java Fri Oct 21 09:32:07 2016
@@ -22,8 +22,8 @@ package org.apache.qpid.server.security;
 
 import java.util.Collection;
 
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.TrustStore;
@@ -58,12 +58,8 @@ public class TrustStoreMessageSourceCrea
             updateTrustStoreSourceRegistration(registry, trustStore);
             trustStore.addChangeListener(trustStoreChangeListener);
         }
-        broker.addChangeListener(new ConfigurationChangeListener()
+        broker.addChangeListener(new AbstractConfigurationChangeListener()
         {
-            @Override
-            public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
-            {
-            }
 
             @Override
             public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
@@ -90,26 +86,6 @@ public class TrustStoreMessageSourceCrea
                 }
             }
 
-            @Override
-            public void attributeSet(final ConfiguredObject<?> object,
-                                     final String attributeName,
-                                     final Object oldAttributeValue,
-                                     final Object newAttributeValue)
-            {
-
-            }
-
-            @Override
-            public void bulkChangeStart(final ConfiguredObject<?> object)
-            {
-
-            }
-
-            @Override
-            public void bulkChangeEnd(final ConfiguredObject<?> object)
-            {
-
-            }
         });
     }
 
@@ -141,7 +117,7 @@ public class TrustStoreMessageSourceCrea
         }
     }
 
-    private class TrustStoreChangeListener implements ConfigurationChangeListener
+    private class TrustStoreChangeListener extends AbstractConfigurationChangeListener
     {
 
         private final SystemNodeRegistry _registry;
@@ -159,19 +135,6 @@ public class TrustStoreMessageSourceCrea
             updateTrustStoreSourceRegistration(_registry, (TrustStore<?>)object);
         }
 
-
-        @Override
-        public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
-        {
-
-        }
-
-        @Override
-        public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
-        {
-
-        }
-
         @Override
         public void attributeSet(final ConfiguredObject<?> object,
                                  final String attributeName,
@@ -181,16 +144,5 @@ public class TrustStoreMessageSourceCrea
             updateTrustStoreSourceRegistration(_registry, (TrustStore<?>)object);
         }
 
-        @Override
-        public void bulkChangeStart(final ConfiguredObject<?> object)
-        {
-
-        }
-
-        @Override
-        public void bulkChangeEnd(final ConfiguredObject<?> object)
-        {
-
-        }
     }
 }

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Fri Oct 21 09:32:07 2016
@@ -36,13 +36,12 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
 import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Binding;
-import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
@@ -627,13 +626,8 @@ public class VirtualHostStoreUpgraderAnd
                 }
             });
         }
-        _virtualHostNode.addChangeListener(new ConfigurationChangeListener()
+        _virtualHostNode.addChangeListener(new AbstractConfigurationChangeListener()
         {
-            @Override
-            public void stateChanged(final ConfiguredObject<?> object, final State oldState, final State newState)
-            {
-
-            }
 
             @Override
             public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child)
@@ -665,26 +659,6 @@ public class VirtualHostStoreUpgraderAnd
                 }
             }
 
-            @Override
-            public void attributeSet(final ConfiguredObject<?> object,
-                                     final String attributeName,
-                                     final Object oldAttributeValue,
-                                     final Object newAttributeValue)
-            {
-
-            }
-
-            @Override
-            public void bulkChangeStart(final ConfiguredObject<?> object)
-            {
-
-            }
-
-            @Override
-            public void bulkChangeEnd(final ConfiguredObject<?> object)
-            {
-
-            }
         });
         if(isNew)
         {

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
+
+public class OutboundTransferDestination implements MessageDestination
+{
+    private final VirtualHost<?> _virtualHost;
+    private final String _address;
+
+    public OutboundTransferDestination(final VirtualHost<?> virtualHost, final String address)
+    {
+        _virtualHost = virtualHost;
+        _address = address;
+    }
+
+    @Override
+    public String getName()
+    {
+        return "$transfer";
+    }
+
+    @Override
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+                                                                                 final String routingAddress,
+                                                                                 final InstanceProperties instanceProperties,
+                                                                                 final ServerTransaction txn,
+                                                                                 final Action<? super BaseMessageInstance> postEnqueueAction)
+    {
+        if (_virtualHost.getState() != State.ACTIVE)
+        {
+            throw new VirtualHostUnavailableException(this._virtualHost);
+        }
+
+
+        final TransferQueue transferQueue = _virtualHost.getTransferQueue();
+        txn.enqueue(transferQueue, message, new ServerTransaction.EnqueueAction()
+        {
+            MessageReference _reference = message.newReference();
+
+            public void postCommit(MessageEnqueueRecord... records)
+            {
+                try
+                {
+                    for (final MessageEnqueueRecord record : records)
+                    {
+                        transferQueue.enqueue(message, postEnqueueAction, record);
+                    }
+                }
+                finally
+                {
+                    _reference.release();
+                }
+            }
+
+            public void onRollback()
+            {
+                _reference.release();
+            }
+        });
+        return 1;
+
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/OutboundTransferDestination.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/QueueContext.java (from r1755476, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/QueueContext.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/QueueContext.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java&r1=1755476&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/QueueContext.java Fri Oct 21 09:32:07 2016
@@ -19,10 +19,12 @@
  *
  */
 
-package org.apache.qpid.server.queue;
+package org.apache.qpid.server.transfer;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import org.apache.qpid.server.queue.QueueEntry;
+
 final class QueueContext
 {
     private volatile QueueEntry _lastSeenEntry;
@@ -31,11 +33,11 @@ final class QueueContext
     static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
             _lastSeenUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+                (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
     static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
             _releasedUpdater =
         AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntry.class, "_releasedEntry");
+                (QueueContext.class, QueueEntry.class, "_releasedEntry");
 
     public QueueContext(QueueEntry head)
     {

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.queue.RecoverableBaseQueue;
+
+public interface TransferQueue extends RecoverableBaseQueue, MessageDestination
+{
+    TransferQueueConsumer addConsumer(TransferTarget target,
+                                      String consumerName);
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,213 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.regex.Pattern;
+
+import org.apache.qpid.server.message.AcquiringMessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.queue.QueueEntry;
+
+public class TransferQueueConsumer implements AcquiringMessageInstanceConsumer<TransferQueueConsumer, TransferTarget>
+{
+    private final TransferQueueImpl _transferQueue;
+    private final TransferTarget _target;
+    private final String _name;
+
+    private final ConcurrentLinkedQueue<TransferQueueEntry> _entries = new ConcurrentLinkedQueue<>();
+    private final Pattern _matchPattern;
+    private volatile QueueContext _queueContext;
+
+
+    private final MessageInstance.StealableConsumerAcquiredState<TransferQueueConsumer>
+            _owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
+    private final Object _identifier = new Object();
+
+
+    TransferQueueConsumer(final TransferQueueImpl transferQueue,
+                          final TransferTarget target,
+                          final String consumerName)
+    {
+        _transferQueue = transferQueue;
+        _target = target;
+        _name = consumerName;
+
+        Collection<String> globalAddressDomains = _target.getGlobalAddressDomains();
+        StringBuilder matchPattern = new StringBuilder();
+        boolean isFirst = true;
+        for(String domain : globalAddressDomains)
+        {
+            if(isFirst)
+            {
+                isFirst = false;
+            }
+            else
+            {
+                matchPattern.append('|');
+            }
+            matchPattern.append('(');
+            matchPattern.append(Pattern.quote(domain.endsWith("/") ? domain : (domain + "/")));
+            matchPattern.append(".*)");
+        }
+        _matchPattern = Pattern.compile(matchPattern.toString());
+    }
+
+    boolean hasInterest(final QueueEntry entry)
+    {
+        String initialRoutingAddress = entry.getMessage().getInitialRoutingAddress();
+        boolean matches = _matchPattern.matcher(initialRoutingAddress).matches();
+        return matches;
+    }
+
+    public boolean processPending()
+    {
+        if(!isSuspended())
+        {
+            TransferQueueEntry entry = _transferQueue.getNextAvailableEntry(this);
+            if(entry != null && !wouldSuspend(entry))
+            {
+                if (!entry.acquire(this))
+                {
+                    // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+                    // to acquire the entry for this consumer
+                    restoreCredit(entry);
+                }
+                else
+                {
+                    _transferQueue.setLastSeenEntry(this, entry);
+
+                    send(entry);
+                    return true;
+                }
+            }
+
+        }
+        return false;
+    }
+
+    void setQueueContext(final QueueContext queueContext)
+    {
+        _queueContext = queueContext;
+    }
+
+    QueueContext getQueueContext()
+    {
+        return _queueContext;
+    }
+
+    @Override
+    public MessageInstance.StealableConsumerAcquiredState<TransferQueueConsumer> getOwningState()
+    {
+        return _owningState;
+    }
+
+    public boolean isSuspended()
+    {
+        return _target.isSuspended();
+    }
+
+    boolean wouldSuspend(final TransferQueueEntry entry)
+    {
+        return _target.wouldSuspend(entry);
+    }
+
+    public TransferTarget getTarget()
+    {
+        return _target;
+    }
+
+    @Override
+    public void acquisitionRemoved(final QueueEntry queueEntry)
+    {
+
+    }
+
+    @Override
+    public long getConsumerNumber()
+    {
+        return 0;
+    }
+
+    @Override
+    public boolean resend(final QueueEntry queueEntry)
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isClosed()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean acquires()
+    {
+        return true;
+    }
+
+    @Override
+    public String getName()
+    {
+        return "$transfer";
+    }
+
+    @Override
+    public void close()
+    {
+
+    }
+
+    @Override
+    public void flush()
+    {
+
+    }
+
+    @Override
+    public void externalStateChange()
+    {
+
+    }
+
+    @Override
+    public Object getIdentifier()
+    {
+        return _identifier;
+    }
+
+    public void send(final TransferQueueEntry entry)
+    {
+        _target.send(entry);
+    }
+
+    public void restoreCredit(final TransferQueueEntry entry)
+    {
+        _target.restoreCredit(entry.getMessage());
+    }
+
+    public void notifyWork()
+    {
+        _target.notifyWork();
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.OrderedQueueEntry;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+
+public class TransferQueueEntry extends OrderedQueueEntry
+{
+
+    protected TransferQueueEntry(final TransferQueueEntryList queueEntryList)
+    {
+        super(queueEntryList);
+    }
+
+    public TransferQueueEntry(final TransferQueueEntryList queueEntryList,
+                              final ServerMessage message,
+                              final MessageEnqueueRecord messageEnqueueRecord)
+    {
+        super(queueEntryList, message, messageEnqueueRecord);
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.OrderedBaseQueueEntryList;
+import org.apache.qpid.server.queue.OrderedQueueEntry;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryList;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+
+public class TransferQueueEntryList extends OrderedBaseQueueEntryList<TransferQueue>
+{
+
+    private static final HeadCreator HEAD_CREATOR = new HeadCreator()
+    {
+        @Override
+        public TransferQueueEntry createHead(final QueueEntryList list)
+        {
+            return new TransferQueueEntry((TransferQueueEntryList) list);
+        }
+    };
+
+    public TransferQueueEntryList(final TransferQueue queue)
+    {
+        super(queue, HEAD_CREATOR);
+    }
+
+
+    @Override
+    protected OrderedQueueEntry createQueueEntry(final ServerMessage<?> message,
+                                                 final MessageEnqueueRecord enqueueRecord)
+    {
+        return new TransferQueueEntry(this, message, enqueueRecord);
+    }
+
+    @Override
+    public void onAcquiredByConsumer(final QueueEntry queueEntry, final MessageInstanceConsumer consumer)
+    {
+
+    }
+
+    @Override
+    public void onNoLongerAcquiredByConsumer(final QueueEntry queueEntry)
+    {
+
+    }
+
+    @Override
+    public void requeue(final QueueEntry queueEntry)
+    {
+
+    }
+
+    @Override
+    public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+    {
+        return false;
+    }
+
+    @Override
+    public void dequeue(final QueueEntry queueEntry)
+    {
+
+    }
+
+    @Override
+    public int routeToAlternate(final QueueEntry queueEntry,
+                                final Action<? super BaseMessageInstance> action,
+                                final ServerTransaction txn)
+    {
+        return 0;
+    }
+
+    @Override
+    public int getMaximumDeliveryAttempts()
+    {
+        return 0;
+    }
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueEntryList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java?rev=1765973&view=auto
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java (added)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java Fri Oct 21 09:32:07 2016
@@ -0,0 +1,413 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transfer;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.message.BaseMessageInstance;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
+
+public class TransferQueueImpl implements TransferQueue
+{
+    private static final UUID TRANSFER_QUEUE_ID = UUID.nameUUIDFromBytes("$transfer".getBytes(StandardCharsets.UTF_8));
+
+    private static final int RECOVERING = 1;
+    private static final int COMPLETING_RECOVERY = 2;
+    private static final int RECOVERED = 3;
+
+    private final AtomicInteger _recovering = new AtomicInteger(RECOVERING);
+    private final AtomicInteger _enqueuingWhileRecovering = new AtomicInteger(0);
+
+    private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
+
+    private final VirtualHost<?> _virtualHost;
+
+    private final TransferQueueEntryList _queueEntryList;
+    private Collection<TransferQueueConsumer> _consumers = new CopyOnWriteArrayList<>();
+
+
+    public TransferQueueImpl(final VirtualHost<?> virtualHost)
+    {
+        _virtualHost = virtualHost;
+        _queueEntryList = new TransferQueueEntryList(this);
+    }
+
+    @Override
+    public void enqueue(final ServerMessage message,
+                        final Action<? super BaseMessageInstance> action,
+                        final MessageEnqueueRecord enqueueRecord)
+    {
+        if(_recovering.get() != RECOVERED)
+        {
+            _enqueuingWhileRecovering.incrementAndGet();
+
+            boolean addedToRecoveryQueue;
+            try
+            {
+                if(addedToRecoveryQueue = (_recovering.get() == RECOVERING))
+                {
+                    _postRecoveryQueue.add(new EnqueueRequest(message, action, enqueueRecord));
+                }
+            }
+            finally
+            {
+                _enqueuingWhileRecovering.decrementAndGet();
+            }
+
+            if(!addedToRecoveryQueue)
+            {
+                while(_recovering.get() != RECOVERED)
+                {
+                    Thread.yield();
+                }
+                doEnqueue(message, action, enqueueRecord);
+            }
+        }
+        else
+        {
+            doEnqueue(message, action, enqueueRecord);
+        }
+
+
+    }
+
+    @Override
+    public void recover(final ServerMessage<?> message, final MessageEnqueueRecord enqueueRecord)
+    {
+        doEnqueue(message, null, enqueueRecord);
+    }
+
+    @Override
+    public final void completeRecovery()
+    {
+        if(_recovering.compareAndSet(RECOVERING, COMPLETING_RECOVERY))
+        {
+            while(_enqueuingWhileRecovering.get() != 0)
+            {
+                Thread.yield();
+            }
+
+            // at this point we can assert that any new enqueue to the queue will not try to put into the post recovery
+            // queue (because the state is no longer RECOVERING, but also no threads are currently trying to enqueue
+            // because the _enqueuingWhileRecovering count is 0.
+
+            enqueueFromPostRecoveryQueue();
+
+            _recovering.set(RECOVERED);
+
+        }
+    }
+
+    @Override
+    public VirtualHost<?> getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    private void enqueueFromPostRecoveryQueue()
+    {
+        while(!_postRecoveryQueue.isEmpty())
+        {
+            EnqueueRequest request = _postRecoveryQueue.poll();
+            MessageReference<?> messageReference = request.getMessage();
+            doEnqueue(messageReference.getMessage(), request.getAction(), request.getEnqueueRecord());
+            messageReference.release();
+        }
+    }
+
+
+
+    protected void doEnqueue(final ServerMessage message, final Action<? super BaseMessageInstance> action, MessageEnqueueRecord enqueueRecord)
+    {
+        final TransferQueueEntry entry = (TransferQueueEntry) _queueEntryList.add(message, enqueueRecord);
+        for (TransferQueueConsumer consumer : getConsumers())
+        {
+            if (consumer.hasInterest(entry))
+            {
+                consumer.notifyWork();
+            }
+        }
+    }
+
+    private Collection<TransferQueueConsumer> getConsumers()
+    {
+        return _consumers;
+    }
+
+    @Override
+    public TransferQueueConsumer addConsumer(final TransferTarget target,
+                                             final String consumerName)
+    {
+
+
+
+        TransferQueueConsumer consumer = new TransferQueueConsumer(this,
+                                                                   target,
+                                                                   consumerName);
+
+        QueueContext queueContext = new QueueContext(_queueEntryList.getHead());
+        consumer.setQueueContext(queueContext);
+        _consumers.add(consumer);
+        consumer.notifyWork();
+
+
+  /*      consumer.setStateListener(this);
+        QueueContext queueContext;
+        if(filters == null || !filters.startAtTail())
+        {
+            queueContext = new QueueContext(getEntries().getHead());
+        }
+        else
+        {
+            queueContext = new QueueContext(getEntries().getTail());
+        }
+        consumer.setQueueContext(queueContext);
+
+        if (!isDeleted())
+        {
+            _consumerList.add(consumer);
+
+            if (isDeleted())
+            {
+                consumer.queueDeleted();
+            }
+        }
+        else
+        {
+            // TODO
+        }
+
+        consumer.addChangeListener(_deletedChildListener);
+
+        deliverAsync();
+*/
+        return consumer;
+    }
+
+
+    void setLastSeenEntry(final TransferQueueConsumer sub, final TransferQueueEntry entry)
+    {
+        QueueContext subContext = sub.getQueueContext();
+        if (subContext != null)
+        {
+            QueueEntry releasedEntry = subContext.getReleasedEntry();
+
+            QueueContext._lastSeenUpdater.set(subContext, entry);
+            if(releasedEntry == entry)
+            {
+                QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+            }
+        }
+    }
+
+    TransferQueueEntry getNextAvailableEntry(final TransferQueueConsumer sub)
+    {
+        QueueContext context = sub.getQueueContext();
+        if(context != null)
+        {
+            QueueEntry lastSeen = context.getLastSeenEntry();
+            QueueEntry releasedNode = context.getReleasedEntry();
+
+            TransferQueueEntry node =
+                    (TransferQueueEntry) ((releasedNode != null && lastSeen.compareTo(releasedNode) >= 0)
+                                        ? releasedNode : _queueEntryList.next(lastSeen));
+
+            boolean expired = false;
+            while (node != null
+                   && (!node.isAvailable()
+                       || (expired = node.expired())
+                       || !sub.hasInterest(node)))
+            {
+                if (expired)
+                {
+                    expired = false;
+                    if (node.acquire())
+                    {
+                        dequeueEntry(node);
+                    }
+                }
+
+                if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
+                {
+                    QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
+                }
+
+                lastSeen = context.getLastSeenEntry();
+                releasedNode = context.getReleasedEntry();
+                node = (TransferQueueEntry) ((releasedNode != null && lastSeen.compareTo(releasedNode) >= 0)
+                                        ? releasedNode
+                                        : _queueEntryList.next(lastSeen));
+            }
+            return node;
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    private void dequeueEntry(final QueueEntry node)
+    {
+        ServerTransaction txn = new AutoCommitTransaction(_virtualHost.getMessageStore());
+        dequeueEntry(node, txn);
+    }
+
+    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+    {
+        txn.dequeue(node.getEnqueueRecord(),
+                    new ServerTransaction.Action()
+                    {
+
+                        public void postCommit()
+                        {
+                            node.delete();
+                        }
+
+                        public void onRollback()
+                        {
+
+                        }
+                    });
+    }
+
+    @Override
+    public boolean isDurable()
+    {
+        return true;
+    }
+
+    @Override
+    public boolean isDeleted()
+    {
+        return false;
+    }
+
+    @Override
+    public String getName()
+    {
+        return "$transfer";
+    }
+
+    @Override
+    public <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
+                                                                                 final String routingAddress,
+                                                                                 final InstanceProperties instanceProperties,
+                                                                                 final ServerTransaction txn,
+                                                                                 final Action<? super BaseMessageInstance> postEnqueueAction)
+    {
+        if (_virtualHost.getState() != State.ACTIVE)
+        {
+            throw new VirtualHostUnavailableException(this._virtualHost);
+        }
+
+        if(!message.isReferenced(this))
+        {
+            txn.enqueue(this, message, new ServerTransaction.EnqueueAction()
+            {
+                MessageReference _reference = message.newReference();
+
+                public void postCommit(MessageEnqueueRecord... records)
+                {
+                    try
+                    {
+                        TransferQueueImpl.this.enqueue(message, postEnqueueAction, records[0]);
+                    }
+                    finally
+                    {
+                        _reference.release();
+                    }
+                }
+
+                public void onRollback()
+                {
+                    _reference.release();
+                }
+            });
+            return 1;
+        }
+        else
+        {
+            return 0;
+        }
+
+    }
+
+    @Override
+    public UUID getId()
+    {
+        return TRANSFER_QUEUE_ID;
+    }
+
+    @Override
+    public MessageDurability getMessageDurability()
+    {
+        return MessageDurability.DEFAULT;
+    }
+
+    private static class EnqueueRequest
+    {
+        private final MessageReference<?> _message;
+        private final Action<? super BaseMessageInstance> _action;
+        private final MessageEnqueueRecord _enqueueRecord;
+
+        public EnqueueRequest(final ServerMessage message,
+                              final Action<? super BaseMessageInstance> action,
+                              final MessageEnqueueRecord enqueueRecord)
+        {
+            _enqueueRecord = enqueueRecord;
+            _message = message.newReference();
+            _action = action;
+        }
+
+        public MessageReference<?> getMessage()
+        {
+            return _message;
+        }
+
+        public Action<? super BaseMessageInstance> getAction()
+        {
+            return _action;
+        }
+
+        public MessageEnqueueRecord getEnqueueRecord()
+        {
+            return _enqueueRecord;
+        }
+    }
+
+}

Propchange: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferQueueImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferTarget.java (from r1755476, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java)
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferTarget.java?p2=qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferTarget.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java&r1=1755476&r2=1765973&rev=1765973&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transfer/TransferTarget.java Fri Oct 21 09:32:07 2016
@@ -18,74 +18,24 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.consumer;
+package org.apache.qpid.server.transfer;
+
+import java.util.Collection;
 
-import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.util.StateChangeListener;
 
-public interface ConsumerTarget
+public interface TransferTarget
 {
 
+    void notifyWork();
 
-    void acquisitionRemoved(MessageInstance node);
-
-    void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
-
-    boolean processPending();
-
-    boolean hasPendingWork();
-
-    String getTargetAddress();
-
-    boolean hasCredit();
-
-    enum State
-    {
-        ACTIVE, SUSPENDED, CLOSED
-    }
-
-    State getState();
-
-    void consumerAdded(ConsumerImpl sub);
-
-    void consumerRemoved(ConsumerImpl sub);
-
-    void notifyCurrentState();
-
-    void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
-
-    long getUnacknowledgedBytes();
+    Collection<String> getGlobalAddressDomains();
 
-    long getUnacknowledgedMessages();
+    void send(TransferQueueEntry entry);
 
-    AMQSessionModel getSessionModel();
+    void restoreCredit(ServerMessage message);
 
-    long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
-
-    boolean hasMessagesToSend();
-
-    void sendNextMessage();
-
-    void flushBatched();
-
-    void queueDeleted();
-
-    void queueEmpty();
-
-    boolean allocateCredit(ServerMessage msg);
-
-    void restoreCredit(ServerMessage queueEntry);
+    boolean wouldSuspend(TransferQueueEntry entry);
 
     boolean isSuspended();
-
-    boolean close();
-
-    boolean trySendLock();
-
-    void getSendLock();
-
-    void releaseSendLock();
-
 }

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Fri Oct 21 09:32:07 2016
@@ -331,18 +331,18 @@ public abstract class AbstractAMQPConnec
     @Override
     public void pushScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
     {
-        if(_network instanceof NonBlockingConnection)
+        if(_network instanceof NonBlockingInboundConnection)
         {
-            ((NonBlockingConnection) _network).pushScheduler(networkConnectionScheduler);
+            ((NonBlockingInboundConnection) _network).pushScheduler(networkConnectionScheduler);
         }
     }
 
     @Override
     public NetworkConnectionScheduler popScheduler()
     {
-        if(_network instanceof NonBlockingConnection)
+        if(_network instanceof NonBlockingInboundConnection)
         {
-            return ((NonBlockingConnection) _network).popScheduler();
+            return ((NonBlockingInboundConnection) _network).popScheduler();
         }
         return null;
     }

Modified: qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1765973&r1=1765972&r2=1765973&view=diff
==============================================================================
--- qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/branches/transfer-queue/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Fri Oct 21 09:32:07 2016
@@ -113,7 +113,7 @@ public class NetworkConnectionScheduler
         }
     }
 
-    void processConnection(final NonBlockingConnection connection)
+    void processConnection(final SchedulableConnection connection)
     {
         Thread.currentThread().setName(connection.getThreadName());
         connection.doPreWork();
@@ -200,12 +200,12 @@ public class NetworkConnectionScheduler
         _selectorThread.cancelAcceptingSocket(serverSocket);
     }
 
-    public void addConnection(final NonBlockingConnection connection)
+    public void addConnection(final SchedulableConnection connection)
     {
         _selectorThread.addConnection(connection);
     }
 
-    public void removeConnection(final NonBlockingConnection connection)
+    public void removeConnection(final SchedulableConnection connection)
     {
         _selectorThread.removeConnection(connection);
     }
@@ -215,7 +215,7 @@ public class NetworkConnectionScheduler
         return _poolSize;
     }
 
-    public void schedule(final NonBlockingConnection connection)
+    public void schedule(final SchedulableConnection connection)
     {
         _selectorThread.addToWork(connection);
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org