You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/09 15:44:01 UTC

qpid-broker-j git commit: QPID-7782: [AMQP1.0] Add supporting AMQP 1.0 SASL protocol tests

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 061fb1b03 -> 32733fc22


QPID-7782: [AMQP1.0] Add supporting AMQP 1.0 SASL protocol tests

Also:
* prevented NPE if client selected a SASL mechanism which was not one of those offered by SaslMechanisms.
* grouped sasl methods togther within the Connection_1_0 implementation.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/32733fc2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/32733fc2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/32733fc2

Branch: refs/heads/master
Commit: 32733fc224a5f374bfb34cbdfe04f715ef9e59af
Parents: 061fb1b
Author: Keith Wall <kw...@apache.org>
Authored: Fri Jun 9 16:38:49 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Fri Jun 9 16:43:26 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   | 256 ++++++++--------
 .../protocol/v1_0/framing/FrameHandler.java     |   2 +-
 .../tests/protocol/v1_0/FrameTransport.java     | 124 +++++++-
 .../qpid/tests/protocol/v1_0/InputHandler.java  |  49 +++-
 .../qpid/tests/protocol/v1_0/OutputHandler.java |   8 +-
 .../protocol/v1_0/SaslPerformativeResponse.java |  55 ++++
 .../main/resources/config-protocol-tests.json   |   1 +
 .../v1_0/transport/ProtocolHeaderTest.java      |  20 ++
 .../v1_0/transport/security/sasl/SaslTest.java  | 289 +++++++++++++++++++
 9 files changed, 652 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 977dc8f..870a961 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -274,6 +274,138 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender());
     }
 
+    @Override
+    public void receiveSaslInit(final SaslInit saslInit)
+    {
+        assertState(ConnectionState.AWAIT_SASL_INIT);
+        if(saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim()))
+        {
+            _localHostname = saslInit.getHostname();
+        }
+        else if(getNetwork().getSelectedHost() != null)
+        {
+            _localHostname = getNetwork().getSelectedHost();
+        }
+        String mechanism = saslInit.getMechanism().toString();
+        final Binary initialResponse = saslInit.getInitialResponse();
+        byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
+
+        List<String> availableMechanisms =
+                _subjectCreator.getAuthenticationProvider().getAvailableMechanisms(getTransport().isSecure());
+        if (!availableMechanisms.contains(mechanism))
+        {
+            handleSaslError();
+        }
+        else
+        {
+            _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this);
+            processSaslResponse(response);
+        }
+    }
+
+    @Override
+    public void receiveSaslResponse(final SaslResponse saslResponse)
+    {
+        assertState(ConnectionState.AWAIT_SASL_RESPONSE);
+        final Binary responseBinary = saslResponse.getResponse();
+        byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
+
+        processSaslResponse(response);
+    }
+
+    @Override
+    public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+    {
+        LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
+        closeSaslWithFailure();
+    }
+
+    @Override
+    public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+    {
+        LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
+        closeSaslWithFailure();
+    }
+
+    @Override
+    public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+    {
+        LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
+        closeSaslWithFailure();
+    }
+
+    private void processSaslResponse(final byte[] response)
+    {
+        byte[] challenge = null;
+        SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
+        if (authenticationResult == null)
+        {
+            authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
+            challenge = authenticationResult.getChallenge();
+        }
+
+        if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
+        {
+            _successfulAuthenticationResult = authenticationResult;
+            if (challenge == null || challenge.length == 0)
+            {
+                setSubject(_successfulAuthenticationResult.getSubject());
+                SaslOutcome outcome = new SaslOutcome();
+                outcome.setCode(SaslCode.OK);
+                send(new SASLFrame(outcome), null);
+                _saslComplete = true;
+                _connectionState = ConnectionState.AWAIT_AMQP_HEADER;
+                disposeSaslNegotiator();
+            }
+            else
+            {
+                continueSaslNegotiation(challenge);
+            }
+        }
+        else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
+        {
+            continueSaslNegotiation(challenge);
+        }
+        else
+        {
+            handleSaslError();
+        }
+    }
+
+    private void continueSaslNegotiation(final byte[] challenge)
+    {
+        SaslChallenge challengeBody = new SaslChallenge();
+        challengeBody.setChallenge(new Binary(challenge));
+        send(new SASLFrame(challengeBody), null);
+
+        _connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
+    }
+
+    private void handleSaslError()
+    {
+        SaslOutcome outcome = new SaslOutcome();
+        outcome.setCode(SaslCode.AUTH);
+        send(new SASLFrame(outcome), null);
+        _saslComplete = true;
+        closeSaslWithFailure();
+    }
+
+    private void closeSaslWithFailure()
+    {
+        _saslComplete = true;
+        disposeSaslNegotiator();
+        _connectionState = ConnectionState.CLOSED;
+        addCloseTicker();
+    }
+
+    private void disposeSaslNegotiator()
+    {
+        if (_saslNegotiator != null)
+        {
+            _saslNegotiator.dispose();
+        }
+        _saslNegotiator = null;
+    }
 
     private void setUserPrincipal(final Principal user)
     {
@@ -359,26 +491,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         }
     }
 
