You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/09 15:44:01 UTC
qpid-broker-j git commit: QPID-7782: [AMQP1.0] Add supporting AMQP
1.0 SASL protocol tests
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 061fb1b03 -> 32733fc22
QPID-7782: [AMQP1.0] Add supporting AMQP 1.0 SASL protocol tests
Also:
* prevented NPE if client selected a SASL mechanism which was not one of those offered by SaslMechanisms.
* grouped sasl methods togther within the Connection_1_0 implementation.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/32733fc2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/32733fc2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/32733fc2
Branch: refs/heads/master
Commit: 32733fc224a5f374bfb34cbdfe04f715ef9e59af
Parents: 061fb1b
Author: Keith Wall <kw...@apache.org>
Authored: Fri Jun 9 16:38:49 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Fri Jun 9 16:43:26 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 256 ++++++++--------
.../protocol/v1_0/framing/FrameHandler.java | 2 +-
.../tests/protocol/v1_0/FrameTransport.java | 124 +++++++-
.../qpid/tests/protocol/v1_0/InputHandler.java | 49 +++-
.../qpid/tests/protocol/v1_0/OutputHandler.java | 8 +-
.../protocol/v1_0/SaslPerformativeResponse.java | 55 ++++
.../main/resources/config-protocol-tests.json | 1 +
.../v1_0/transport/ProtocolHeaderTest.java | 20 ++
.../v1_0/transport/security/sasl/SaslTest.java | 289 +++++++++++++++++++
9 files changed, 652 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 977dc8f..870a961 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -274,6 +274,138 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
_frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
}
+ @Override
+ public void receiveSaslInit(final SaslInit saslInit)
+ {
+ assertState(ConnectionState.AWAIT_SASL_INIT);
+ if(saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim()))
+ {
+ _localHostname = saslInit.getHostname();
+ }
+ else if(getNetwork().getSelectedHost() != null)
+ {
+ _localHostname = getNetwork().getSelectedHost();
+ }
+ String mechanism = saslInit.getMechanism().toString();
+ final Binary initialResponse = saslInit.getInitialResponse();
+ byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
+
+ List<String> availableMechanisms =
+ _subjectCreator.getAuthenticationProvider().getAvailableMechanisms(getTransport().isSecure());
+ if (!availableMechanisms.contains(mechanism))
+ {
+ handleSaslError();
+ }
+ else
+ {
+ _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this);
+ processSaslResponse(response);
+ }
+ }
+
+ @Override
+ public void receiveSaslResponse(final SaslResponse saslResponse)
+ {
+ assertState(ConnectionState.AWAIT_SASL_RESPONSE);
+ final Binary responseBinary = saslResponse.getResponse();
+ byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
+
+ processSaslResponse(response);
+ }
+
+ @Override
+ public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+ {
+ LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
+ closeSaslWithFailure();
+ }
+
+ @Override
+ public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+ {
+ LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
+ closeSaslWithFailure();
+ }
+
+ @Override
+ public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+ {
+ LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
+ closeSaslWithFailure();
+ }
+
+ private void processSaslResponse(final byte[] response)
+ {
+ byte[] challenge = null;
+ SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
+ if (authenticationResult == null)
+ {
+ authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
+ challenge = authenticationResult.getChallenge();
+ }
+
+ if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
+ {
+ _successfulAuthenticationResult = authenticationResult;
+ if (challenge == null || challenge.length == 0)
+ {
+ setSubject(_successfulAuthenticationResult.getSubject());
+ SaslOutcome outcome = new SaslOutcome();
+ outcome.setCode(SaslCode.OK);
+ send(new SASLFrame(outcome), null);
+ _saslComplete = true;
+ _connectionState = ConnectionState.AWAIT_AMQP_HEADER;
+ disposeSaslNegotiator();
+ }
+ else
+ {
+ continueSaslNegotiation(challenge);
+ }
+ }
+ else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
+ {
+ continueSaslNegotiation(challenge);
+ }
+ else
+ {
+ handleSaslError();
+ }
+ }
+
+ private void continueSaslNegotiation(final byte[] challenge)
+ {
+ SaslChallenge challengeBody = new SaslChallenge();
+ challengeBody.setChallenge(new Binary(challenge));
+ send(new SASLFrame(challengeBody), null);
+
+ _connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
+ }
+
+ private void handleSaslError()
+ {
+ SaslOutcome outcome = new SaslOutcome();
+ outcome.setCode(SaslCode.AUTH);
+ send(new SASLFrame(outcome), null);
+ _saslComplete = true;
+ closeSaslWithFailure();
+ }
+
+ private void closeSaslWithFailure()
+ {
+ _saslComplete = true;
+ disposeSaslNegotiator();
+ _connectionState = ConnectionState.CLOSED;
+ addCloseTicker();
+ }
+
+ private void disposeSaslNegotiator()
+ {
+ if (_saslNegotiator != null)
+ {
+ _saslNegotiator.dispose();
+ }
+ _saslNegotiator = null;
+ }
private void setUserPrincipal(final Principal user)
{
@@ -359,26 +491,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
}
- private void closeSaslWithFailure()
- {
- _saslComplete = true;
- disposeSaslNegotiator();
- _connectionState = ConnectionState.CLOSED;
- addCloseTicker();
- }
-
- private void disposeSaslNegotiator()
- {
- _saslNegotiator.dispose();
- _saslNegotiator = null;
- }
-
- @Override
- public void receiveSaslChallenge(final SaslChallenge saslChallenge)
- {
- LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
- closeSaslWithFailure();
- }
@Override
public void receiveClose(final int channel, final Close close)
@@ -438,23 +550,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
}
- @Override
- public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
- {
- LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
- closeSaslWithFailure();
- }
-
- @Override
- public void receiveSaslResponse(final SaslResponse saslResponse)
- {
- final Binary responseBinary = saslResponse.getResponse();
- byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
-
- assertState(ConnectionState.AWAIT_SASL_RESPONSE);
-
- processSaslResponse(response);
- }
@Override
public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
@@ -538,13 +633,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
@Override
- public void receiveSaslOutcome(final SaslOutcome saslOutcome)
- {
- LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
- closeSaslWithFailure();
- }
-
- @Override
public void receiveEnd(final int channel, final End end)
{
assertState(ConnectionState.OPENED);
@@ -877,7 +965,8 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
// already sent our close - probably due to an error
break;
default:
- throw new ConnectionScopedRuntimeException("Connection Open failed under mysterious circumstances.");
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Unexpected state %s during connection open.", _connectionState));
}
}
}
@@ -992,87 +1081,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
@Override
- public void receiveSaslInit(final SaslInit saslInit)
- {
- assertState(ConnectionState.AWAIT_SASL_INIT);
- if(saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim()))
- {
- _localHostname = saslInit.getHostname();
- }
- else if(getNetwork().getSelectedHost() != null)
- {
- _localHostname = getNetwork().getSelectedHost();
- }
- String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString();
- final Binary initialResponse = saslInit.getInitialResponse();
- byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
-
- _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this);
- processSaslResponse(response);
- }
-
- @Override
public String getLocalFQDN()
{
return _localHostname != null ? _localHostname : super.getLocalFQDN();
}
- private void processSaslResponse(final byte[] response)
- {
- byte[] challenge = null;
- SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
- if (authenticationResult == null)
- {
- authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
- challenge = authenticationResult.getChallenge();
- }
-
- if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
- {
- _successfulAuthenticationResult = authenticationResult;
- if (challenge == null || challenge.length == 0)
- {
- setSubject(_successfulAuthenticationResult.getSubject());
- SaslOutcome outcome = new SaslOutcome();
- outcome.setCode(SaslCode.OK);
- send(new SASLFrame(outcome), null);
- _saslComplete = true;
- _connectionState = ConnectionState.AWAIT_AMQP_HEADER;
- disposeSaslNegotiator();
- }
- else
- {
- continueSaslNegotiation(challenge);
- }
- }
- else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
- {
- continueSaslNegotiation(challenge);
- }
- else
- {
- handleSaslError();
- }
- }
-
- private void continueSaslNegotiation(final byte[] challenge)
- {
- SaslChallenge challengeBody = new SaslChallenge();
- challengeBody.setChallenge(new Binary(challenge));
- send(new SASLFrame(challengeBody), null);
-
- _connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
- }
-
- private void handleSaslError()
- {
- SaslOutcome outcome = new SaslOutcome();
- outcome.setCode(SaslCode.AUTH);
- send(new SASLFrame(outcome), null);
- _saslComplete = true;
- closeSaslWithFailure();
- }
-
@Override
public int getMaxFrameSize()
{
@@ -1419,7 +1432,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
}
-
@Override
public void send(final AMQFrame amqFrame, ByteBuffer buf)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index a4dc4af..c5e0334 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -41,7 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class FrameHandler implements ProtocolHandler
{
- public static Logger LOGGER = LoggerFactory.getLogger(FrameHandler.class);
+ public static final Logger LOGGER = LoggerFactory.getLogger(FrameHandler.class);
private final boolean _isSasl;
private final ConnectionHandler _connectionHandler;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 47872f0..eda903a 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -20,12 +20,14 @@
package org.apache.qpid.tests.protocol.v1_0;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertNull;
+import static org.hamcrest.Matchers.nullValue;
import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
@@ -54,8 +56,10 @@ import org.hamcrest.CoreMatchers;
import org.hamcrest.core.Is;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -70,18 +74,25 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class FrameTransport implements AutoCloseable
{
- private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
public static final long RESPONSE_TIMEOUT = 6000;
+ private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
private final Channel _channel;
private final BlockingQueue<Response> _queue = new ArrayBlockingQueue<>(100);
private final EventLoopGroup _workerGroup;
+ private volatile boolean _channelClosedSeen = false;
private int _amqpConnectionId;
private short _amqpChannelId;
public FrameTransport(final InetSocketAddress brokerAddress)
{
+ this(brokerAddress, false);
+ }
+
+ public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
+ {
_workerGroup = new NioEventLoopGroup();
try
@@ -95,12 +106,16 @@ public class FrameTransport implements AutoCloseable
@Override
public void initChannel(SocketChannel ch) throws Exception
{
- ch.pipeline().addLast(new InputHandler(_queue))
- .addLast(new OutputHandler());
+ ch.pipeline().addLast(new InputHandler(_queue, isSasl)).addLast(new OutputHandler());
}
});
_channel = b.connect(brokerAddress).sync().channel();
+ _channel.closeFuture().addListener(future ->
+ {
+ _channelClosedSeen = true;
+ _queue.add(CHANNEL_CLOSED_RESPONSE);
+ });
}
catch (InterruptedException e)
{
@@ -134,13 +149,21 @@ public class FrameTransport implements AutoCloseable
public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
{
- final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer)frameBody).getPayload() : null;
+ final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null;
TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
channelFuture.sync();
return JdkFutureAdapters.listenInPoolThread(channelFuture);
}
+ public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
+ {
+ SASLFrame transportFrame = new SASLFrame(frameBody);
+ ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
+ channelFuture.sync();
+ return JdkFutureAdapters.listenInPoolThread(channelFuture);
+ }
+
public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception
{
return sendPerformative(frameBody, UnsignedShort.valueOf(_amqpChannelId));
@@ -174,6 +197,52 @@ public class FrameTransport implements AutoCloseable
return _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
}
+ public <R extends Response> R getNextResponse(Class<? extends Response> expectedResponseClass) throws Exception
+ {
+ R actualResponse = (R) getNextResponse();
+ if (actualResponse == null)
+ {
+ throw new IllegalStateException(String.format("No response received within timeout %d - expecting %s",
+ RESPONSE_TIMEOUT, expectedResponseClass.getName()));
+ }
+ else if (!expectedResponseClass.isAssignableFrom(actualResponse.getClass()))
+ {
+ throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s",
+ expectedResponseClass.getName(),
+ actualResponse.getClass().getName()));
+ }
+
+ return actualResponse;
+ }
+
+ public <P> P getNextPerformativeResponse(Class<?> expectedFrameBodyClass) throws Exception
+ {
+ final P actualFrameBody;
+ if (SaslFrameBody.class.isAssignableFrom(expectedFrameBodyClass))
+ {
+ SaslPerformativeResponse response = getNextResponse(SaslPerformativeResponse.class);
+ actualFrameBody = (P) response.getFrameBody();
+ }
+ else if (FrameBody.class.isAssignableFrom(expectedFrameBodyClass))
+ {
+ PerformativeResponse response = getNextResponse(PerformativeResponse.class);
+ actualFrameBody = (P) response.getFrameBody();
+ }
+ else
+ {
+ throw new IllegalArgumentException(String.format("Unexpected class %s", expectedFrameBodyClass.getName()));
+ }
+
+ if (!expectedFrameBodyClass.isAssignableFrom(actualFrameBody.getClass()))
+ {
+ throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s",
+ expectedFrameBodyClass.getName(),
+ actualFrameBody.getClass().getName()));
+ }
+
+ return actualFrameBody;
+ }
+
public void doProtocolNegotiation() throws Exception
{
byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
@@ -286,7 +355,6 @@ public class FrameTransport implements AutoCloseable
Attach responseAttach = (Attach) response.getFrameBody();
assertThat(responseAttach.getTarget(), is(notNullValue()));
-
PerformativeResponse flowResponse = (PerformativeResponse) getNextResponse();
assertThat(flowResponse, Is.is(CoreMatchers.notNullValue()));
assertThat(flowResponse.getFrameBody(), Is.is(CoreMatchers.instanceOf(Flow.class)));
@@ -295,9 +363,16 @@ public class FrameTransport implements AutoCloseable
public void assertNoMoreResponses() throws Exception
{
Response response = getNextResponse();
- assertNull("Unexpected response.", response);
+ assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+ }
+
+ public void assertNoMoreResponsesAndChannelClosed() throws Exception
+ {
+ assertNoMoreResponses();
+ assertThat(_channelClosedSeen, is(true));
}
+
private int getConnectionId()
{
if (_amqpConnectionId == 0)
@@ -310,4 +385,39 @@ public class FrameTransport implements AutoCloseable
}
return _amqpConnectionId;
}
+
+ public void assertChannelClosed()
+ {
+ try
+ {
+ ChannelFuture channelFuture = _channel.write(new byte[]{0});
+ channelFuture.sync();
+ throw new IllegalStateException(
+ "Expecting the channel to be already closed by, but it was able to take more input.");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (Exception e)
+ {
+ if (e instanceof ClosedChannelException)
+ {
+ // PASS
+ }
+ else
+ {
+ throw new IllegalStateException("Unexpected exception", e);
+ }
+ }
+ }
+
+ private static class ChannelClosedResponse implements Response
+ {
+ @Override
+ public String toString()
+ {
+ return "ChannelClosed";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
index 8468f5a..10fde24 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
@@ -56,12 +57,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class InputHandler extends ChannelInboundHandlerAdapter
{
- private enum ParsingState
- {
- HEADER,
- PERFORMATIVES
- };
-
private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
.registerTransportLayer()
@@ -69,18 +64,27 @@ public class InputHandler extends ChannelInboundHandlerAdapter
.registerTransactionLayer()
.registerSecurityLayer()
.registerExtensionSoleconnLayer();
+
+ private enum ParsingState
+ {
+ HEADER,
+ PERFORMATIVES
+ };
+
+ private final MyConnectionHandler _connectionHandler;
private final ValueHandler _valueHandler;
- private final FrameHandler _frameHandler;
+ private final BlockingQueue<Response> _responseQueue;
private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
- private BlockingQueue<Response> _responseQueue;
- private ParsingState _state = ParsingState.HEADER;
+ private volatile FrameHandler _frameHandler;
+ private volatile ParsingState _state = ParsingState.HEADER;
- public InputHandler(final BlockingQueue<Response> queue)
+ public InputHandler(final BlockingQueue<Response> queue, final boolean isSasl)
{
_valueHandler = new ValueHandler(TYPE_REGISTRY);
- _frameHandler = new FrameHandler(_valueHandler, new MyConnectionHandler(), false);
+ _connectionHandler = new MyConnectionHandler();
+ _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
_responseQueue = queue;
}
@@ -140,12 +144,17 @@ public class InputHandler extends ChannelInboundHandlerAdapter
}
}
+ private void resetInputHandlerAfterSaslOutcome()
+ {
+ _state = ParsingState.HEADER;
+ _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
+ }
+
private class MyConnectionHandler implements ConnectionHandler
{
@Override
public void receiveOpen(final int channel, final Open close)
{
- System.out.println();
}
@Override
@@ -211,7 +220,7 @@ public class InputHandler extends ChannelInboundHandlerAdapter
@Override
public void handleError(final Error parsingError)
{
-
+ LOGGER.error("Unexpected error {}", parsingError);
}
@Override
@@ -230,16 +239,24 @@ public class InputHandler extends ChannelInboundHandlerAdapter
int channel = channelFrameBody.getChannel();
if (val instanceof FrameBody)
{
- response = new PerformativeResponse((short) channel, (FrameBody) val);
+ FrameBody frameBody = (FrameBody) val;
+ response = new PerformativeResponse((short) channel, frameBody);
}
else if (val instanceof SaslFrameBody)
{
- throw new UnsupportedOperationException("TODO: ");
+ SaslFrameBody frameBody = (SaslFrameBody) val;
+ response = new SaslPerformativeResponse((short) channel, frameBody);
+
+ if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
+ {
+ resetInputHandlerAfterSaslOutcome();
+ }
}
else
{
- throw new UnsupportedOperationException("Unexoected frame type : " + val.getClass());
+ throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
}
+
_responseQueue.add(response);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
index 414ad90..dbd4cd6 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
@@ -28,11 +28,7 @@ import io.netty.channel.ChannelPromise;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.transport.ByteBufferSender;
@@ -50,7 +46,7 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
{
- if (msg instanceof TransportFrame)
+ if (msg instanceof AMQFrame)
{
FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
{
@@ -86,7 +82,7 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
}
});
- _frameWriter.send((TransportFrame) msg);
+ _frameWriter.send(((AMQFrame) msg));
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
new file mode 100644
index 0000000..cd2da99
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+
+public class SaslPerformativeResponse implements Response
+{
+ private final short _channelId;
+ private final SaslFrameBody _frameBody;
+
+ public SaslPerformativeResponse(final short channelId,
+ final SaslFrameBody frameBody)
+ {
+ _channelId = channelId;
+ _frameBody = frameBody;
+ }
+
+ public SaslFrameBody getFrameBody()
+ {
+ return _frameBody;
+ }
+
+ public short getChannelId()
+ {
+ return _channelId;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SaslPerformativeResponse{" +
+ "_channelId=" + _channelId +
+ ", _frameBody=" + _frameBody +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
index 7a5e20d..1aaa210 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
+++ b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
@@ -27,6 +27,7 @@
}, {
"name" : "plain",
"type" : "Plain",
+ "secureOnlyMechanisms" : [],
"users" : [ {
"name" : "admin",
"type" : "managed",
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index 91d2c48..ed9c10f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -66,6 +66,26 @@ public class ProtocolHeaderTest extends ProtocolTestBase
HeaderResponse response = (HeaderResponse) transport.getNextResponse();
assertArrayEquals("Unexpected protocol header response", bytes, response.getHeader());
}
+ }
+
+ @Test
+ @SpecificationTest(section = "2.2",
+ description = " A client might request use of a protocol id that is unacceptable to a server. [...]"
+ + "In this case, the server MUST send a protocol header with an acceptable protocol id"
+ + "(and version) and then close the socket.")
+ public void unacceptableProtocolIdSent_SaslAcceptable() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ byte[] rawHeaderBytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+ byte[] expectedSaslHeaderBytes = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8);
+ transport.sendProtocolHeader(rawHeaderBytes);
+ HeaderResponse response = (HeaderResponse) transport.getNextResponse();
+ assertArrayEquals("Unexpected protocol header response", expectedSaslHeaderBytes, response.getHeader());
+
+ transport.assertNoMoreResponses();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
new file mode 100644
index 0000000..ede03a2
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.security.sasl;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.xml.bind.DatatypeConverter;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.HeaderResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class SaslTest extends ProtocolTestBase
+{
+ private static final Symbol CRAM_MD5 = Symbol.getSymbol("CRAM-MD5");
+ private static final Symbol PLAIN = Symbol.getSymbol("PLAIN");
+
+ private static final byte[] SASL_AMQP_HEADER_BYTES = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] AMQP_HEADER_BYTES = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+
+ @Test
+ @SpecificationTest(section = "5.3.2",
+ description = "SASL Negotiation [...] challenge/response step occurs zero times")
+ public void saslSuccessfulAuthentication() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true))
+ {
+ transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+ HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+
+ assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class);
+ assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(PLAIN));
+
+
+ SaslInit saslInit = new SaslInit();
+ saslInit.setMechanism(PLAIN);
+ saslInit.setInitialResponse(new Binary("\0guest\0guest".getBytes(StandardCharsets.US_ASCII)));
+ transport.sendPerformative(saslInit);
+
+ SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class);
+ assertThat(saslOutcome.getCode(), equalTo(SaslCode.OK));
+
+ transport.sendProtocolHeader(AMQP_HEADER_BYTES);
+ HeaderResponse headerResponse = transport.getNextResponse(HeaderResponse.class);
+ assertThat(headerResponse.getHeader(), is(equalTo(AMQP_HEADER_BYTES)));
+
+ transport.assertNoMoreResponses();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "5.3.2",
+ description = "SASL Negotiation [...] challenge/response step occurs once")
+ public void saslSuccessfulAuthenticationWithChallengeResponse() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true))
+ {
+ transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+ HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+
+ assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class);
+ assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(CRAM_MD5));
+
+ SaslInit saslInit = new SaslInit();
+ saslInit.setMechanism(CRAM_MD5);
+ transport.sendPerformative(saslInit);
+
+ SaslChallenge challenge = transport.getNextPerformativeResponse(SaslChallenge.class);
+ assertThat(challenge.getChallenge(), is(notNullValue()));
+
+ byte[] response = generateCramMD5ClientResponse("guest", "guest", challenge.getChallenge().getArray());
+
+ SaslResponse saslResponse = new SaslResponse();
+ saslResponse.setResponse(new Binary(response));
+ transport.sendPerformative(saslResponse);
+
+ SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class);
+ assertThat(saslOutcome.getCode(), equalTo(SaslCode.OK));
+
+ transport.sendProtocolHeader(AMQP_HEADER_BYTES);
+ HeaderResponse headerResponse = transport.getNextResponse(HeaderResponse.class);
+ assertThat(headerResponse.getHeader(), is(equalTo(AMQP_HEADER_BYTES)));
+
+ transport.assertNoMoreResponses();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "5.3.2", description = "SASL Negotiation")
+ public void saslUnsuccessfulAuthentication() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true))
+ {
+ transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+ HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+
+ assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class);
+ assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(PLAIN));
+
+ SaslInit saslInit = new SaslInit();
+ saslInit.setMechanism(PLAIN);
+ saslInit.setInitialResponse(new Binary("\0guest\0badpassword".getBytes(StandardCharsets.US_ASCII)));
+ transport.sendPerformative(saslInit);
+
+ SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class);
+ assertThat(saslOutcome.getCode(), equalTo(SaslCode.AUTH));
+
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "5.3.2",
+ description = "The partner MUST then choose one of the supported mechanisms and initiate a sasl exchange."
+ + "If the selected mechanism is not supported by the receiving peer, it MUST close the connection "
+ + "with the authentication-failure close-code.")
+ public void unsupportedSaslMechanism() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true))
+ {
+ transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+ HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+
+ assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+ SaslInit saslInit = new SaslInit();
+ saslInit.setMechanism(Symbol.getSymbol("NOT-A-MECHANISM"));
+ transport.sendPerformative(saslInit);
+
+ SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class);
+ assertThat(saslOutcome.getCode(), equalTo(SaslCode.AUTH));
+ assertThat(saslOutcome.getAdditionalData(), is(nullValue()));
+
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "5.3.2", description = "SASL Negotiation")
+ public void authenticationBypassDisallowed() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true))
+ {
+ transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+ HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+ assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+ Open open = new Open();
+ open.setContainerId("testContainerId");
+ transport.sendPerformative(open);
+
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "5.3.2",
+ description = "The peer acting as the SASL server MUST announce supported authentication mechanisms using"
+ + "the sasl-mechanisms frame.")
+ public void clientSendsSaslMechanisms() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true))
+ {
+ transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+ HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+ assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+ SaslMechanisms clientMechs = new SaslMechanisms();
+ clientMechs.setSaslServerMechanisms(new Symbol[] {Symbol.valueOf("CLIENT-MECH")});
+ transport.sendPerformative(clientMechs);
+
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "5.3.2", description = "SASL Negotiation")
+ public void clientSendsSaslChallenge() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true))
+ {
+ transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+ HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+ assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+ SaslChallenge saslChallenge = new SaslChallenge();
+ saslChallenge.setChallenge(new Binary(new byte[] {}));
+ transport.sendPerformative(saslChallenge);
+
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "5.3.2", description = "SASL Negotiation")
+ public void clientSendsSaslOutcome() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true))
+ {
+ transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+ HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+ assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+ SaslOutcome saslOutcome = new SaslOutcome();
+ saslOutcome.setCode(SaslCode.OK);
+ transport.sendPerformative(saslOutcome);
+
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
+
+ private static byte[] generateCramMD5ClientResponse(String userName, String userPassword, byte[] challengeBytes)
+ throws Exception
+ {
+ String macAlgorithm = "HmacMD5";
+ Mac mac = Mac.getInstance(macAlgorithm);
+ mac.init(new SecretKeySpec(userPassword.getBytes(StandardCharsets.UTF_8), macAlgorithm));
+ final byte[] messageAuthenticationCode = mac.doFinal(challengeBytes);
+ String responseAsString = userName + " " + DatatypeConverter.printHexBinary(messageAuthenticationCode)
+ .toLowerCase();
+ return responseAsString.getBytes();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org