You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/03/24 18:55:01 UTC

svn commit: r1736478 - in /qpid/java/trunk: bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-...

Author: kwall
Date: Thu Mar 24 17:55:00 2016
New Revision: 1736478

URL: http://svn.apache.org/viewvc?rev=1736478&view=rev
Log:
QPID-7156: [Java Broker] Stop new connections from being established while virtual host is stopping.

Work by Lorenz Quack and me

Modified:
    qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
    qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java

Modified: qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java (original)
+++ qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java Thu Mar 24 17:55:00 2016
@@ -114,8 +114,10 @@ public class BDBHAVirtualHostNodeRestTes
         assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
 
         // verify that remote nodes for node1 are created and their state is set to ACTIVE
-        waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "ACTIVE");
-        waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "ACTIVE");
+        _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1,
+                                                BDBHARemoteReplicationNode.STATE, "ACTIVE");
+        _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1,
+                                                BDBHARemoteReplicationNode.STATE, "ACTIVE");
 
         mutateDesiredState(node1Url, "STOPPED");
 
@@ -124,8 +126,10 @@ public class BDBHAVirtualHostNodeRestTes
         assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
 
         // verify that remote node state fro node1 is changed to UNAVAILABLE
-        waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
-        waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
+        _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1,
+                                                BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
+        _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1,
+                                                BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
 
         List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2);
         assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size());
@@ -218,7 +222,7 @@ public class BDBHAVirtualHostNodeRestTes
         getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE", HttpServletResponse.SC_OK);
 
         // wait for new master
-        waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER");
+        _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER");
 
         // delete remote node
         getRestTestHelper().submitRequest("remotereplicationnode/" + NODE2 + "/" + NODE1, "DELETE", HttpServletResponse.SC_OK);
@@ -315,8 +319,8 @@ public class BDBHAVirtualHostNodeRestTes
             }
         }
 
-        waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name());
-        waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name());
+        _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name());
+        _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name());
     }
 
     private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception
@@ -325,7 +329,7 @@ public class BDBHAVirtualHostNodeRestTes
 
         getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData, HttpServletResponse.SC_CREATED);
         String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name();
-        waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState);
+        _restTestHelper.waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState);
     }
 
     private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception
@@ -356,7 +360,7 @@ public class BDBHAVirtualHostNodeRestTes
     {
         boolean isMaster = nodeName.equals(masterNode);
         String expectedRole = isMaster? "MASTER" : "REPLICA";
-        waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
+        _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
 
         Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0");
         assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME));
@@ -376,7 +380,7 @@ public class BDBHAVirtualHostNodeRestTes
 
         if (isMaster)
         {
-            waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name());
+            _restTestHelper.waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name());
         }
 
     }
@@ -393,7 +397,7 @@ public class BDBHAVirtualHostNodeRestTes
             for (String remote : remotes)
             {
                 String remoteUrl = "remotereplicationnode/" + clusterNodeName + "/" + remote;
-                Map<String, Object> nodeData = waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
+                Map<String, Object> nodeData = _restTestHelper.waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
                 assertRemoteNodeData(remote, nodeData);
             }
         }

