You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/20 16:50:09 UTC

svn commit: r1570242 - in /qpid/trunk/qpid/java: amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/

Author: rgodfrey
Date: Thu Feb 20 15:50:09 2014
New Revision: 1570242

URL: http://svn.apache.org/r1570242
Log:
QPID-5563 : Ensure vhost is set as early as possible, on receipt of connection open

Modified:
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Thu Feb 20 15:50:09 2014
@@ -343,6 +343,8 @@ public class ConnectionEndpoint implemen
             _idleTimeout = open.getIdleTimeOut().longValue();
         }
 
+        _connectionEventListener.openReceived();
+
         switch (_state)
         {
             case UNOPENED:

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEventListener.java Thu Feb 20 15:50:09 2014
@@ -21,8 +21,16 @@ package org.apache.qpid.amqp_1_0.transpo
 
 public interface ConnectionEventListener
 {
+    void openReceived();
+
     class DefaultConnectionEventListener implements ConnectionEventListener
     {
+        @Override
+        public void openReceived()
+        {
+
+        }
+
         public void remoteSessionCreation(final SessionEndpoint endpoint)
         {
             endpoint.end();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Feb 20 15:50:09 2014
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.net.InetAddress;
 import java.net.SocketAddress;
 import java.security.Principal;
 import java.security.PrivilegedAction;
@@ -121,9 +120,32 @@ public class Connection_1_0 implements C
         return _reference;
     }
 
+    @Override
+    public void openReceived()
+    {
+        String host = _conn.getLocalHostname();
+        if(host == null || host.trim().equals(""))
+        {
+            host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST);
+        }
+        _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host);
+        if(_vhost == null)
+        {
+            final Error err = new Error();
+            err.setCondition(AmqpError.NOT_FOUND);
+            err.setDescription("Unknown hostname " + _conn.getLocalHostname());
+            _conn.close(err);
+        }
+        Subject authSubject = _subjectCreator.createSubjectWithGroups(_conn.getUser());
+        _subject.getPrincipals().addAll(authSubject.getPrincipals());
+        _subject.getPublicCredentials().addAll(authSubject.getPublicCredentials());
+        _subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
+
+    }
+
     public void remoteSessionCreation(SessionEndpoint endpoint)
     {
-        final Session_1_0 session = new Session_1_0(_vhost, this, endpoint);
+        final Session_1_0 session = new Session_1_0(this, endpoint);
         _sessions.add(session);
         endpoint.setSessionEventListener(new SessionEventListener()
         {
@@ -397,33 +419,13 @@ public class Connection_1_0 implements C
     }
 
 
-    public void frameReceived()
+    Subject getSubject()
     {
-        if(_vhost == null && _conn.isOpen())
-        {
-            String host = _conn.getLocalHostname();
-            if(host == null || host.trim().equals(""))
-            {
-                host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST);
-            }
-            _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host);
-            if(_vhost == null)
-            {
-                final Error err = new Error();
-                err.setCondition(AmqpError.NOT_FOUND);
-                err.setDescription("Unknown hostname " + _conn.getLocalHostname());
-                _conn.close(err);
-            }
-            Subject authSubject = _subjectCreator.createSubjectWithGroups(_conn.getUser());
-            _subject.getPrincipals().addAll(authSubject.getPrincipals());
-            _subject.getPublicCredentials().addAll(authSubject.getPublicCredentials());
-            _subject.getPrivateCredentials().addAll(authSubject.getPrivateCredentials());
-
-        }
+        return _subject;
     }
 
-    Subject getSubject()
+    VirtualHost getVirtualHost()
     {
-        return _subject;
+        return _vhost;
     }
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java Thu Feb 20 15:50:09 2014
@@ -365,7 +365,6 @@ public class ProtocolEngine_1_0_0_SASL i
                             public Void run()
                             {
                                 _frameHandler = _frameHandler.parse(msg);
-                                _connection.frameReceived();
                                 return null;
                             }
                         });

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1570242&r1=1570241&r2=1570242&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Feb 20 15:50:09 2014
@@ -87,11 +87,9 @@ public class Session_1_0 implements Sess
 
 
 
