You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/02/20 20:35:47 UTC

svn commit: r1731444 - /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java

Author: rgodfrey
Date: Sat Feb 20 19:35:47 2016
New Revision: 1731444

URL: http://svn.apache.org/viewvc?rev=1731444&view=rev
Log:
QPID-6424 : Implement broker side redirection for AMQP 1-0

Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1731444&r1=1731443&r2=1731444&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Sat Feb 20 19:35:47 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
 
+import java.net.InetAddress;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
@@ -29,6 +30,7 @@ import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -45,12 +47,14 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
 import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
 import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
 import org.apache.qpid.amqp_1_0.type.transport.End;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
@@ -58,6 +62,9 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionRedirect;
 
 public class Connection_1_0 implements ConnectionEventListener
 {
@@ -170,26 +177,95 @@ public class Connection_1_0 implements C
             }
             else
             {
-                final Principal user = _connectionEndpoint.getUser();
-                if (user != null)
-                {
-                    setUserPrincipal(user);
-                }
-                _amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal());
-                _amqpConnection.updateAccessControllerContext();
-                if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject())
-                    == null)
+                if (_vhost.getState() != State.ACTIVE)
                 {
                     final Error err = new Error();
-                    err.setCondition(AmqpError.NOT_ALLOWED);
-                    err.setDescription("Connection has not been authenticated");
+                    err.setCondition(AmqpError.NOT_FOUND);
                     _connectionEndpoint.close(err);
                     _amqpConnection.close();
+
                     _closedOnOpen = true;
+                    final String redirectHost = _vhost.getRedirectHost(((AmqpPort) _port));
+                    if(redirectHost == null)
+                    {
+                        err.setDescription("Virtual host '" + host + "' is not active");
+                    }
+                    else
+                    {
+                        String newtworkHost;
+                        int port;
+                        if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
+                        {
+                            // IPv6 case
+                            newtworkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
+                            if(redirectHost.contains("]:"))
+                            {
+                                port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
+                            }
+                            else
+                            {
+                                port = -1;
+                            }
+                        }
+                        else
+                        {
+                            if(redirectHost.contains(":"))
+                            {
+                                newtworkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
+                                try
+                                {
+                                    String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
+                                    port = Integer.parseInt(portString);
+                                }
+                                catch (NumberFormatException e)
+                                {
+                                    port = -1;
+                                }
+                            }
+                            else
+                            {
+                                newtworkHost = redirectHost;
+                                port = -1;
+                            }
+                        }
+                        final Map<Symbol, Object> infoMap = new HashMap<>();
+                        infoMap.put(Symbol.valueOf("network-host"), newtworkHost);
+                        if(port > 0)
+                        {
+                            infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
+                        }
+                        err.setInfo(infoMap);
+                    }
+
+                    _connectionEndpoint.close(err);
+                    _amqpConnection.close();
+
+                    _closedOnOpen = true;
+
                 }
                 else
                 {
-                    _amqpConnection.virtualHostAssociated();
+                    final Principal user = _connectionEndpoint.getUser();
+                    if (user != null)
+                    {
+                        setUserPrincipal(user);
+                    }
+                    _amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal());
+                    _amqpConnection.updateAccessControllerContext();
+                    if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject())
+                        == null)
+                    {
+                        final Error err = new Error();
+                        err.setCondition(AmqpError.NOT_ALLOWED);
+                        err.setDescription("Connection has not been authenticated");
+                        _connectionEndpoint.close(err);
+                        _amqpConnection.close();
+                        _closedOnOpen = true;
+                    }
+                    else
+                    {
+                        _amqpConnection.virtualHostAssociated();
+                    }
                 }
             }
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org