You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/20 16:50:09 UTC
svn commit: r1570242 - in /qpid/trunk/qpid/java:
amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
Author: rgodfrey
Date: Thu Feb 20 15:50:09 2014
New Revision: 1570242
URL: http://svn.apache.org/r1570242
Log:
QPID-5563 : Ensure vhost is set as early as possible, on receipt of connection open
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Thu Feb 20 15:50:09 2014
@@ -343,6 +343,8 @@ public class ConnectionEndpoint implemen
_idleTimeout = open.getIdleTimeOut().longValue();
}
+ _connectionEventListener.openReceived();
+
switch (_state)
{
case UNOPENED:
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java Thu Feb 20 15:50:09 2014
@@ -21,8 +21,16 @@ package org.apache.qpid.amqp_1_0.transpo
public interface ConnectionEventListener
{
+ void openReceived();
+
class DefaultConnectionEventListener implements ConnectionEventListener
{
+ @Override
+ public void openReceived()
+ {
+
+ }
+
public void remoteSessionCreation(final SessionEndpoint endpoint)
{
endpoint.end();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Feb 20 15:50:09 2014
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.net.InetAddress;
import java.net.SocketAddress;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -121,9 +120,32 @@ public class Connection_1_0 implements C
return _reference;
}
+ @Override
+ public void openReceived()
+ {
+ String host = _conn.getLocalHostname();
+ if(host == null || host.trim().equals(""))
+ {
+ host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST);
+ }
+ _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host);
+ if(_vhost == null)
+ {
+ final Error err = new Error();
+ err.setCondition(AmqpError.NOT_FOUND);
+ err.setDescription("Unknown hostname " + _conn.getLocalHostname());
+ _conn.close(err);
+ }
+ Subject authSubject = _subjectCreator.createSubjectWithGroups(_conn.getUser());
+ _subject.getPrincipals().addAll(authSubject.getPrincipals());
+ _subject.getPublicCredentials().addAll(authSubject.getPublicCredentials());
+ _subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
+
+ }
+
public void remoteSessionCreation(SessionEndpoint endpoint)
{
- final Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
+ final Session_1_0 session = new Session_1_0(this, endpoint);
_sessions.add(session);
endpoint.setSessionEventListener(new SessionEventListener()
{
@@ -397,33 +419,13 @@ public class Connection_1_0 implements C
}
- public void frameReceived()
+ Subject getSubject()
{
- if(_vhost == null && _conn.isOpen())
- {
- String host = _conn.getLocalHostname();
- if(host == null || host.trim().equals(""))
- {
- host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST);
- }
- _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host);
- if(_vhost == null)
- {
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_FOUND);
- err.setDescription("Unknown hostname " + _conn.getLocalHostname());
- _conn.close(err);
- }
- Subject authSubject = _subjectCreator.createSubjectWithGroups(_conn.getUser());
- _subject.getPrincipals().addAll(authSubject.getPrincipals());
- _subject.getPublicCredentials().addAll(authSubject.getPublicCredentials());
- _subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
-
- }
+ return _subject;
}
- Subject getSubject()
+ VirtualHost getVirtualHost()
{
- return _subject;
+ return _vhost;
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Thu Feb 20 15:50:09 2014
@@ -365,7 +365,6 @@ public class ProtocolEngine_1_0_0_SASL i
public Void run()
{
_frameHandler = _frameHandler.parse(msg);
- _connection.frameReceived();
return null;
}
});
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Feb 20 15:50:09 2014
@@ -87,11 +87,9 @@ public class Session_1_0 implements Sess
- public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint)
+ public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint)
{
- _vhost = vhost;
_endpoint = endpoint;
- _transaction = new AutoCommitTransaction(vhost.getMessageStore());
_connection = connection;
_subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
_subject.getPrincipals().add(new SessionPrincipal(this));
@@ -106,7 +104,7 @@ public class Session_1_0 implements Sess
final
LinkRegistry
- linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
+ linkRegistry = getVirtualHost().getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
if(endpoint.getRole() == Role.SENDER)
@@ -129,7 +127,7 @@ public class Session_1_0 implements Sess
source.setAddress(tempQueue.getName());
}
String addr = source.getAddress();
- MessageSource queue = _vhost.getMessageSource(addr);
+ MessageSource queue = getVirtualHost().getMessageSource(addr);
if(queue != null)
{
@@ -140,7 +138,7 @@ public class Session_1_0 implements Sess
}
else
{
- Exchange exchg = _vhost.getExchange(addr);
+ Exchange exchg = getVirtualHost().getExchange(addr);
if(exchg != null)
{
destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
@@ -165,7 +163,7 @@ public class Session_1_0 implements Sess
try
{
final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
- _vhost,
+ getVirtualHost(),
(SendingDestination) destination
);
@@ -245,7 +243,7 @@ public class Session_1_0 implements Sess
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final TxnCoordinatorLink_1_0 coordinatorLink =
- new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
+ new TxnCoordinatorLink_1_0(getVirtualHost(), this, receivingLinkEndpoint, _openTransactions);
receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(coordinatorLink));
link = coordinatorLink;
@@ -272,7 +270,7 @@ public class Session_1_0 implements Sess
}
String addr = target.getAddress();
- MessageDestination messageDestination = _vhost.getMessageDestination(addr);
+ MessageDestination messageDestination = getVirtualHost().getMessageDestination(addr);
if(messageDestination != null)
{
destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
@@ -280,7 +278,7 @@ public class Session_1_0 implements Sess
}
else
{
- AMQQueue queue = _vhost.getQueue(addr);
+ AMQQueue queue = getVirtualHost().getQueue(addr);
if(queue != null)
{
@@ -303,7 +301,8 @@ public class Session_1_0 implements Sess
if(destination != null)
{
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
+ final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
+ getVirtualHost(),
(ReceivingDestination) destination);
receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink));
@@ -355,7 +354,7 @@ public class Session_1_0 implements Sess
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
Map<String,Object> attributes = new HashMap<String,Object>();
- attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()));
+ attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName()));
attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false);
@@ -388,7 +387,7 @@ public class Session_1_0 implements Sess
// TODO convert AMQP 1-0 node properties to queue attributes
- final AMQQueue tempQueue = queue = _vhost.createQueue(attributes );
+ final AMQQueue tempQueue = queue = getVirtualHost().createQueue(attributes);
}
catch (AccessControlException e)
{
@@ -409,7 +408,15 @@ public class Session_1_0 implements Sess
{
// TODO should treat invalid id differently to null
ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
- return transaction == null ? _transaction : transaction;
+ if(transaction == null)
+ {
+ if(_transaction == null)
+ {
+ _transaction = new AutoCommitTransaction(_connection.getVirtualHost().getMessageStore());
+ }
+ transaction = _transaction;
+ }
+ return transaction;
}
public void remoteEnd(End end)
@@ -622,7 +629,7 @@ public class Session_1_0 implements Sess
connectionId,
getClientID(),
remoteAddress,
- _vhost.getName(),
+ getVirtualHost().getName(),
_endpoint.getSendingChannel()) + "] ";
}
@@ -657,6 +664,11 @@ public class Session_1_0 implements Sess
return _subject;
}
+ VirtualHost getVirtualHost()
+ {
+ return _connection.getVirtualHost();
+ }
+
private class SubjectSpecificReceivingLinkListener implements ReceivingLinkListener
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org