-    private void closeSaslWithFailure()
-    {
-        _saslComplete = true;
-        disposeSaslNegotiator();
-        _connectionState = ConnectionState.CLOSED;
-        addCloseTicker();
-    }
-
-    private void disposeSaslNegotiator()
-    {
-        _saslNegotiator.dispose();
-        _saslNegotiator = null;
-    }
-
-    @Override
-    public void receiveSaslChallenge(final SaslChallenge saslChallenge)
-    {
-        LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject());
-        closeSaslWithFailure();
-    }
 
     @Override
     public void receiveClose(final int channel, final Close close)
@@ -438,23 +550,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
         }
     }
 
-    @Override
-    public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
-    {
-        LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject());
-        closeSaslWithFailure();
-    }
-
-    @Override
-    public void receiveSaslResponse(final SaslResponse saslResponse)
-    {
-        final Binary responseBinary = saslResponse.getResponse();
-        byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
-
-        assertState(ConnectionState.AWAIT_SASL_RESPONSE);
-
-        processSaslResponse(response);
-    }
 
     @Override
     public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
@@ -538,13 +633,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     }
 
     @Override
-    public void receiveSaslOutcome(final SaslOutcome saslOutcome)
-    {
-        LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject());
-        closeSaslWithFailure();
-    }
-
-    @Override
     public void receiveEnd(final int channel, final End end)
     {
         assertState(ConnectionState.OPENED);
@@ -877,7 +965,8 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                                     // already sent our close - probably due to an error
                                     break;
                                 default:
-                                    throw new ConnectionScopedRuntimeException("Connection Open failed under mysterious circumstances.");
+                                    throw new ConnectionScopedRuntimeException(String.format(
+                                            "Unexpected state %s during connection open.", _connectionState));
                             }
                         }
                     }
@@ -992,87 +1081,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     }
 
     @Override
-    public void receiveSaslInit(final SaslInit saslInit)
-    {
-        assertState(ConnectionState.AWAIT_SASL_INIT);
-        if(saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim()))
-        {
-            _localHostname = saslInit.getHostname();
-        }
-        else if(getNetwork().getSelectedHost() != null)
-        {
-            _localHostname = getNetwork().getSelectedHost();
-        }
-        String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString();
-        final Binary initialResponse = saslInit.getInitialResponse();
-        byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
-
-        _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this);
-        processSaslResponse(response);
-    }
-
-    @Override
     public String getLocalFQDN()
     {
         return _localHostname != null ? _localHostname : super.getLocalFQDN();
     }
 
