You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/22 15:02:47 UTC

[1/2] qpid-broker-j git commit: QPID-8038: [Broker-J][AMQP 0-10] Add protocol tests for AMQP 0-10

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 612c2cb6c -> ff2980e2d


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
new file mode 100644
index 0000000..9a477dc
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -0,0 +1,260 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.Range;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class MessageTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    @SpecificationTest(section = "10.message.transfer",
+            description = "This command transfers a message between two peers.")
+    public void sendTransfer() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "test".getBytes(UTF_8);
+            SessionCompleted completed = interaction.openAnonymousConnection()
+                                                    .channelId(1)
+                                                    .attachSession(sessionName)
+                                                    .message()
+                                                    .transferDesitnation(BrokerAdmin.TEST_QUEUE_NAME)
+                                                    .transferId(0)
+                                                    .transfer()
+                                                    .session()
+                                                    .flushCompleted()
+                                                    .flush()
+                                                    .consumeResponse()
+                                                    .getLatestResponse(SessionCompleted.class);
+
+            assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+            int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(queueDepthMessages, is(equalTo(1)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "10.message.subscribe",
+            description = "This command asks the server to start a \"subscription\","
+                          + " which is a request for messages from a specific queue.")
+    public void subscribe() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "testSession".getBytes(UTF_8);
+            final String subscriberName = "testSubscriber";
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .message()
+                       .subscribeDestination(subscriberName)
+                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .subscribeId(0)
+                       .subscribe()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            SessionCompleted completed = consumeResponse(interaction,
+                                                         SessionCompleted.class,
+                                                         SessionCommandPoint.class,
+                                                         SessionConfirmed.class);
+
+            assertThat(completed.getCommands(), is(notNullValue()));
+            assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "10.message.transfer",
+            description = "The client may request a broker to transfer messages to it, from a particular queue,"
+                          + " by issuing a subscribe command. The subscribe command specifies the destination"
+                          + " that the broker should use for any resulting transfers.")
+    public void receiveTransfer() throws Exception
+    {
+        String testMessageBody = "testMessage";
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "testSession".getBytes(UTF_8);
+            final String subscriberName = "testSubscriber";
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .message()
+                       .subscribeDestination(subscriberName)
+                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .subscribeId(0)
+                       .subscribe()
+                       .message()
+                       .flowId(1)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.MESSAGE)
+                       .flowValue(1)
+                       .flow()
+                       .message()
+                       .flowId(2)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.BYTE)
+                       .flowValue(-1)
+                       .flow();
+
+            MessageTransfer transfer = consumeResponse(interaction,
+                                                       MessageTransfer.class,
+                                                       SessionCompleted.class,
+                                                       SessionCommandPoint.class,
+                                                       SessionConfirmed.class);
+
+            try (QpidByteBuffer buffer = transfer.getBody())
+            {
+                final byte[] dst = new byte[buffer.remaining()];
+                buffer.get(dst);
+                assertThat(new String(dst, UTF_8), is(equalTo(testMessageBody)));
+            }
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "10.message.accept",
+            description = "Accepts the message.")
+    public void acceptTransfer() throws Exception
+    {
+        String testMessageBody = "testMessage";
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "testSession".getBytes(UTF_8);
+            final String subscriberName = "testSubscriber";
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .message()
+                       .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
+                       .subscribeAcquireMode(MessageAcquireMode.PRE_ACQUIRED)
+                       .subscribeDestination(subscriberName)
+                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .subscribeId(0)
+                       .subscribe()
+                       .message()
+                       .flowId(1)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.MESSAGE)
+                       .flowValue(1)
+                       .flow()
+                       .message()
+                       .flowId(2)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.BYTE)
+                       .flowValue(-1)
+                       .flow();
+
+            MessageTransfer transfer = consumeResponse(interaction,
+                                                       MessageTransfer.class,
+                                                       SessionCompleted.class,
+                                                       SessionCommandPoint.class,
+                                                       SessionConfirmed.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+            RangeSet transfers = Range.newInstance(transfer.getId());
+            interaction.message().acceptId(3).acceptTransfers(transfers).accept()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            SessionCompleted completed = consumeResponse(interaction,
+                                                         SessionCompleted.class,
+                                                         SessionCommandPoint.class,
+                                                         SessionConfirmed.class,
+                                                         SessionFlush.class);
+
+            assertThat(completed.getCommands(), is(notNullValue()));
+            assertThat(completed.getCommands().includes(3), is(equalTo(true)));
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    private <T extends Method> T consumeResponse(final Interaction interaction,
+                                                 final Class<T> expected,
+                                                 final Class<? extends Method>... ignore)
+            throws Exception
+    {
+        List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore));
+        possibleResponses.add(expected);
+
+        T completed = null;
+        do
+        {
+            interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()]));
+            Response<?> response = interaction.getLatestResponse();
+            if (expected.isAssignableFrom(response.getBody().getClass()))
+            {
+                completed = (T) response.getBody();
+            }
+        }
+        while (completed == null);
+        return completed;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
new file mode 100644
index 0000000..51de2c2
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+
+import org.hamcrest.core.IsEqual;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetached;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class SessionTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "9.session.attach",
+            description = "Requests that the current transport be attached to the named session.")
+    public void attach() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+            SessionAttached sessionAttached = interaction.openAnonymousConnection()
+                                                         .channelId(1)
+                                                         .session()
+                                                         .attachName(sessionName)
+                                                         .attach()
+                                                         .consumeResponse()
+                                                         .getLatestResponse(SessionAttached.class);
+            assertThat(sessionAttached.getName(), IsEqual.equalTo(sessionName));
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "9.session.detach",
+            description = "Detaches the current transport from the named session.")
+    public void detach() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+            SessionDetached sessionDetached = interaction.openAnonymousConnection()
+                                                         .channelId(1)
+                                                         .session()
+                                                         .attachName(sessionName)
+                                                         .attach()
+                                                         .consumeResponse(SessionAttached.class)
+                                                         .session()
+                                                         .detachName(sessionName)
+                                                         .detach()
+                                                         .consumeResponse()
+                                                         .getLatestResponse(SessionDetached.class);
+
+            assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+        }
+    }
+
+
+    @Ignore("QPID-8047")
+    @Test
+    @SpecificationTest(section = "9.session",
+            description = "The transport MUST be attached in order to use any control other than"
+                          + " \"attach\", \"attached\", \"detach\", or \"detached\"."
+                          + " A peer receiving any other control on a detached transport MUST discard it and send a"
+                          + " session.detached with the \"not-attached\" reason code.")
+    public void detachUnknownSession() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+            SessionDetached sessionDetached = interaction.openAnonymousConnection()
+                                                         .channelId(1)
+                                                         .session()
+                                                         .detachName(sessionName)
+                                                         .detach()
+                                                         .consumeResponse()
+                                                         .getLatestResponse(SessionDetached.class);
+
+            assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+        }
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
new file mode 100644
index 0000000..337ada3
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class TransactionTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    @SpecificationTest(section = "10.tx.commit",
+            description = "This command commits all messages published and accepted in the current transaction.")
+    public void messageSendCommit() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "test".getBytes(UTF_8);
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .tx().selectId(0).select()
+                       .message()
+                       .transferDesitnation(BrokerAdmin.TEST_QUEUE_NAME)
+                       .transferId(1)
+                       .transfer()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            SessionCompleted completed;
+            do
+            {
+                completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
+            }
+            while (!completed.getCommands().includes(1));
+
+            int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(queueDepthMessages, is(equalTo(0)));
+
+            interaction.tx().commitId(2).commit()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
+            assertThat(completed.getCommands().includes(2), is(equalTo(true)));
+
+            queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(queueDepthMessages, is(equalTo(1)));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
new file mode 100644
index 0000000..f4ed0a2
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareBody;
+
+public class ExchangeInteraction
+{
+    private Interaction _interaction;
+    private String _decalreExchange = "amq.direct";
+    private String _declareType = "direct";
+    private boolean _declarePassive = true;
+    private boolean _declareDurable = true;
+    private boolean _declareAutoDelete = false;
+    private boolean _declareNoWait = false;
+    private Map<String, Object> _declareArguments = new HashMap<>();
+
+    public ExchangeInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+    }
+
+    public Interaction declare() throws Exception
+    {
+        return _interaction.sendPerformative(new ExchangeDeclareBody(0,
+                                                                     AMQShortString.valueOf(_decalreExchange),
+                                                                     AMQShortString.valueOf(_declareType),
+                                                                     _declarePassive,
+                                                                     _declareDurable,
+                                                                     _declareAutoDelete,
+                                                                     false,
+                                                                     _declareNoWait,
+                                                                     FieldTable.convertToFieldTable(_declareArguments)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
index 52cd7a0..0b5c4e4 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
@@ -26,18 +26,20 @@ import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
 import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
 
 
-public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
+public class FrameTransport extends AbstractFrameTransport<Interaction>
 {
     private final byte[] _protocolHeader;
     private ProtocolVersion _protocolVersion;
 
-    public FrameTransport(final InetSocketAddress brokerAddress)
+    FrameTransport(final InetSocketAddress brokerAddress)
     {
         this(brokerAddress, Protocol.AMQP_0_9_1);
     }
-    public FrameTransport(final InetSocketAddress brokerAddress, Protocol protocol)
+
+    FrameTransport(final InetSocketAddress brokerAddress, Protocol protocol)
     {
         super(brokerAddress, new FrameDecoder(getProtocolVersion(protocol)), new FrameEncoder());
         _protocolVersion = getProtocolVersion(protocol);
@@ -74,12 +76,12 @@ public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTranspor
         return _protocolHeader;
     }
 
-    public ProtocolVersion getProtocolVersion()
+    ProtocolVersion getProtocolVersion()
     {
         return _protocolVersion;
     }
 
-    public static ProtocolVersion getProtocolVersion(Protocol protocol)
+    private static ProtocolVersion getProtocolVersion(Protocol protocol)
     {
         final ProtocolVersion protocolVersion;
         switch (protocol)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index 35f4bf5..b990eae 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -26,8 +26,9 @@ import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.tests.protocol.AbstractInteraction;
 
-public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
+public class Interaction extends AbstractInteraction<Interaction>
 {
 
     private int _channelId;
@@ -36,6 +37,8 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
     private ChannelInteraction _channelInteraction;
     private QueueInteraction _queueInteraction;
     private BasicInteraction _basicInteraction;
+    private TxInteraction _txInteraction;
+    private ExchangeInteraction _exchangeInteraction;
 
     Interaction(final FrameTransport transport)
     {
@@ -44,6 +47,8 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
         _channelInteraction = new ChannelInteraction(this);
         _queueInteraction = new QueueInteraction(this);
         _basicInteraction = new BasicInteraction(this);
+        _txInteraction = new TxInteraction(this);
+        _exchangeInteraction = new ExchangeInteraction(this);
     }
 
     @Override
@@ -52,12 +57,6 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
         return getTransport().getProtocolHeader();
     }
 
-    @Override
-    protected Interaction getInteraction()
-    {
-        return this;
-    }
-
     public Interaction sendPerformative(final AMQBody amqBody) throws Exception
     {
         return sendPerformative(_channelId, amqBody);
@@ -120,4 +119,14 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
     {
         return _basicInteraction;
     }
+
+    public TxInteraction tx()
+    {
+        return _txInteraction;
+    }
+
+    public ExchangeInteraction exchange()
+    {
+        return _exchangeInteraction;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/TxInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/TxInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/TxInteraction.java
new file mode 100644
index 0000000..63a078a
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/TxInteraction.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.v0_8.transport.TxCommitBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectBody;
+
+public class TxInteraction
+{
+    private Interaction _interaction;
+
+    public TxInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+    }
+
+    public Interaction select() throws Exception
+    {
+        return _interaction.sendPerformative(TxSelectBody.INSTANCE);
+    }
+
+    public Interaction commit() throws Exception
+    {
+        return _interaction.sendPerformative(TxCommitBody.INSTANCE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
index b5cf60d..40d1918 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
@@ -58,7 +58,7 @@ public class ChannelTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "1.5.2.5", description = "request a channel close")
-    public void channelClose() throws Exception
+    public void noFrameCanBeSentOnClosedChannel() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
index b4909c6..fa4a692 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
@@ -31,8 +31,10 @@ import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -119,7 +121,8 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
                        .tuneOkFrameMax(1024)
                        .tuneOkHeartbeat(response.getHeartbeat())
                        .tuneOk()
-                       .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+                       .connection().open()
+                       .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
         }
     }
 
@@ -144,7 +147,8 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
                        .tuneOkFrameMax(Long.MAX_VALUE)
                        .tuneOkHeartbeat(response.getHeartbeat())
                        .tuneOk()
-                       .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+                       .connection().open()
+                       .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
         }
     }
 
@@ -180,4 +184,22 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
     }
 
 
+    @Test
+    @SpecificationTest(section = "9",
+            description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+    public void authenticationBypassAfterSendingStartOk() throws Exception
+    {
+        InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol()
+                       .consumeResponse(ConnectionStartBody.class)
+                       .connection().startOkMechanism("PLAIN").startOk().consumeResponse(ConnectionSecureBody.class)
+                       .connection().tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
new file mode 100644
index 0000000..df677e8
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxCommitOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class TransactionTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    @SpecificationTest(section = "1.9.2.3", description = "commit the current transaction")
+    public void publishMessage() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .tx().select().consumeResponse(TxSelectOkBody.class)
+                       .basic().contentHeaderPropertiesContentType("text/plain")
+                       .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
+                       .contentHeaderPropertiesDeliveryMode((byte)1)
+                       .contentHeaderPropertiesPriority((byte)1)
+                       .publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .content("Test")
+                       .publishMessage()
+                       .exchange().declare().consumeResponse(ExchangeDeclareOkBody.class);
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+
+            interaction.tx().commit().consumeResponse(TxCommitOkBody.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index dd59757..219c423 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -23,7 +23,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.net.InetSocketAddress;
 
-public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+
+public class FrameTransport extends AbstractFrameTransport<Interaction>
 {
     public FrameTransport(final InetSocketAddress brokerAddress)
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 4aad6ee..b2f8147 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -77,9 +77,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.AbstractInteraction;
 import org.apache.qpid.tests.protocol.Response;
 
-public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
+public class Interaction extends AbstractInteraction<Interaction>
 {
     private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Begin _begin;
@@ -177,12 +178,6 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
         return _protocolHeader;
     }
 
-    @Override
-    protected Interaction getInteraction()
-    {
-        return this;
-    }
-
     //////////
     // SASL //
     //////////

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
new file mode 100644
index 0000000..cad8415
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public abstract class AbstractFrameTransport<I extends AbstractInteraction<I>> implements AutoCloseable
+{
+    static final long RESPONSE_TIMEOUT =
+            Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
+    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
+
+    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
+    private final EventLoopGroup _workerGroup;
+    private final InetSocketAddress _brokerAddress;
+    private final InputHandler _inputHandler;
+    private final OutputHandler _outputHandler;
+
+    private volatile Channel _channel;
+    private volatile boolean _channelClosedSeen = false;
+
+    public AbstractFrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
+    {
+        _brokerAddress = brokerAddress;
+        _inputHandler = new InputHandler(_queue, inputDecoder);
+        _outputHandler = new OutputHandler(outputEncoder);
+        _workerGroup = new NioEventLoopGroup();
+    }
+
+    public InetSocketAddress getBrokerAddress()
+    {
+        return _brokerAddress;
+    }
+
+    public AbstractFrameTransport<I> connect()
+    {
+        try
+        {
+            Bootstrap b = new Bootstrap();
+            b.group(_workerGroup);
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.SO_KEEPALIVE, true);
+            b.handler(new ChannelInitializer<SocketChannel>()
+            {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception
+                {
+                    ChannelPipeline pipeline = ch.pipeline();
+                    buildInputOutputPipeline(pipeline);
+                }
+            });
+
+            _channel = b.connect(_brokerAddress).sync().channel();
+            _channel.closeFuture().addListener(future ->
+                                               {
+                                                   _channelClosedSeen = true;
+                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
+                                               });
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return this;
+    }
+
+    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+    {
+        pipeline.addLast(_inputHandler).addLast(_outputHandler);
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+        try
+        {
+            if (_channel != null)
+            {
+                _channel.disconnect().sync();
+                _channel.close().sync();
+                _channel = null;
+            }
+        }
+        finally
+        {
+            _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
+        }
+    }
+
+    ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+    {
+        Preconditions.checkState(_channel != null, "Not connected");
+        ChannelPromise promise = _channel.newPromise();
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+        buffer.writeBytes(bytes);
+        _channel.write(buffer, promise);
+        return JdkFutureAdapters.listenInPoolThread(promise);
+    }
+
+    public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
+    {
+        Preconditions.checkState(_channel != null, "Not connected");
+        ChannelPromise promise = _channel.newPromise();
+        _channel.write(data, promise);
+        return JdkFutureAdapters.listenInPoolThread(promise);
+    }
+
+    <T extends Response<?>> T getNextResponse() throws Exception
+    {
+        return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
+    public void assertNoMoreResponses() throws Exception
+    {
+        Response response = getNextResponse();
+        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+    }
+
+    public void assertNoMoreResponsesAndChannelClosed() throws Exception
+    {
+        assertNoMoreResponses();
+        assertThat(_channelClosedSeen, is(true));
+    }
+
+    public void flush()
+    {
+        _channel.flush();
+    }
+
+    public abstract byte[] getProtocolHeader();
+
+    public abstract I newInteraction();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
new file mode 100644
index 0000000..4b41ca9
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class AbstractInteraction<I extends AbstractInteraction<I>>
+{
+    private final AbstractFrameTransport<I> _transport;
+    private ListenableFuture<?> _latestFuture;
+    private Response<?> _latestResponse;
+
+    public AbstractInteraction(final AbstractFrameTransport<I> frameTransport)
+    {
+        _transport = frameTransport;
+    }
+
+    public I consumeResponse(final Class<?>... responseTypes) throws Exception
+    {
+        sync();
+        _latestResponse = getNextResponse();
+        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+        {
+            return getInteraction();
+        }
+        acceptableResponseClasses.remove(null);
+        if (_latestResponse != null)
+        {
+            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+            {
+                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+                {
+                    return getInteraction();
+                }
+            }
+        }
+        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+                                                      acceptableResponseClasses,
+                                                      _latestResponse == null ? null : _latestResponse.getBody()));
+    }
+
+    protected Response<?> getNextResponse() throws Exception
+    {
+        return _transport.getNextResponse();
+    }
+
+    public I sync() throws InterruptedException, ExecutionException, TimeoutException
+    {
+        _transport.flush();
+        if (_latestFuture != null)
+        {
+            _latestFuture.get(AbstractFrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+            _latestFuture = null;
+        }
+        return getInteraction();
+    }
+
+    public Response<?> getLatestResponse() throws Exception
+    {
+        return _latestResponse;
+    }
+
+    public <T> T getLatestResponse(Class<T> type) throws Exception
+    {
+        if (_latestResponse.getBody() == null)
+        {
+            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+                                                          type.getSimpleName(),
+                                                          _latestResponse.getClass()));
+        }
+
+        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+        {
+            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+                                                          type.getSimpleName(),
+                                                          _latestResponse.getBody()));
+        }
+
+        return (T) _latestResponse.getBody();
+    }
+
+    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody) throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return future;
+    }
+
+    public I negotiateProtocol() throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return getInteraction();
+    }
+
+    protected AbstractFrameTransport getTransport()
+    {
+        return _transport;
+    }
+
+    protected abstract byte[] getProtocolHeader();
+
+    private I getInteraction()
+    {
+        return (I) this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/ChannelClosedResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/ChannelClosedResponse.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/ChannelClosedResponse.java
new file mode 100644
index 0000000..b701023
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/ChannelClosedResponse.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+public class ChannelClosedResponse implements Response<ChannelClosedResponse>
+{
+    @Override
+    public String toString()
+    {
+        return "ChannelClosed";
+    }
+
+    @Override
+    public ChannelClosedResponse getBody()
+    {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
deleted file mode 100644
index 28dc02e..0000000
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-public abstract class FrameTransport implements AutoCloseable
-{
-    public static final long RESPONSE_TIMEOUT =
-            Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
-    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
-
-    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
-    private final EventLoopGroup _workerGroup;
-    private final InetSocketAddress _brokerAddress;
-    private final InputHandler _inputHandler;
-    private final OutputHandler _outputHandler;
-
-    private volatile Channel _channel;
-    private volatile boolean _channelClosedSeen = false;
-
-    public FrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
-    {
-        _brokerAddress = brokerAddress;
-        _inputHandler = new InputHandler(_queue, inputDecoder);
-        _outputHandler = new OutputHandler(outputEncoder);
-        _workerGroup = new NioEventLoopGroup();
-    }
-
-    public InetSocketAddress getBrokerAddress()
-    {
-        return _brokerAddress;
-    }
-
-    public FrameTransport connect()
-    {
-        try
-        {
-            Bootstrap b = new Bootstrap();
-            b.group(_workerGroup);
-            b.channel(NioSocketChannel.class);
-            b.option(ChannelOption.SO_KEEPALIVE, true);
-            b.handler(new ChannelInitializer<SocketChannel>()
-            {
-                @Override
-                public void initChannel(SocketChannel ch) throws Exception
-                {
-                    ChannelPipeline pipeline = ch.pipeline();
-                    buildInputOutputPipeline(pipeline);
-                }
-            });
-
-            _channel = b.connect(_brokerAddress).sync().channel();
-            _channel.closeFuture().addListener(future ->
-                                               {
-                                                   _channelClosedSeen = true;
-                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
-                                               });
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return this;
-    }
-
-    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
-    {
-        pipeline.addLast(_inputHandler).addLast(_outputHandler);
-    }
-
-    @Override
-    public void close() throws Exception
-    {
-        try
-        {
-            if (_channel != null)
-            {
-                _channel.disconnect().sync();
-                _channel.close().sync();
-                _channel = null;
-            }
-        }
-        finally
-        {
-            _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
-        }
-    }
-
-    public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
-    {
-        Preconditions.checkState(_channel != null, "Not connected");
-        ChannelPromise promise = _channel.newPromise();
-        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-        buffer.writeBytes(bytes);
-        _channel.write(buffer, promise);
-        return JdkFutureAdapters.listenInPoolThread(promise);
-    }
-
-    public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
-    {
-        Preconditions.checkState(_channel != null, "Not connected");
-        ChannelPromise promise = _channel.newPromise();
-        _channel.write(data, promise);
-        return JdkFutureAdapters.listenInPoolThread(promise);
-    }
-
-    public <T extends Response<?>> T getNextResponse() throws Exception
-    {
-        return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
-    }
-
-    public void assertNoMoreResponses() throws Exception
-    {
-        Response response = getNextResponse();
-        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
-    }
-
-    public void assertNoMoreResponsesAndChannelClosed() throws Exception
-    {
-        assertNoMoreResponses();
-        assertThat(_channelClosedSeen, is(true));
-    }
-
-    public void flush()
-    {
-        _channel.flush();
-    }
-
-    private static class ChannelClosedResponse implements Response<Void>
-    {
-        @Override
-        public String toString()
-        {
-            return "ChannelClosed";
-        }
-
-        @Override
-        public Void getBody()
-        {
-            return null;
-        }
-    }
-
-    public abstract byte[] getProtocolHeader();
-
-    protected abstract Interaction newInteraction();
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
deleted file mode 100644
index b6e631d..0000000
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tests.protocol;
-
-import static com.google.common.util.concurrent.Futures.allAsList;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-public abstract class Interaction<I extends Interaction>
-{
-    private final FrameTransport _transport;
-    private ListenableFuture<?> _latestFuture;
-    private Response<?> _latestResponse;
-
-    public Interaction(final FrameTransport frameTransport)
-    {
-        _transport = frameTransport;
-    }
-
-    public I consumeResponse(final Class<?>... responseTypes) throws Exception
-    {
-        sync();
-        _latestResponse = getNextResponse();
-        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
-        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
-            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
-        {
-            return getInteraction();
-        }
-        acceptableResponseClasses.remove(null);
-        if (_latestResponse != null)
-        {
-            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
-            {
-                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
-                {
-                    return getInteraction();
-                }
-            }
-        }
-        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
-                                                      acceptableResponseClasses,
-                                                      _latestResponse == null ? null : _latestResponse.getBody()));
-    }
-
-    protected Response<?> getNextResponse() throws Exception
-    {
-        return _transport.getNextResponse();
-    }
-
-    public I sync() throws InterruptedException, ExecutionException, TimeoutException
-    {
-        _transport.flush();
-        if (_latestFuture != null)
-        {
-            _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
-            _latestFuture = null;
-        }
-        return getInteraction();
-    }
-
-    public Response<?> getLatestResponse() throws Exception
-    {
-        return _latestResponse;
-    }
-
-    public <T> T getLatestResponse(Class<T> type) throws Exception
-    {
-        if (_latestResponse.getBody() == null)
-        {
-            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
-                                                          type.getSimpleName(),
-                                                          _latestResponse.getClass()));
-        }
-
-        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
-        {
-            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
-                                                          type.getSimpleName(),
-                                                          _latestResponse.getBody()));
-        }
-
-        return (T) _latestResponse.getBody();
-    }
-
-    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody) throws Exception
-    {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
-        return future;
-    }
-
-    public I negotiateProtocol() throws Exception
-    {
-        final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
-        return getInteraction();
-    }
-
-    protected FrameTransport getTransport()
-    {
-        return _transport;
-    }
-
-    protected abstract byte[] getProtocolHeader();
-
-    protected abstract I getInteraction();
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-8038: [Broker-J][AMQP 0-10] Add protocol tests for AMQP 0-10

Posted by or...@apache.org.
QPID-8038: [Broker-J][AMQP 0-10] Add protocol tests for AMQP 0-10


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ff2980e2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ff2980e2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ff2980e2

Branch: refs/heads/master
Commit: ff2980e2d6e9520ba204acd41a78e9ee412a2c11
Parents: 612c2cb
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Nov 21 17:16:42 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 22 15:02:23 2017 +0000

----------------------------------------------------------------------
 pom.xml                                         |  47 ++--
 systests/protocol-tests-amqp-0-10/pom.xml       | 109 +++++++
 .../qpid/tests/protocol/v0_10/Assembler.java    | 264 +++++++++++++++++
 .../protocol/v0_10/ConnectionInteraction.java   |  83 ++++++
 .../qpid/tests/protocol/v0_10/Disassembler.java | 281 +++++++++++++++++++
 .../tests/protocol/v0_10/ErrorResponse.java     |  40 +++
 .../protocol/v0_10/ExecutionInteraction.java    |  47 ++++
 .../qpid/tests/protocol/v0_10/FrameDecoder.java | 190 +++++++++++++
 .../qpid/tests/protocol/v0_10/FrameEncoder.java |  87 ++++++
 .../tests/protocol/v0_10/FrameTransport.java    |  58 ++++
 .../qpid/tests/protocol/v0_10/Interaction.java  | 138 +++++++++
 .../protocol/v0_10/MessageInteraction.java      | 147 ++++++++++
 .../protocol/v0_10/PerformativeResponse.java    |  48 ++++
 .../protocol/v0_10/ProtocolEventReceiver.java   |  67 +++++
 .../protocol/v0_10/SessionInteraction.java      |  89 ++++++
 .../tests/protocol/v0_10/TxInteraction.java     |  60 ++++
 .../resources/config-protocol-tests-0-10.json   |  78 +++++
 .../tests/protocol/v0_10/ConnectionTest.java    | 214 ++++++++++++++
 .../qpid/tests/protocol/v0_10/MessageTest.java  | 260 +++++++++++++++++
 .../qpid/tests/protocol/v0_10/SessionTest.java  | 123 ++++++++
 .../tests/protocol/v0_10/TransactionTest.java   |  92 ++++++
 .../protocol/v0_8/ExchangeInteraction.java      |  58 ++++
 .../tests/protocol/v0_8/FrameTransport.java     |  12 +-
 .../qpid/tests/protocol/v0_8/Interaction.java   |  23 +-
 .../qpid/tests/protocol/v0_8/TxInteraction.java |  44 +++
 .../qpid/tests/protocol/v0_8/ChannelTest.java   |   2 +-
 .../tests/protocol/v0_8/ConnectionTest.java     |  26 +-
 .../tests/protocol/v0_8/TransactionTest.java    |  79 ++++++
 .../tests/protocol/v1_0/FrameTransport.java     |   4 +-
 .../qpid/tests/protocol/v1_0/Interaction.java   |   9 +-
 .../tests/protocol/AbstractFrameTransport.java  | 175 ++++++++++++
 .../tests/protocol/AbstractInteraction.java     | 150 ++++++++++
 .../tests/protocol/ChannelClosedResponse.java   |  36 +++
 .../qpid/tests/protocol/FrameTransport.java     | 190 -------------
 .../apache/qpid/tests/protocol/Interaction.java | 147 ----------
 35 files changed, 3097 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e7d58bb..0c0c445 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,7 @@
     <module>systests/qpid-systests-jms_2.0</module>
     <module>systests/protocol-tests-core</module>
     <module>systests/protocol-tests-amqp-0-8</module>
+    <module>systests/protocol-tests-amqp-0-10</module>
     <module>systests/protocol-tests-amqp-1-0</module>
     <module>systests/end-to-end-conversion-tests</module>
     <module>perftests</module>
@@ -427,6 +428,12 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-amqp-0-10</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <!-- External dependencies -->
       <dependency>
         <groupId>org.apache.qpid</groupId>
@@ -1332,26 +1339,26 @@
       </properties>
     </profile>
 
-      <profile>
-          <id>java-json.0-10</id>
-          <activation>
-              <property>
-                  <name>profile</name>
-                  <value>java-json.0-10</value>
-              </property>
-          </activation>
-          <properties>
-              <profile>java-json.0-10</profile>
-              <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes>
-              <profile.broker.version>v0_10</profile.broker.version>
-              <profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
-              <profile.broker.persistent>true</profile.broker.persistent>
-              <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
-              <profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
-          </properties>
-      </profile>
-
-      <profile>
+    <profile>
+      <id>java-json.0-10</id>
+      <activation>
+        <property>
+          <name>profile</name>
+          <value>java-json.0-10</value>
+        </property>
+      </activation>
+      <properties>
+        <profile>java-json.0-10</profile>
+        <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes>
+        <profile.broker.version>v0_10</profile.broker.version>
+        <profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
+        <profile.broker.persistent>true</profile.broker.persistent>
+        <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
+        <profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+      </properties>
+    </profile>
+
+    <profile>
       <id>cpp</id>
       <activation>
         <property>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/pom.xml b/systests/protocol-tests-amqp-0-10/pom.xml
new file mode 100644
index 0000000..3acf129
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-systests-parent</artifactId>
+        <version>7.1.0-SNAPSHOT</version>
+        <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>protocol-tests-amqp-0-10</artifactId>
+    <name>Apache Qpid Protocol Tests for AMQP 0-10</name>
+    <description>Tests for AMQP 0-10</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-test-utils</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>protocol-tests-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-systests-utils</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-logging-logback</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-memory-store</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-derby-store</artifactId>
+            <optional>true</optional>
+            <scope>test</scope>
+        </dependency>
+
+       <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-bdbstore</artifactId>
+            <scope>test</scope>
+       </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-integration</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemPropertyVariables>
+                        <qpid.initialConfigurationLocation>classpath:config-protocol-tests-0-10.json</qpid.initialConfigurationLocation>
+                    </systemPropertyVariables>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
new file mode 100644
index 0000000..aac39b6
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.NetworkDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.NetworkEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+
+public class Assembler implements NetworkDelegate
+{
+
+    private static final int ARRAY_SIZE = 0xFF;
+    private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+    private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>();
+
+    private final ProtocolEventReceiver receiver;
+    private final Map<Integer, List<Frame>> segments;
+    private static final ThreadLocal<BBDecoder> _decoder = ThreadLocal.withInitial(BBDecoder::new);
+
+    Assembler(ProtocolEventReceiver receiver)
+    {
+        this.receiver = receiver;
+        segments = new HashMap<>();
+    }
+
+    private int segmentKey(Frame frame)
+    {
+        return (frame.getTrack() + 1) * frame.getChannel();
+    }
+
+    private List<Frame> getSegment(Frame frame)
+    {
+        return segments.get(segmentKey(frame));
+    }
+
+    private void setSegment(Frame frame, List<Frame> segment)
+    {
+        int key = segmentKey(frame);
+        if (segments.containsKey(key))
+        {
+            error(new ProtocolError(Frame.L2, "segment in progress: %s",
+                                    frame));
+        }
+        segments.put(segmentKey(frame), segment);
+    }
+
+    private void clearSegment(Frame frame)
+    {
+        segments.remove(segmentKey(frame));
+    }
+
+    private void emit(int channel, ProtocolEvent event)
+    {
+        event.setChannel(channel);
+        receiver.received(event);
+    }
+
+    void received(NetworkEvent event)
+    {
+        event.delegate(this);
+    }
+
+    public void init(ProtocolHeader header)
+    {
+        emit(0, header);
+    }
+
+    public void error(ProtocolError error)
+    {
+        emit(0, error);
+    }
+
+    public void frame(Frame frame)
+    {
+        ByteBuffer segment;
+        if (frame.isFirstFrame() && frame.isLastFrame())
+        {
+            segment = frame.getBody();
+            assemble(frame, segment);
+        }
+        else
+        {
+            List<Frame> frames;
+            if (frame.isFirstFrame())
+            {
+                frames = new ArrayList<>();
+                setSegment(frame, frames);
+            }
+            else
+            {
+                frames = getSegment(frame);
+            }
+
+            frames.add(frame);
+
+            if (frame.isLastFrame())
+            {
+                clearSegment(frame);
+
+                int size = 0;
+                for (Frame f : frames)
+                {
+                    size += f.getSize();
+                }
+                segment = allocateByteBuffer(size);
+                for (Frame f : frames)
+                {
+                    segment.put(f.getBody());
+                }
+                segment.flip();
+                assemble(frame, segment);
+            }
+        }
+    }
+
+    private ByteBuffer allocateByteBuffer(final int size)
+    {
+        return ByteBuffer.allocate(size);
+    }
+
+    private void assemble(Frame frame, ByteBuffer segment)
+    {
+        BBDecoder dec = _decoder.get();
+        dec.init(segment);
+
+        int channel = frame.getChannel();
+        Method command;
+
+        switch (frame.getType())
+        {
+            case CONTROL:
+                int controlType = dec.readUint16();
+                Method control = Method.create(controlType);
+                control.read(dec);
+                emit(channel, control);
+                break;
+            case COMMAND:
+                int commandType = dec.readUint16();
+                // read in the session header, right now we don't use it
+                int hdr = dec.readUint16();
+                command = Method.create(commandType);
+                command.setSync((0x0001 & hdr) != 0);
+                command.read(dec);
+                if (command.hasPayload() && !frame.isLastSegment())
+                {
+                    setIncompleteCommand(channel, command);
+                }
+                else
+                {
+                    emit(channel, command);
+                }
+                break;
+            case HEADER:
+                command = getIncompleteCommand(channel);
+                List<Struct> structs = null;
+                DeliveryProperties deliveryProps = null;
+                MessageProperties messageProps = null;
+
+                while (dec.hasRemaining())
+                {
+                    Struct struct = dec.readStruct32();
+                    if (struct instanceof DeliveryProperties && deliveryProps == null)
+                    {
+                        deliveryProps = (DeliveryProperties) struct;
+                    }
+                    else if (struct instanceof MessageProperties && messageProps == null)
+                    {
+                        messageProps = (MessageProperties) struct;
+                    }
+                    else
+                    {
+                        if (structs == null)
+                        {
+                            structs = new ArrayList<>(2);
+                        }
+                        structs.add(struct);
+                    }
+                }
+                command.setHeader(new Header(deliveryProps, messageProps, structs));
+
+                if (frame.isLastSegment())
+                {
+                    setIncompleteCommand(channel, null);
+                    emit(channel, command);
+                }
+                break;
+            case BODY:
+                command = getIncompleteCommand(channel);
+                command.setBody(QpidByteBuffer.wrap(segment));
+                setIncompleteCommand(channel, null);
+                emit(channel, command);
+                break;
+            default:
+                throw new IllegalStateException("unknown frame type: " + frame.getType());
+        }
+
+        dec.releaseBuffer();
+    }
+
+    private void setIncompleteCommand(int channelId, Method incomplete)
+    {
+        if ((channelId & ARRAY_SIZE) == channelId)
+        {
+            _incompleteMethodArray[channelId] = incomplete;
+        }
+        else
+        {
+            if (incomplete != null)
+            {
+                _incompleteMethodMap.put(channelId, incomplete);
+            }
+            else
+            {
+                _incompleteMethodMap.remove(channelId);
+            }
+        }
+    }
+
+    private Method getIncompleteCommand(int channelId)
+    {
+        if ((channelId & ARRAY_SIZE) == channelId)
+        {
+            return _incompleteMethodArray[channelId];
+        }
+        else
+        {
+            return _incompleteMethodMap.get(channelId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
new file mode 100644
index 0000000..d7b54b0
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk;
+
+public class ConnectionInteraction
+{
+    public static final String SASL_MECHANISM_ANONYMOUS = "ANONYMOUS";
+    public static final String SASL_MECHANISM_PLAIN = "PLAIN";
+
+    private final Interaction _interaction;
+    private ConnectionStartOk _startOk;
+    private ConnectionTuneOk _tuneOk;
+    private ConnectionOpen _open;
+
+    public ConnectionInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _startOk = new ConnectionStartOk();
+        _tuneOk = new ConnectionTuneOk();
+        _open = new ConnectionOpen();
+    }
+
+    public Interaction startOk() throws Exception
+    {
+        return _interaction.sendPerformative(_startOk);
+    }
+
+    public ConnectionInteraction startOkMechanism(final String mechanism)
+    {
+        _startOk.setMechanism(mechanism);
+        return this;
+    }
+
+    public Interaction tuneOk() throws Exception
+    {
+        return _interaction.sendPerformative(_tuneOk);
+    }
+
+    public Interaction open() throws Exception
+    {
+        return _interaction.sendPerformative(_open);
+    }
+
+    public ConnectionInteraction tuneOkChannelMax(final int channelMax)
+    {
+        _tuneOk.setChannelMax(channelMax);
+        return this;
+    }
+
+    public ConnectionInteraction tuneOkMaxFrameSize(final int maxFrameSize)
+    {
+        _tuneOk.setMaxFrameSize(maxFrameSize);
+        return this;
+    }
+
+    public ConnectionInteraction startOkResponse(final byte[] response)
+    {
+        _startOk.setResponse(response);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
new file mode 100644
index 0000000..e60049e
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.lang.Math.min;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_FRAME;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_SEG;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.HEADER_SIZE;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_FRAME;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.FrameSizeObserver;
+import org.apache.qpid.server.protocol.v0_10.ProtocolEventSender;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
+import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+import org.apache.qpid.server.transport.ByteBufferSender;
+
+/**
+ * Disassembler
+ */
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(Disassembler.class);
+    private final ByteBufferSender _sender;
+    private final Object _sendlock = new Object();
+    private volatile int _maxPayload;
+    private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+    {
+        public BBEncoder initialValue()
+        {
+            return new BBEncoder(4 * 1024);
+        }
+    };
+
+    public Disassembler(ByteBufferSender sender, int maxFrame)
+    {
+        _sender = sender;
+        if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+        {
+            throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+        }
+        _maxPayload = maxFrame - HEADER_SIZE;
+    }
+
+    public void send(ProtocolEvent event)
+    {
+        event.delegate(null, this);
+    }
+
+    public void flush()
+    {
+        synchronized (_sendlock)
+        {
+            _sender.flush();
+        }
+    }
+
+    public void close()
+    {
+        synchronized (_sendlock)
+        {
+            _sender.close();
+        }
+    }
+
+    public void init(Void v, ProtocolHeader header)
+    {
+        synchronized (_sendlock)
+        {
+            _sender.send(header.toByteBuffer());
+            _sender.flush();
+        }
+    }
+
+    public void control(Void v, Method method)
+    {
+        method(method, SegmentType.CONTROL);
+    }
+
+    public void command(Void v, Method method)
+    {
+        method(method, SegmentType.COMMAND);
+    }
+
+    private void method(Method method, SegmentType type)
+    {
+        BBEncoder enc = _encoder.get();
+        enc.init();
+        enc.writeUint16(method.getEncodedType());
+        if (type == SegmentType.COMMAND)
+        {
+            if (method.isSync())
+            {
+                enc.writeUint16(0x0101);
+            }
+            else
+            {
+                enc.writeUint16(0x0100);
+            }
+        }
+        method.write(enc);
+        int methodLimit = enc.position();
+
+        byte flags = FIRST_SEG;
+
+        boolean payload = method.hasPayload();
+        if (!payload)
+        {
+            flags |= LAST_SEG;
+        }
+
+        int headerLimit = -1;
+        if (payload)
+        {
+            final Header hdr = method.getHeader();
+            if (hdr != null)
+            {
+                if(hdr.getDeliveryProperties() != null)
+                {
+                    enc.writeStruct32(hdr.getDeliveryProperties());
+                }
+                if(hdr.getMessageProperties() != null)
+                {
+                    enc.writeStruct32(hdr.getMessageProperties());
+                }
+                if(hdr.getNonStandardProperties() != null)
+                {
+                    for (Struct st : hdr.getNonStandardProperties())
+                    {
+                        enc.writeStruct32(st);
+                    }
+                }
+            }
+            headerLimit = enc.position();
+        }
+
+        synchronized (_sendlock)
+        {
+            ByteBuffer buf = enc.underlyingBuffer();
+            buf.flip();
+            ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+            copy.put(buf.duplicate());
+            copy.flip();
+
+            final ByteBuffer methodBuf = view(copy,0, methodLimit);
+            fragment(flags, type, method, methodBuf);
+            if (payload)
+            {
+                QpidByteBuffer qpidByteBuffer = method.getBody();
+                ByteBuffer body = null;
+                if (qpidByteBuffer != null)
+                {
+                    body = ByteBuffer.allocate(qpidByteBuffer.remaining());
+                    qpidByteBuffer.copyTo(body);
+                }
+                ByteBuffer headerBuf = view(copy, methodLimit, headerLimit);
+                fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerBuf);
+                if (body != null)
+                {
+                    fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
+                }
+            }
+        }
+    }
+
+    private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buffer)
+    {
+        byte typeb = (byte) type.getValue();
+        byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+        int remaining = buffer.remaining();
+        boolean first = true;
+        while (true)
+        {
+            int size = min(_maxPayload, remaining);
+            remaining -= size;
+
+            byte newflags = flags;
+            if (first)
+            {
+                newflags |= FIRST_FRAME;
+                first = false;
+            }
+            if (remaining == 0)
+            {
+                newflags |= LAST_FRAME;
+            }
+
+            frame(newflags, typeb, track, event.getChannel(), size, buffer);
+
+            if (remaining == 0)
+            {
+                break;
+            }
+        }
+    }
+
+    private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buffer)
+    {
+        ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE);
+
+        data.put(0, flags);
+        data.put(1, type);
+        data.putShort(2, (short) (size + HEADER_SIZE));
+        data.put(4, (byte) 0);
+        data.put(5, track);
+        data.putShort(6, (short) channel);
+
+        try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(data))
+        {
+            _sender.send(qpidByteBuffer);
+        }
+
+        if(size > 0)
+        {
+            final ByteBuffer view = view(buffer, 0, size);
+            try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(view))
+            {
+                _sender.send(qpidByteBuffer);
+            }
+            buffer.position(buffer.position() + size);
+        }
+    }
+
+    public void error(Void v, ProtocolError error)
+    {
+        throw new IllegalArgumentException(String.valueOf(error));
+    }
+
+    @Override
+    public void setMaxFrameSize(final int maxFrame)
+    {
+        if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+        {
+            throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+        }
+        _maxPayload = maxFrame - HEADER_SIZE;
+
+    }
+
+    private static ByteBuffer view(ByteBuffer buffer, int offset, int length)
+    {
+        ByteBuffer view = buffer.slice();
+        view.position(offset);
+        int newLimit = Math.min(view.position() + length, view.capacity());
+        view.limit(newLimit);
+        return view.slice();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
new file mode 100644
index 0000000..fa79489
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.tests.protocol.Response;
+
+public class ErrorResponse implements Response<ProtocolError>
+{
+    private final ProtocolError _error;
+
+    public ErrorResponse(final ProtocolError protocolError)
+    {
+        _error = protocolError;
+    }
+
+    @Override
+    public ProtocolError getBody()
+    {
+        return _error;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
new file mode 100644
index 0000000..2e6817b
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionSync;
+
+public class ExecutionInteraction
+{
+    private final Interaction _interaction;
+    private final ExecutionSync _sync;
+
+    public ExecutionInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _sync = new ExecutionSync();
+    }
+
+    public ExecutionInteraction syncId(final int id)
+    {
+        _sync.setId(id);
+        return this;
+    }
+
+    public Interaction sync() throws Exception
+    {
+        _interaction.sendPerformative(_sync);
+        return _interaction;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
new file mode 100644
index 0000000..fff894b
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.apache.qpid.server.transport.util.Functions.str;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collection;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+
+    private final ProtocolEventReceiver _receiver;
+
+    public enum State
+    {
+        PROTO_HDR,
+        FRAME_HDR,
+        FRAME_BODY,
+        ERROR
+    }
+
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+    private final Assembler _assembler;
+
+    private int _maxFrameSize = 4096;
+    private State _state;
+    private ByteBuffer input = null;
+    private int _needed;
+
+    private byte _flags;
+    private SegmentType _type;
+    private byte _track;
+    private int _channel;
+
+    FrameDecoder(final byte[] headerBytes)
+    {
+        _receiver = new ProtocolEventReceiver(headerBytes);
+        this._assembler = new Assembler(_receiver);
+        this._state = State.PROTO_HDR;
+        _needed = 8;
+
+    }
+
+    @Override
+    public Collection<Response<?>> decode(final ByteBuffer buf) throws Exception
+    {
+        int limit = buf.limit();
+        int remaining = buf.remaining();
+        while (remaining > 0)
+        {
+            if (remaining >= _needed)
+            {
+                int consumed = _needed;
+                int pos = buf.position();
+                if (input == null)
+                {
+                    buf.limit(pos + _needed);
+                    input = buf;
+                    _state = next(pos);
+                    buf.limit(limit);
+                    buf.position(pos + consumed);
+                }
+                else
+                {
+                    buf.limit(pos + _needed);
+                    input.put(buf);
+                    buf.limit(limit);
+                    input.flip();
+                    _state = next(0);
+                }
+
+                remaining -= consumed;
+                input = null;
+            }
+            else
+            {
+                if (input == null)
+                {
+                    input = ByteBuffer.allocate(_needed);
+                }
+                input.put(buf);
+                _needed -= remaining;
+                remaining = 0;
+            }
+        }
+        return _receiver.getReceivedEvents();
+    }
+
+    private State next(int pos)
+    {
+        input.order(ByteOrder.BIG_ENDIAN);
+
+        switch (_state) {
+            case PROTO_HDR:
+                if (input.get(pos) != 'A' &&
+                    input.get(pos + 1) != 'M' &&
+                    input.get(pos + 2) != 'Q' &&
+                    input.get(pos + 3) != 'P')
+                {
+                    error("bad protocol header: %s", str(input));
+                    return State.ERROR;
+                }
+
+                byte protoClass = input.get(pos + 4);
+                byte instance = input.get(pos + 5);
+                byte major = input.get(pos + 6);
+                byte minor = input.get(pos + 7);
+                _assembler.received(new ProtocolHeader(protoClass, instance, major, minor));
+                _needed = Frame.HEADER_SIZE;
+                return State.FRAME_HDR;
+            case FRAME_HDR:
+                _flags = input.get(pos);
+                _type = SegmentType.get(input.get(pos + 1));
+                int size = (0xFFFF & input.getShort(pos + 2));
+                size -= Frame.HEADER_SIZE;
+                _maxFrameSize = 64 * 1024;
+                if (size < 0 || size > (_maxFrameSize - 12))
+                {
+                    error("bad frame size: %d", size);
+                    return State.ERROR;
+                }
+                byte b = input.get(pos + 5);
+                if ((b & 0xF0) != 0) {
+                    error("non-zero reserved bits in upper nibble of " +
+                          "frame header byte 5: '%x'", b);
+                    return State.ERROR;
+                } else {
+                    _track = (byte) (b & 0xF);
+                }
+                _channel = (0xFFFF & input.getShort(pos + 6));
+                if (size == 0)
+                {
+                    Frame frame = new Frame(_flags, _type, _track, _channel, EMPTY_BYTE_BUFFER);
+                    _assembler.received(frame);
+                    _needed = Frame.HEADER_SIZE;
+                    return State.FRAME_HDR;
+                }
+                else
+                {
+                    _needed = size;
+                    return State.FRAME_BODY;
+                }
+            case FRAME_BODY:
+                Frame frame = new Frame(_flags, _type, _track, _channel, input.slice());
+                _assembler.received(frame);
+                _needed = Frame.HEADER_SIZE;
+                return State.FRAME_HDR;
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
+    private void error(String fmt, Object ... args)
+    {
+        _assembler.received(new ProtocolError(Frame.L1, fmt, args));
+    }
+
+    public void setMaxFrameSize(final int maxFrameSize)
+    {
+        _maxFrameSize = maxFrameSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
new file mode 100644
index 0000000..dfec4f4
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+    @Override
+    public ByteBuffer encode(final Object msg)
+    {
+        if (msg instanceof ProtocolEvent)
+        {
+            final List<ByteBuffer> buffers = new ArrayList<>();
+            final AtomicInteger totalSize = new AtomicInteger();
+            Disassembler disassembler = new Disassembler(new ByteBufferSender()
+            {
+                @Override
+                public boolean isDirectBufferPreferred()
+                {
+                    return false;
+                }
+
+                @Override
+                public void send(final QpidByteBuffer msg)
+                {
+                    int remaining = msg.remaining();
+                    byte[] data = new byte[remaining];
+                    ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+                    msg.get(data);
+                    buffers.add(byteBuffer);
+                    totalSize.addAndGet(remaining);
+                }
+
+                @Override
+                public void flush()
+                {
+
+                }
+
+                @Override
+                public void close()
+                {
+
+                }
+            }, 512);
+
+            disassembler.send((ProtocolEvent) msg);
+            ByteBuffer data = ByteBuffer.allocate(totalSize.get());
+            for (ByteBuffer buffer : buffers)
+            {
+                data.put(buffer);
+            }
+            data.flip();
+            return data;
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
new file mode 100644
index 0000000..3b7849c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.net.InetSocketAddress;
+
+import org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+
+
+public class FrameTransport extends AbstractFrameTransport<Interaction>
+{
+    private final byte[] _protocolHeader;
+
+    public FrameTransport(final InetSocketAddress brokerAddress)
+    {
+        super(brokerAddress, new FrameDecoder(new ProtocolEngineCreator_0_10().getHeaderIdentifier()), new FrameEncoder());
+        _protocolHeader = new ProtocolEngineCreator_0_10().getHeaderIdentifier();
+    }
+
+    @Override
+    public byte[] getProtocolHeader()
+    {
+        return _protocolHeader;
+    }
+
+    @Override
+    public Interaction newInteraction()
+    {
+        return new Interaction(this);
+    }
+
+    @Override
+    public FrameTransport connect()
+    {
+        super.connect();
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
new file mode 100644
index 0000000..5d53a89
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+import org.apache.qpid.tests.protocol.AbstractInteraction;
+
+public class Interaction extends AbstractInteraction<Interaction>
+{
+    private ConnectionInteraction _connectionInteraction;
+    private SessionInteraction _sessionInteraction;
+    private MessageInteraction _messageInteraction;
+    private ExecutionInteraction _executionInteraction;
+    private int _channelId;
+    private TxInteraction _txInteraction;
+
+    public Interaction(final AbstractFrameTransport frameTransport)
+    {
+        super(frameTransport);
+        _connectionInteraction = new ConnectionInteraction(this);
+        _sessionInteraction = new SessionInteraction(this);
+        _messageInteraction = new MessageInteraction(this);
+        _executionInteraction = new ExecutionInteraction(this);
+        _txInteraction = new TxInteraction(this);
+    }
+
+    @Override
+    protected byte[] getProtocolHeader()
+    {
+        return getTransport().getProtocolHeader();
+    }
+
+    public <T extends Method> Interaction sendPerformative(final T performative) throws Exception
+    {
+        performative.setChannel(_channelId);
+        sendPerformativeAndChainFuture(copyPerformative(performative));
+        return this;
+    }
+
+    public ConnectionInteraction connection()
+    {
+        return _connectionInteraction;
+    }
+
+    private <T extends Method> T copyPerformative(final T src)
+    {
+        T dst = (T) Method.create(src.getStructType());
+        final BBEncoder encoder = new BBEncoder(4096);
+        encoder.init();
+        src.write(encoder);
+        ByteBuffer buffer = encoder.buffer();
+
+        final BBDecoder decoder = new BBDecoder();
+        decoder.init(buffer);
+        dst.read(decoder);
+        return dst;
+    }
+
+    public Interaction openAnonymousConnection() throws Exception
+    {
+        this.negotiateProtocol().consumeResponse()
+            .consumeResponse(ConnectionStart.class)
+            .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+            .consumeResponse(ConnectionTune.class)
+            .connection().tuneOk()
+            .connection().open()
+            .consumeResponse(ConnectionOpenOk.class);
+        return this;
+    }
+
+    public SessionInteraction session()
+    {
+        return _sessionInteraction;
+    }
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+
+    public Interaction channelId(final int channelId)
+    {
+        _channelId = channelId;
+        return this;
+    }
+
+    public Interaction attachSession(final byte[] sessionName) throws Exception
+    {
+        this.session()
+            .attachName(sessionName)
+            .attach()
+            .consumeResponse(SessionAttached.class)
+            .session().commandPointCommandId(0).commandPoint();
+        return this;
+    }
+
+    public MessageInteraction message()
+    {
+        return _messageInteraction;
+    }
+
+    public ExecutionInteraction execution()
+    {
+        return _executionInteraction;
+    }
+
+    public TxInteraction tx()
+    {
+        return _txInteraction;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
new file mode 100644
index 0000000..4660c86
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageFlow;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageSubscribe;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+
+public class MessageInteraction
+{
+    private final Interaction _interaction;
+    private MessageTransfer _transfer;
+    private MessageSubscribe _subscribe;
+    private MessageFlow _flow;
+    private MessageAccept _accept;
+
+    public MessageInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _transfer = new MessageTransfer();
+        _subscribe = new MessageSubscribe();
+        _flow = new MessageFlow();
+        _accept = new MessageAccept();
+    }
+
+    public MessageInteraction transferId(final int id)
+    {
+        _transfer.setId(id);
+        return this;
+    }
+
+    public MessageInteraction transferDesitnation(final String destination)
+    {
+        _transfer.setDestination(destination);
+        return this;
+    }
+
+    public Interaction transfer() throws Exception
+    {
+        _interaction.sendPerformative(_transfer);
+        return _interaction;
+    }
+
+    public MessageInteraction subscribeQueue(final String queueName)
+    {
+        _subscribe.setQueue(queueName);
+        return this;
+    }
+
+    public MessageInteraction subscribeId(final int id)
+    {
+        _subscribe.setId(id);
+        return this;
+    }
+
+    public Interaction subscribe() throws Exception
+    {
+        return _interaction.sendPerformative(_subscribe);
+    }
+
+    public MessageInteraction subscribeDestination(final String destination)
+    {
+        _subscribe.setDestination(destination);
+        return this;
+    }
+
+    public Interaction flow() throws Exception
+    {
+        return _interaction.sendPerformative(_flow);
+    }
+
+    public MessageInteraction flowId(final int id)
+    {
+        _flow.setId(id);
+        return this;
+    }
+
+    public MessageInteraction flowDestination(final String destination)
+    {
+        _flow.setDestination(destination);
+        return this;
+    }
+
+    public MessageInteraction flowUnit(final MessageCreditUnit unit)
+    {
+        _flow.setUnit(unit);
+        return this;
+    }
+
+    public MessageInteraction flowValue(final long value)
+    {
+        _flow.setValue(value);
+        return this;
+    }
+
+    public MessageInteraction subscribeAcceptMode(final MessageAcceptMode acceptMode)
+    {
+        _subscribe.setAcceptMode(acceptMode);
+        return this;
+    }
+
+    public MessageInteraction subscribeAcquireMode(final MessageAcquireMode acquireMode)
+    {
+        _subscribe.setAcquireMode(acquireMode);
+        return this;
+    }
+
+    public Interaction accept() throws Exception
+    {
+        return _interaction.sendPerformative(_accept);
+    }
+
+    public MessageInteraction acceptId(final int id)
+    {
+        _accept.setId(id);
+        return this;
+    }
+
+    public MessageInteraction acceptTransfers(final RangeSet transfers)
+    {
+        _accept.setTransfers(transfers);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
new file mode 100644
index 0000000..701e92c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.tests.protocol.Response;
+
+public class PerformativeResponse implements Response<Method>
+{
+    private Method _method;
+
+    public PerformativeResponse(final Method method)
+    {
+        _method = method;
+    }
+
+    @Override
+    public Method getBody()
+    {
+        return _method;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "PerformativeResponse{" +
+               "_method=" + _method +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
new file mode 100644
index 0000000..37eb66e
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+
+public class ProtocolEventReceiver
+{
+    private Queue<Response<?>> _events = new ConcurrentLinkedQueue<>();
+    private final byte[] _headerBytes;
+
+    public ProtocolEventReceiver(final byte[] headerBytes)
+    {
+        _headerBytes = headerBytes;
+    }
+
+    void received(ProtocolEvent msg)
+    {
+        if (msg instanceof ProtocolHeader)
+        {
+            _events.add(new HeaderResponse(_headerBytes));
+        }
+        else if (msg instanceof Method)
+        {
+            _events.add(new PerformativeResponse((Method) msg));
+        }
+        else if (msg instanceof ProtocolError)
+        {
+            _events.add(new ErrorResponse((ProtocolError) msg));
+        }
+    }
+
+    public Collection<Response<?>> getReceivedEvents()
+    {
+        Collection<Response<?>> results = new ArrayList<>(_events);
+        _events.removeAll(results);
+        return results;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
new file mode 100644
index 0000000..fce711d
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
+
+public class SessionInteraction
+{
+    private final Interaction _interaction;
+    private SessionAttach _attach;
+    private SessionDetach _detach;
+    private SessionCommandPoint _commandPoint;
+    private SessionFlush _flush;
+
+    public SessionInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _attach = new SessionAttach();
+        _detach = new SessionDetach();
+        _commandPoint = new SessionCommandPoint();
+        _flush = new SessionFlush();
+    }
+
+    public Interaction attach() throws Exception
+    {
+        return _interaction.sendPerformative(_attach);
+    }
+
+    public SessionInteraction attachName(final byte[] name)
+    {
+        _attach.setName(name);
+        return this;
+    }
+
+    public Interaction detach() throws Exception
+    {
+        return _interaction.sendPerformative(_detach);
+    }
+
+    public SessionInteraction detachName(final byte[] sessionName)
+    {
+        _detach.setName(sessionName);
+        return this;
+    }
+
+    public Interaction commandPoint() throws Exception
+    {
+        return _interaction.sendPerformative(_commandPoint);
+    }
+
+    public SessionInteraction commandPointCommandId(final int commandId)
+    {
+        _commandPoint.setCommandId(commandId);
+        return this;
+    }
+
+    public Interaction flush() throws Exception
+    {
+        return _interaction.sendPerformative(_flush);
+    }
+
+    public SessionInteraction flushCompleted()
+    {
+        _flush.setCompleted(true);
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
new file mode 100644
index 0000000..14c9912
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.TxCommit;
+import org.apache.qpid.server.protocol.v0_10.transport.TxSelect;
+
+public class TxInteraction
+{
+    private final Interaction _interaction;
+    private final TxSelect _select;
+    private final TxCommit _commit;
+
+    public TxInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+        _select = new TxSelect();
+        _commit = new TxCommit();
+    }
+
+    public Interaction select() throws Exception
+    {
+        return _interaction.sendPerformative(_select);
+    }
+
+    public TxInteraction selectId(final int id)
+    {
+        _select.setId(id);
+        return this;
+    }
+
+    public TxInteraction commitId(final int id)
+    {
+        _commit.setId(id);
+        return this;
+    }
+
+    public Interaction commit() throws Exception
+    {
+        return _interaction.sendPerformative(_commit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
new file mode 100644
index 0000000..69387fb
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+{
+  "name" : "${broker.name}",
+  "modelVersion" : "7.0",
+  "authenticationproviders" : [ {
+    "name" : "anon",
+    "type" : "Anonymous"
+  }, {
+    "name" : "plain",
+    "type" : "Plain",
+    "secureOnlyMechanisms" : [],
+    "users" : [ {
+      "name" : "admin",
+      "type" : "managed",
+      "password" : "admin"
+    }, {
+      "name" : "guest",
+      "type" : "managed",
+      "password" : "guest"
+    } ]
+  } ],
+  "ports" : [ {
+    "name" : "AMQP",
+    "type" : "AMQP",
+    "authenticationProvider" : "plain",
+    "port" : "0",
+    "protocols" : [ "AMQP_0_10" ],
+    "virtualhostaliases" : [ {
+      "name" : "defaultAlias",
+      "type" : "defaultAlias"
+    }, {
+      "name" : "hostnameAlias",
+      "type" : "hostnameAlias"
+    }, {
+      "name" : "nameAlias",
+      "type" : "nameAlias"
+    } ]
+  }, {
+    "name" : "ANONYMOUS_AMQP",
+    "type" : "AMQP",
+    "authenticationProvider" : "anon",
+    "port" : "0",
+    "protocols" : [ "AMQP_0_10" ],
+    "virtualhostaliases" : [ {
+      "name" : "defaultAlias",
+      "type" : "defaultAlias",
+      "durable" : true
+    }, {
+      "name" : "hostnameAlias",
+      "type" : "hostnameAlias",
+      "durable" : true
+    }, {
+      "name" : "nameAlias",
+      "type" : "nameAlias",
+      "durable" : true
+    } ]
+  } ],
+  "virtualhostnodes" : []
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
new file mode 100644
index 0000000..1072f7c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+
+import org.hamcrest.core.IsEqual;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ConnectionTest extends BrokerAdminUsingTestBase
+{
+    private static final String DEFAULT_LOCALE = "en_US";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "4.3. Version Negotiation",
+            description = "When the client opens a new socket connection to an AMQP server,"
+                          + " it MUST send a protocol header with the client's preferred protocol version."
+                          + "If the requested protocol version is supported, the server MUST send its own protocol"
+                          + " header with the requested version to the socket, and then implement the protocol accordingly")
+    public void versionNegotiation() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            Response<?> response = interaction.negotiateProtocol().consumeResponse().getLatestResponse();
+            assertThat(response, is(instanceOf(HeaderResponse.class)));
+            assertThat(response.getBody(), is(IsEqual.equalTo(transport.getProtocolHeader())));
+
+            ConnectionStart connectionStart = interaction.consumeResponse().getLatestResponse(ConnectionStart.class);
+            assertThat(connectionStart.getMechanisms(), is(notNullValue()));
+            assertThat(connectionStart.getMechanisms(), contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS));
+            assertThat(connectionStart.getLocales(), is(notNullValue()));
+            assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9.connection.start-ok",
+            description = "An AMQP client MUST handle incoming connection.start controls.")
+    public void startOk() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class)
+                       .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                       .consumeResponse().getLatestResponse(ConnectionTune.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9.connection.tune-ok",
+            description = "This control sends the client's connection tuning parameters to the server."
+                          + " Certain fields are negotiated, others provide capability information.")
+    public void tuneOkAndOpen() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class)
+                       .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                       .consumeResponse(ConnectionTune.class)
+                       .connection().tuneOk()
+                       .connection().open()
+                       .consumeResponse().getLatestResponse(ConnectionOpenOk.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9",
+            description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+    public void authenticationBypassBySendingTuneOk() throws Exception
+    {
+        InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class)
+                       .connection().tuneOk()
+                       .connection().open()
+                       .consumeResponse().getLatestResponse(ConnectionClose.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9",
+            description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+    public void authenticationBypassBySendingOpen() throws Exception
+    {
+        InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse().consumeResponse(ConnectionStart.class)
+                       .connection().open()
+                       .consumeResponse().getLatestResponse(ConnectionClose.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9",
+            description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+    public void authenticationBypassAfterSendingStartOk() throws Exception
+    {
+        InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .consumeResponse(ConnectionStart.class)
+                       .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_PLAIN).startOk().consumeResponse(ConnectionSecure.class)
+                       .connection().tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "9.connection.tune-ok.minimum",
+            description = "[...] the minimum negotiated value for max-frame-size is also MIN-MAX-FRAME-SIZE [4096]")
+    public void tooSmallFrameSize() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+                                                 .consumeResponse(ConnectionStart.class)
+                                                 .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                                                 .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+            interaction.connection().tuneOkChannelMax(response.getChannelMax())
+                                    .tuneOkMaxFrameSize(1024)
+                                    .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "9.connection.tune-ok.max-frame-size",
+            description = "If the client specifies a channel max that is higher than the value provided by the server,"
+                          + " the server MUST close the connection without attempting a negotiated close."
+                          + " The server may report the error in some fashion to assist implementers.")
+    public void tooLargeFrameSize() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+                                                 .consumeResponse(ConnectionStart.class)
+                                                 .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                                                 .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+            assumeThat(response.hasMaxFrameSize(), is(true));
+            assumeThat(response.getMaxFrameSize(), is(lessThan(0xFFFF)));
+            interaction.connection().tuneOkChannelMax(response.getChannelMax())
+                                    .tuneOkMaxFrameSize(response.getMaxFrameSize() + 1)
+                                    .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org