Modified: qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java (original)
+++ qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java Thu Mar 24 17:55:00 2016
@@ -117,7 +117,7 @@ public class BDBHAVirtualHostRestTest ex
 
     public void testSetLocalTransactionSynchronizationPolicy() throws Exception
     {
-        Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+        Map<String, Object> hostAttributes = _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
         assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
 
         Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
@@ -129,7 +129,7 @@ public class BDBHAVirtualHostRestTest ex
 
     public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
     {
-        Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+        Map<String, Object> hostAttributes = _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
         assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
 
         Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
@@ -141,19 +141,19 @@ public class BDBHAVirtualHostRestTest ex
 
     public void testMutateState() throws Exception
     {
-        waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+        _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
         assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
 
         Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
         getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
 
-        waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
+        _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
         assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED");
 
         newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
         getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
 
-        waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+        _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
         assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Thu Mar 24 17:55:00 2016
@@ -22,7 +22,7 @@ package org.apache.qpid.server.model;
 
 import java.util.Collection;
 
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AMQPConnection;
 
 @ManagedObject( creatable = false )
 public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
@@ -122,5 +122,5 @@ public interface Connection<X extends Co
     Collection<Session> getSessions();
 
 
-    AbstractAMQPConnection<?> getUnderlyingConnection();
+    AMQPConnection<?> getUnderlyingConnection();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Thu Mar 24 17:55:00 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.transport
 
 import java.net.SocketAddress;
 import java.security.Principal;
+import java.util.List;
 
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.model.Connection;
@@ -60,7 +61,9 @@ public interface AMQPConnection<C extend
 
     void unblock();
 
-    void setScheduler(NetworkConnectionScheduler networkConnectionScheduler);
+    void pushScheduler(NetworkConnectionScheduler networkConnectionScheduler);
+
+    NetworkConnectionScheduler popScheduler();
 
     boolean hasSessionWithName(byte[] name);
 
@@ -71,4 +74,10 @@ public interface AMQPConnection<C extend
     boolean isIOThread();
 
     boolean isAuthorizedMessagePrincipal(String messageUserId);
+
+    void stopConnection();
+
+    List<? extends AMQSessionModel<?>> getSessionModels();
+
+    void resetStatistics();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Thu Mar 24 17:55:00 2016
@@ -53,7 +53,6 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Session;
@@ -101,6 +100,7 @@ public abstract class AbstractAMQPConnec
     private final SettableFuture<Void> _transportClosedFuture = SettableFuture.create();
     private final SettableFuture<Void> _modelClosedFuture = SettableFuture.create();
     private final AtomicBoolean _modelClosing = new AtomicBoolean();
+    private volatile VirtualHost<?> _virtualHost;
     private volatile long _lastReadTime;
     private volatile long _lastWriteTime;
     private volatile AccessControlContext _accessControllerContext;
@@ -264,6 +264,7 @@ public abstract class AbstractAMQPConnec
         return String.valueOf(_network.getRemoteAddress());
     }
 
+    @Override
     public final void stopConnection()
     {
         _stopped = true;
@@ -299,15 +300,25 @@ public abstract class AbstractAMQPConnec
     }
 
     @Override
-    public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+    public void pushScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
     {
         if(_network instanceof NonBlockingConnection)
         {
-            ((NonBlockingConnection) _network).changeScheduler(networkConnectionScheduler);
+            ((NonBlockingConnection) _network).pushScheduler(networkConnectionScheduler);
         }
     }
 
     @Override
+    public NetworkConnectionScheduler popScheduler()
+    {
+        if(_network instanceof NonBlockingConnection)
+        {
+            return ((NonBlockingConnection) _network).popScheduler();
+        }
+        return null;
+    }
+
+    @Override
     public String getClientProduct()
     {
         return _clientProduct;
@@ -414,6 +425,7 @@ public abstract class AbstractAMQPConnec
         getVirtualHost().registerMessageReceived(messageSize, timestamp);
     }
 
+    @Override
     public final void resetStatistics()
     {
         _messagesDelivered.reset();
@@ -531,9 +543,10 @@ public abstract class AbstractAMQPConnec
     {
     }
 
-    final public void virtualHostAssociated()
+    public final void associateVirtualHost(final VirtualHost<?> virtualHost)
     {
-        getVirtualHost().registerConnection(this);
+        virtualHost.registerConnection(this);
+        _virtualHost = virtualHost;
         updateMaxMessageSize();
         _messageAuthorizationRequired = getVirtualHost().getContextValue(Boolean.class, Broker.BROKER_MSG_AUTH);
     }
@@ -693,8 +706,6 @@ public abstract class AbstractAMQPConnec
         });
     }
 
-    public abstract List<? extends AMQSessionModel<?>> getSessionModels();
-
     @Override
     public int getSessionCount()
     {
@@ -702,7 +713,7 @@ public abstract class AbstractAMQPConnec
     }
 
     @Override
-    public AbstractAMQPConnection<?> getUnderlyingConnection()
+    public AMQPConnection<?> getUnderlyingConnection()
     {
         return this;
     }
@@ -731,6 +742,12 @@ public abstract class AbstractAMQPConnec
         return !_messageAuthorizationRequired || getAuthorizedPrincipal().getName().equals(userId == null? "" : userId);
     }
 
+    @Override
+    public VirtualHost<?> getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
     private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
     {
         private final long _allowedTime;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Thu Mar 24 17:55:00 2016
@@ -64,6 +64,21 @@ public class NetworkConnectionScheduler
                                     });
     }
 
+    @Override
+    public String toString()
+    {
+        return "NetworkConnectionScheduler{" +
+               "_factory=" + _factory +
+               ", _executor=" + _executor +
+               ", _running=" + _running +
+               ", _poolSize=" + _poolSize +
+               ", _threadKeepAliveTimeout=" + _threadKeepAliveTimeout +
+               ", _name='" + _name + '\'' +
+               ", _numberOfSelectors=" + _numberOfSelectors +
+               ", _selectorThread=" + _selectorThread +
+               '}';
+    }
+
     public NetworkConnectionScheduler(String name,
                                       final int numberOfSelectors, int threadPoolSize,
                                       long threadKeepAliveTimeout,

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Mar 24 17:55:00 2016
@@ -26,9 +26,11 @@ import java.nio.channels.SocketChannel;
 import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,7 +54,7 @@ public class NonBlockingConnection imple
 
     private final SocketChannel _socketChannel;
     private NonBlockingConnectionDelegate _delegate;
-    private NetworkConnectionScheduler _scheduler;
+    private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
     private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
 
     private final String _remoteSocketAddress;
@@ -76,6 +78,7 @@ public class NonBlockingConnection imple
     private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
     private final AtomicLong _maxReadIdleMillis = new AtomicLong();
     private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
+    private final AtomicBoolean _hasShutdown = new AtomicBoolean();
 
     public NonBlockingConnection(SocketChannel socketChannel,
                                  ProtocolEngine protocolEngine,
@@ -85,7 +88,7 @@ public class NonBlockingConnection imple
                                  final AmqpPort port)
     {
         _socketChannel = socketChannel;
-        _scheduler = scheduler;
+        pushScheduler(scheduler);
 
         _protocolEngine = protocolEngine;
         _onTransportEncryptionAction = onTransportEncryptionAction;
@@ -102,7 +105,7 @@ public class NonBlockingConnection imple
             @Override
             public void performAction(final ProtocolEngine object)
             {
-                _scheduler.schedule(NonBlockingConnection.this);
+                getScheduler().schedule(NonBlockingConnection.this);
             }
         });
 