-    public Session_1_0(VirtualHost vhost, final Connection_1_0 connection, final SessionEndpoint endpoint)
+    public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint)
     {
-        _vhost = vhost;
         _endpoint = endpoint;
-        _transaction = new AutoCommitTransaction(vhost.getMessageStore());
         _connection = connection;
         _subject.getPrincipals().addAll(connection.getSubject().getPrincipals());
         _subject.getPrincipals().add(new SessionPrincipal(this));
@@ -106,7 +104,7 @@ public class Session_1_0 implements Sess
 
         final
         LinkRegistry
-                linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
+                linkRegistry = getVirtualHost().getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
 
 
         if(endpoint.getRole() == Role.SENDER)
@@ -129,7 +127,7 @@ public class Session_1_0 implements Sess
                         source.setAddress(tempQueue.getName());
                     }
                     String addr = source.getAddress();
-                    MessageSource queue = _vhost.getMessageSource(addr);
+                    MessageSource queue = getVirtualHost().getMessageSource(addr);
                     if(queue != null)
                     {
 
@@ -140,7 +138,7 @@ public class Session_1_0 implements Sess
                     }
                     else
                     {
-                        Exchange exchg = _vhost.getExchange(addr);
+                        Exchange exchg = getVirtualHost().getExchange(addr);
                         if(exchg != null)
                         {
                             destination = new ExchangeDestination(exchg, source.getDurable(), source.getExpiryPolicy());
@@ -165,7 +163,7 @@ public class Session_1_0 implements Sess
                     try
                     {
                         final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
-                                                                                _vhost,
+                                                                                getVirtualHost(),
                                                                                 (SendingDestination) destination
                         );
 
@@ -245,7 +243,7 @@ public class Session_1_0 implements Sess
 
                 final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
                 final TxnCoordinatorLink_1_0 coordinatorLink =
-                        new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
+                        new TxnCoordinatorLink_1_0(getVirtualHost(), this, receivingLinkEndpoint, _openTransactions);
                 receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(coordinatorLink));
                 link = coordinatorLink;
 
@@ -272,7 +270,7 @@ public class Session_1_0 implements Sess
                         }
 
                         String addr = target.getAddress();
-                        MessageDestination messageDestination = _vhost.getMessageDestination(addr);
+                        MessageDestination messageDestination = getVirtualHost().getMessageDestination(addr);
                         if(messageDestination != null)
                         {
                             destination = new NodeReceivingDestination(messageDestination, target.getDurable(),
@@ -280,7 +278,7 @@ public class Session_1_0 implements Sess
                         }
                         else
                         {
-                            AMQQueue queue = _vhost.getQueue(addr);
+                            AMQQueue queue = getVirtualHost().getQueue(addr);
                             if(queue != null)
                             {
 
@@ -303,7 +301,8 @@ public class Session_1_0 implements Sess
                     if(destination != null)
                     {
                         final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
-                        final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
+                        final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
+                                                                                      getVirtualHost(),
                                 (ReceivingDestination) destination);
 
                         receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(receivingLink));
@@ -355,7 +354,7 @@ public class Session_1_0 implements Sess
                                             ? null
                                             : (LifetimePolicy) properties.get(LIFETIME_POLICY);
             Map<String,Object> attributes = new HashMap<String,Object>();
-            attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()));
+            attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName()));
             attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
             attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false);
 
@@ -388,7 +387,7 @@ public class Session_1_0 implements Sess
 
             // TODO convert AMQP 1-0 node properties to queue attributes
 
-            final AMQQueue tempQueue = queue = _vhost.createQueue(attributes );
+            final AMQQueue tempQueue = queue = getVirtualHost().createQueue(attributes);
         }
         catch (AccessControlException e)
         {
@@ -409,7 +408,15 @@ public class Session_1_0 implements Sess
     {
         // TODO should treat invalid id differently to null
         ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
-        return transaction == null ? _transaction : transaction;
+        if(transaction == null)
+        {
+            if(_transaction == null)
+            {
+                _transaction = new AutoCommitTransaction(_connection.getVirtualHost().getMessageStore());
+            }
+            transaction = _transaction;
+        }
+        return transaction;
     }
 
     public void remoteEnd(End end)
@@ -622,7 +629,7 @@ public class Session_1_0 implements Sess
                                     connectionId,
                                     getClientID(),
                                     remoteAddress,
-                                    _vhost.getName(),
+                                    getVirtualHost().getName(),
                                     _endpoint.getSendingChannel())  + "] ";
     }
 
@@ -657,6 +664,11 @@ public class Session_1_0 implements Sess
         return _subject;
     }
 
+    VirtualHost getVirtualHost()
+    {
+        return _connection.getVirtualHost();
+    }
+
 
     private class SubjectSpecificReceivingLinkListener implements ReceivingLinkListener
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org