-    private void processSaslResponse(final byte[] response)
-    {
-        byte[] challenge = null;
-        SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult;
-        if (authenticationResult == null)
-        {
-            authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]);
-            challenge = authenticationResult.getChallenge();
-        }
-
-        if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS)
-        {
-            _successfulAuthenticationResult = authenticationResult;
-            if (challenge == null || challenge.length == 0)
-            {
-            setSubject(_successfulAuthenticationResult.getSubject());
-            SaslOutcome outcome = new SaslOutcome();
-            outcome.setCode(SaslCode.OK);
-            send(new SASLFrame(outcome), null);
-            _saslComplete = true;
-            _connectionState = ConnectionState.AWAIT_AMQP_HEADER;
-            disposeSaslNegotiator();
-            }
-            else
-            {
-                continueSaslNegotiation(challenge);
-            }
-        }
-        else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE)
-        {
-            continueSaslNegotiation(challenge);
-        }
-        else
-        {
-            handleSaslError();
-        }
-    }
-
-    private void continueSaslNegotiation(final byte[] challenge)
-    {
-        SaslChallenge challengeBody = new SaslChallenge();
-        challengeBody.setChallenge(new Binary(challenge));
-        send(new SASLFrame(challengeBody), null);
-
-        _connectionState = ConnectionState.AWAIT_SASL_RESPONSE;
-    }
-
-    private void handleSaslError()
-    {
-        SaslOutcome outcome = new SaslOutcome();
-        outcome.setCode(SaslCode.AUTH);
-        send(new SASLFrame(outcome), null);
-        _saslComplete = true;
-        closeSaslWithFailure();
-    }
-
     @Override
     public int getMaxFrameSize()
     {
@@ -1419,7 +1432,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
     }
 
 
-
     @Override
     public void send(final AMQFrame amqFrame, ByteBuffer buf)
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index a4dc4af..c5e0334 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -41,7 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class FrameHandler implements ProtocolHandler
 {
-    public static Logger LOGGER = LoggerFactory.getLogger(FrameHandler.class);
+    public static final Logger LOGGER = LoggerFactory.getLogger(FrameHandler.class);
     private final boolean _isSasl;
 
     private final ConnectionHandler _connectionHandler;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 47872f0..eda903a 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -20,12 +20,14 @@
 package org.apache.qpid.tests.protocol.v1_0;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
-import static org.junit.Assert.assertNull;
+import static org.hamcrest.Matchers.nullValue;
 
 import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
@@ -54,8 +56,10 @@ import org.hamcrest.CoreMatchers;
 import org.hamcrest.core.Is;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
 import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -70,18 +74,25 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class FrameTransport implements AutoCloseable
 {
-    private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
     public static final long RESPONSE_TIMEOUT = 6000;
+    private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
 
     private final Channel _channel;
     private final BlockingQueue<Response> _queue = new ArrayBlockingQueue<>(100);
     private final EventLoopGroup _workerGroup;
 
+    private volatile boolean _channelClosedSeen = false;
     private int _amqpConnectionId;
     private short _amqpChannelId;
 
     public FrameTransport(final InetSocketAddress brokerAddress)
     {
+        this(brokerAddress, false);
+    }
+
+    public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
+    {
         _workerGroup = new NioEventLoopGroup();
 
         try
@@ -95,12 +106,16 @@ public class FrameTransport implements AutoCloseable
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception
                 {
-                    ch.pipeline().addLast(new InputHandler(_queue))
-                                 .addLast(new OutputHandler());
+                    ch.pipeline().addLast(new InputHandler(_queue, isSasl)).addLast(new OutputHandler());
                 }
             });
 
             _channel = b.connect(brokerAddress).sync().channel();
+            _channel.closeFuture().addListener(future ->
+                                               {
+                                                   _channelClosedSeen = true;
+                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
+                                               });
         }
         catch (InterruptedException e)
         {
@@ -134,13 +149,21 @@ public class FrameTransport implements AutoCloseable
 
     public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
     {
-        final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer)frameBody).getPayload() : null;
+        final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null;
         TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
         ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
         channelFuture.sync();
         return JdkFutureAdapters.listenInPoolThread(channelFuture);
     }
 