@@ -390,6 +393,11 @@ public class NonBlockingConnection imple
 
     private void shutdown()
     {
+        if (_hasShutdown.getAndSet(true))
+        {
+            return;
+        }
+
         try
         {
             shutdownInput();
@@ -581,20 +589,25 @@ public class NonBlockingConnection imple
     {
     }
 
-    public void changeScheduler(NetworkConnectionScheduler scheduler)
+    public final void pushScheduler(NetworkConnectionScheduler scheduler)
     {
-        _scheduler = scheduler;
+        _schedulerDeque.addFirst(scheduler);
     }
 
-    @Override
-    public String toString()
+    public final NetworkConnectionScheduler popScheduler()
     {
-        return "[NonBlockingConnection " + _remoteSocketAddress + "]";
+        return _schedulerDeque.removeFirst();
     }
 
-    public NetworkConnectionScheduler getScheduler()
+    public final NetworkConnectionScheduler getScheduler()
     {
-        return _scheduler;
+        return _schedulerDeque.peekFirst();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "[NonBlockingConnection " + _remoteSocketAddress + "]";
     }
 
     public void processAmqpData(QpidByteBuffer applicationData)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Mar 24 17:55:00 2016
@@ -634,6 +634,10 @@ class SelectorThread extends Thread
 
      public void addToWork(final NonBlockingConnection connection)
      {
+         if (_closed.get())
+         {
+             throw new IllegalStateException("Adding connection work " + connection + " to closed selector thread " + _scheduler);
+         }
          if(connection.setScheduled())
          {
              _workQueue.add(new ConnectionProcessor(_scheduler, connection));

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Mar 24 17:55:00 2016
@@ -69,6 +69,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ConnectionValidator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
@@ -108,6 +109,7 @@ public abstract class AbstractVirtualHos
     private final Set<AMQPConnection<?>> _connections = newSetFromMap(new ConcurrentHashMap<AMQPConnection<?>, Boolean>());
     private final AccessControlContext _housekeepingJobContext;
     private final AccessControlContext _fileSystemSpaceCheckerJobContext;
+    private final AtomicBoolean _acceptsConnections = new AtomicBoolean(false);
 
     private static enum BlockingType { STORE, FILESYSTEM };
 
@@ -955,6 +957,7 @@ public abstract class AbstractVirtualHos
         {
             _logger.debug("Closing connection registry :" + _connections.size() + " connections.");
         }
+        _acceptsConnections.set(false);
         for(Connection conn : _connections)
         {
             conn.getUnderlyingConnection().stopConnection();
@@ -1751,16 +1754,26 @@ public abstract class AbstractVirtualHos
             @Override
             public ListenableFuture<Void> execute()
             {
-                _connections.add(connection);
-
-                if(_blocked.get())
+                if (_acceptsConnections.get())
                 {
-                    connection.block();
-                }
+                    _connections.add(connection);
 
-                connection.setScheduler(_networkConnectionScheduler);
+                    if (_blocked.get())
+                    {
+                        connection.block();
+                    }
 
-                return Futures.immediateFuture(null);
+                    connection.pushScheduler(_networkConnectionScheduler);
+                    return Futures.immediateFuture(null);
+                }
+                else
+                {
+                    final VirtualHostUnavailableException exception =
+                            new VirtualHostUnavailableException(String.format(
+                                    "VirtualHost '%s' not accepting connections",
+                                    getName()));
+                    return Futures.immediateFailedFuture(exception);
+                }
             }
 
             @Override
@@ -1797,6 +1810,7 @@ public abstract class AbstractVirtualHos
             @Override
             public ListenableFuture<Void> execute()
             {
+                connection.popScheduler();
                 _connections.remove(connection);
 
                 return Futures.immediateFuture(null);
@@ -1885,7 +1899,6 @@ public abstract class AbstractVirtualHos
             postCreateDefaultExchangeTasks();
             return Futures.immediateFuture(null);
         }
-
     }
 
     private void postCreateDefaultExchangeTasks()
@@ -1915,6 +1928,7 @@ public abstract class AbstractVirtualHos
         {
             initialiseHouseKeeping(getHousekeepingCheckPeriod());
             finalState = State.ACTIVE;
+            _acceptsConnections.set(true);
         }
         finally
         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java Thu Mar 24 17:55:00 2016
@@ -25,10 +25,15 @@ import org.apache.qpid.server.util.Conne
 
 public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException
 {
+    public VirtualHostUnavailableException(String message)
+    {
+        super(message);
+    }
+
     public VirtualHostUnavailableException(VirtualHost<?> host)
     {
-        super("Virtualhost state "
-              + host.getState()
-              + " prevents the message from being sent");
+        this("Virtualhost state "
+             + host.getState()
+             + " prevents the message from being sent");
     }
 }

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Thu Mar 24 17:55:00 2016
@@ -31,10 +31,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.security.AccessControlException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -43,6 +45,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
@@ -65,6 +68,7 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class VirtualHostTest extends QpidTestCase
@@ -465,6 +469,60 @@ public class VirtualHostTest extends Qpi
         }
     }
 
+    public void testRegisterConnection() throws Exception
+    {
+        VirtualHost<?> vhost = createVirtualHost("sdf");
+        AMQPConnection<?> connection = getMockConnection();
+
+        assertEquals("unexpected number of connections before test", 0, vhost.getConnectionCount());
+        vhost.registerConnection(connection);
+        assertEquals("unexpected number of connections after registerConnection", 1, vhost.getConnectionCount());
+        assertEquals("unexpected connection object", Collections.singleton(connection), vhost.getConnections());
+    }
+
+    public void testStopVirtualhostClosesConnections() throws Exception
+    {
+        VirtualHost<?> vhost = createVirtualHost("sdf");
+        AMQPConnection<?> connection = getMockConnection();
+
+        vhost.registerConnection(connection);
+        assertEquals("unexpected number of connections after registerConnection", 1, vhost.getConnectionCount());
+        assertEquals("unexpected connection object", Collections.singleton(connection), vhost.getConnections());
+        vhost.stop();
+        verify(connection).stopConnection();
+        verify(connection).closeAsync();
+    }
+
+    public void testRegisterConnectionOnStoppedVirtualhost() throws Exception
+    {
+        VirtualHost<?> vhost = createVirtualHost("sdf");
+        AMQPConnection<?> connection = getMockConnection();
+
+        vhost.stop();
+        try
+        {
+            vhost.registerConnection(connection);
+            fail("exception not thrown");
+        }
+        catch (VirtualHostUnavailableException e)
+        {
+            // pass
+        }
+        assertEquals("unexpected number of connections", 0, vhost.getConnectionCount());
+        vhost.start();
+        vhost.registerConnection(connection);
+        assertEquals("unexpected number of connections", 1, vhost.getConnectionCount());
+    }
+
+    private AMQPConnection<?> getMockConnection()
+    {
+        AMQPConnection<?> connection = mock(AMQPConnection.class);
+        final ListenableFuture<Void> listenableFuture = Futures.immediateFuture(null);
+        when(connection.closeAsync()).thenReturn(listenableFuture);
+        when(connection.getUnderlyingConnection()).thenReturn((AMQPConnection) connection);
+        return connection;
+    }
+
     private VirtualHost<?> createVirtualHost(final String virtualHostName)
     {
         return createVirtualHost(virtualHostName, Collections.<String, Object>emptyMap());

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Thu Mar 24 17:55:00 2016
@@ -41,7 +41,6 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.store.StoreException;
@@ -84,8 +83,7 @@ public class AMQPConnection_0_10 extends
     {
         super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
 
-        _connection = new ServerConnection(id, broker, port, transport);
-        _connection.setAmqpConnection(this);
+        _connection = new ServerConnection(id, broker, port, transport, this);
         SocketAddress address = network.getLocalAddress();
         String fqdn = null;
 
@@ -333,11 +331,6 @@ public class AMQPConnection_0_10 extends
         return _connection.getRemoteContainerName();
     }
 
-    public VirtualHost<?> getVirtualHost()
-    {
-        return _connection.getVirtualHost();
-    }
-
     public List<ServerSession> getSessionModels()
     {
         return _connection.getSessionModels();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Thu Mar 24 17:55:00 2016
@@ -73,7 +73,6 @@ public class ServerConnection extends Co
     private Principal _authorizedPrincipal = null;
     private final long _connectionId;
     private final Object _reference = new Object();
-    private volatile VirtualHost<?> _virtualHost;
     private final AmqpPort<?> _port;
     private final AtomicLong _lastIoTime = new AtomicLong();
     private boolean _blocking;
@@ -84,22 +83,22 @@ public class ServerConnection extends Co
 
     private int _messageCompressionThreshold;
 
-    private AMQPConnection_0_10 _amqpConnection;
+    private final AMQPConnection_0_10 _amqpConnection;
     private boolean _ignoreFutureInput;
     private boolean _ignoreAllButConnectionCloseOk;
 
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
                             final AmqpPort<?> port,
-                            final Transport transport)
+                            final Transport transport,
+                            final AMQPConnection_0_10 serverProtocolEngine)
     {
         _connectionId = connectionId;
         _broker = broker;
 
         _port = port;
         _transport = transport;
-
-
+        _amqpConnection = serverProtocolEngine;
     }
 
     public Object getReference()
@@ -124,7 +123,8 @@ public class ServerConnection extends Co
 
     EventLogger getEventLogger()
     {
-        return _virtualHost == null ? _broker.getEventLogger() : _virtualHost.getEventLogger();
+        VirtualHost<?> virtualHost = getVirtualHost();
+        return virtualHost == null ? _broker.getEventLogger() : virtualHost.getEventLogger();
     }
 
     @Override
@@ -135,7 +135,6 @@ public class ServerConnection extends Co
         if (state == State.OPEN)
         {
             _amqpConnection.logConnectionOpen();
-            _amqpConnection.virtualHostAssociated();
         }
 
         if(state == State.CLOSING)
@@ -155,19 +154,14 @@ public class ServerConnection extends Co
         return _amqpConnection;
     }
 
