You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/03/25 14:04:12 UTC
svn commit: r1736566 [1/2] - in /qpid/java/branches/6.0.x: ./
bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/...
Author: kwall
Date: Fri Mar 25 13:04:11 2016
New Revision: 1736566
URL: http://svn.apache.org/viewvc?rev=1736566&view=rev
Log:
QPID-7156: [Java Broker] Stop new connections from being established while virtual host is stopping.
Manually backported from trunk:
svn merge -c 1736478 ^/qpid/java/trunk
Modified:
qpid/java/branches/6.0.x/ (props changed)
qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 25 13:04:11 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478
/qpid/trunk/qpid:796646-796653
Modified: qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java (original)
+++ qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostNodeRestTest.java Fri Mar 25 13:04:11 2016
@@ -114,8 +114,10 @@ public class BDBHAVirtualHostNodeRestTes
assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
// verify that remote nodes for node1 are created and their state is set to ACTIVE
- waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "ACTIVE");
- waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1,
+ BDBHARemoteReplicationNode.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1,
+ BDBHARemoteReplicationNode.STATE, "ACTIVE");
mutateDesiredState(node1Url, "STOPPED");
@@ -124,8 +126,10 @@ public class BDBHAVirtualHostNodeRestTes
assertActualAndDesiredStates(node3Url, "ACTIVE", "ACTIVE");
// verify that remote node state fro node1 is changed to UNAVAILABLE
- waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
- waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1, BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
+ _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE2 + "/" + NODE1,
+ BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
+ _restTestHelper.waitForAttributeChanged("remotereplicationnode/" + NODE3 + "/" + NODE1,
+ BDBHARemoteReplicationNode.STATE, "UNAVAILABLE");
List<Map<String, Object>> remoteNodes = getRestTestHelper().getJsonAsList("remotereplicationnode/" + NODE2);
assertEquals("Unexpected number of remote nodes on " + NODE2, 2, remoteNodes.size());
@@ -218,7 +222,7 @@ public class BDBHAVirtualHostNodeRestTes
getRestTestHelper().submitRequest(_baseNodeRestUrl + NODE1, "DELETE", HttpServletResponse.SC_OK);
// wait for new master
- waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER");
+ _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE2 + "?depth=0", BDBHAVirtualHostNode.ROLE, "MASTER");
// delete remote node
getRestTestHelper().submitRequest("remotereplicationnode/" + NODE2 + "/" + NODE1, "DELETE", HttpServletResponse.SC_OK);
@@ -315,8 +319,8 @@ public class BDBHAVirtualHostNodeRestTes
}
}
- waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name());
- waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name());
+ _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE1, VirtualHostNode.STATE, State.ERRORED.name());
+ _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + NODE3, VirtualHostNode.STATE, State.ERRORED.name());
}
private void createHANode(String nodeName, int nodePort, int helperPort) throws Exception
@@ -325,7 +329,7 @@ public class BDBHAVirtualHostNodeRestTes
getRestTestHelper().submitRequest(_baseNodeRestUrl + nodeName, "PUT", nodeData, HttpServletResponse.SC_CREATED);
String hostExpectedState = nodePort == helperPort ? State.ACTIVE.name(): State.UNAVAILABLE.name();
- waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState);
+ _restTestHelper.waitForAttributeChanged("virtualhost/" + nodeName + "/" + _hostName, BDBHAVirtualHost.STATE, hostExpectedState);
}
private Map<String, Object> createNodeAttributeMap(String nodeName, int nodePort, int helperPort) throws Exception
@@ -356,7 +360,7 @@ public class BDBHAVirtualHostNodeRestTes
{
boolean isMaster = nodeName.equals(masterNode);
String expectedRole = isMaster? "MASTER" : "REPLICA";
- waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
+ _restTestHelper.waitForAttributeChanged(_baseNodeRestUrl + nodeName + "?depth=0", BDBHAVirtualHostNode.ROLE, expectedRole);
Map<String, Object> nodeData = getRestTestHelper().getJsonAsSingletonList(_baseNodeRestUrl + nodeName + "?depth=0");
assertEquals("Unexpected name", nodeName, nodeData.get(BDBHAVirtualHostNode.NAME));
@@ -376,7 +380,7 @@ public class BDBHAVirtualHostNodeRestTes
if (isMaster)
{
- waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name());
+ _restTestHelper.waitForAttributeChanged("virtualhost/" + masterNode + "/" + _hostName + "?depth=0", VirtualHost.STATE, State.ACTIVE.name());
}
}
@@ -393,7 +397,7 @@ public class BDBHAVirtualHostNodeRestTes
for (String remote : remotes)
{
String remoteUrl = "remotereplicationnode/" + clusterNodeName + "/" + remote;
- Map<String, Object> nodeData = waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
+ Map<String, Object> nodeData = _restTestHelper.waitForAttributeChanged(remoteUrl, BDBHARemoteReplicationNode.ROLE, remote.equals(masterNode) ? "MASTER" : "REPLICA");
assertRemoteNodeData(remote, nodeData);
}
}
Modified: qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java (original)
+++ qpid/java/branches/6.0.x/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/BDBHAVirtualHostRestTest.java Fri Mar 25 13:04:11 2016
@@ -113,7 +113,7 @@ public class BDBHAVirtualHostRestTest ex
public void testSetLocalTransactionSynchronizationPolicy() throws Exception
{
- Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ Map<String, Object> hostAttributes = _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
assertEquals("Unexpected synchronization policy before change", "SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
@@ -125,7 +125,7 @@ public class BDBHAVirtualHostRestTest ex
public void testSetRemoteTransactionSynchronizationPolicy() throws Exception
{
- Map<String, Object> hostAttributes = waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
+ Map<String, Object> hostAttributes = _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, State.ACTIVE.name());
assertEquals("Unexpected synchronization policy before change", "NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
Map<String, Object> newPolicy = Collections.<String, Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
@@ -137,19 +137,19 @@ public class BDBHAVirtualHostRestTest ex
public void testMutateState() throws Exception
{
- waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
+ _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "STOPPED");
assertActualAndDesireStates(_virtualhostUrl, "STOPPED", "STOPPED");
newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(_virtualhostUrl, "ACTIVE", "ACTIVE");
}
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java Fri Mar 25 13:04:11 2016
@@ -22,7 +22,7 @@ package org.apache.qpid.server.model;
import java.util.Collection;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
+import org.apache.qpid.server.transport.AMQPConnection;
@ManagedObject( creatable = false )
public interface Connection<X extends Connection<X>> extends ConfiguredObject<X>
@@ -115,5 +115,5 @@ public interface Connection<X extends Co
Collection<Session> getSessions();
- AbstractAMQPConnection<?> getUnderlyingConnection();
+ AMQPConnection<?> getUnderlyingConnection();
}
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java Fri Mar 25 13:04:11 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.transport
import java.net.SocketAddress;
import java.security.Principal;
+import java.util.List;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Connection;
@@ -60,7 +61,9 @@ public interface AMQPConnection<C extend
void unblock();
- void setScheduler(NetworkConnectionScheduler networkConnectionScheduler);
+ void pushScheduler(NetworkConnectionScheduler networkConnectionScheduler);
+
+ NetworkConnectionScheduler popScheduler();
boolean hasSessionWithName(byte[] name);
@@ -69,4 +72,10 @@ public interface AMQPConnection<C extend
void reserveOutboundMessageSpace(long size);
boolean isIOThread();
+
+ void stopConnection();
+
+ List<? extends AMQSessionModel<?>> getSessionModels();
+
+ void resetStatistics();
}
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Fri Mar 25 13:04:11 2016
@@ -100,6 +100,7 @@ public abstract class AbstractAMQPConnec
private final SettableFuture<Void> _transportClosedFuture = SettableFuture.create();
private final SettableFuture<Void> _modelClosedFuture = SettableFuture.create();
private final AtomicBoolean _modelClosing = new AtomicBoolean();
+ private volatile VirtualHost<?,?,?> _virtualHost;
private volatile long _lastReadTime;
private volatile long _lastWriteTime;
private volatile AccessControlContext _accessControllerContext;
@@ -255,6 +256,7 @@ public abstract class AbstractAMQPConnec
return String.valueOf(_network.getRemoteAddress());
}
+ @Override
public final void stopConnection()
{
_stopped = true;
@@ -285,12 +287,21 @@ public abstract class AbstractAMQPConnec
return _remoteProcessPid;
}
- public void setScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ public void pushScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
+ {
+ if(_network instanceof NonBlockingConnection)
+ {
+ ((NonBlockingConnection) _network).pushScheduler(networkConnectionScheduler);
+ }
+ }
+
+ public NetworkConnectionScheduler popScheduler()
{
if(_network instanceof NonBlockingConnection)
{
- ((NonBlockingConnection) _network).changeScheduler(networkConnectionScheduler);
+ return ((NonBlockingConnection) _network).popScheduler();
}
+ return null;
}
public String getClientProduct()
@@ -366,6 +377,7 @@ public abstract class AbstractAMQPConnec
((VirtualHostImpl<?,?,?>)getVirtualHost()).registerMessageReceived(messageSize, timestamp);
}
+ @Override
public final void resetStatistics()
{
_messagesDelivered.reset();
@@ -483,9 +495,10 @@ public abstract class AbstractAMQPConnec
{
}
- public void virtualHostAssociated()
+ public final void associateVirtualHost(final VirtualHost<?,?,?> virtualHost)
{
- getVirtualHost().registerConnection(this);
+ virtualHost.registerConnection(this);
+ _virtualHost = virtualHost;
}
@Override
@@ -643,8 +656,6 @@ public abstract class AbstractAMQPConnec
});
}
- public abstract List<? extends AMQSessionModel<?>> getSessionModels();
-
@Override
public int getSessionCount()
{
@@ -652,7 +663,7 @@ public abstract class AbstractAMQPConnec
}
@Override
- public AbstractAMQPConnection<?> getUnderlyingConnection()
+ public AMQPConnection<?> getUnderlyingConnection()
{
return this;
}
@@ -675,6 +686,12 @@ public abstract class AbstractAMQPConnec
protected abstract EventLogger getEventLogger();
+ @Override
+ public VirtualHost<?,?,?> getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
{
private final long _allowedTime;
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java Fri Mar 25 13:04:11 2016
@@ -64,6 +64,21 @@ public class NetworkConnectionScheduler
});
}
+ @Override
+ public String toString()
+ {
+ return "NetworkConnectionScheduler{" +
+ "_factory=" + _factory +
+ ", _executor=" + _executor +
+ ", _running=" + _running +
+ ", _poolSize=" + _poolSize +
+ ", _threadKeepAliveTimeout=" + _threadKeepAliveTimeout +
+ ", _name='" + _name + '\'' +
+ ", _numberOfSelectors=" + _numberOfSelectors +
+ ", _selectorThread=" + _selectorThread +
+ '}';
+ }
+
public NetworkConnectionScheduler(String name,
final int numberOfSelectors, int threadPoolSize,
long threadKeepAliveTimeout,
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Fri Mar 25 13:04:11 2016
@@ -26,9 +26,11 @@ import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
+import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -52,7 +54,7 @@ public class NonBlockingConnection imple
private final SocketChannel _socketChannel;
private NonBlockingConnectionDelegate _delegate;
- private NetworkConnectionScheduler _scheduler;
+ private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final String _remoteSocketAddress;
@@ -75,6 +77,7 @@ public class NonBlockingConnection imple
private Iterator<Runnable> _pendingIterator;
private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
private final AtomicLong _maxReadIdleMillis = new AtomicLong();
+ private final AtomicBoolean _hasShutdown = new AtomicBoolean();
private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
public NonBlockingConnection(SocketChannel socketChannel,
@@ -85,7 +88,7 @@ public class NonBlockingConnection imple
final AmqpPort port)
{
_socketChannel = socketChannel;
- _scheduler = scheduler;
+ pushScheduler(scheduler);
_protocolEngine = protocolEngine;
_onTransportEncryptionAction = onTransportEncryptionAction;
@@ -102,7 +105,7 @@ public class NonBlockingConnection imple
@Override
public void performAction(final ProtocolEngine object)
{
- _scheduler.schedule(NonBlockingConnection.this);
+ getScheduler().schedule(NonBlockingConnection.this);
}
});
@@ -390,6 +393,11 @@ public class NonBlockingConnection imple
private void shutdown()
{
+ if (_hasShutdown.getAndSet(true))
+ {
+ return;
+ }
+
try
{
shutdownInput();
@@ -575,20 +583,25 @@ public class NonBlockingConnection imple
{
}
- public void changeScheduler(NetworkConnectionScheduler scheduler)
+ public final void pushScheduler(NetworkConnectionScheduler scheduler)
{
- _scheduler = scheduler;
+ _schedulerDeque.addFirst(scheduler);
}
- @Override
- public String toString()
+ public final NetworkConnectionScheduler popScheduler()
{
- return "[NonBlockingConnection " + _remoteSocketAddress + "]";
+ return _schedulerDeque.removeFirst();
}
- public NetworkConnectionScheduler getScheduler()
+ public final NetworkConnectionScheduler getScheduler()
{
- return _scheduler;
+ return _schedulerDeque.peekFirst();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "[NonBlockingConnection " + _remoteSocketAddress + "]";
}
public void processAmqpData(QpidByteBuffer applicationData)
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java Fri Mar 25 13:04:11 2016
@@ -630,6 +630,11 @@ class SelectorThread extends Thread
public void addToWork(final NonBlockingConnection connection)
{
+ if (_closed.get())
+ {
+ LOGGER.warn("Adding connection work {} to closed selector thread {}", connection, _scheduler);
+ return;
+ }
if(connection.setScheduled())
{
_workQueue.add(new ConnectionProcessor(_scheduler, connection));
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Fri Mar 25 13:04:11 2016
@@ -71,6 +71,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
@@ -111,6 +112,7 @@ public abstract class AbstractVirtualHos
private final Set<VirtualHostConnectionListener> _connectionAssociationListeners = new CopyOnWriteArraySet<>();
private final AccessControlContext _housekeepingJobContext;
private final AccessControlContext _fileSystemSpaceCheckerJobContext;
+ private final AtomicBoolean _acceptsConnections = new AtomicBoolean(false);
private static enum BlockingType { STORE, FILESYSTEM };
@@ -941,6 +943,7 @@ public abstract class AbstractVirtualHos
{
_logger.debug("Closing connection registry :" + _connections.size() + " connections.");
}
+ _acceptsConnections.set(false);
for(Connection conn : _connections)
{
conn.getUnderlyingConnection().stopConnection();
@@ -1751,20 +1754,30 @@ public abstract class AbstractVirtualHos
@Override
public ListenableFuture<Void> execute()
{
- _connections.add(connection);
-
- if(_blocked.get())
+ if (_acceptsConnections.get())
{
- connection.block();
- }
+ _connections.add(connection);
- connection.setScheduler(_networkConnectionScheduler);
+ if (_blocked.get())
+ {
+ connection.block();
+ }
- for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
+ connection.pushScheduler(_networkConnectionScheduler);
+ for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
+ {
+ listener.connectionAssociated(connection);
+ }
+ return Futures.immediateFuture(null);
+ }
+ else
{
- listener.connectionAssociated(connection);
+ final VirtualHostUnavailableException exception =
+ new VirtualHostUnavailableException(String.format(
+ "VirtualHost '%s' not accepting connections",
+ getName()));
+ return Futures.immediateFailedFuture(exception);
}
- return Futures.immediateFuture(null);
}
@Override
@@ -1801,6 +1814,7 @@ public abstract class AbstractVirtualHos
@Override
public ListenableFuture<Void> execute()
{
+ connection.popScheduler();
if (_connections.remove(connection))
{
for (VirtualHostConnectionListener listener : _connectionAssociationListeners)
@@ -1942,7 +1956,6 @@ public abstract class AbstractVirtualHos
postCreateDefaultExchangeTasks();
return Futures.immediateFuture(null);
}
-
}
private void postCreateDefaultExchangeTasks()
@@ -1972,6 +1985,7 @@ public abstract class AbstractVirtualHos
{
initialiseHouseKeeping(getHousekeepingCheckPeriod());
finalState = State.ACTIVE;
+ _acceptsConnections.set(true);
}
finally
{
Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java Fri Mar 25 13:04:11 2016
@@ -20,14 +20,20 @@
*/
package org.apache.qpid.server.virtualhost;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class VirtualHostUnavailableException extends ConnectionScopedRuntimeException
{
- public VirtualHostUnavailableException(VirtualHostImpl<?, ?, ?> host)
+ public VirtualHostUnavailableException(String message)
{
- super("Virtualhost state "
- + host.getState()
- + " prevents the message from being sent");
+ super(message);
+ }
+
+ public VirtualHostUnavailableException(VirtualHost<?,?,?> host)
+ {
+ this("Virtualhost state "
+ + host.getState()
+ + " prevents the message from being sent");
}
}
Modified: qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Fri Mar 25 13:04:11 2016
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.UUID;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
@@ -65,6 +66,7 @@ import org.apache.qpid.server.transport.
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.test.utils.QpidTestCase;
public class VirtualHostTest extends QpidTestCase
@@ -465,6 +467,60 @@ public class VirtualHostTest extends Qpi
}
}
+ public void testRegisterConnection() throws Exception
+ {
+ VirtualHost<?,?,?> vhost = createVirtualHost("sdf");
+ AMQPConnection<?> connection = getMockConnection();
+
+ assertEquals("unexpected number of connections before test", 0, vhost.getConnectionCount());
+ vhost.registerConnection(connection);
+ assertEquals("unexpected number of connections after registerConnection", 1, vhost.getConnectionCount());
+ assertEquals("unexpected connection object", Collections.singleton(connection), vhost.getConnections());
+ }
+
+ public void testStopVirtualhostClosesConnections() throws Exception
+ {
+ VirtualHost<?,?,?> vhost = createVirtualHost("sdf");
+ AMQPConnection<?> connection = getMockConnection();
+
+ vhost.registerConnection(connection);
+ assertEquals("unexpected number of connections after registerConnection", 1, vhost.getConnectionCount());
+ assertEquals("unexpected connection object", Collections.singleton(connection), vhost.getConnections());
+ vhost.stop();
+ verify(connection).stopConnection();
+ verify(connection).closeAsync();
+ }
+
+ public void testRegisterConnectionOnStoppedVirtualhost() throws Exception
+ {
+ VirtualHost<?,?,?> vhost = createVirtualHost("sdf");
+ AMQPConnection<?> connection = getMockConnection();
+
+ vhost.stop();
+ try
+ {
+ vhost.registerConnection(connection);
+ fail("exception not thrown");
+ }
+ catch (VirtualHostUnavailableException e)
+ {
+ // pass
+ }
+ assertEquals("unexpected number of connections", 0, vhost.getConnectionCount());
+ vhost.start();
+ vhost.registerConnection(connection);
+ assertEquals("unexpected number of connections", 1, vhost.getConnectionCount());
+ }
+
+ private AMQPConnection<?> getMockConnection()
+ {
+ AMQPConnection<?> connection = mock(AMQPConnection.class);
+ final ListenableFuture<Void> listenableFuture = Futures.immediateFuture(null);
+ when(connection.closeAsync()).thenReturn(listenableFuture);
+ when(connection.getUnderlyingConnection()).thenReturn((AMQPConnection) connection);
+ return connection;
+ }
+
private VirtualHost<?,?,?> createVirtualHost(final String virtualHostName)
{
return createVirtualHost(virtualHostName, Collections.<String, Object>emptyMap());
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java Fri Mar 25 13:04:11 2016
@@ -41,7 +41,6 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.store.StoreException;
@@ -84,8 +83,7 @@ public class AMQPConnection_0_10 extends
{
super(broker, network, port, transport, Protocol.AMQP_0_10, id, aggregateTicker);
- _connection = new ServerConnection(id, broker, port, transport);
- _connection.setAmqpConnection(this);
+ _connection = new ServerConnection(id, broker, port, transport, this);
SocketAddress address = network.getLocalAddress();
String fqdn = null;
@@ -327,11 +325,6 @@ public class AMQPConnection_0_10 extends
return _connection.getRemoteContainerName();
}
- public VirtualHost<?, ?, ?> getVirtualHost()
- {
- return _connection.getVirtualHost();
- }
-
public List<ServerSession> getSessionModels()
{
return _connection.getSessionModels();
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Mar 25 13:04:11 2016
@@ -32,7 +32,6 @@ import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -46,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
@@ -78,7 +78,6 @@ public class ServerConnection extends Co
private Principal _authorizedPrincipal = null;
private final long _connectionId;
private final Object _reference = new Object();
- private volatile VirtualHostImpl<?,?,?> _virtualHost;
private final AmqpPort<?> _port;
private final AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
@@ -90,14 +89,15 @@ public class ServerConnection extends Co
private int _messageCompressionThreshold;
private final int _maxMessageSize;
- private AMQPConnection_0_10 _amqpConnection;
+ private final AMQPConnection_0_10 _amqpConnection;
private boolean _ignoreFutureInput;
private boolean _ignoreAllButConnectionCloseOk;
public ServerConnection(final long connectionId,
Broker<?> broker,
final AmqpPort<?> port,
- final Transport transport)
+ final Transport transport,
+ final AMQPConnection_0_10 serverProtocolEngine)
{
_connectionId = connectionId;
_broker = broker;
@@ -108,6 +108,7 @@ public class ServerConnection extends Co
int maxMessageSize = port.getContextValue(Integer.class, AmqpPort.PORT_MAX_MESSAGE_SIZE);
_maxMessageSize = (maxMessageSize > 0) ? maxMessageSize : Integer.MAX_VALUE;
+ _amqpConnection = serverProtocolEngine;
}
public Object getReference()
@@ -132,7 +133,8 @@ public class ServerConnection extends Co
EventLogger getEventLogger()
{
- return _virtualHost == null ? _broker.getEventLogger() : _virtualHost.getEventLogger();
+ VirtualHostImpl<?,?,?> virtualHost = getVirtualHost();
+ return virtualHost == null ? _broker.getEventLogger() : virtualHost.getEventLogger();
}
@Override
@@ -143,7 +145,6 @@ public class ServerConnection extends Co
if (state == State.OPEN)
{
_amqpConnection.logConnectionOpen();
- _amqpConnection.virtualHostAssociated();
}
if(state == State.CLOSING)
@@ -163,19 +164,14 @@ public class ServerConnection extends Co
return _amqpConnection;
}
- public void setAmqpConnection(final AMQPConnection_0_10 serverProtocolEngine)
- {
- _amqpConnection = serverProtocolEngine;
- }
-
public VirtualHostImpl<?,?,?> getVirtualHost()
{
- return _virtualHost;
+ return (VirtualHostImpl<?, ?, ?>) _amqpConnection.getVirtualHost();
}
public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
{
- _virtualHost = virtualHost;
+ _amqpConnection.associateVirtualHost(virtualHost);
_messageCompressionThreshold =
virtualHost.getContextValue(Integer.class,
Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
@@ -184,7 +180,7 @@ public class ServerConnection extends Co
{
_messageCompressionThreshold = Integer.MAX_VALUE;
}
- _amqpConnection.getSubject().getPrincipals().add(_virtualHost.getPrincipal());
+ _amqpConnection.getSubject().getPrincipals().add(virtualHost.getPrincipal());
_amqpConnection.updateAccessControllerContext();
}
@@ -466,9 +462,10 @@ public class ServerConnection extends Co
}
finally
{
- if(_virtualHost != null)
+ VirtualHost<?,?,?> virtualHost = getVirtualHost();
+ if(virtualHost != null)
{
- _virtualHost.deregisterConnection(_amqpConnection);
+ virtualHost.deregisterConnection(_amqpConnection);
}
getEventLogger().message(isConnectionLost() ? ConnectionMessages.DROPPED_CONNECTION() : ConnectionMessages.CLOSE());
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Mar 25 13:04:11 2016
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.State;
@@ -51,6 +50,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.*;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -248,9 +248,9 @@ public class ServerConnectionDelegate ex
return;
}
- sconn.setVirtualHost(vhost);
try
{
+ sconn.setVirtualHost(vhost);
if(!vhost.authoriseCreateConnection(sconn.getAmqpConnection()))
{
sconn.setState(Connection.State.CLOSING);
@@ -258,7 +258,7 @@ public class ServerConnectionDelegate ex
return;
}
}
- catch (AccessControlException e)
+ catch (AccessControlException | VirtualHostUnavailableException e)
{
sconn.setState(Connection.State.CLOSING);
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Fri Mar 25 13:04:11 2016
@@ -34,6 +34,7 @@ import org.apache.qpid.server.model.Brok
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -73,33 +74,6 @@ public class ServerSessionTest extends Q
}
}
- public void testCompareTo() throws Exception
- {
- final Broker<?> broker = mock(Broker.class);
- when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
-
- AmqpPort amqpPort = createMockPort(AmqpPort.DEFAULT_MAX_MESSAGE_SIZE);
-
- ServerConnection connection = new ServerConnection(1, broker, amqpPort, Transport.TCP);
- final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
- Subject subject = new Subject();
- when(protocolEngine.getSubject()).thenReturn(subject);
- connection.setAmqpConnection(protocolEngine);
- connection.setVirtualHost(_virtualHost);
- ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
- new Binary(getName().getBytes()), 0);
-
- // create a session with the same name but on a different connection
- ServerConnection connection2 = new ServerConnection(2, broker, amqpPort, Transport.TCP);
- connection2.setAmqpConnection(protocolEngine);
- connection2.setVirtualHost(_virtualHost);
- ServerSession session2 = new ServerSession(connection2, new ServerSessionDelegate(),
- new Binary(getName().getBytes()), 0);
-
- assertFalse("Unexpected compare result", session1.compareTo(session2) == 0);
- assertEquals("Unexpected compare result", 0, session1.compareTo(session1));
- }
-
public void testOverlargeMessageTest() throws Exception
{
final Broker<?> broker = mock(Broker.class);
@@ -107,12 +81,13 @@ public class ServerSessionTest extends Q
AmqpPort port = createMockPort(1024);
- ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP);
final AMQPConnection_0_10 protocolEngine = mock(AMQPConnection_0_10.class);
+ when(protocolEngine.getVirtualHost()).thenReturn(_virtualHost);
+ when(protocolEngine.getUnderlyingConnection()).thenReturn((AMQPConnection) protocolEngine);
Subject subject = new Subject();
when(protocolEngine.getSubject()).thenReturn(subject);
- connection.setAmqpConnection(protocolEngine);
+ ServerConnection connection = new ServerConnection(1, broker, port, Transport.TCP, protocolEngine);
connection.setVirtualHost(_virtualHost);
final List<Method> invokedMethods = new ArrayList<>();
ServerSession session = new ServerSession(connection, new ServerSessionDelegate(),
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Mar 25 13:04:11 2016
@@ -1760,7 +1760,7 @@ public class AMQChannel
public VirtualHostImpl getVirtualHost()
{
- return getConnection().getVirtualHost();
+ return (VirtualHostImpl) getConnection().getVirtualHost();
}
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -2119,7 +2119,7 @@ public class AMQChannel
}
AMQShortString consumerTag1 = consumerTag;
- VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost();
+ VirtualHostImpl<?, ?, ?> vHost = getVirtualHost();
sync();
String queueName = AMQShortString.toString(queue);
@@ -2244,7 +2244,7 @@ public class AMQChannel
_logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + queueName + " noAck: " + noAck + " ]");
}
- VirtualHostImpl vHost = _connection.getVirtualHost();
+ VirtualHostImpl vHost = getVirtualHost();
sync();
MessageSource queue = queueName == null ? getDefaultQueue() : vHost.getAttainedMessageSource(queueName.toString());
if (queue == null)
@@ -2317,8 +2317,7 @@ public class AMQChannel
- VirtualHostImpl vHost = _connection.getVirtualHost();
-
+ VirtualHostImpl vHost = getVirtualHost();
if(blockingTimeoutExceeded())
{
getVirtualHost().getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
@@ -2699,7 +2698,7 @@ public class AMQChannel
routingKey + " queue: " + queueName + " ]");
}
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ VirtualHostImpl virtualHost = getVirtualHost();
MethodRegistry methodRegistry = _connection.getMethodRegistry();
sync();
@@ -2888,7 +2887,7 @@ public class AMQChannel
final AMQMethodBody declareOkBody = methodRegistry.createExchangeDeclareOkBody();
ExchangeImpl exchange;
- VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost();
+ VirtualHostImpl<?, ?, ?> virtualHost = getVirtualHost();
if (isDefaultExchange(exchangeName))
{
@@ -3043,7 +3042,7 @@ public class AMQChannel
}
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ VirtualHostImpl virtualHost = getVirtualHost();
sync();
if (isDefaultExchange(exchangeStr))
@@ -3113,7 +3112,7 @@ public class AMQChannel
" nowait: " + nowait + " arguments: " + argumentsTable + " ]");
}
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ VirtualHostImpl virtualHost = getVirtualHost();
AMQQueue<?> queue;
if (queueName == null)
{
@@ -3222,7 +3221,7 @@ public class AMQChannel
" autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]");
}
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ VirtualHostImpl virtualHost = getVirtualHost();
final AMQShortString queueName;
@@ -3413,7 +3412,7 @@ public class AMQChannel
_logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]");
}
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ VirtualHostImpl virtualHost = getVirtualHost();
sync();
AMQQueue queue;
if (queueName == null)
@@ -3483,7 +3482,7 @@ public class AMQChannel
_logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + queueName + " nowait: " + nowait + " ]");
}
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ VirtualHostImpl virtualHost = getVirtualHost();
AMQQueue queue = null;
if (queueName == null && (queue = getDefaultQueue()) == null)
{
@@ -3536,7 +3535,7 @@ public class AMQChannel
" arguments: " + arguments + " ]");
}
- VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ VirtualHostImpl virtualHost = getVirtualHost();
final boolean useDefaultQueue = queueName == null;
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java Fri Mar 25 13:04:11 2016
@@ -63,6 +63,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.security.*;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
@@ -87,6 +88,7 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.AggregateTicker;
@@ -116,8 +118,6 @@ public class AMQPConnection_0_8
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
- private volatile VirtualHostImpl<?,?,?> _virtualHost;
-
private final Object _channelAddRemoveLock = new Object();
private final Map<Integer, AMQChannel> _channelMap = new ConcurrentHashMap<>();
@@ -275,7 +275,7 @@ public class AMQPConnection_0_8
}
catch (StoreException e)
{
- if (_virtualHost.getState() == State.ACTIVE)
+ if (getVirtualHost().getState() == State.ACTIVE)
{
throw new ServerScopedRuntimeException(e);
}
@@ -722,15 +722,9 @@ public class AMQPConnection_0_8
return getMethodRegistry();
}
- public VirtualHostImpl<?,?,?> getVirtualHost()
- {
- return _virtualHost;
- }
-
public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
{
- _virtualHost = virtualHost;
- virtualHostAssociated();
+ associateVirtualHost(virtualHost);
_messageCompressionThreshold = virtualHost.getContextValue(Integer.class,
Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
@@ -800,9 +794,10 @@ public class AMQPConnection_0_8
{
performDeleteTasks();
- if (_virtualHost != null)
+ final VirtualHost<?,?,?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
{
- _virtualHost.deregisterConnection(this);
+ virtualHost.deregisterConnection(this);
}
}
@@ -975,7 +970,8 @@ public class AMQPConnection_0_8
assertState(ConnectionState.OPEN);
// Protect the broker against out of order frame request.
- if (_virtualHost == null)
+ final VirtualHost<?,?,?> virtualHost = getVirtualHost();
+ if (virtualHost == null)
{
sendConnectionClose(AMQConstant.COMMAND_INVALID,
"Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
@@ -993,9 +989,9 @@ public class AMQPConnection_0_8
}
else
{
- _logger.debug("Connecting to: {}", _virtualHost.getName());
+ _logger.debug("Connecting to: {}", virtualHost.getName());
- final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore());
+ final AMQChannel channel = new AMQChannel(this, channelId, virtualHost.getMessageStore());
addChannel(channel);
@@ -1061,9 +1057,9 @@ public class AMQPConnection_0_8
}
else
{
- setVirtualHost(virtualHost);
try
{
+ setVirtualHost(virtualHost);
if(virtualHost.authoriseCreateConnection(this))
{
@@ -1078,7 +1074,7 @@ public class AMQPConnection_0_8
sendConnectionClose(AMQConstant.ACCESS_REFUSED, "Connection refused", 0);
}
}
- catch (AccessControlException e)
+ catch (AccessControlException | VirtualHostUnavailableException e)
{
sendConnectionClose(AMQConstant.ACCESS_REFUSED, e.getMessage(), 0);
}
@@ -1412,9 +1408,10 @@ public class AMQPConnection_0_8
@Override
public EventLogger getEventLogger()
{
- if(_virtualHost != null)
+ final VirtualHostImpl<?,?,?> virtualHost = (VirtualHostImpl<?, ?, ?>) getVirtualHost();
+ if (virtualHost != null)
{
- return _virtualHost.getEventLogger();
+ return virtualHost.getEventLogger();
}
else
{
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Fri Mar 25 13:04:11 2016
@@ -48,6 +48,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.util.ByteBufferUtils;
import org.apache.qpid.util.GZIPUtils;
@@ -85,7 +86,8 @@ public class ProtocolOutputConverterImpl
}
else
{
- return getMessageConverter(serverMessage).convert(serverMessage, _connection.getVirtualHost());
+ return getMessageConverter(serverMessage).convert(serverMessage,
+ (VirtualHostImpl) _connection.getVirtualHost());
}
}
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Fri Mar 25 13:04:11 2016
@@ -186,6 +186,7 @@ public class AMQChannelTest extends Qpid
authenticatedUser,
Collections.<Principal>emptySet(),
Collections.<Principal>emptySet()));
+ _amqConnection.associateVirtualHost(_virtualHost);
int channelId = 1;
AMQChannel channel = new AMQChannel(_amqConnection, channelId, _virtualHost.getMessageStore());
@@ -220,6 +221,7 @@ public class AMQChannelTest extends Qpid
Set<Principal> authenticatedUser = Collections.<Principal>singleton(new AuthenticatedPrincipal("user"));
_amqConnection.setAuthorizedSubject(new Subject(true, authenticatedUser, Collections.<Principal>emptySet(), Collections.<Principal>emptySet()));
+ _amqConnection.associateVirtualHost(_virtualHost);
AMQChannel channel = new AMQChannel(_amqConnection, 1, _virtualHost.getMessageStore());
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Mar 25 13:04:11 2016
@@ -53,30 +53,27 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.protocol.ConnectionClosingTicker;
-import org.apache.qpid.server.transport.AbstractAMQPConnection;
-import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
+import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.NetworkConnectionScheduler;
import org.apache.qpid.server.transport.NonBlockingConnection;
+import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -157,8 +154,6 @@ public class AMQPConnection_1_0 extends
super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
_broker = broker;
_connection = createConnection(broker, network, port, transport, id, useSASL);
-
- _connection.setAmqpConnection(this);
_endpoint = _connection.getConnectionEndpoint();
_endpoint.setConnectionEventListener(_connection);
_endpoint.setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
@@ -179,12 +174,12 @@ public class AMQPConnection_1_0 extends
}
- public static Connection_1_0 createConnection(final Broker<?> broker,
- final ServerNetworkConnection network,
- final AmqpPort<?> port,
- final Transport transport,
- final long id,
- final boolean useSASL)
+ private Connection_1_0 createConnection(final Broker<?> broker,
+ final ServerNetworkConnection network,
+ final AmqpPort<?> port,
+ final Transport transport,
+ final long id,
+ final boolean useSASL)
{
Container container = new Container(broker.getId().toString());
@@ -214,7 +209,7 @@ public class AMQPConnection_1_0 extends
endpoint.setProperties(serverProperties);
endpoint.setRemoteAddress(network.getRemoteAddress());
- return new Connection_1_0(endpoint, id, port, transport, subjectCreator);
+ return new Connection_1_0(endpoint, id, port, transport, subjectCreator, this);
}
@@ -488,7 +483,7 @@ public class AMQPConnection_1_0 extends
void changeScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
{
- ((NonBlockingConnection) getNetwork()).changeScheduler(networkConnectionScheduler);
+ ((NonBlockingConnection) getNetwork()).pushScheduler(networkConnectionScheduler);
}
@@ -665,11 +660,6 @@ public class AMQPConnection_1_0 extends
return _connection.getRemoteContainerName();
}
- public VirtualHost<?, ?, ?> getVirtualHost()
- {
- return _connection.getVirtualHost();
- }
-
public List<Session_1_0> getSessionModels()
{
return _connection.getSessionModels();
Modified: qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Mar 25 13:04:11 2016
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-import java.net.InetAddress;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -67,6 +66,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionCloseCode;
import org.apache.qpid.transport.ConnectionRedirect;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
public class Connection_1_0 implements ConnectionEventListener
{
@@ -74,8 +74,7 @@ public class Connection_1_0 implements C
private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
private final AmqpPort<?> _port;
private final SubjectCreator _subjectCreator;
- private AMQPConnection_1_0 _amqpConnection;
- private VirtualHostImpl<?,?,?> _vhost;
+ private final AMQPConnection_1_0 _amqpConnection;
private final Transport _transport;
private final ConnectionEndpoint _connectionEndpoint;
private final long _connectionId;
@@ -92,7 +91,7 @@ public class Connection_1_0 implements C
getConnectionId(),
getClientId(),
getRemoteAddressString(),
- _vhost.getName())
+ getVirtualHost().getName())
+ "] ";
}
@@ -109,17 +108,14 @@ public class Connection_1_0 implements C
long connectionId,
AmqpPort<?> port,
Transport transport,
- final SubjectCreator subjectCreator)
+ final SubjectCreator subjectCreator,
+ final AMQPConnection_1_0 amqpConnection)
{
_port = port;
_transport = transport;
_connectionEndpoint = connectionEndpoint;
_connectionId = connectionId;
_subjectCreator = subjectCreator;
- }
-
- void setAmqpConnection(final AMQPConnection_1_0 amqpConnection)
- {
_amqpConnection = amqpConnection;
}
@@ -169,13 +165,7 @@ public class Connection_1_0 implements C
final VirtualHostImpl<?,?,?> vhost = ((AmqpPort) _port).getVirtualHost(host);
if (vhost == null)
{
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_FOUND);
- err.setDescription("Unknown hostname in connection open: '" + host + "'");
- _connectionEndpoint.close(err);
- _amqpConnection.close();
-
- _closedOnOpen = true;
+ closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + host + "'");
}
else
{
@@ -256,23 +246,34 @@ public class Connection_1_0 implements C
_amqpConnection.updateAccessControllerContext();
if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject()) == null)
{
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_ALLOWED);
- err.setDescription("Connection has not been authenticated");
- _connectionEndpoint.close(err);
- _amqpConnection.close();
- _closedOnOpen = true;
+ closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
}
else
{
- _vhost = vhost;
- _amqpConnection.virtualHostAssociated();
+ try
+ {
+ _amqpConnection.associateVirtualHost(vhost);
+ }
+ catch (VirtualHostUnavailableException e)
+ {
+ closeWithError(AmqpError.NOT_ALLOWED, e.getMessage());
+ }
}
}
}
}
}
+ private void closeWithError(final AmqpError amqpError, final String errorDescription)
+ {
+ final Error err = new Error();
+ err.setCondition(amqpError);
+ err.setDescription(errorDescription);
+ _connectionEndpoint.close(err);
+ _amqpConnection.close();
+ _closedOnOpen = true;
+ }
+
void setUserPrincipal(final Principal user)
{
Subject authSubject = _subjectCreator.createSubjectWithGroups(user);
@@ -362,9 +363,10 @@ public class Connection_1_0 implements C
}
finally
{
- if (_vhost != null)
+ VirtualHost<?,?,?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
{
- _vhost.deregisterConnection(_amqpConnection);
+ virtualHost.deregisterConnection(_amqpConnection);
}
getAmqpConnection().getEventLogger().message(ConnectionMessages.CLOSE());
}
@@ -470,10 +472,9 @@ public class Connection_1_0 implements C
public VirtualHostImpl getVirtualHost()
{
- return _vhost;
+ return (VirtualHostImpl) _amqpConnection.getVirtualHost();
}
-
public void transportStateChanged()
{
for (Session_1_0 session : _sessions)
@@ -558,11 +559,12 @@ public class Connection_1_0 implements C
@Override
public String toString()
{
+ VirtualHost<?,?,?> virtualHost = getVirtualHost();
return "Connection_1_0["
+ _connectionId
+ " "
+ _amqpConnection.getAddress()
- + (_vhost == null ? "" : (" vh : " + _vhost.getName()))
+ + (virtualHost == null ? "" : (" vh : " + virtualHost.getName()))
+ ']';
}
}
Modified: qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java (original)
+++ qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java Fri Mar 25 13:04:11 2016
@@ -20,11 +20,7 @@
*/
package org.apache.qpid.systest.rest;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import org.apache.qpid.server.management.plugin.HttpManagement;
import org.apache.qpid.server.model.AuthenticationProvider;
@@ -45,7 +41,7 @@ public class QpidRestTestCase extends Qp
public static final String[] EXPECTED_VIRTUALHOSTS = { TEST1_VIRTUALHOST, TEST2_VIRTUALHOST, TEST3_VIRTUALHOST};
public static final String[] EXPECTED_EXCHANGES = { "amq.fanout", "amq.match", "amq.direct","amq.topic" };
- private RestTestHelper _restTestHelper = new RestTestHelper(findFreePort());
+ protected RestTestHelper _restTestHelper = new RestTestHelper(findFreePort());
@Override
public void setUp() throws Exception
@@ -102,33 +98,4 @@ public class QpidRestTestCase extends Qp
{
return _restTestHelper;
}
-
- public Map<String, Object> waitForAttributeChanged(String url, String attributeName, Object newValue) throws Exception
- {
- List<Map<String, Object>> nodeAttributes = getAttributesIgnoringNotFound(url);
- int timeout = 5000;
- long limit = System.currentTimeMillis() + timeout;
- while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
- {
- Thread.sleep(100l);
- nodeAttributes = getAttributesIgnoringNotFound(url);
- }
- Map<String, Object> nodeData = nodeAttributes.get(0);
- assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout " + timeout + "ms.", newValue, nodeData.get(attributeName));
- return nodeData;
- }
-
- private List<Map<String, Object>> getAttributesIgnoringNotFound(String url) throws IOException
- {
- List<Map<String, Object>> nodeAttributes;
- try
- {
- nodeAttributes = getRestTestHelper().getJsonAsList(url);
- }
- catch(FileNotFoundException e)
- {
- nodeAttributes = Collections.emptyList();
- }
- return nodeAttributes;
- }
}
Modified: qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java (original)
+++ qpid/java/branches/6.0.x/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java Fri Mar 25 13:04:11 2016
@@ -28,6 +28,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -646,4 +647,35 @@ public class RestTestHelper
throw new RuntimeException("Unsupported encoding UTF8", e);
}
}
+
+ public Map<String, Object> waitForAttributeChanged(String url,
+ String attributeName,
+ Object newValue) throws Exception
+ {
+ List<Map<String, Object>> nodeAttributes = getAttributesIgnoringNotFound(url);
+ int timeout = 30000;
+ long limit = System.currentTimeMillis() + timeout;
+ while(System.currentTimeMillis() < limit && (nodeAttributes.size() == 0 || !newValue.equals(nodeAttributes.get(0).get(attributeName))))
+ {
+ Thread.sleep(100l);
+ nodeAttributes = getAttributesIgnoringNotFound(url);
+ }
+ Map<String, Object> nodeData = nodeAttributes.get(0);
+ Assert.assertEquals("Attribute " + attributeName + " did not reach expected value within permitted timeout " + timeout + "ms.", newValue, nodeData.get(attributeName));
+ return nodeData;
+ }
+
+ private List<Map<String, Object>> getAttributesIgnoringNotFound(String url) throws IOException
+ {
+ List<Map<String, Object>> nodeAttributes;
+ try
+ {
+ nodeAttributes = getJsonAsList(url);
+ }
+ catch(FileNotFoundException e)
+ {
+ nodeAttributes = Collections.emptyList();
+ }
+ return nodeAttributes;
+ }
}
Modified: qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java?rev=1736566&r1=1736565&r2=1736566&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java (original)
+++ qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java Fri Mar 25 13:04:11 2016
@@ -209,19 +209,19 @@ public class VirtualHostRestTest extends
{
String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
}
@@ -232,7 +232,7 @@ public class VirtualHostRestTest extends
String restHostUrl = "virtualhost/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST;
String restQueueUrl = "queue/" + TEST1_VIRTUALHOST + "/" + TEST1_VIRTUALHOST + "/" + testQueueName;
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
Connection connection = getConnection();
@@ -248,13 +248,13 @@ public class VirtualHostRestTest extends
Map<String, Object> newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "STOPPED");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "STOPPED");
assertActualAndDesireStates(restHostUrl, "STOPPED", "STOPPED");
newAttributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, "ACTIVE");
getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes, HttpServletResponse.SC_OK);
- waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
+ _restTestHelper.waitForAttributeChanged(restHostUrl, VirtualHost.STATE, "ACTIVE");
assertActualAndDesireStates(restHostUrl, "ACTIVE", "ACTIVE");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org