+    public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
+    {
+        SASLFrame transportFrame = new SASLFrame(frameBody);
+        ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
+        channelFuture.sync();
+        return JdkFutureAdapters.listenInPoolThread(channelFuture);
+    }
+
     public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception
     {
         return sendPerformative(frameBody, UnsignedShort.valueOf(_amqpChannelId));
@@ -174,6 +197,52 @@ public class FrameTransport implements AutoCloseable
         return _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
     }
 
+    public <R extends Response> R getNextResponse(Class<? extends Response> expectedResponseClass) throws Exception
+    {
+        R actualResponse = (R) getNextResponse();
+        if (actualResponse == null)
+        {
+            throw new IllegalStateException(String.format("No response received within timeout %d - expecting %s",
+                                                          RESPONSE_TIMEOUT, expectedResponseClass.getName()));
+        }
+        else if (!expectedResponseClass.isAssignableFrom(actualResponse.getClass()))
+        {
+            throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s",
+                                                          expectedResponseClass.getName(),
+                                                          actualResponse.getClass().getName()));
+        }
+
+        return actualResponse;
+    }
+
+    public <P> P getNextPerformativeResponse(Class<?> expectedFrameBodyClass) throws Exception
+    {
+        final P actualFrameBody;
+        if (SaslFrameBody.class.isAssignableFrom(expectedFrameBodyClass))
+        {
+            SaslPerformativeResponse response = getNextResponse(SaslPerformativeResponse.class);
+            actualFrameBody = (P) response.getFrameBody();
+        }
+        else if (FrameBody.class.isAssignableFrom(expectedFrameBodyClass))
+        {
+            PerformativeResponse response = getNextResponse(PerformativeResponse.class);
+            actualFrameBody = (P) response.getFrameBody();
+        }
+        else
+        {
+            throw new IllegalArgumentException(String.format("Unexpected class %s", expectedFrameBodyClass.getName()));
+        }
+
+        if (!expectedFrameBodyClass.isAssignableFrom(actualFrameBody.getClass()))
+        {
+            throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s",
+                                                          expectedFrameBodyClass.getName(),
+                                                          actualFrameBody.getClass().getName()));
+        }
+
+        return actualFrameBody;
+    }
+
     public void doProtocolNegotiation() throws Exception
     {
         byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
@@ -286,7 +355,6 @@ public class FrameTransport implements AutoCloseable
         Attach responseAttach = (Attach) response.getFrameBody();
         assertThat(responseAttach.getTarget(), is(notNullValue()));
 
-
         PerformativeResponse flowResponse = (PerformativeResponse) getNextResponse();
         assertThat(flowResponse, Is.is(CoreMatchers.notNullValue()));
         assertThat(flowResponse.getFrameBody(), Is.is(CoreMatchers.instanceOf(Flow.class)));
@@ -295,9 +363,16 @@ public class FrameTransport implements AutoCloseable
     public void assertNoMoreResponses() throws Exception
     {
         Response response = getNextResponse();
-        assertNull("Unexpected response.", response);
+        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+    }
+
+    public void assertNoMoreResponsesAndChannelClosed() throws Exception
+    {
+        assertNoMoreResponses();
+        assertThat(_channelClosedSeen, is(true));
     }
 
+
     private int getConnectionId()
     {
         if (_amqpConnectionId == 0)
@@ -310,4 +385,39 @@ public class FrameTransport implements AutoCloseable
         }
         return _amqpConnectionId;
     }