-    public void setAmqpConnection(final AMQPConnection_0_10 serverProtocolEngine)
-    {
-        _amqpConnection = serverProtocolEngine;
-    }
-
     public VirtualHost<?> getVirtualHost()
     {
-        return _virtualHost;
+        return _amqpConnection.getVirtualHost();
     }
 
     public void setVirtualHost(VirtualHost<?> virtualHost)
     {
-        _virtualHost = virtualHost;
+        _amqpConnection.associateVirtualHost(virtualHost);
         _messageCompressionThreshold =
                 virtualHost.getContextValue(Integer.class,
                                             Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
@@ -176,7 +170,7 @@ public class ServerConnection extends Co
         {
             _messageCompressionThreshold = Integer.MAX_VALUE;
         }
-        _amqpConnection.getSubject().getPrincipals().add(_virtualHost.getPrincipal());
+        _amqpConnection.getSubject().getPrincipals().add(virtualHost.getPrincipal());
         _amqpConnection.updateAccessControllerContext();
     }
 
@@ -423,9 +417,10 @@ public class ServerConnection extends Co
         }
         finally
         {
-            if(_virtualHost != null)
+            VirtualHost<?> virtualHost = getVirtualHost();
+            if(virtualHost != null)
             {
-                _virtualHost.deregisterConnection(_amqpConnection);
+                virtualHost.deregisterConnection(_amqpConnection);
             }
             getEventLogger().message(isConnectionLost() ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE());
         }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Thu Mar 24 17:55:00 2016
