You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/02/20 20:35:47 UTC
svn commit: r1731444 -
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Author: rgodfrey
Date: Sat Feb 20 19:35:47 2016
New Revision: 1731444
URL: http://svn.apache.org/viewvc?rev=1731444&view=rev
Log:
QPID-6424 : Implement broker side redirection for AMQP 1-0
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1731444&r1=1731443&r2=1731444&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Sat Feb 20 19:35:47 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+import java.net.InetAddress;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -29,6 +30,7 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -45,12 +47,14 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
import org.apache.qpid.amqp_1_0.type.transport.End;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.port.AmqpPort;
@@ -58,6 +62,9 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionRedirect;
public class Connection_1_0 implements ConnectionEventListener
{
@@ -170,26 +177,95 @@ public class Connection_1_0 implements C
}
else
{
- final Principal user = _connectionEndpoint.getUser();
- if (user != null)
- {
- setUserPrincipal(user);
- }
- _amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal());
- _amqpConnection.updateAccessControllerContext();
- if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject())
- == null)
+ if (_vhost.getState() != State.ACTIVE)
{
final Error err = new Error();
- err.setCondition(AmqpError.NOT_ALLOWED);
- err.setDescription("Connection has not been authenticated");
+ err.setCondition(AmqpError.NOT_FOUND);
_connectionEndpoint.close(err);
_amqpConnection.close();
+
_closedOnOpen = true;
+ final String redirectHost = _vhost.getRedirectHost(((AmqpPort) _port));
+ if(redirectHost == null)
+ {
+ err.setDescription("Virtual host '" + host + "' is not active");
+ }
+ else
+ {
+ String newtworkHost;
+ int port;
+ if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
+ {
+ // IPv6 case
+ newtworkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
+ if(redirectHost.contains("]:"))
+ {
+ port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
+ }
+ else
+ {
+ port = -1;
+ }
+ }
+ else
+ {
+ if(redirectHost.contains(":"))
+ {
+ newtworkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
+ try
+ {
+ String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
+ port = Integer.parseInt(portString);
+ }
+ catch (NumberFormatException e)
+ {
+ port = -1;
+ }
+ }
+ else
+ {
+ newtworkHost = redirectHost;
+ port = -1;
+ }
+ }
+ final Map<Symbol, Object> infoMap = new HashMap<>();
+ infoMap.put(Symbol.valueOf("network-host"), newtworkHost);
+ if(port > 0)
+ {
+ infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
+ }
+ err.setInfo(infoMap);
+ }
+
+ _connectionEndpoint.close(err);
+ _amqpConnection.close();
+
+ _closedOnOpen = true;
+
}
else
{
- _amqpConnection.virtualHostAssociated();
+ final Principal user = _connectionEndpoint.getUser();
+ if (user != null)
+ {
+ setUserPrincipal(user);
+ }
+ _amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal());
+ _amqpConnection.updateAccessControllerContext();
+ if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject())
+ == null)
+ {
+ final Error err = new Error();
+ err.setCondition(AmqpError.NOT_ALLOWED);
+ err.setDescription("Connection has not been authenticated");
+ _connectionEndpoint.close(err);
+ _amqpConnection.close();
+ _closedOnOpen = true;
+ }
+ else
+ {
+ _amqpConnection.virtualHostAssociated();
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org