You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/03/24 18:55:01 UTC
svn commit: r1736478 - in /qpid/java/trunk:
bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/transport/ broker-...
Author: kwall
Date: Thu Mar 24 17:55:00 2016
New Revision: 1736478
URL: http://svn.apache.org/viewvc?rev=1736478&view=rev
Log:
QPID-7156: [Java Broker] Stop new connections from being established while virtual host is stopping.
Work by Lorenz Quack and me
Modified:
qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
Modified: qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java (original)
+++ qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java Thu Mar 24 17:55:00 2016
@@ -114,8 +114,10 @@ public class BDBHAVirtualHostNodeRestTes
assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
// verify that remote nodes for node1 are created and their state is set to ACTIVE
- waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "ACTIVE");
- waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1,
+ BDBHARemoteReplicationNode.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1,
+ BDBHARemoteReplicationNode.STATE, "ACTIVE");
mutateDesiredState(node1Url, "STOPPED");
@@ -124,8 +126,10 @@ public class BDBHAVirtualHostNodeRestTes
assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
// verify that remote node state fro node1 is changed to UNAVAILABLE
- waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
- waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
+ _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1,
+ BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
+ _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1,
+ BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2);
assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size());
@@ -218,7 +222,7 @@ public class BDBHAVirtualHostNodeRestTes
getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE", HttpServletResponse.SC_OK);
// wait for new master
- waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER");
+ _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER");
// delete remote node
getRestTestHelper().submitRequest("remotereplicationnode/" + NODE2 + "/" + NODE1, "DELETE", HttpServletResponse.SC_OK);
@@ -315,8 +319,8 @@ public class BDBHAVirtualHostNodeRestTes
}
}
- waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name());
- waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name());
+ _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name());
+ _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name());
}
private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception
@@ -325,7 +329,7 @@ public class BDBHAVirtualHostNodeRestTes
getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData, HttpServletResponse.SC_CREATED);
String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name();
- waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState);
+ _restTestHelper.waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState);
}
private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception
@@ -356,7 +360,7 @@ public class BDBHAVirtualHostNodeRestTes
{
boolean isMaster = nodeName.equals(masterNode);
String expectedRole = isMaster? "MASTER" : "REPLICA";
- waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
+ _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0");
assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME));
@@ -376,7 +380,7 @@ public class BDBHAVirtualHostNodeRestTes
if (isMaster)
{
- waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name());
+ _restTestHelper.waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name());
}
}
@@ -393,7 +397,7 @@ public class BDBHAVirtualHostNodeRestTes
for (String remote : remotes)
{
String remoteUrl = "remotereplicationnode/" + clusterNodeName + "/" + remote;
- Map<String, Object> nodeData = waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
+ Map<String, Object> nodeData = _restTestHelper.waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
assertRemoteNodeData(remote, nodeData);
}
}
Modified: qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java (original)
+++ qpid/java/trunk/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java Thu Mar 24 17:55:00 2016
@@ -117,7 +117,7 @@ public class BDBHAVirtualHostRestTest ex
public void testSetLocalTransactionSynchronizationPolicy() throws Exception
{
- Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ Map<String, Object> hostAttributes = _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
@@ -129,7 +129,7 @@ public class BDBHAVirtualHostRestTest ex
public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
{
- Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ Map<String, Object> hostAttributes = _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
@@ -141,19 +141,19 @@ public class BDBHAVirtualHostRestTest ex
public void testMutateState() throws Exception
{
- waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
+ _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED");
newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Thu Mar 24 17:55:00 2016
@@ -22,7 +22,7 @@ package org.apache.qpid.server.model;
import java.util.Collection;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AMQPConnection;
@ManagedObject( creatable = false )
public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
@@ -122,5 +122,5 @@ public interface Connection<X extends Co
Collection<Session> getSessions();
- AbstractAMQPConnection<?> getUnderlyingConnection();
+ AMQPConnection<?> getUnderlyingConnection();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Thu Mar 24 17:55:00 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.transport
import java.net.SocketAddress;
import java.security.Principal;
+import java.util.List;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Connection;
@@ -60,7 +61,9 @@ public interface AMQPConnection<C extend
void unblock();
- void setScheduler(NetworkConnectionScheduler networkConnectionScheduler);
+ void pushScheduler(NetworkConnectionScheduler networkConnectionScheduler);
+
+ NetworkConnectionScheduler popScheduler();
boolean hasSessionWithName(byte[] name);
@@ -71,4 +74,10 @@ public interface AMQPConnection<C extend
boolean isIOThread();
boolean isAuthorizedMessagePrincipal(String messageUserId);
+
+ void stopConnection();
+
+ List<? extends AMQSessionModel<?>> getSessionModels();
+
+ void resetStatistics();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Thu Mar 24 17:55:00 2016
@@ -53,7 +53,6 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Session;
@@ -101,6 +100,7 @@ public abstract class AbstractAMQPConnec
private final SettableFuture<Void> _transportClosedFuture = SettableFuture.create();
private final SettableFuture<Void> _modelClosedFuture = SettableFuture.create();
private final AtomicBoolean _modelClosing = new AtomicBoolean();
+ private volatile VirtualHost<?> _virtualHost;
private volatile long _lastReadTime;
private volatile long _lastWriteTime;
private volatile AccessControlContext _accessControllerContext;
@@ -264,6 +264,7 @@ public abstract class AbstractAMQPConnec
return String.valueOf(_network.getRemoteAddress());
}
+ @Override
public final void stopConnection()
{
_stopped = true;
@@ -299,15 +300,25 @@ public abstract class AbstractAMQPConnec
}
@Override
- public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ public void pushScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
{
if(_network instanceof NonBlockingConnection)
{
- ((NonBlockingConnection) _network).changeScheduler(networkConnectionScheduler);
+ ((NonBlockingConnection) _network).pushScheduler(networkConnectionScheduler);
}
}
@Override
+ public NetworkConnectionScheduler popScheduler()
+ {
+ if(_network instanceof NonBlockingConnection)
+ {
+ return ((NonBlockingConnection) _network).popScheduler();
+ }
+ return null;
+ }
+
+ @Override
public String getClientProduct()
{
return _clientProduct;
@@ -414,6 +425,7 @@ public abstract class AbstractAMQPConnec
getVirtualHost().registerMessageReceived(messageSize, timestamp);
}
+ @Override
public final void resetStatistics()
{
_messagesDelivered.reset();
@@ -531,9 +543,10 @@ public abstract class AbstractAMQPConnec
{
}
- final public void virtualHostAssociated()
+ public final void associateVirtualHost(final VirtualHost<?> virtualHost)
{
- getVirtualHost().registerConnection(this);
+ virtualHost.registerConnection(this);
+ _virtualHost = virtualHost;
updateMaxMessageSize();
_messageAuthorizationRequired = getVirtualHost().getContextValue(Boolean.class, Broker.BROKER_MSG_AUTH);
}
@@ -693,8 +706,6 @@ public abstract class AbstractAMQPConnec
});
}
- public abstract List<? extends AMQSessionModel<?>> getSessionModels();
-
@Override
public int getSessionCount()
{
@@ -702,7 +713,7 @@ public abstract class AbstractAMQPConnec
}
@Override
- public AbstractAMQPConnection<?> getUnderlyingConnection()
+ public AMQPConnection<?> getUnderlyingConnection()
{
return this;
}
@@ -731,6 +742,12 @@ public abstract class AbstractAMQPConnec
return !_messageAuthorizationRequired || getAuthorizedPrincipal().getName().equals(userId == null? "" : userId);
}
+ @Override
+ public VirtualHost<?> getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
{
private final long _allowedTime;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Thu Mar 24 17:55:00 2016
@@ -64,6 +64,21 @@ public class NetworkConnectionScheduler
});
}
+ @Override
+ public String toString()
+ {
+ return "NetworkConnectionScheduler{" +
+ "_factory=" + _factory +
+ ", _executor=" + _executor +
+ ", _running=" + _running +
+ ", _poolSize=" + _poolSize +
+ ", _threadKeepAliveTimeout=" + _threadKeepAliveTimeout +
+ ", _name='" + _name + '\'' +
+ ", _numberOfSelectors=" + _numberOfSelectors +
+ ", _selectorThread=" + _selectorThread +
+ '}';
+ }
+
public NetworkConnectionScheduler(String name,
final int numberOfSelectors, int threadPoolSize,
long threadKeepAliveTimeout,
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Thu Mar 24 17:55:00 2016
@@ -26,9 +26,11 @@ import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
+import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,7 +54,7 @@ public class NonBlockingConnection imple
private final SocketChannel _socketChannel;
private NonBlockingConnectionDelegate _delegate;
- private NetworkConnectionScheduler _scheduler;
+ private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final String _remoteSocketAddress;
@@ -76,6 +78,7 @@ public class NonBlockingConnection imple
private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
private final AtomicLong _maxReadIdleMillis = new AtomicLong();
private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
+ private final AtomicBoolean _hasShutdown = new AtomicBoolean();
public NonBlockingConnection(SocketChannel socketChannel,
ProtocolEngine protocolEngine,
@@ -85,7 +88,7 @@ public class NonBlockingConnection imple
final AmqpPort port)
{
_socketChannel = socketChannel;
- _scheduler = scheduler;
+ pushScheduler(scheduler);
_protocolEngine = protocolEngine;
_onTransportEncryptionAction = onTransportEncryptionAction;
@@ -102,7 +105,7 @@ public class NonBlockingConnection imple
@Override
public void performAction(final ProtocolEngine object)
{
- _scheduler.schedule(NonBlockingConnection.this);
+ getScheduler().schedule(NonBlockingConnection.this);
}
});
@@ -390,6 +393,11 @@ public class NonBlockingConnection imple
private void shutdown()
{
+ if (_hasShutdown.getAndSet(true))
+ {
+ return;
+ }
+
try
{
shutdownInput();
@@ -581,20 +589,25 @@ public class NonBlockingConnection imple
{
}
- public void changeScheduler(NetworkConnectionScheduler scheduler)
+ public final void pushScheduler(NetworkConnectionScheduler scheduler)
{
- _scheduler = scheduler;
+ _schedulerDeque.addFirst(scheduler);
}
- @Override
- public String toString()
+ public final NetworkConnectionScheduler popScheduler()
{
- return "[NonBlockingConnection " + _remoteSocketAddress + "]";
+ return _schedulerDeque.removeFirst();
}
- public NetworkConnectionScheduler getScheduler()
+ public final NetworkConnectionScheduler getScheduler()
{
- return _scheduler;
+ return _schedulerDeque.peekFirst();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[NonBlockingConnection " + _remoteSocketAddress + "]";
}
public void processAmqpData(QpidByteBuffer applicationData)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Thu Mar 24 17:55:00 2016
@@ -634,6 +634,10 @@ class SelectorThread extends Thread
public void addToWork(final NonBlockingConnection connection)
{
+ if (_closed.get())
+ {
+ throw new IllegalStateException("Adding connection work " + connection + " to closed selector thread " + _scheduler);
+ }
if(connection.setScheduled())
{
_workQueue.add(new ConnectionProcessor(_scheduler, connection));
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Thu Mar 24 17:55:00 2016
@@ -69,6 +69,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
@@ -108,6 +109,7 @@ public abstract class AbstractVirtualHos
private final Set<AMQPConnection<?>> _connections = newSetFromMap(new ConcurrentHashMap<AMQPConnection<?>, Boolean>());
private final AccessControlContext _housekeepingJobContext;
private final AccessControlContext _fileSystemSpaceCheckerJobContext;
+ private final AtomicBoolean _acceptsConnections = new AtomicBoolean(false);
private static enum BlockingType { STORE, FILESYSTEM };
@@ -955,6 +957,7 @@ public abstract class AbstractVirtualHos
{
_logger.debug("Closing connection registry :" + _connections.size() + " connections.");
}
+ _acceptsConnections.set(false);
for(Connection conn : _connections)
{
conn.getUnderlyingConnection().stopConnection();
@@ -1751,16 +1754,26 @@ public abstract class AbstractVirtualHos
@Override
public ListenableFuture<Void> execute()
{
- _connections.add(connection);
-
- if(_blocked.get())
+ if (_acceptsConnections.get())
{
- connection.block();
- }
+ _connections.add(connection);
- connection.setScheduler(_networkConnectionScheduler);
+ if (_blocked.get())
+ {
+ connection.block();
+ }
- return Futures.immediateFuture(null);
+ connection.pushScheduler(_networkConnectionScheduler);
+ return Futures.immediateFuture(null);
+ }
+ else
+ {
+ final VirtualHostUnavailableException exception =
+ new VirtualHostUnavailableException(String.format(
+ "VirtualHost '%s' not accepting connections",
+ getName()));
+ return Futures.immediateFailedFuture(exception);
+ }
}
@Override
@@ -1797,6 +1810,7 @@ public abstract class AbstractVirtualHos
@Override
public ListenableFuture<Void> execute()
{
+ connection.popScheduler();
_connections.remove(connection);
return Futures.immediateFuture(null);
@@ -1885,7 +1899,6 @@ public abstract class AbstractVirtualHos
postCreateDefaultExchangeTasks();
return Futures.immediateFuture(null);
}
-
}
private void postCreateDefaultExchangeTasks()
@@ -1915,6 +1928,7 @@ public abstract class AbstractVirtualHos
{
initialiseHouseKeeping(getHousekeepingCheckPeriod());
finalState = State.ACTIVE;
+ _acceptsConnections.set(true);
}
finally
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java Thu Mar 24 17:55:00 2016
@@ -25,10 +25,15 @@ import org.apache.qpid.server.util.Conne
public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException
{
+ public VirtualHostUnavailableException(String message)
+ {
+ super(message);
+ }
+
public VirtualHostUnavailableException(VirtualHost<?> host)
{
- super("Virtualhost state "
- + host.getState()
- + " prevents the message from being sent");
+ this("Virtualhost state "
+ + host.getState()
+ + " prevents the message from being sent");
}
}
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Thu Mar 24 17:55:00 2016
@@ -31,10 +31,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.security.AccessControlException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -43,6 +45,7 @@ import java.util.Map;
import java.util.UUID;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
@@ -65,6 +68,7 @@ import org.apache.qpid.server.transport.
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.test.utils.QpidTestCase;
public class VirtualHostTest extends QpidTestCase
@@ -465,6 +469,60 @@ public class VirtualHostTest extends Qpi
}
}
+ public void testRegisterConnection() throws Exception
+ {
+ VirtualHost<?> vhost = createVirtualHost("sdf");
+ AMQPConnection<?> connection = getMockConnection();
+
+ assertEquals("unexpected number of connections before test", 0, vhost.getConnectionCount());
+ vhost.registerConnection(connection);
+ assertEquals("unexpected number of connections after registerConnection", 1, vhost.getConnectionCount());
+ assertEquals("unexpected connection object", Collections.singleton(connection), vhost.getConnections());
+ }
+
+ public void testStopVirtualhostClosesConnections() throws Exception
+ {
+ VirtualHost<?> vhost = createVirtualHost("sdf");
+ AMQPConnection<?> connection = getMockConnection();
+
+ vhost.registerConnection(connection);
+ assertEquals("unexpected number of connections after registerConnection", 1, vhost.getConnectionCount());
+ assertEquals("unexpected connection object", Collections.singleton(connection), vhost.getConnections());
+ vhost.stop();
+ verify(connection).stopConnection();
+ verify(connection).closeAsync();
+ }
+
+ public void testRegisterConnectionOnStoppedVirtualhost() throws Exception
+ {
+ VirtualHost<?> vhost = createVirtualHost("sdf");
+ AMQPConnection<?> connection = getMockConnection();
+
+ vhost.stop();
+ try
+ {
+ vhost.registerConnection(connection);
+ fail("exception not thrown");
+ }
+ catch (VirtualHostUnavailableException e)
+ {
+ // pass
+ }
+ assertEquals("unexpected number of connections", 0, vhost.getConnectionCount());
+ vhost.start();
+ vhost.registerConnection(connection);
+ assertEquals("unexpected number of connections", 1, vhost.getConnectionCount());
+ }
+
+ private AMQPConnection<?> getMockConnection()
+ {
+ AMQPConnection<?> connection = mock(AMQPConnection.class);
+ final ListenableFuture<Void> listenableFuture = Futures.immediateFuture(null);
+ when(connection.closeAsync()).thenReturn(listenableFuture);
+ when(connection.getUnderlyingConnection()).thenReturn((AMQPConnection) connection);
+ return connection;
+ }
+
private VirtualHost<?> createVirtualHost(final String virtualHostName)
{
return createVirtualHost(virtualHostName, Collections.<String, Object>emptyMap());
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Thu Mar 24 17:55:00 2016
@@ -41,7 +41,6 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.store.StoreException;
@@ -84,8 +83,7 @@ public class AMQPConnection_0_10 extends
{
super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
- _connection = new ServerConnection(id, broker, port, transport);
- _connection.setAmqpConnection(this);
+ _connection = new ServerConnection(id, broker, port, transport, this);
SocketAddress address = network.getLocalAddress();
String fqdn = null;
@@ -333,11 +331,6 @@ public class AMQPConnection_0_10 extends
return _connection.getRemoteContainerName();
}
- public VirtualHost<?> getVirtualHost()
- {
- return _connection.getVirtualHost();
- }
-
public List<ServerSession> getSessionModels()
{
return _connection.getSessionModels();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Thu Mar 24 17:55:00 2016
@@ -73,7 +73,6 @@ public class ServerConnection extends Co
private Principal _authorizedPrincipal = null;
private final long _connectionId;
private final Object _reference = new Object();
- private volatile VirtualHost<?> _virtualHost;
private final AmqpPort<?> _port;
private final AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
@@ -84,22 +83,22 @@ public class ServerConnection extends Co
private int _messageCompressionThreshold;
- private AMQPConnection_0_10 _amqpConnection;
+ private final AMQPConnection_0_10 _amqpConnection;
private boolean _ignoreFutureInput;
private boolean _ignoreAllButConnectionCloseOk;
public ServerConnection(final long connectionId,
Broker<?> broker,
final AmqpPort<?> port,
- final Transport transport)
+ final Transport transport,
+ final AMQPConnection_0_10 serverProtocolEngine)
{
_connectionId = connectionId;
_broker = broker;
_port = port;
_transport = transport;
-
-
+ _amqpConnection = serverProtocolEngine;
}
public Object getReference()
@@ -124,7 +123,8 @@ public class ServerConnection extends Co
EventLogger getEventLogger()
{
- return _virtualHost == null ? _broker.getEventLogger() : _virtualHost.getEventLogger();
+ VirtualHost<?> virtualHost = getVirtualHost();
+ return virtualHost == null ? _broker.getEventLogger() : virtualHost.getEventLogger();
}
@Override
@@ -135,7 +135,6 @@ public class ServerConnection extends Co
if (state == State.OPEN)
{
_amqpConnection.logConnectionOpen();
- _amqpConnection.virtualHostAssociated();
}
if(state == State.CLOSING)
@@ -155,19 +154,14 @@ public class ServerConnection extends Co
return _amqpConnection;
}
- public void setAmqpConnection(final AMQPConnection_0_10 serverProtocolEngine)
- {
- _amqpConnection = serverProtocolEngine;
- }
-
public VirtualHost<?> getVirtualHost()
{
- return _virtualHost;
+ return _amqpConnection.getVirtualHost();
}
public void setVirtualHost(VirtualHost<?> virtualHost)
{
- _virtualHost = virtualHost;
+ _amqpConnection.associateVirtualHost(virtualHost);
_messageCompressionThreshold =
virtualHost.getContextValue(Integer.class,
Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
@@ -176,7 +170,7 @@ public class ServerConnection extends Co
{
_messageCompressionThreshold = Integer.MAX_VALUE;
}
- _amqpConnection.getSubject().getPrincipals().add(_virtualHost.getPrincipal());
+ _amqpConnection.getSubject().getPrincipals().add(virtualHost.getPrincipal());
_amqpConnection.updateAccessControllerContext();
}
@@ -423,9 +417,10 @@ public class ServerConnection extends Co
}
finally
{
- if(_virtualHost != null)
+ VirtualHost<?> virtualHost = getVirtualHost();
+ if(virtualHost != null)
{
- _virtualHost.deregisterConnection(_amqpConnection);
+ virtualHost.deregisterConnection(_amqpConnection);
}
getEventLogger().message(isConnectionLost() ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE());
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Thu Mar 24 17:55:00 2016
@@ -51,6 +51,7 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.*;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -249,9 +250,9 @@ public class ServerConnectionDelegate ex
return;
}
- sconn.setVirtualHost(vhost);
try
{
+ sconn.setVirtualHost(vhost);
if(!vhost.authoriseCreateConnection(sconn.getAmqpConnection()))
{
sconn.setState(Connection.State.CLOSING);
@@ -259,7 +260,7 @@ public class ServerConnectionDelegate ex
return;
}
}
- catch (AccessControlException e)
+ catch (AccessControlException | VirtualHostUnavailableException e)
{
sconn.setState(Connection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Thu Mar 24 17:55:00 2016
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.Port
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
@@ -73,33 +74,6 @@ public class ServerSessionTest extends Q
}
}
- public void testCompareTo() throws Exception
- {
- final Broker<?> broker = mock(Broker.class);
- when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
-
- AmqpPort amqpPort = createMockPort();
-
- ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
- final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
- Subject subject = new Subject();
- when(protocolEngine.getSubject()).thenReturn(subject);
- connection.setAmqpConnection(protocolEngine);
- connection.setVirtualHost(_virtualHost);
- ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
- new Binary(getName().getBytes()), 0);
-
- // create a session with the same name but on a different connection
- ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
- connection2.setAmqpConnection(protocolEngine);
- connection2.setVirtualHost(_virtualHost);
- ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
- new Binary(getName().getBytes()), 0);
-
- assertFalse("Unexpected compare result", session1.compareTo(session2) == 0);
- assertEquals("Unexpected compare result", 0, session1.compareTo(session1));
- }
-
public void testOverlargeMessageTest() throws Exception
{
final Broker<?> broker = mock(Broker.class);
@@ -107,12 +81,13 @@ public class ServerSessionTest extends Q
AmqpPort port = createMockPort();
- ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
+ when(protocolEngine.getVirtualHost()).thenReturn((VirtualHost) _virtualHost);
+ when(protocolEngine.getUnderlyingConnection()).thenReturn((AMQPConnection) protocolEngine);
Subject subject = new Subject();
when(protocolEngine.getSubject()).thenReturn(subject);
when(protocolEngine.getMaxMessageSize()).thenReturn(1024l);
- connection.setAmqpConnection(protocolEngine);
+ ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP, protocolEngine);
connection.setVirtualHost(_virtualHost);
final List<Method> invokedMethods = new ArrayList<>();
ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Thu Mar 24 17:55:00 2016
@@ -42,7 +42,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@@ -86,6 +85,7 @@ import org.apache.qpid.server.transport.
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -115,8 +115,6 @@ public class AMQPConnection_0_8
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
- private volatile VirtualHost<?> _virtualHost;
-
private final Object _channelAddRemoveLock = new Object();
private final Map<Integer, AMQChannel> _channelMap = new ConcurrentHashMap<>();
@@ -270,7 +268,7 @@ public class AMQPConnection_0_8
}
catch (StoreException e)
{
- if (_virtualHost.getState() == State.ACTIVE)
+ if (getVirtualHost().getState() == State.ACTIVE)
{
throw new ServerScopedRuntimeException(e);
}
@@ -710,15 +708,9 @@ public class AMQPConnection_0_8
return getMethodRegistry();
}
- public VirtualHost<?> getVirtualHost()
- {
- return _virtualHost;
- }
-
public void setVirtualHost(VirtualHost<?> virtualHost)
{
- _virtualHost = virtualHost;
- virtualHostAssociated();
+ associateVirtualHost(virtualHost);
_messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
@@ -789,9 +781,10 @@ public class AMQPConnection_0_8
{
performDeleteTasks();
- if (_virtualHost != null)
+ final VirtualHost<?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
{
- _virtualHost.deregisterConnection(this);
+ virtualHost.deregisterConnection(this);
}
}
@@ -964,7 +957,8 @@ public class AMQPConnection_0_8
assertState(ConnectionState.OPEN);
// Protect the broker against out of order frame request.
- if (_virtualHost == null)
+ final VirtualHost<?> virtualHost = getVirtualHost();
+ if (virtualHost == null)
{
sendConnectionClose(AMQConstant.COMMAND_INVALID,
"Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
@@ -982,9 +976,9 @@ public class AMQPConnection_0_8
}
else
{
- _logger.debug("Connecting to: {}", _virtualHost.getName());
+ _logger.debug("Connecting to: {}", virtualHost.getName());
- final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore());
+ final AMQChannel channel = new AMQChannel(this, channelId, virtualHost.getMessageStore());
addChannel(channel);
@@ -1050,9 +1044,9 @@ public class AMQPConnection_0_8
}
else
{
- setVirtualHost(virtualHost);
try
{
+ setVirtualHost(virtualHost);
if(virtualHost.authoriseCreateConnection(this))
{
@@ -1067,7 +1061,7 @@ public class AMQPConnection_0_8
sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Connection refused", 0);
}
}
- catch (AccessControlException e)
+ catch (AccessControlException | VirtualHostUnavailableException e)
{
sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), 0);
}
@@ -1396,9 +1390,10 @@ public class AMQPConnection_0_8
@Override
public EventLogger getEventLogger()
{
- if(_virtualHost != null)
+ final VirtualHost<?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
{
- return _virtualHost.getEventLogger();
+ return virtualHost.getEventLogger();
}
else
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Thu Mar 24 17:55:00 2016
@@ -37,15 +37,12 @@ import javax.security.auth.Subject;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MessagePublishInfo;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
@@ -184,7 +181,7 @@ public class AMQChannelTest extends Qpid
authenticatedUser,
Collections.<Principal>emptySet(),
Collections.<Principal>emptySet()));
- _amqConnection.virtualHostAssociated();
+ _amqConnection.associateVirtualHost(_virtualHost);
int channelId = 1;
AMQChannel channel = new AMQChannel(_amqConnection, channelId, _virtualHost.getMessageStore());
@@ -214,7 +211,7 @@ public class AMQChannelTest extends Qpid
Set<Principal> authenticatedUser = Collections.<Principal>singleton(new AuthenticatedPrincipal("user"));
_amqConnection.setAuthorizedSubject(new Subject(true, authenticatedUser, Collections.<Principal>emptySet(), Collections.<Principal>emptySet()));
- _amqConnection.virtualHostAssociated();
+ _amqConnection.associateVirtualHost(_virtualHost);
AMQChannel channel = new AMQChannel(_amqConnection, 1, _virtualHost.getMessageStore());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Thu Mar 24 17:55:00 2016
@@ -59,22 +59,22 @@ import org.apache.qpid.configuration.Com
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
-import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.transport.NonBlockingConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -154,8 +154,6 @@ public class AMQPConnection_1_0 extends
super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
_broker = broker;
_connection = createConnection(broker, network, port, transport, id, useSASL);
-
- _connection.setAmqpConnection(this);
_endpoint = _connection.getConnectionEndpoint();
_endpoint.setConnectionEventListener(_connection);
_endpoint.setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
@@ -176,12 +174,12 @@ public class AMQPConnection_1_0 extends
}
- public static Connection_1_0 createConnection(final Broker<?> broker,
- final ServerNetworkConnection network,
- final AmqpPort<?> port,
- final Transport transport,
- final long id,
- final boolean useSASL)
+ private Connection_1_0 createConnection(final Broker<?> broker,
+ final ServerNetworkConnection network,
+ final AmqpPort<?> port,
+ final Transport transport,
+ final long id,
+ final boolean useSASL)
{
Container container = new Container(broker.getId().toString());
@@ -211,7 +209,7 @@ public class AMQPConnection_1_0 extends
endpoint.setProperties(serverProperties);
endpoint.setRemoteAddress(network.getRemoteAddress());
- return new Connection_1_0(endpoint, id, port, transport, subjectCreator);
+ return new Connection_1_0(endpoint, id, port, transport, subjectCreator, this);
}
@@ -485,7 +483,7 @@ public class AMQPConnection_1_0 extends
void changeScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
{
- ((NonBlockingConnection) getNetwork()).changeScheduler(networkConnectionScheduler);
+ ((NonBlockingConnection) getNetwork()).pushScheduler(networkConnectionScheduler);
}
@@ -662,11 +660,6 @@ public class AMQPConnection_1_0 extends
return _connection.getRemoteContainerName();
}
- public VirtualHost<?> getVirtualHost()
- {
- return _connection.getVirtualHost();
- }
-
public List<Session_1_0> getSessionModels()
{
return _connection.getSessionModels();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Mar 24 17:55:00 2016
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-import java.net.InetAddress;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -63,9 +62,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionRedirect;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class Connection_1_0 implements ConnectionEventListener
{
@@ -73,8 +70,7 @@ public class Connection_1_0 implements C
private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
private final AmqpPort<?> _port;
private final SubjectCreator _subjectCreator;
- private AMQPConnection_1_0 _amqpConnection;
- private VirtualHost<?> _vhost;
+ private final AMQPConnection_1_0 _amqpConnection;
private final Transport _transport;
private final ConnectionEndpoint _connectionEndpoint;
private final long _connectionId;
@@ -91,7 +87,7 @@ public class Connection_1_0 implements C
getConnectionId(),
getClientId(),
getRemoteAddressString(),
- _vhost.getName())
+ getVirtualHost().getName())
+ "] ";
}
@@ -108,17 +104,14 @@ public class Connection_1_0 implements C
long connectionId,
AmqpPort<?> port,
Transport transport,
- final SubjectCreator subjectCreator)
+ final SubjectCreator subjectCreator,
+ final AMQPConnection_1_0 amqpConnection)
{
_port = port;
_transport = transport;
_connectionEndpoint = connectionEndpoint;
_connectionId = connectionId;
_subjectCreator = subjectCreator;
- }
-
- void setAmqpConnection(final AMQPConnection_1_0 amqpConnection)
- {
_amqpConnection = amqpConnection;
}
@@ -168,13 +161,7 @@ public class Connection_1_0 implements C
final VirtualHost vhost = ((AmqpPort) _port).getVirtualHost(host);
if (vhost == null)
{
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_FOUND);
- err.setDescription("Unknown hostname in connection open: '" + host + "'");
- _connectionEndpoint.close(err);
- _amqpConnection.close();
-
- _closedOnOpen = true;
+ closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + host + "'");
}
else
{
@@ -255,23 +242,34 @@ public class Connection_1_0 implements C
_amqpConnection.updateAccessControllerContext();
if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject()) == null)
{
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_ALLOWED);
- err.setDescription("Connection has not been authenticated");
- _connectionEndpoint.close(err);
- _amqpConnection.close();
- _closedOnOpen = true;
+ closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
}
else
{
- _vhost = vhost;
- _amqpConnection.virtualHostAssociated();
+ try
+ {
+ _amqpConnection.associateVirtualHost(vhost);
+ }
+ catch (VirtualHostUnavailableException e)
+ {
+ closeWithError(AmqpError.NOT_ALLOWED, e.getMessage());
+ }
}
}
}
}
}
+ private void closeWithError(final AmqpError amqpError, final String errorDescription)
+ {
+ final Error err = new Error();
+ err.setCondition(amqpError);
+ err.setDescription(errorDescription);
+ _connectionEndpoint.close(err);
+ _amqpConnection.close();
+ _closedOnOpen = true;
+ }
+
void setUserPrincipal(final Principal user)
{
Subject authSubject = _subjectCreator.createSubjectWithGroups(user);
@@ -361,9 +359,10 @@ public class Connection_1_0 implements C
}
finally
{
- if (_vhost != null)
+ VirtualHost<?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
{
- _vhost.deregisterConnection(_amqpConnection);
+ virtualHost.deregisterConnection(_amqpConnection);
}
getAmqpConnection().getEventLogger().message(ConnectionMessages.CLOSE());
}
@@ -469,10 +468,9 @@ public class Connection_1_0 implements C
public VirtualHost<?> getVirtualHost()
{
- return _vhost;
+ return _amqpConnection.getVirtualHost();
}
-
public void transportStateChanged()
{
for (Session_1_0 session : _sessions)
@@ -557,11 +555,12 @@ public class Connection_1_0 implements C
@Override
public String toString()
{
+ VirtualHost<?> virtualHost = getVirtualHost();
return "Connection_1_0["
+ _connectionId
+ " "
+ _amqpConnection.getAddress()
- + (_vhost == null ? "" : (" vh : " + _vhost.getName()))
+ + (virtualHost == null ? "" : (" vh : " + virtualHost.getName()))
+ ']';
}
}
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java Thu Mar 24 17:55:00 2016
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.systest.rest;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -88,33 +82,4 @@ public class QpidRestTestCase extends Qp
{
return _restTestHelper;
}
-
- public Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
- {
- List<Map<String, Object>> nodeAttributes = getAttributesIgnoringNotFound(url);
- int timeout = 30000;
- long limit = System.currentTimeMillis() + timeout;
- while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
- {
- Thread.sleep(100l);
- nodeAttributes = getAttributesIgnoringNotFound(url);
- }
- Map<String, Object> nodeData = nodeAttributes.get(0);
- assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout " + timeout + "ms.", newValue, nodeData.get(attributeName));
- return nodeData;
- }
-
- private List<Map<String, Object>> getAttributesIgnoringNotFound(String url) throws IOException
- {
- List<Map<String, Object>> nodeAttributes;
- try
- {
- nodeAttributes = getRestTestHelper().getJsonAsList(url);
- }
- catch(FileNotFoundException e)
- {
- nodeAttributes = Collections.emptyList();
- }
- return nodeAttributes;
- }
}
Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java (original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Thu Mar 24 17:55:00 2016
@@ -20,6 +20,7 @@ package org.apache.qpid.systest.rest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -622,4 +623,35 @@ public class RestTestHelper
throw new RuntimeException("Unsupported encoding UTF8", e);
}
}
+
+ public Map<String, Object> waitForAttributeChanged(String url,
+ String attributeName,
+ Object newValue) throws Exception
+ {
+ List<Map<String, Object>> nodeAttributes = getAttributesIgnoringNotFound(url);
+ int timeout = 30000;
+ long limit = System.currentTimeMillis() + timeout;
+ while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
+ {
+ Thread.sleep(100l);
+ nodeAttributes = getAttributesIgnoringNotFound(url);
+ }
+ Map<String, Object> nodeData = nodeAttributes.get(0);
+ Assert.assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout " + timeout + "ms.", newValue, nodeData.get(attributeName));
+ return nodeData;
+ }
+
+ private List<Map<String, Object>> getAttributesIgnoringNotFound(String url) throws IOException
+ {
+ List<Map<String, Object>> nodeAttributes;
+ try
+ {
+ nodeAttributes = getJsonAsList(url);
+ }
+ catch(FileNotFoundException e)
+ {
+ nodeAttributes = Collections.emptyList();
+ }
+ return nodeAttributes;
+ }
}
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Thu Mar 24 17:55:00 2016
@@ -206,19 +206,19 @@ public class VirtualHostRestTest extends
{
String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
}
@@ -229,7 +229,7 @@ public class VirtualHostRestTest extends
String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
String restQueueUrl = "queue/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST + "/" + testQueueName;
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
Connection connection = getConnection();
@@ -245,13 +245,13 @@ public class VirtualHostRestTest extends
Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java?rev=1736478&r1=1736477&r2=1736478&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java Thu Mar 24 17:55:00 2016
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.test.unit.client.connection;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
@@ -32,14 +36,20 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.naming.NamingException;
import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.client.BasicMessageProducer;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.systest.rest.Asserts;
+import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.transport.ConnectionException;
/**
@@ -58,6 +68,7 @@ public class BrokerClosesClientConnectio
@Override
protected void setUp() throws Exception
{
+ getDefaultBrokerConfiguration().addHttpManagementConfiguration();
super.setUp();
_connection = getConnection();
@@ -86,6 +97,67 @@ public class BrokerClosesClientConnectio
ensureCanCloseWithoutException();
}
+ public void testClientCloseOnVirtualHostStop() throws Exception
+ {
+ final String virtualHostName = TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST;
+ RestTestHelper restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+
+ final CountDownLatch connectionCreatorStarted = new CountDownLatch(1);
+ final AtomicBoolean shutdown = new AtomicBoolean(false);
+ final AtomicReference<Exception> clientException = new AtomicReference<>();
+ Thread connectionCreator = new Thread(new Runnable(){
+
+ @Override
+ public void run()
+ {
+ while (!shutdown.get())
+ {
+ try
+ {
+ getConnection();
+ }
+ catch (Exception e)
+ {
+ clientException.set(e);
+ shutdown.set(true);
+ }
+ connectionCreatorStarted.countDown();
+ }
+ }
+ });
+
+ try
+ {
+ connectionCreator.start();
+ assertTrue("connection creation thread did not start in time", connectionCreatorStarted.await(20, TimeUnit.SECONDS));
+
+ String restHostUrl = "virtualhost/" + virtualHostName + "/" + virtualHostName;
+ restTestHelper.submitRequest(restHostUrl, "PUT", Collections.singletonMap("desiredState", (Object) "STOPPED"), 200);
+ restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+
+ int connectionCount = 0;
+ for (int i = 0; i < 20; ++i)
+ {
+ Map<String, Object> portObject = restTestHelper.getJsonAsSingletonList("port/" + TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT);
+ Map<String, Object> portStatistics = (Map<String, Object>) portObject.get("statistics");
+ connectionCount = (int) portStatistics.get("connectionCount");
+ if (connectionCount == 0)
+ {
+ break;
+ }
+ Thread.sleep(250);
+ }
+ assertEquals("unexpected number of connections after virtual host stopped", 0, connectionCount);
+
+ assertConnectionCloseWasReported(clientException.get(), AMQConnectionFailureException.class);
+ }
+ finally
+ {
+ shutdown.set(true);
+ connectionCreator.join(10000);
+ }
+ }
+
public void testClientCloseOnBrokerKill() throws Exception
{
final Class<? extends Exception> expectedLinkedException = isBroker010() ? ConnectionException.class : AMQDisconnectedException.class;
@@ -118,12 +190,11 @@ public class BrokerClosesClientConnectio
}
}
- private void assertConnectionCloseWasReported(JMSException exception, Class<? extends Exception> linkedExceptionClass)
+ private void assertConnectionCloseWasReported(Exception exception, Class<? extends Exception> linkedExceptionClass)
{
- assertNotNull("Broker shutdown should be reported to the client via the ExceptionListener", exception);
- assertNotNull("JMSXException should have linked exception", exception.getLinkedException());
-
- assertEquals("Unexpected linked exception", linkedExceptionClass, exception.getLinkedException().getClass());
+ assertNotNull("Did not receive exception", exception);
+ assertNotNull("Exception should have a cause", exception.getCause());
+ assertEquals("Unexpected exception cause", linkedExceptionClass, exception.getCause().getClass());
}
private void assertJmsObjectsClosed()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org