+
+    public void assertChannelClosed()
+    {
+        try
+        {
+            ChannelFuture channelFuture = _channel.write(new byte[]{0});
+            channelFuture.sync();
+            throw new IllegalStateException(
+                    "Expecting the channel to be already closed by, but it was able to take more input.");
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+        }
+        catch (Exception e)
+        {
+            if (e instanceof ClosedChannelException)
+            {
+                // PASS
+            }
+            else
+            {
+                throw new IllegalStateException("Unexpected exception", e);
+            }
+        }
+    }
+
+    private static class ChannelClosedResponse implements Response
+    {
+        @Override
+        public String toString()
+        {
+            return "ChannelClosed";
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
index 8468f5a..10fde24 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
@@ -56,12 +57,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class InputHandler extends ChannelInboundHandlerAdapter
 {
-    private enum ParsingState
-    {
-        HEADER,
-        PERFORMATIVES
-    };
-
     private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
     private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
                                                                                             .registerTransportLayer()
@@ -69,18 +64,27 @@ public class InputHandler extends ChannelInboundHandlerAdapter
                                                                                             .registerTransactionLayer()
                                                                                             .registerSecurityLayer()
                                                                                             .registerExtensionSoleconnLayer();
+
+    private enum ParsingState
+    {
+        HEADER,
+        PERFORMATIVES
+    };
+
+    private final MyConnectionHandler _connectionHandler;
     private final ValueHandler _valueHandler;
-    private final FrameHandler _frameHandler;
+    private final BlockingQueue<Response> _responseQueue;
 
     private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
-    private BlockingQueue<Response> _responseQueue;
-    private ParsingState _state = ParsingState.HEADER;
+    private volatile FrameHandler _frameHandler;
+    private volatile ParsingState _state = ParsingState.HEADER;
 
-    public InputHandler(final BlockingQueue<Response> queue)
+    public InputHandler(final BlockingQueue<Response> queue, final boolean isSasl)
     {
 
         _valueHandler = new ValueHandler(TYPE_REGISTRY);
-        _frameHandler = new FrameHandler(_valueHandler, new MyConnectionHandler(), false);
+        _connectionHandler = new MyConnectionHandler();
+        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
 
         _responseQueue = queue;
     }
@@ -140,12 +144,17 @@ public class InputHandler extends ChannelInboundHandlerAdapter
         }
     }
 
+    private void resetInputHandlerAfterSaslOutcome()
+    {
+        _state = ParsingState.HEADER;
+        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
+    }
+
     private class MyConnectionHandler implements ConnectionHandler
     {
         @Override
         public void receiveOpen(final int channel, final Open close)
         {
-            System.out.println();
         }
 
         @Override
@@ -211,7 +220,7 @@ public class InputHandler extends ChannelInboundHandlerAdapter
         @Override
         public void handleError(final Error parsingError)
         {
-
+            LOGGER.error("Unexpected error {}", parsingError);
         }
 
         @Override
@@ -230,16 +239,24 @@ public class InputHandler extends ChannelInboundHandlerAdapter
                 int channel = channelFrameBody.getChannel();
                 if (val instanceof FrameBody)
                 {
-                    response = new PerformativeResponse((short) channel, (FrameBody) val);
+                    FrameBody frameBody = (FrameBody) val;
+                    response = new PerformativeResponse((short) channel, frameBody);
                 }
                 else if (val instanceof SaslFrameBody)
                 {
-                    throw new UnsupportedOperationException("TODO: ");
+                    SaslFrameBody frameBody = (SaslFrameBody) val;
+                    response = new SaslPerformativeResponse((short) channel, frameBody);
+
+                    if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
+                    {
+                        resetInputHandlerAfterSaslOutcome();
+                    }
                 }
                 else
                 {
-                    throw new UnsupportedOperationException("Unexoected frame type : " + val.getClass());
+                    throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
                 }
+
                 _responseQueue.add(response);
             }
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
index 414ad90..dbd4cd6 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
@@ -28,11 +28,7 @@ import io.netty.channel.ChannelPromise;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
 import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.transport.ByteBufferSender;
 
@@ -50,7 +46,7 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
     {
 
-        if (msg instanceof TransportFrame)
+        if (msg instanceof AMQFrame)
         {
             FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
             {
@@ -86,7 +82,7 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
 
                 }
             });
-            _frameWriter.send((TransportFrame) msg);
+            _frameWriter.send(((AMQFrame) msg));
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
new file mode 100644
index 0000000..cd2da99
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+
+public class SaslPerformativeResponse implements Response
+{
+    private final short _channelId;
+    private final SaslFrameBody _frameBody;
+
+    public SaslPerformativeResponse(final short channelId,
+                                    final SaslFrameBody frameBody)
+    {
+        _channelId = channelId;
+        _frameBody = frameBody;
+    }
+
+    public SaslFrameBody getFrameBody()
+    {
+        return _frameBody;
+    }
+
+    public short getChannelId()
+    {
+        return _channelId;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SaslPerformativeResponse{" +
+               "_channelId=" + _channelId +
+               ", _frameBody=" + _frameBody +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
index 7a5e20d..1aaa210 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
+++ b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
@@ -27,6 +27,7 @@
   }, {
     "name" : "plain",
     "type" : "Plain",
+    "secureOnlyMechanisms" : [],
     "users" : [ {
       "name" : "admin",
       "type" : "managed",

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index 91d2c48..ed9c10f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -66,6 +66,26 @@ public class ProtocolHeaderTest extends ProtocolTestBase
             HeaderResponse response = (HeaderResponse) transport.getNextResponse();
             assertArrayEquals("Unexpected protocol header response", bytes, response.getHeader());
         }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.2",
+            description = " A client might request use of a protocol id that is unacceptable to a server. [...]"
+                          + "In this case, the server MUST send a protocol header with an acceptable protocol id"
+                          + "(and version) and then close the socket.")
+    public void unacceptableProtocolIdSent_SaslAcceptable() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            byte[] rawHeaderBytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+            byte[] expectedSaslHeaderBytes = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8);
+            transport.sendProtocolHeader(rawHeaderBytes);
+            HeaderResponse response = (HeaderResponse) transport.getNextResponse();
 
+            assertArrayEquals("Unexpected protocol header response", expectedSaslHeaderBytes, response.getHeader());
+
+            transport.assertNoMoreResponses();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
new file mode 100644
index 0000000..ede03a2
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.security.sasl;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import javax.xml.bind.DatatypeConverter;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.HeaderResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class SaslTest extends ProtocolTestBase
+{
+    private static final Symbol CRAM_MD5 = Symbol.getSymbol("CRAM-MD5");
+    private static final Symbol PLAIN = Symbol.getSymbol("PLAIN");
+
+    private static final byte[] SASL_AMQP_HEADER_BYTES = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] AMQP_HEADER_BYTES = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+
+    @Test
+    @SpecificationTest(section = "5.3.2",
+            description = "SASL Negotiation [...] challenge/response step occurs zero times")
+    public void saslSuccessfulAuthentication() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true))
+        {
+            transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+            HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+
+            assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class);
+            assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(PLAIN));
+
+
+            SaslInit saslInit = new SaslInit();
+            saslInit.setMechanism(PLAIN);
+            saslInit.setInitialResponse(new Binary("\0guest\0guest".getBytes(StandardCharsets.US_ASCII)));
+            transport.sendPerformative(saslInit);
+
+            SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class);
+            assertThat(saslOutcome.getCode(), equalTo(SaslCode.OK));
+
+            transport.sendProtocolHeader(AMQP_HEADER_BYTES);
+            HeaderResponse headerResponse = transport.getNextResponse(HeaderResponse.class);
+            assertThat(headerResponse.getHeader(), is(equalTo(AMQP_HEADER_BYTES)));
+
+            transport.assertNoMoreResponses();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "5.3.2",
+            description = "SASL Negotiation [...] challenge/response step occurs once")
+    public void saslSuccessfulAuthenticationWithChallengeResponse() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true))
+        {
+            transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+            HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+
+            assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class);
+            assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(CRAM_MD5));
+
+            SaslInit saslInit = new SaslInit();
+            saslInit.setMechanism(CRAM_MD5);
+            transport.sendPerformative(saslInit);
+
+            SaslChallenge challenge = transport.getNextPerformativeResponse(SaslChallenge.class);
+            assertThat(challenge.getChallenge(), is(notNullValue()));
+
+            byte[] response = generateCramMD5ClientResponse("guest", "guest", challenge.getChallenge().getArray());
+
+            SaslResponse saslResponse = new SaslResponse();
+            saslResponse.setResponse(new Binary(response));
+            transport.sendPerformative(saslResponse);
+
+            SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class);
+            assertThat(saslOutcome.getCode(), equalTo(SaslCode.OK));
+
+            transport.sendProtocolHeader(AMQP_HEADER_BYTES);
+            HeaderResponse headerResponse = transport.getNextResponse(HeaderResponse.class);
+            assertThat(headerResponse.getHeader(), is(equalTo(AMQP_HEADER_BYTES)));
+
+            transport.assertNoMoreResponses();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "5.3.2", description = "SASL Negotiation")
+    public void saslUnsuccessfulAuthentication() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true))
+        {
+            transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+            HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+
+            assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class);
+            assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(PLAIN));
+
+            SaslInit saslInit = new SaslInit();
+            saslInit.setMechanism(PLAIN);
+            saslInit.setInitialResponse(new Binary("\0guest\0badpassword".getBytes(StandardCharsets.US_ASCII)));
+            transport.sendPerformative(saslInit);
+
+            SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class);
+            assertThat(saslOutcome.getCode(), equalTo(SaslCode.AUTH));
+
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "5.3.2",
+            description = "The partner MUST then choose one of the supported mechanisms and initiate a sasl exchange."
+                          + "If the selected mechanism is not supported by the receiving peer, it MUST close the connection "
+                          + "with the authentication-failure close-code.")
+    public void unsupportedSaslMechanism() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true))
+        {
+            transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+            HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+
+            assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+            SaslInit saslInit = new SaslInit();
+            saslInit.setMechanism(Symbol.getSymbol("NOT-A-MECHANISM"));
+            transport.sendPerformative(saslInit);
+
+            SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class);
+            assertThat(saslOutcome.getCode(), equalTo(SaslCode.AUTH));
+            assertThat(saslOutcome.getAdditionalData(), is(nullValue()));
+
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "5.3.2", description = "SASL Negotiation")
+    public void authenticationBypassDisallowed() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true))
+        {
+            transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+            HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+            assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+            Open open = new Open();
+            open.setContainerId("testContainerId");
+            transport.sendPerformative(open);
+
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "5.3.2",
+            description = "The peer acting as the SASL server MUST announce supported authentication mechanisms using"
+                          + "the sasl-mechanisms frame.")
+    public void clientSendsSaslMechanisms() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true))
+        {
+            transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+            HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+            assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+            SaslMechanisms clientMechs = new SaslMechanisms();
+            clientMechs.setSaslServerMechanisms(new Symbol[] {Symbol.valueOf("CLIENT-MECH")});
+            transport.sendPerformative(clientMechs);
+
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "5.3.2", description = "SASL Negotiation")
+    public void clientSendsSaslChallenge() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true))
+        {
+            transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+            HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+            assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+            SaslChallenge saslChallenge = new SaslChallenge();
+            saslChallenge.setChallenge(new Binary(new byte[] {}));
+            transport.sendPerformative(saslChallenge);
+
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "5.3.2", description = "SASL Negotiation")
+    public void clientSendsSaslOutcome() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true))
+        {
+            transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
+            HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
+            assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            transport.getNextPerformativeResponse(SaslMechanisms.class);
+
+            SaslOutcome saslOutcome = new SaslOutcome();
+            saslOutcome.setCode(SaslCode.OK);
+            transport.sendPerformative(saslOutcome);
+
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    private static byte[] generateCramMD5ClientResponse(String userName, String userPassword, byte[] challengeBytes)
+            throws Exception
+    {
+        String macAlgorithm = "HmacMD5";
+        Mac mac = Mac.getInstance(macAlgorithm);
+        mac.init(new SecretKeySpec(userPassword.getBytes(StandardCharsets.UTF_8), macAlgorithm));
+        final byte[] messageAuthenticationCode = mac.doFinal(challengeBytes);
+        String responseAsString = userName + " " + DatatypeConverter.printHexBinary(messageAuthenticationCode)
+                                                                    .toLowerCase();
+        return responseAsString.getBytes();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org