@@ -51,6 +51,7 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.transport.*;
 import org.apache.qpid.transport.network.NetworkConnection;
 
@@ -249,9 +250,9 @@ public class ServerConnectionDelegate ex
                 return;
             }
 
-            sconn.setVirtualHost(vhost);
             try
             {
+                sconn.setVirtualHost(vhost);
                 if(!vhost.authoriseCreateConnection(sconn.getAmqpConnection()))
                 {
                     sconn.setState(Connection.State.CLOSING);
@@ -259,7 +260,7 @@ public class ServerConnectionDelegate ex
                     return;
                 }
             }
-            catch (AccessControlException e)
+            catch (AccessControlException | VirtualHostUnavailableException e)
             {
                 sconn.setState(Connection.State.CLOSING);
                 sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Thu Mar 24 17:55:00 2016
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Port
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.Binary;
@@ -73,33 +74,6 @@ public class ServerSessionTest extends Q
         }
     }
 
-    public void testCompareTo() throws Exception
-    {
-        final Broker<?> broker = mock(Broker.class);
-        when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
-
-        AmqpPort amqpPort = createMockPort();
-
-        ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
-        final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
-        Subject subject = new Subject();
-        when(protocolEngine.getSubject()).thenReturn(subject);
-        connection.setAmqpConnection(protocolEngine);
-        connection.setVirtualHost(_virtualHost);
-        ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
-                new Binary(getName().getBytes()), 0);
-
-        // create a session with the same name but on a different connection
-        ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
-        connection2.setAmqpConnection(protocolEngine);
-        connection2.setVirtualHost(_virtualHost);
-        ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
-                new Binary(getName().getBytes()), 0);
-
-        assertFalse("Unexpected compare result", session1.compareTo(session2) == 0);
-        assertEquals("Unexpected compare result", 0, session1.compareTo(session1));
-    }
-
     public void testOverlargeMessageTest() throws Exception
     {
         final Broker<?> broker = mock(Broker.class);
@@ -107,12 +81,13 @@ public class ServerSessionTest extends Q
 
         AmqpPort port = createMockPort();
 
-        ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
         final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
+        when(protocolEngine.getVirtualHost()).thenReturn((VirtualHost) _virtualHost);
+        when(protocolEngine.getUnderlyingConnection()).thenReturn((AMQPConnection) protocolEngine);
         Subject subject = new Subject();
         when(protocolEngine.getSubject()).thenReturn(subject);
         when(protocolEngine.getMaxMessageSize()).thenReturn(1024l);
-        connection.setAmqpConnection(protocolEngine);
+        ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP, protocolEngine);
         connection.setVirtualHost(_virtualHost);
         final List<Method> invokedMethods = new ArrayList<>();
         ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Thu Mar 24 17:55:00 2016
@@ -42,7 +42,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
@@ -86,6 +85,7 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.AggregateTicker;
@@ -115,8 +115,6 @@ public class AMQPConnection_0_8
     private final AtomicBoolean _stateChanged = new AtomicBoolean();
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
 
-    private volatile VirtualHost<?> _virtualHost;
-
     private final Object _channelAddRemoveLock = new Object();
     private final Map<Integer, AMQChannel> _channelMap = new ConcurrentHashMap<>();
 
@@ -270,7 +268,7 @@ public class AMQPConnection_0_8
                 }
                 catch (StoreException e)
                 {
-                    if (_virtualHost.getState() == State.ACTIVE)
+                    if (getVirtualHost().getState() == State.ACTIVE)
                     {
                         throw new ServerScopedRuntimeException(e);
                     }
@@ -710,15 +708,9 @@ public class AMQPConnection_0_8
         return getMethodRegistry();
     }
 
-    public VirtualHost<?> getVirtualHost()
-    {
-        return _virtualHost;
-    }
-
     public void setVirtualHost(VirtualHost<?> virtualHost)
     {
-        _virtualHost = virtualHost;
-        virtualHostAssociated();
+        associateVirtualHost(virtualHost);
 
         _messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
                                                                    Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
@@ -789,9 +781,10 @@ public class AMQPConnection_0_8
             {
                 performDeleteTasks();
 
-                if (_virtualHost != null)
+                final VirtualHost<?> virtualHost = getVirtualHost();
+                if (virtualHost != null)
                 {
-                    _virtualHost.deregisterConnection(this);
+                    virtualHost.deregisterConnection(this);
                 }
 
             }
@@ -964,7 +957,8 @@ public class AMQPConnection_0_8
         assertState(ConnectionState.OPEN);
 
         // Protect the broker against out of order frame request.
-        if (_virtualHost == null)
+        final VirtualHost<?> virtualHost = getVirtualHost();
+        if (virtualHost == null)
         {
             sendConnectionClose(AMQConstant.COMMAND_INVALID,
                     "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
@@ -982,9 +976,9 @@ public class AMQPConnection_0_8
         }
         else
         {
-            _logger.debug("Connecting to: {}", _virtualHost.getName());
+            _logger.debug("Connecting to: {}", virtualHost.getName());
 
-            final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore());
+            final AMQChannel channel = new AMQChannel(this, channelId, virtualHost.getMessageStore());
 
             addChannel(channel);
 
@@ -1050,9 +1044,9 @@ public class AMQPConnection_0_8
             }
             else
             {
-                setVirtualHost(virtualHost);
                 try
                 {
+                    setVirtualHost(virtualHost);
 
                     if(virtualHost.authoriseCreateConnection(this))
                     {
@@ -1067,7 +1061,7 @@ public class AMQPConnection_0_8
                         sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Connection refused", 0);
                     }
                 }
-                catch (AccessControlException e)
+                catch (AccessControlException | VirtualHostUnavailableException e)
                 {
                     sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), 0);
                 }
@@ -1396,9 +1390,10 @@ public class AMQPConnection_0_8
     @Override
     public EventLogger getEventLogger()
     {
-        if(_virtualHost != null)
+        final VirtualHost<?> virtualHost = getVirtualHost();
+        if (virtualHost != null)
         {
-            return _virtualHost.getEventLogger();
+            return virtualHost.getEventLogger();
         }
         else
         {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Thu Mar 24 17:55:00 2016
@@ -37,15 +37,12 @@ import javax.security.auth.Subject;
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
@@ -184,7 +181,7 @@ public class AMQChannelTest extends Qpid
                                                         authenticatedUser,
                                                         Collections.<Principal>emptySet(),
                                                         Collections.<Principal>emptySet()));
-        _amqConnection.virtualHostAssociated();
+        _amqConnection.associateVirtualHost(_virtualHost);
 
         int channelId = 1;
         AMQChannel channel = new AMQChannel(_amqConnection, channelId, _virtualHost.getMessageStore());
@@ -214,7 +211,7 @@ public class AMQChannelTest extends Qpid
 
         Set<Principal> authenticatedUser = Collections.<Principal>singleton(new AuthenticatedPrincipal("user"));
         _amqConnection.setAuthorizedSubject(new Subject(true, authenticatedUser, Collections.<Principal>emptySet(),  Collections.<Principal>emptySet()));
-        _amqConnection.virtualHostAssociated();
+        _amqConnection.associateVirtualHost(_virtualHost);
 
         AMQChannel channel = new AMQChannel(_amqConnection, 1, _virtualHost.getMessageStore());
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Thu Mar 24 17:55:00 2016
@@ -59,22 +59,22 @@ import org.apache.qpid.configuration.Com
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
-import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.transport.NonBlockingConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -154,8 +154,6 @@ public class AMQPConnection_1_0 extends
         super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
         _broker = broker;
         _connection = createConnection(broker, network, port, transport, id, useSASL);
-
-        _connection.setAmqpConnection(this);
         _endpoint = _connection.getConnectionEndpoint();
         _endpoint.setConnectionEventListener(_connection);
         _endpoint.setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
@@ -176,12 +174,12 @@ public class AMQPConnection_1_0 extends
 
     }
 
-    public static Connection_1_0 createConnection(final Broker<?> broker,
-                                                  final ServerNetworkConnection network,
-                                                  final AmqpPort<?> port,
-                                                  final Transport transport,
-                                                  final long id,
-                                                  final boolean useSASL)
+    private Connection_1_0 createConnection(final Broker<?> broker,
+                                            final ServerNetworkConnection network,
+                                            final AmqpPort<?> port,
+                                            final Transport transport,
+                                            final long id,
+                                            final boolean useSASL)
     {
         Container container = new Container(broker.getId().toString());
 
@@ -211,7 +209,7 @@ public class AMQPConnection_1_0 extends
         endpoint.setProperties(serverProperties);
 
         endpoint.setRemoteAddress(network.getRemoteAddress());
-        return new Connection_1_0(endpoint, id, port, transport, subjectCreator);
+        return new Connection_1_0(endpoint, id, port, transport, subjectCreator, this);
     }
 
 
@@ -485,7 +483,7 @@ public class AMQPConnection_1_0 extends
 
     void changeScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
     {
-        ((NonBlockingConnection) getNetwork()).changeScheduler(networkConnectionScheduler);
+        ((NonBlockingConnection) getNetwork()).pushScheduler(networkConnectionScheduler);
     }
 
 
@@ -662,11 +660,6 @@ public class AMQPConnection_1_0 extends
         return _connection.getRemoteContainerName();
     }
 
-    public VirtualHost<?> getVirtualHost()
-    {
-        return _connection.getVirtualHost();
-    }
-
     public List<Session_1_0> getSessionModels()
     {
         return _connection.getSessionModels();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Mar 24 17:55:00 2016
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
 
-import java.net.InetAddress;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
@@ -63,9 +62,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionRedirect;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public class Connection_1_0 implements ConnectionEventListener
 {
@@ -73,8 +70,7 @@ public class Connection_1_0 implements C
     private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
     private final AmqpPort<?> _port;
     private final SubjectCreator _subjectCreator;
-    private AMQPConnection_1_0 _amqpConnection;
-    private VirtualHost<?> _vhost;
+    private final AMQPConnection_1_0 _amqpConnection;
     private final Transport _transport;
     private final ConnectionEndpoint _connectionEndpoint;
     private final long _connectionId;
@@ -91,7 +87,7 @@ public class Connection_1_0 implements C
                                         getConnectionId(),
                                         getClientId(),
                                         getRemoteAddressString(),
-                                        _vhost.getName())
+                                        getVirtualHost().getName())
                    + "] ";
 
         }
@@ -108,17 +104,14 @@ public class Connection_1_0 implements C
                           long connectionId,
                           AmqpPort<?> port,
                           Transport transport,
-                          final SubjectCreator subjectCreator)
+                          final SubjectCreator subjectCreator,
+                          final AMQPConnection_1_0 amqpConnection)
     {
         _port = port;
         _transport = transport;
         _connectionEndpoint = connectionEndpoint;
         _connectionId = connectionId;
         _subjectCreator = subjectCreator;
-    }
-
-    void setAmqpConnection(final AMQPConnection_1_0 amqpConnection)
-    {
         _amqpConnection = amqpConnection;
     }
 
@@ -168,13 +161,7 @@ public class Connection_1_0 implements C
             final VirtualHost vhost = ((AmqpPort) _port).getVirtualHost(host);
             if (vhost == null)
             {
-                final Error err = new Error();
-                err.setCondition(AmqpError.NOT_FOUND);
-                err.setDescription("Unknown hostname in connection open: '" + host + "'");
-                _connectionEndpoint.close(err);
-                _amqpConnection.close();
-
-                _closedOnOpen = true;
+                closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + host + "'");
             }
             else
             {
@@ -255,23 +242,34 @@ public class Connection_1_0 implements C
                     _amqpConnection.updateAccessControllerContext();
                     if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject()) == null)
                     {
-                        final Error err = new Error();
-                        err.setCondition(AmqpError.NOT_ALLOWED);
-                        err.setDescription("Connection has not been authenticated");
-                        _connectionEndpoint.close(err);
-                        _amqpConnection.close();
-                        _closedOnOpen = true;
+                        closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
                     }
                     else
                     {
-                        _vhost = vhost;
-                        _amqpConnection.virtualHostAssociated();
+                        try
+                        {
+                            _amqpConnection.associateVirtualHost(vhost);
+                        }
+                        catch (VirtualHostUnavailableException e)
+                        {
+                            closeWithError(AmqpError.NOT_ALLOWED, e.getMessage());
+                        }
                     }
                 }
             }
         }
     }
 
+    private void closeWithError(final AmqpError amqpError, final String errorDescription)
+    {
+        final Error err = new Error();
+        err.setCondition(amqpError);
+        err.setDescription(errorDescription);
+        _connectionEndpoint.close(err);
+        _amqpConnection.close();
+        _closedOnOpen = true;
+    }
+
     void setUserPrincipal(final Principal user)
     {
         Subject authSubject = _subjectCreator.createSubjectWithGroups(user);
@@ -361,9 +359,10 @@ public class Connection_1_0 implements C
         }
         finally
         {
-            if (_vhost != null)
+            VirtualHost<?> virtualHost = getVirtualHost();
+            if (virtualHost != null)
             {
-                _vhost.deregisterConnection(_amqpConnection);
+                virtualHost.deregisterConnection(_amqpConnection);
             }
             getAmqpConnection().getEventLogger().message(ConnectionMessages.CLOSE());
         }
@@ -469,10 +468,9 @@ public class Connection_1_0 implements C
 
     public VirtualHost<?> getVirtualHost()
     {
-        return _vhost;
+        return _amqpConnection.getVirtualHost();
     }
 
-
     public void transportStateChanged()
     {
         for (Session_1_0 session : _sessions)
@@ -557,11 +555,12 @@ public class Connection_1_0 implements C
     @Override
     public String toString()
     {
+        VirtualHost<?> virtualHost = getVirtualHost();
         return "Connection_1_0["
                +  _connectionId
                + " "
                + _amqpConnection.getAddress()
-               + (_vhost == null ? "" : (" vh : " + _vhost.getName()))
+               + (virtualHost == null ? "" : (" vh : " + virtualHost.getName()))
                + ']';
     }
 }

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java Thu Mar 24 17:55:00 2016
@@ -20,12 +20,6 @@
  */
 package org.apache.qpid.systest.rest;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -88,33 +82,4 @@ public class QpidRestTestCase extends Qp
     {
         return _restTestHelper;
     }
-
-    public Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
-    {
-        List<Map<String, Object>> nodeAttributes = getAttributesIgnoringNotFound(url);
-        int timeout = 30000;
-        long limit = System.currentTimeMillis() + timeout;
-        while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
-        {
-            Thread.sleep(100l);
-            nodeAttributes = getAttributesIgnoringNotFound(url);
-        }
-        Map<String, Object> nodeData = nodeAttributes.get(0);
-        assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout "  + timeout + "ms.", newValue, nodeData.get(attributeName));
-        return nodeData;
-    }
-
-    private List<Map<String, Object>> getAttributesIgnoringNotFound(String url) throws IOException
-    {
-        List<Map<String, Object>> nodeAttributes;
-        try
-        {
-            nodeAttributes = getRestTestHelper().getJsonAsList(url);
-        }
-        catch(FileNotFoundException e)
-        {
-            nodeAttributes = Collections.emptyList();
-        }
-        return nodeAttributes;
-    }
 }

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Thu Mar 24 17:55:00 2016
@@ -20,6 +20,7 @@ package org.apache.qpid.systest.rest;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -622,4 +623,35 @@ public class RestTestHelper
             throw new RuntimeException("Unsupported encoding UTF8", e);
         }
     }
+
+    public Map<String, Object> waitForAttributeChanged(String url,
+                                                       String attributeName,
+                                                       Object newValue) throws Exception
+    {
+        List<Map<String, Object>> nodeAttributes = getAttributesIgnoringNotFound(url);
+        int timeout = 30000;
+        long limit = System.currentTimeMillis() + timeout;
+        while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
+        {
+            Thread.sleep(100l);
+            nodeAttributes = getAttributesIgnoringNotFound(url);
+        }
+        Map<String, Object> nodeData = nodeAttributes.get(0);
+        Assert.assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout " + timeout + "ms.", newValue, nodeData.get(attributeName));
+        return nodeData;
+    }
+
+    private List<Map<String, Object>> getAttributesIgnoringNotFound(String url) throws IOException
+    {
+        List<Map<String, Object>> nodeAttributes;
+        try
+        {
+            nodeAttributes = getJsonAsList(url);
+        }
+        catch(FileNotFoundException e)
+        {
+            nodeAttributes = Collections.emptyList();
+        }
+        return nodeAttributes;
+    }
 }

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Thu Mar 24 17:55:00 2016
@@ -206,19 +206,19 @@ public class VirtualHostRestTest extends
     {
         String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
 
-        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+        _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
         assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
 
         Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
         getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
 
-        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+        _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
         assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
 
         newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
         getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
 
-        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+        _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
 
         assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
     }
@@ -229,7 +229,7 @@ public class VirtualHostRestTest extends
         String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
         String restQueueUrl = "queue/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST + "/" + testQueueName;
 
-        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+        _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
         assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
 
         Connection connection = getConnection();
@@ -245,13 +245,13 @@ public class VirtualHostRestTest extends
         Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
         getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
 
-        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+        _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
         assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
 
         newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
         getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
 
-        waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+        _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
 
         assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
 

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java Thu Mar 24 17:55:00 2016
@@ -20,8 +20,12 @@
  */
 package org.apache.qpid.test.unit.client.connection;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.ExceptionListener;
@@ -32,14 +36,20 @@ import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.naming.NamingException;
 
 import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.AMQConnectionFailureException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.BasicMessageConsumer;
 import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.systest.rest.Asserts;
+import org.apache.qpid.systest.rest.RestTestHelper;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
 import org.apache.qpid.transport.ConnectionException;
 
 /**
@@ -58,6 +68,7 @@ public class BrokerClosesClientConnectio
     @Override
     protected void setUp() throws Exception
     {
+        getDefaultBrokerConfiguration().addHttpManagementConfiguration();
         super.setUp();
 
         _connection = getConnection();
@@ -86,6 +97,67 @@ public class BrokerClosesClientConnectio
         ensureCanCloseWithoutException();
     }
 
+    public void testClientCloseOnVirtualHostStop() throws Exception
+    {
+        final String virtualHostName = TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST;
+        RestTestHelper restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+
+        final CountDownLatch connectionCreatorStarted = new CountDownLatch(1);
+        final AtomicBoolean shutdown = new AtomicBoolean(false);
+        final AtomicReference<Exception> clientException = new AtomicReference<>();
+        Thread connectionCreator = new Thread(new Runnable(){
+
+            @Override
+            public void run()
+            {
+                while (!shutdown.get())
+                {
+                    try
+                    {
+                        getConnection();
+                    }
+                    catch (Exception e)
+                    {
+                        clientException.set(e);
+                        shutdown.set(true);
+                    }
+                    connectionCreatorStarted.countDown();
+                }
+            }
+        });
+
+        try
+        {
+            connectionCreator.start();
+            assertTrue("connection creation thread did not start in time", connectionCreatorStarted.await(20, TimeUnit.SECONDS));
+
+            String restHostUrl = "virtualhost/" + virtualHostName + "/" + virtualHostName;
+            restTestHelper.submitRequest(restHostUrl, "PUT", Collections.singletonMap("desiredState", (Object) "STOPPED"), 200);
+            restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+
+            int connectionCount = 0;
+            for (int i = 0; i < 20; ++i)
+            {
+                Map<String, Object> portObject = restTestHelper.getJsonAsSingletonList("port/" + TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT);
+                Map<String, Object> portStatistics = (Map<String, Object>) portObject.get("statistics");
+                connectionCount = (int) portStatistics.get("connectionCount");
+                if (connectionCount == 0)
+                {
+                    break;
+                }
+                Thread.sleep(250);
+            }
+            assertEquals("unexpected number of connections after virtual host stopped", 0, connectionCount);
+
+            assertConnectionCloseWasReported(clientException.get(), AMQConnectionFailureException.class);
+        }
+        finally
+        {
+            shutdown.set(true);
+            connectionCreator.join(10000);
+        }
+    }
+
     public void testClientCloseOnBrokerKill() throws Exception
     {
         final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQDisconnectedException.class;
@@ -118,12 +190,11 @@ public class BrokerClosesClientConnectio
         }
     }
 
-    private void assertConnectionCloseWasReported(JMSException exception, Class<? extends Exception> linkedExceptionClass)
+    private void assertConnectionCloseWasReported(Exception exception, Class<? extends Exception> linkedExceptionClass)
     {
-        assertNotNull("Broker shutdown should be reported to the client via the ExceptionListener", exception);
-        assertNotNull("JMSXException should have linked exception", exception.getLinkedException());
-
-        assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass());
+        assertNotNull("Did not receive exception", exception);
+        assertNotNull("Exception should have a cause", exception.getCause());
+        assertEquals("Unexpected exception cause", linkedExceptionClass, exception.getCause().getClass());
     }
 
     private void assertJmsObjectsClosed()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org