You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/18 01:42:19 UTC

[1/3] qpid-broker-j git commit: [Broker-J][System Tests] Add protocol system test suites for AMQP 0-8, 0-9 and 0-9-1

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master a3c00bbfc -> c6d80d80e


[Broker-J][System Tests] Add protocol system test suites for AMQP 0-8,0-9 and 0-9-1


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c6d80d80
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c6d80d80
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c6d80d80

Branch: refs/heads/master
Commit: c6d80d80e685f70653f609951868a9e8f0ffb5a6
Parents: 06e53d7
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 18 01:32:22 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Sat Nov 18 01:38:53 2017 +0000

----------------------------------------------------------------------
 .../qpid/server/protocol/v0_8/AMQDecoder.java   |   4 +-
 .../server/protocol/v0_8/ServerDecoder.java     |   2 +-
 .../server/protocol/v0_8/ClientDecoder.java     |   2 +-
 pom.xml                                         |   7 +
 systests/protocol-tests-amqp-0-8/pom.xml        | 109 +++++++
 .../tests/protocol/v0_8/BasicInteraction.java   | 214 ++++++++++++
 .../tests/protocol/v0_8/ChannelInteraction.java |  51 +++
 .../qpid/tests/protocol/v0_8/ClientDecoder.java | 323 +++++++++++++++++++
 .../protocol/v0_8/ConnectionInteraction.java    | 103 ++++++
 .../qpid/tests/protocol/v0_8/FrameDecoder.java  | 111 +++++++
 .../qpid/tests/protocol/v0_8/FrameEncoder.java  |  83 +++++
 .../tests/protocol/v0_8/FrameTransport.java     | 101 ++++++
 .../qpid/tests/protocol/v0_8/Interaction.java   | 109 +++++++
 .../protocol/v0_8/PerformativeResponse.java     |  54 ++++
 .../tests/protocol/v0_8/QueueInteraction.java   |  63 ++++
 .../resources/config-protocol-tests-0-8.json    |  78 +++++
 .../qpid/tests/protocol/v0_8/BasicTest.java     | 156 +++++++++
 .../qpid/tests/protocol/v0_8/ChannelTest.java   |  54 ++++
 .../tests/protocol/v0_8/ConnectionTest.java     | 183 +++++++++++
 .../qpid/tests/protocol/v0_8/QueueTest.java     |  67 ++++
 20 files changed, 1870 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java
index 706f1ae..add9fb1 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java
@@ -217,11 +217,11 @@ public abstract class AMQDecoder<T extends MethodProcessor>
     }
 
 
-    abstract void processMethod(int channelId,
+    protected abstract void processMethod(int channelId,
                                QpidByteBuffer in)
             throws AMQFrameDecodingException;
 
-    AMQFrameDecodingException newUnknownMethodException(final int classId,
+    protected AMQFrameDecodingException newUnknownMethodException(final int classId,
                                                         final int methodId,
                                                         ProtocolVersion protocolVersion)
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java
index 08ccda9..59d4985 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java
@@ -46,7 +46,7 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
 
 
     @Override
-    void processMethod(int channelId,
+    protected void processMethod(int channelId,
                        QpidByteBuffer in)
             throws AMQFrameDecodingException
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java
index de8afb9..8e02bbb 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java
@@ -86,7 +86,7 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
     }
 
     @Override
-    void processMethod(int channelId,
+    protected void processMethod(int channelId,
                        QpidByteBuffer in)
             throws AMQFrameDecodingException
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 62e7033..e7d58bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,7 @@
     <module>systests/systests-utils</module>
     <module>systests/qpid-systests-jms_2.0</module>
     <module>systests/protocol-tests-core</module>
+    <module>systests/protocol-tests-amqp-0-8</module>
     <module>systests/protocol-tests-amqp-1-0</module>
     <module>systests/end-to-end-conversion-tests</module>
     <module>perftests</module>
@@ -420,6 +421,12 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-amqp-0-8</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <!-- External dependencies -->
       <dependency>
         <groupId>org.apache.qpid</groupId>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/pom.xml b/systests/protocol-tests-amqp-0-8/pom.xml
new file mode 100644
index 0000000..3f788db
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-systests-parent</artifactId>
+        <version>7.1.0-SNAPSHOT</version>
+        <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>protocol-tests-amqp-0-8</artifactId>
+    <name>Apache Qpid Protocol Tests for AMQP 0-8,0-9,0-9-1</name>
+    <description>Tests for AMQP 0-8,0-9,0-9-1</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-test-utils</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>protocol-tests-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-systests-utils</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-logging-logback</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-memory-store</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-derby-store</artifactId>
+            <optional>true</optional>
+            <scope>test</scope>
+        </dependency>
+
+       <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-bdbstore</artifactId>
+            <scope>test</scope>
+       </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-integration</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemPropertyVariables>
+                        <qpid.initialConfigurationLocation>classpath:config-protocol-tests-0-8.json</qpid.initialConfigurationLocation>
+                    </systemPropertyVariables>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
new file mode 100644
index 0000000..9ef66ec
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosBody;
+import org.apache.qpid.server.protocol.v0_8.transport.CompositeAMQDataBlock;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+
+public class BasicInteraction
+{
+    private final Interaction _interaction;
+    private String _publishExchange;
+    private String _publishRoutingKey;
+    private boolean _publishMandatory;
+    private boolean _publishImmediate;
+    private byte[] _content;
+    private Map<String, Object> _contentHeaderPropertiesHeaders = new HashMap<>();
+    private String _contentHeaderPropertiesContentType;
+    private byte _contentHeaderPropertiesDeliveryMode;
+    private byte _contentHeaderPropertiesPriority;
+    private int _qosPrefetchCount;
+    private long _qosPrefetchSize;
+    private boolean _qosGlobal;
+    private String _consumeQueueName;
+    private String _consumeConsumerTag;
+    private boolean _consumeNoLocal;
+    private boolean _consumeNoAck;
+    private boolean _consumeExclusive;
+    private boolean _consumeNoWait;
+    private Map<String, Object> _consumeArguments = new HashMap<>();
+    private long _ackDeliveryTag;
+    private boolean _ackMultiple;
+
+    public BasicInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+    }
+
+    public Interaction publish() throws Exception
+    {
+        return _interaction.sendPerformative(new BasicPublishBody(0,
+                                                                  AMQShortString.valueOf(_publishExchange),
+                                                                  AMQShortString.valueOf(_publishRoutingKey),
+                                                                  _publishMandatory,
+                                                                  _publishImmediate));
+    }
+
+    public BasicInteraction content(final String content)
+    {
+        _content = content.getBytes(StandardCharsets.UTF_8);
+        return this;
+    }
+
+    public BasicInteraction content(final byte[] content)
+    {
+        _content = content;
+        return this;
+    }
+
+    public BasicInteraction contentHeaderPropertiesHeaders(final Map<String, Object> messageHeaders)
+    {
+        _contentHeaderPropertiesHeaders = messageHeaders;
+        return this;
+    }
+
+    public BasicInteraction contentHeaderPropertiesContentType(final String messageContentType)
+    {
+        _contentHeaderPropertiesContentType = messageContentType;
+        return this;
+    }
+
+    public BasicInteraction contentHeaderPropertiesPriority(final byte priority)
+    {
+        _contentHeaderPropertiesPriority = priority;
+        return this;
+    }
+
+    public BasicInteraction contentHeaderPropertiesDeliveryMode(final byte deliveryMode)
+    {
+        _contentHeaderPropertiesDeliveryMode = deliveryMode;
+        return this;
+    }
+
+
+    public Interaction publishMessage() throws Exception
+    {
+        List<AMQFrame> frames = new ArrayList<>();
+        BasicPublishBody publishFrame = new BasicPublishBody(0,
+                                                             AMQShortString.valueOf(_publishExchange),
+                                                             AMQShortString.valueOf(_publishRoutingKey),
+                                                             _publishMandatory,
+                                                             _publishImmediate);
+        frames.add(new AMQFrame(_interaction.getChannelId(), publishFrame));
+        final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
+        basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+        basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
+        basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
+        basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
+        final int contentSize = _content == null ? 0 : _content.length;
+        ContentHeaderBody contentHeaderBody = new ContentHeaderBody(basicContentHeaderProperties, contentSize);
+        frames.add(new AMQFrame(_interaction.getChannelId(), contentHeaderBody));
+        if (contentSize > 0)
+        {
+            final int framePayloadMax = _interaction.getMaximumFrameSize() - 8;
+            int offset = 0;
+            do
+            {
+                int contentToCopyLength = Math.min(framePayloadMax, contentSize - offset);
+                ContentBody contentBody = new ContentBody(ByteBuffer.wrap(_content, offset,
+                                                                          contentToCopyLength));
+                frames.add(new AMQFrame(_interaction.getChannelId(), contentBody));
+                offset += contentToCopyLength;
+            }
+            while (offset < contentSize);
+        }
+
+        CompositeAMQDataBlock frame = new CompositeAMQDataBlock(frames.toArray(new AMQFrame[frames.size()]));
+
+        return _interaction.sendPerformative(frame);
+    }
+
+    public BasicInteraction publishExchange(final String exchangeName)
+    {
+        _publishExchange = exchangeName;
+        return this;
+    }
+
+    public BasicInteraction publishRoutingKey(final String queueName)
+    {
+        _publishRoutingKey = queueName;
+        return this;
+    }
+
+    public BasicInteraction qosPrefetchCount(final int prefetchCount)
+    {
+        _qosPrefetchCount = prefetchCount;
+        return this;
+    }
+
+    public Interaction qos() throws Exception
+    {
+        return _interaction.sendPerformative(new BasicQosBody(_qosPrefetchSize,
+                                                              _qosPrefetchCount,
+                                                              _qosGlobal));
+    }
+
+    public Interaction consume() throws Exception
+    {
+        return _interaction.sendPerformative(new BasicConsumeBody(0,
+                                                                  AMQShortString.valueOf(_consumeQueueName),
+                                                                  AMQShortString.valueOf(_consumeConsumerTag),
+                                                                  _consumeNoLocal,
+                                                                  _consumeNoAck,
+                                                                  _consumeExclusive,
+                                                                  _consumeNoWait,
+                                                                  FieldTable.convertToFieldTable(_consumeArguments)));
+    }
+
+    public BasicInteraction consumeConsumerTag(final String consumerTag)
+    {
+        _consumeConsumerTag = consumerTag;
+        return this;
+    }
+
+    public BasicInteraction consumeQueue(final String queueName)
+    {
+        _consumeQueueName = queueName;
+        return this;
+    }
+
+    public Interaction ack() throws Exception
+    {
+        return _interaction.sendPerformative(new BasicAckBody(_ackDeliveryTag, _ackMultiple));
+    }
+
+    public BasicInteraction ackDeliveryTag(final long deliveryTag)
+    {
+        _ackDeliveryTag = deliveryTag;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
new file mode 100644
index 0000000..51d4426
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenBody;
+
+public class ChannelInteraction
+{
+    private Interaction _interaction;
+
+    public ChannelInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+    }
+
+    public Interaction open() throws Exception
+    {
+        return _interaction.sendPerformative(new ChannelOpenBody());
+    }
+
+    public Interaction close() throws Exception
+    {
+        return _interaction.sendPerformative(new ChannelCloseBody(200, AMQShortString.valueOf(""), 0, 0));
+    }
+
+    public Interaction flow(final boolean active) throws Exception
+    {
+        return _interaction.sendPerformative(new ChannelFlowBody(active));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java
new file mode 100644
index 0000000..2bd9d27
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java
@@ -0,0 +1,323 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.server.protocol.v0_8.AMQDecoder;
+import org.apache.qpid.server.protocol.v0_8.AMQFrameDecodingException;
+import org.apache.qpid.server.protocol.v0_8.transport.*;
+
+public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
+{
+    private QpidByteBuffer _incompleteBuffer;
+
+    /**
+     * Creates a new AMQP decoder.
+     *
+     * @param methodProcessor          method processor
+     */
+    public ClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor)
+    {
+        super(false, methodProcessor);
+    }
+
+    public void decodeBuffer(ByteBuffer incomingBuffer) throws AMQFrameDecodingException, AMQProtocolVersionException
+    {
+        if (_incompleteBuffer == null)
+        {
+            QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(incomingBuffer);
+            final int required = decode(qpidByteBuffer);
+            if (required != 0)
+            {
+                _incompleteBuffer = QpidByteBuffer.allocate(qpidByteBuffer.remaining() + required);
+                _incompleteBuffer.put(qpidByteBuffer);
+            }
+            qpidByteBuffer.dispose();
+        }
+        else
+        {
+            if (incomingBuffer.remaining() < _incompleteBuffer.remaining())
+            {
+                _incompleteBuffer.put(incomingBuffer);
+            }
+            else
+            {
+                _incompleteBuffer.flip();
+                final QpidByteBuffer aggregatedBuffer =
+                        QpidByteBuffer.allocate(_incompleteBuffer.remaining() + incomingBuffer.remaining());
+                aggregatedBuffer.put(_incompleteBuffer);
+                aggregatedBuffer.put(incomingBuffer);
+                aggregatedBuffer.flip();
+                final int required = decode(aggregatedBuffer);
+
+                _incompleteBuffer.dispose();
+                if (required != 0)
+                {
+                    _incompleteBuffer = QpidByteBuffer.allocate(aggregatedBuffer.remaining() + required);
+                    _incompleteBuffer.put(aggregatedBuffer);
+                }
+                else
+                {
+                    _incompleteBuffer = null;
+                }
+                aggregatedBuffer.dispose();
+            }
+        }
+        // post-condition: assert(!incomingBuffer.hasRemaining());
+    }
+
+    @Override
+    protected void processMethod(int channelId,
+                       QpidByteBuffer in)
+            throws AMQFrameDecodingException
+    {
+        ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor = getMethodProcessor();
+        ClientChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
+        final int classAndMethod = in.getInt();
+        int classId = classAndMethod >> 16;
+        int methodId = classAndMethod & 0xFFFF;
+        methodProcessor.setCurrentMethod(classId, methodId);
+        try
+        {
+            switch (classAndMethod)
+            {
+                //CONNECTION_CLASS:
+                case 0x000a000a:
+                    ConnectionStartBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0014:
+                    ConnectionSecureBody.process(in, methodProcessor);
+                    break;
+                case 0x000a001e:
+                    ConnectionTuneBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0029:
+                    ConnectionOpenOkBody.process(in, methodProcessor);
+                    break;
+                case 0x000a002a:
+                    ConnectionRedirectBody.process(in, methodProcessor);
+                    break;
+                case 0x000a0032:
+                    if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
+                    {
+                        ConnectionRedirectBody.process(in, methodProcessor);
+                    }
+                    else
+                    {
+                        ConnectionCloseBody.process(in, methodProcessor);
+                    }
+                    break;
+                case 0x000a0033:
+                    if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        methodProcessor.getProtocolVersion());
+                    }
+                    else
+                    {
+                        methodProcessor.receiveConnectionCloseOk();
+                    }
+                    break;
+                case 0x000a003c:
+                    if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
+                    {
+                        ConnectionCloseBody.process(in, methodProcessor);
+                    }
+                    else
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        methodProcessor.getProtocolVersion());
+                    }
+                    break;
+                case 0x000a003d:
+                    if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
+                    {
+                        methodProcessor.receiveConnectionCloseOk();
+                    }
+                    else
+                    {
+                        throw newUnknownMethodException(classId, methodId,
+                                                        methodProcessor.getProtocolVersion());
+                    }
+                    break;
+
+                // CHANNEL_CLASS:
+
+                case 0x0014000b:
+                    ChannelOpenOkBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor);
+                    break;
+                case 0x00140014:
+                    ChannelFlowBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140015:
+                    ChannelFlowOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x0014001e:
+                    ChannelAlertBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140028:
+                    ChannelCloseBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00140029:
+                    channelMethodProcessor.receiveChannelCloseOk();
+                    break;
+
+                // ACCESS_CLASS:
+
+                case 0x001e000b:
+                    AccessRequestOkBody.process(in, channelMethodProcessor);
+                    break;
+
+                // EXCHANGE_CLASS:
+
+                case 0x0028000b:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveExchangeDeclareOk();
+                    }
+                    break;
+                case 0x00280015:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveExchangeDeleteOk();
+                    }
+                    break;
+                case 0x00280017:
+                    ExchangeBoundOkBody.process(in, channelMethodProcessor);
+                    break;
+
+
+                // QUEUE_CLASS:
+
+                case 0x0032000b:
+                    QueueDeclareOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320015:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveQueueBindOk();
+                    }
+                    break;
+                case 0x0032001f:
+                    QueuePurgeOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320029:
+                    QueueDeleteOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x00320033:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveQueueUnbindOk();
+                    }
+                    break;
+
+
+                // BASIC_CLASS:
+
+                case 0x003c000b:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveBasicQosOk();
+                    }
+                    break;
+                case 0x003c0015:
+                    BasicConsumeOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c001f:
+                    BasicCancelOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0032:
+                    BasicReturnBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c003c:
+                    BasicDeliverBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0047:
+                    BasicGetOkBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0048:
+                    BasicGetEmptyBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0050:
+                    BasicAckBody.process(in, channelMethodProcessor);
+                    break;
+                case 0x003c0065:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveBasicRecoverSyncOk();
+                    }
+                    break;
+                case 0x003c006f:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveBasicRecoverSyncOk();
+                    }
+                    break;
+                case 0x003c0078:
+                    BasicNackBody.process(in, channelMethodProcessor);
+                    break;
+
+                // CONFIRM CLASS:
+
+                case 0x0055000b:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveConfirmSelectOk();
+                    }
+                    break;
+
+                // TX_CLASS:
+
+                case 0x005a000b:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxSelectOk();
+                    }
+                    break;
+                case 0x005a0015:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxCommitOk();
+                    }
+                    break;
+                case 0x005a001f:
+                    if(!channelMethodProcessor.ignoreAllButCloseOk())
+                    {
+                        channelMethodProcessor.receiveTxRollbackOk();
+                    }
+                    break;
+
+                default:
+                    throw newUnknownMethodException(classId, methodId,
+                                                    methodProcessor.getProtocolVersion());
+
+            }
+        }
+        finally
+        {
+            methodProcessor.setCurrentMethod(0, 0);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
new file mode 100644
index 0000000..023e7fc
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneOkBody;
+
+public class ConnectionInteraction
+{
+    private final Interaction _interaction;
+
+    private final Map<String, Object> _startOkClientProperties = new HashMap<>();
+    private String _startOkMechanism;
+    private byte[] _startOkResponse;
+    private String _startOkLocale;
+    private int _tuneOkChannelMax;
+    private long _tuneOkFrameMax;
+    private int _tuneOkHeartbeat;
+    private String _openVirtualHost;
+
+    public ConnectionInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+    }
+
+
+    public ConnectionInteraction startOkMechanism(final String startOkMechanism)
+    {
+        _startOkMechanism = startOkMechanism;
+        return this;
+    }
+
+
+    public Interaction startOk() throws Exception
+    {
+        return _interaction.sendPerformative(new ConnectionStartOkBody(FieldTable.convertToFieldTable(_startOkClientProperties),
+                                                                       AMQShortString.valueOf(_startOkMechanism),
+                                                                       _startOkResponse,
+                                                                       AMQShortString.valueOf(_startOkLocale)));
+    }
+
+    public ConnectionInteraction tuneOkChannelMax(final int channelMax)
+    {
+        _tuneOkChannelMax = channelMax;
+        return this;
+    }
+
+    public ConnectionInteraction tuneOkFrameMax(final long frameMax)
+    {
+        _tuneOkFrameMax = frameMax;
+        return this;
+    }
+
+    public ConnectionInteraction tuneOkHeartbeat(final int heartbeat)
+    {
+        _tuneOkHeartbeat = heartbeat;
+        return this;
+    }
+
+    public Interaction tuneOk() throws Exception
+    {
+        return _interaction.sendPerformative(new ConnectionTuneOkBody(_tuneOkChannelMax,
+                                                                      _tuneOkFrameMax,
+                                                                      _tuneOkHeartbeat));
+    }
+
+    public ConnectionInteraction openVirtualHost(String virtualHost)
+    {
+        _openVirtualHost = virtualHost;
+        return this;
+    }
+
+    public Interaction open() throws Exception
+    {
+        return _interaction.sendPerformative(new ConnectionOpenBody(AMQShortString.valueOf(_openVirtualHost),
+                                                                    null,
+                                                                    false));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java
new file mode 100644
index 0000000..499fe72
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
+import org.apache.qpid.server.protocol.v0_8.transport.FrameCreatingMethodProcessor;
+import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+    private final ClientDecoder _clientDecoder;
+    private final FrameCreatingMethodProcessor _methodProcessor;
+
+    FrameDecoder(ProtocolVersion protocolVersion)
+    {
+        _methodProcessor = new FrameCreatingMethodProcessor(protocolVersion);
+        _clientDecoder = new ClientDecoder(_methodProcessor);
+    }
+
+    @Override
+    public Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception
+    {
+        _clientDecoder.decodeBuffer(inputBuffer);
+
+        List<AMQDataBlock> receivedFrames = new ArrayList<>(_methodProcessor.getProcessedMethods());
+        List<Response<?>> result = new ArrayList<>();
+
+        for (AMQDataBlock frame : receivedFrames)
+        {
+            if (frame instanceof AMQFrame)
+            {
+                AMQFrame amqFrame = (AMQFrame) frame;
+                result.add(new PerformativeResponse(amqFrame.getChannel(), amqFrame.getSize(), amqFrame.getBodyFrame()));
+            }
+            else if (frame instanceof ProtocolInitiation)
+            {
+                byte[] data =  new byte[(int) frame.getSize()];
+                frame.writePayload(new ByteBufferSender()
+                {
+                    @Override
+                    public boolean isDirectBufferPreferred()
+                    {
+                        return false;
+                    }
+
+                    @Override
+                    public void send(final QpidByteBuffer msg)
+                    {
+                        msg.copyTo(data);
+                    }
+
+                    @Override
+                    public void flush()
+                    {
+
+                    }
+
+                    @Override
+                    public void close()
+                    {
+
+                    }
+                });
+
+                result.add(new HeaderResponse(data));
+            }
+            else
+            {
+                throw new IllegalArgumentException(String.format("Unexpected data block received %s", frame));
+            }
+        }
+        _methodProcessor.getProcessedMethods().removeAll(receivedFrames);
+        return result;
+    }
+
+    ProtocolVersion getVersion()
+    {
+        return _methodProcessor.getProtocolVersion();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java
new file mode 100644
index 0000000..9b471fc
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+
+    @Override
+    public ByteBuffer encode(final Object msg)
+    {
+        if (msg instanceof AMQDataBlock)
+        {
+            final List<ByteBuffer>buffers = new ArrayList<>();
+            ((AMQDataBlock)msg).writePayload(new ByteBufferSender()
+            {
+                @Override
+                public boolean isDirectBufferPreferred()
+                {
+                    return false;
+                }
+
+                @Override
+                public void send(final QpidByteBuffer msg)
+                {
+                    byte[] data = new byte[msg.remaining()];
+                    msg.get(data);
+                    buffers.add(ByteBuffer.wrap(data));
+                }
+
+                @Override
+                public void flush()
+                {
+                }
+
+                @Override
+                public void close()
+                {
+
+                }
+            });
+            int remaining = 0;
+            for (ByteBuffer byteBuffer: buffers)
+            {
+                remaining += byteBuffer.remaining();
+            }
+            ByteBuffer result = ByteBuffer.allocate(remaining);
+            for (ByteBuffer byteBuffer: buffers)
+            {
+                result.put(byteBuffer);
+            }
+            result.flip();
+            return result;
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
new file mode 100644
index 0000000..52cd7a0
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.net.InetSocketAddress;
+
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.protocol.ProtocolVersion;
+
+
+public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
+{
+    private final byte[] _protocolHeader;
+    private ProtocolVersion _protocolVersion;
+
+    public FrameTransport(final InetSocketAddress brokerAddress)
+    {
+        this(brokerAddress, Protocol.AMQP_0_9_1);
+    }
+    public FrameTransport(final InetSocketAddress brokerAddress, Protocol protocol)
+    {
+        super(brokerAddress, new FrameDecoder(getProtocolVersion(protocol)), new FrameEncoder());
+        _protocolVersion = getProtocolVersion(protocol);
+        byte[] protocolHeader = null;
+        for(ProtocolEngineCreator installedEngine : (new QpidServiceLoader()).instancesOf(ProtocolEngineCreator.class))
+        {
+            if (installedEngine.getVersion() == protocol)
+            {
+                protocolHeader = installedEngine.getHeaderIdentifier();
+            }
+        }
+
+        if (protocolHeader == null)
+        {
+            throw new IllegalArgumentException(String.format("Unsupported protocol %s", protocol));
+        }
+        _protocolHeader = protocolHeader;
+    }
+
+    @Override
+    public FrameTransport connect()
+    {
+        super.connect();
+        return this;
+    }
+
+    public Interaction newInteraction()
+    {
+        return new Interaction(this);
+    }
+
+    public byte[] getProtocolHeader()
+    {
+        return _protocolHeader;
+    }
+
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
+    }
+
+    public static ProtocolVersion getProtocolVersion(Protocol protocol)
+    {
+        final ProtocolVersion protocolVersion;
+        switch (protocol)
+        {
+            case AMQP_0_8:
+                protocolVersion = ProtocolVersion.v0_8;
+                break;
+            case AMQP_0_9_1:
+                protocolVersion = ProtocolVersion.v0_91;
+                break;
+            case AMQP_0_9:
+                protocolVersion = ProtocolVersion.v0_9;
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported protocol %s", protocol));
+        }
+        return protocolVersion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
new file mode 100644
index 0000000..0b62770
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+
+public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
+{
+
+    private int _channelId;
+    private int _maximumPayloadSize = 512;
+
+    Interaction(final FrameTransport transport)
+    {
+        super(transport);
+    }
+
+    @Override
+    protected byte[] getProtocolHeader()
+    {
+        return getTransport().getProtocolHeader();
+    }
+
+    @Override
+    protected Interaction getInteraction()
+    {
+        return this;
+    }
+
+    public Interaction sendPerformative(final AMQBody amqBody) throws Exception
+    {
+        return sendPerformative(getChannelId(), amqBody);
+    }
+
+    public Interaction sendPerformative(int channel, final AMQBody amqBody) throws Exception
+    {
+        final AMQFrame frameBody = new AMQFrame(channel, amqBody);
+        sendPerformativeAndChainFuture(frameBody, false);
+        return this;
+    }
+
+    public Interaction sendPerformative(final AMQDataBlock dataBlock) throws Exception
+    {
+        sendPerformativeAndChainFuture(dataBlock, false);
+        return this;
+    }
+
+    public Interaction openAnonymousConnection() throws Exception
+    {
+        return this.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
+                   .connection().startOkMechanism("ANONYMOUS").startOk().consumeResponse(ConnectionTuneBody.class)
+                   .connection().tuneOk()
+                   .connection().open().consumeResponse(ConnectionOpenOkBody.class);
+
+    }
+
+    public ConnectionInteraction connection()
+    {
+        return new ConnectionInteraction(this);
+    }
+
+    public ChannelInteraction channel()
+    {
+        return new ChannelInteraction(this);
+    }
+
+    public QueueInteraction queue()
+    {
+        return new QueueInteraction(this);
+    }
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+
+    public int getMaximumFrameSize()
+    {
+        return _maximumPayloadSize;
+    }
+
+    public BasicInteraction basic()
+    {
+        return new BasicInteraction(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java
new file mode 100644
index 0000000..66871e9
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
+import org.apache.qpid.tests.protocol.Response;
+
+public class PerformativeResponse implements Response<AMQBody>
+{
+    private final int _channel;
+    private final long _size;
+    private final AMQBody _body;
+
+    public PerformativeResponse(int channel, long size, final AMQBody body)
+    {
+        _channel = channel;
+        _size = size;
+        _body = body;
+    }
+
+    @Override
+    public AMQBody getBody()
+    {
+        return _body;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "PerformativeResponse{" +
+               "_channel=" + _channel +
+               ", _size=" + _size +
+               ", _body=" + _body +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
new file mode 100644
index 0000000..6e86385
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareBody;
+
+public class QueueInteraction
+{
+    private Interaction _interaction;
+    private String _declareName;
+    private boolean _declarePassive;
+    private boolean _declareDurable;
+    private boolean _declareExclusive;
+    private boolean _declareAutoDelete;
+    private boolean _declareNowait;
+    private Map<String, Object> _declareArguments = new HashMap<>();
+
+    public QueueInteraction(final Interaction interaction)
+    {
+        _interaction = interaction;
+    }
+
+    public QueueInteraction declareName(String name)
+    {
+        _declareName = name;
+        return this;
+    }
+
+    public Interaction declare() throws Exception
+    {
+        return _interaction.sendPerformative(new QueueDeclareBody(0,
+                                                                  AMQShortString.valueOf(_declareName),
+                                                                  _declarePassive,
+                                                                  _declareDurable,
+                                                                  _declareExclusive,
+                                                                  _declareAutoDelete,
+                                                                  _declareNowait,
+                                                                  FieldTable.convertToFieldTable(_declareArguments)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json b/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json
new file mode 100644
index 0000000..d3738c9
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+{
+  "name" : "${broker.name}",
+  "modelVersion" : "7.0",
+  "authenticationproviders" : [ {
+    "name" : "anon",
+    "type" : "Anonymous"
+  }, {
+    "name" : "plain",
+    "type" : "Plain",
+    "secureOnlyMechanisms" : [],
+    "users" : [ {
+      "name" : "admin",
+      "type" : "managed",
+      "password" : "admin"
+    }, {
+      "name" : "guest",
+      "type" : "managed",
+      "password" : "guest"
+    } ]
+  } ],
+  "ports" : [ {
+    "name" : "AMQP",
+    "type" : "AMQP",
+    "authenticationProvider" : "plain",
+    "port" : "0",
+    "protocols" : [ "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1" ],
+    "virtualhostaliases" : [ {
+      "name" : "defaultAlias",
+      "type" : "defaultAlias"
+    }, {
+      "name" : "hostnameAlias",
+      "type" : "hostnameAlias"
+    }, {
+      "name" : "nameAlias",
+      "type" : "nameAlias"
+    } ]
+  }, {
+    "name" : "ANONYMOUS_AMQP",
+    "type" : "AMQP",
+    "authenticationProvider" : "anon",
+    "port" : "0",
+    "protocols" : [ "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1" ],
+    "virtualhostaliases" : [ {
+      "name" : "defaultAlias",
+      "type" : "defaultAlias",
+      "durable" : true
+    }, {
+      "name" : "hostnameAlias",
+      "type" : "hostnameAlias",
+      "durable" : true
+    }, {
+      "name" : "nameAlias",
+      "type" : "nameAlias",
+      "durable" : true
+    } ]
+  } ],
+  "virtualhostnodes" : []
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
new file mode 100644
index 0000000..eb51925
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class BasicTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    @SpecificationTest(section = "1.8.3.7", description = "publish a message")
+    public void publishMessage() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .basic().contentHeaderPropertiesContentType("text/plain")
+                               .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
+                               .contentHeaderPropertiesDeliveryMode((byte)1)
+                               .contentHeaderPropertiesPriority((byte)1)
+                               .publishExchange("")
+                               .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                               .content("Test")
+                               .publishMessage()
+                       .channel().close()
+                       .consumeResponse(ChannelCloseOkBody.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "1.8.3.3", description = " start a queue consumer")
+    public void consumeMessage() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            String messageContent = "Test";
+            String consumerTag = "A";
+            String queueName = BrokerAdmin.TEST_QUEUE_NAME;
+            Map<String, Object> messageHeaders = Collections.singletonMap("test", "testValue");
+            String messageContentType = "text/plain";
+            byte deliveryMode = (byte) 1;
+            byte priority = (byte) 2;
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .basic().qosPrefetchCount(1).qos().consumeResponse(BasicQosOkBody.class)
+                       .basic().consumeConsumerTag(consumerTag)
+                       .consumeQueue(queueName)
+                       .consume().consumeResponse(BasicConsumeOkBody.class)
+                       .channel().flow(true).consumeResponse(ChannelFlowOkBody.class)
+                       .basic().contentHeaderPropertiesContentType(messageContentType)
+                       .contentHeaderPropertiesHeaders(messageHeaders)
+                       .contentHeaderPropertiesDeliveryMode(deliveryMode)
+                       .contentHeaderPropertiesPriority(priority)
+                       .publishExchange("")
+                       .publishRoutingKey(queueName)
+                       .content(messageContent)
+                       .publishMessage()
+                       .consumeResponse(BasicDeliverBody.class);
+
+            BasicDeliverBody delivery = interaction.getLatestResponse(BasicDeliverBody.class);
+            assertThat(delivery.getConsumerTag(), is(equalTo(AMQShortString.valueOf(consumerTag))));
+            assertThat(delivery.getConsumerTag(), is(notNullValue()));
+            assertThat(delivery.getRedelivered(), is(equalTo(false)));
+            assertThat(delivery.getExchange(), is(nullValue()));
+            assertThat(delivery.getRoutingKey(), is(equalTo(AMQShortString.valueOf(queueName))));
+
+            ContentHeaderBody header =
+                    interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class);
+
+            assertThat(header.getBodySize(), is(equalTo((long)messageContent.length())));
+            BasicContentHeaderProperties properties = header.getProperties();
+            Map<String, Object> receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders()));
+            assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders))));
+            assertThat(properties.getContentTypeAsString(), is(equalTo(messageContentType)));
+            assertThat(properties.getPriority(), is(equalTo(priority)));
+            assertThat(properties.getDeliveryMode(), is(equalTo(deliveryMode)));
+
+            ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class);
+
+            QpidByteBuffer payload = content.getPayload();
+            byte[] contentData = new byte[payload.remaining()];
+            payload.get(contentData);
+            payload.dispose();
+            String receivedContent = new String(contentData, StandardCharsets.UTF_8);
+
+            assertThat(receivedContent, is(equalTo(messageContent)));
+            assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(1)));
+
+            interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
+                              .ack()
+                       .channel().close().consumeResponse(ChannelCloseOkBody.class);
+            assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(0)));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
new file mode 100644
index 0000000..4ca4ae8
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ChannelTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation")
+    public void channelOpen() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
new file mode 100644
index 0000000..1fbbf2a
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ConnectionTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation")
+    public void connectionStart() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionStartBody response =
+                    interaction.negotiateProtocol().consumeResponse().getLatestResponse(ConnectionStartBody.class);
+
+            assertThat(response.getVersionMajor(), is(equalTo((short)transport.getProtocolVersion().getMajorVersion())));
+            assertThat(response.getVersionMinor(), is(equalTo((short)transport.getProtocolVersion().getActualMinorVersion())));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.4.2.2", description = "select security mechanism and locale")
+    public void connectionStartOk() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol()
+                       .consumeResponse(ConnectionStartBody.class)
+                       .connection().startOkMechanism("ANONYMOUS")
+                                    .startOk()
+                       .consumeResponse();
+
+            interaction.getLatestResponse(ConnectionTuneBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.4.2.5", description = "select security mechanism and locale")
+    public void connectionTuneOkAndOpen() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTuneBody response = interaction.negotiateProtocol()
+                                                     .consumeResponse(ConnectionStartBody.class)
+                                                     .connection().startOkMechanism("ANONYMOUS")
+                                                     .startOk()
+                                                     .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+            interaction.connection().tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkFrameMax(response.getFrameMax())
+                       .tuneOkHeartbeat(response.getHeartbeat())
+                       .tuneOk()
+                       .connection().open()
+                       .consumeResponse().getLatestResponse(ConnectionOpenOkBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.4.2.5", description = "[...] the minimum negotiated value for frame-max is also"
+                                                          + " frame-min-size [4096].")
+    public void tooSmallFrameSize() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTuneBody response = interaction.negotiateProtocol()
+                                                     .consumeResponse(ConnectionStartBody.class)
+                                                     .connection().startOkMechanism("ANONYMOUS")
+                                                     .startOk()
+                                                     .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+            interaction.connection().tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkFrameMax(1024)
+                       .tuneOkHeartbeat(response.getHeartbeat())
+                       .tuneOk()
+                       .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.4.2.5.2.", description = "If the client specifies a frame max that is higher than"
+                                                             + " the value provided by the server, the server MUST"
+                                                             + " close the connection without attempting a negotiated"
+                                                             + " close. The server may report the error in some fashion"
+                                                             + " to assist implementors.")
+    public void tooLargeFrameSize() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTuneBody response = interaction.negotiateProtocol()
+                                                     .consumeResponse(ConnectionStartBody.class)
+                                                     .connection().startOkMechanism("ANONYMOUS")
+                                                     .startOk()
+                                                     .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+            interaction.connection().tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkFrameMax(Long.MAX_VALUE)
+                       .tuneOkHeartbeat(response.getHeartbeat())
+                       .tuneOk()
+                       .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK"
+                                                       + " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK")
+    public void authenticationBypassBySendingTuneOk() throws Exception
+    {
+        final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
+                       .connection().tuneOk()
+                       .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK"
+                                                       + " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK")
+    public void authenticationBypassBySendingOpen() throws Exception
+    {
+        final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
+                       .connection().open()
+                       .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
new file mode 100644
index 0000000..920ea73
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class QueueTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation")
+    public void queueDeclare() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final String queueName = "testQueue";
+            QueueDeclareOkBody response = interaction.openAnonymousConnection()
+                                                     .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                     .queue().declareName(queueName).declare()
+                                                     .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+            assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(queueName))));
+            assertThat(response.getMessageCount(), is(equalTo(0L)));
+            assertThat(response.getConsumerCount(), is(equalTo(0L)));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it

Posted by or...@apache.org.
QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/06e53d7a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/06e53d7a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/06e53d7a

Branch: refs/heads/master
Commit: 06e53d7a5f6cb942a2706dffe340018b659aa077
Parents: a3c00bb
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 18 01:31:29 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Sat Nov 18 01:38:53 2017 +0000

----------------------------------------------------------------------
 pom.xml                                         |  13 +
 systests/protocol-tests-amqp-1-0/pom.xml        |  35 +--
 .../qpid/tests/protocol/v1_0/FrameDecoder.java  | 276 +++++++++++++++++
 .../qpid/tests/protocol/v1_0/FrameEncoder.java  |  93 ++++++
 .../tests/protocol/v1_0/FrameTransport.java     | 205 +------------
 .../tests/protocol/v1_0/HeaderResponse.java     |  46 ---
 .../qpid/tests/protocol/v1_0/InputHandler.java  | 305 -------------------
 .../qpid/tests/protocol/v1_0/Interaction.java   | 141 +++------
 .../qpid/tests/protocol/v1_0/Matchers.java      |  60 ----
 .../qpid/tests/protocol/v1_0/OutputHandler.java |  96 ------
 .../protocol/v1_0/PerformativeResponse.java     |   1 +
 .../qpid/tests/protocol/v1_0/Response.java      |  25 --
 .../protocol/v1_0/SaslPerformativeResponse.java |   1 +
 .../tests/protocol/v1_0/SpecificationTest.java  |  34 ---
 .../tests/protocol/v1_0/DecodeErrorTest.java    |   2 +
 .../bindmapjms/TemporaryDestinationTest.java    |   4 +-
 .../extensions/websocket/WebSocketTest.java     |  13 +-
 .../v1_0/messaging/DeleteOnCloseTest.java       |   2 +-
 .../protocol/v1_0/messaging/MessageFormat.java  |   4 +-
 .../v1_0/messaging/MultiTransferTest.java       |   4 +-
 .../protocol/v1_0/messaging/OutcomeTest.java    |   4 +-
 .../protocol/v1_0/messaging/TransferTest.java   |  11 +-
 .../v1_0/transaction/DischargeTest.java         |   2 +-
 .../transaction/TransactionalTransferTest.java  |   4 +-
 .../v1_0/transport/ProtocolHeaderTest.java      |   2 +-
 .../v1_0/transport/connection/OpenTest.java     |   7 +-
 .../v1_0/transport/link/AttachTest.java         |   2 +-
 .../protocol/v1_0/transport/link/FlowTest.java  |   2 +-
 .../transport/link/ResumeDeliveriesTest.java    |  12 +-
 .../v1_0/transport/security/sasl/SaslTest.java  |   2 +-
 .../v1_0/transport/session/BeginTest.java       |   8 +-
 systests/protocol-tests-core/pom.xml            |  75 +++++
 .../qpid/tests/protocol/FrameTransport.java     | 198 ++++++++++++
 .../qpid/tests/protocol/HeaderResponse.java     |  46 +++
 .../qpid/tests/protocol/InputDecoder.java       |  30 ++
 .../qpid/tests/protocol/InputHandler.java       |  81 +++++
 .../apache/qpid/tests/protocol/Interaction.java | 141 +++++++++
 .../apache/qpid/tests/protocol/Matchers.java    |  60 ++++
 .../qpid/tests/protocol/OutputEncoder.java      |  29 ++
 .../qpid/tests/protocol/OutputHandler.java      |  69 +++++
 .../apache/qpid/tests/protocol/Response.java    |  25 ++
 .../qpid/tests/protocol/SpecificationTest.java  |  34 +++
 42 files changed, 1280 insertions(+), 924 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c6bbfcb..62e7033 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,7 @@
     <module>systests</module>
     <module>systests/systests-utils</module>
     <module>systests/qpid-systests-jms_2.0</module>
+    <module>systests/protocol-tests-core</module>
     <module>systests/protocol-tests-amqp-1-0</module>
     <module>systests/end-to-end-conversion-tests</module>
     <module>perftests</module>
@@ -407,6 +408,18 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-core</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-amqp-1-0</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <!-- External dependencies -->
       <dependency>
         <groupId>org.apache.qpid</groupId>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index 5200e2d..aa25351 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -53,6 +53,11 @@
 
         <dependency>
             <groupId>org.apache.qpid</groupId>
+            <artifactId>protocol-tests-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-systests-utils</artifactId>
         </dependency>
 
@@ -91,38 +96,9 @@
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-bdbstore</artifactId>
             <scope>test</scope>
-            <optional>true</optional>
         </dependency>
 
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-buffer</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-common</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-codec-http</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-handler</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-transport</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-library</artifactId>
         </dependency>
@@ -130,6 +106,7 @@
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-integration</artifactId>
         </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
new file mode 100644
index 0000000..4dc06cb
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(FrameDecoder.class);
+    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                            .registerTransportLayer()
+                                                                                            .registerMessagingLayer()
+                                                                                            .registerTransactionLayer()
+                                                                                            .registerSecurityLayer()
+                                                                                            .registerExtensionSoleconnLayer();
+    private final MyConnectionHandler _connectionHandler;
+    private volatile FrameHandler _frameHandler;
+
+    private enum ParsingState
+    {
+        HEADER,
+        PERFORMATIVES;
+    }
+
+    private final ValueHandler _valueHandler;
+
+    private volatile ParsingState _state = ParsingState.HEADER;
+
+    public FrameDecoder(final boolean isSasl)
+    {
+        _valueHandler = new ValueHandler(TYPE_REGISTRY);
+        _connectionHandler = new MyConnectionHandler();
+        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
+    }
+
+    @Override
+    public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
+    {
+        List<Response<?>> responses = new ArrayList<>();
+        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
+        switch(_state)
+        {
+            case HEADER:
+                if (inputBuffer.remaining() >= 8)
+                {
+                    byte[] header = new byte[8];
+                    inputBuffer.get(header);
+                    responses.add(new HeaderResponse(header));
+                    _state = ParsingState.PERFORMATIVES;
+                    _frameHandler.parse(qpidByteBuffer);
+                }
+                break;
+            case PERFORMATIVES:
+                _frameHandler.parse(qpidByteBuffer);
+                break;
+            default:
+                throw new IllegalStateException("Unexpected state : " + _state);
+        }
+
+        Response<?> r;
+        while((r = _connectionHandler._responseQueue.poll())!=null)
+        {
+            responses.add(r);
+        }
+        return responses;
+    }
+
+    private void resetInputHandlerAfterSaslOutcome()
+    {
+        _state = ParsingState.HEADER;
+        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
+    }
+
+    private class MyConnectionHandler implements ConnectionHandler
+    {
+        private volatile int _frameSize = 512;
+        private Queue<Response<?>> _responseQueue = new ConcurrentLinkedQueue<>();
+
+        @Override
+        public void receiveOpen(final int channel, final Open close)
+        {
+        }
+
+        @Override
+        public void receiveClose(final int channel, final Close close)
+        {
+
+        }
+
+        @Override
+        public void receiveBegin(final int channel, final Begin begin)
+        {
+
+        }
+
+        @Override
+        public void receiveEnd(final int channel, final End end)
+        {
+
+        }
+
+        @Override
+        public void receiveAttach(final int channel, final Attach attach)
+        {
+
+        }
+
+        @Override
+        public void receiveDetach(final int channel, final Detach detach)
+        {
+
+        }
+
+        @Override
+        public void receiveTransfer(final int channel, final Transfer transfer)
+        {
+
+        }
+
+        @Override
+        public void receiveDisposition(final int channel, final Disposition disposition)
+        {
+
+        }
+
+        @Override
+        public void receiveFlow(final int channel, final Flow flow)
+        {
+
+        }
+
+        @Override
+        public int getMaxFrameSize()
+        {
+            return _frameSize;
+        }
+
+        @Override
+        public int getChannelMax()
+        {
+            return UnsignedShort.MAX_VALUE.intValue();
+        }
+
+        @Override
+        public void handleError(final Error parsingError)
+        {
+            LOGGER.error("Unexpected error {}", parsingError);
+        }
+
+        @Override
+        public boolean closedForInput()
+        {
+            return false;
+        }
+
+        @Override
+        public void receive(final List<ChannelFrameBody> channelFrameBodies)
+        {
+            for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
+            {
+                Response response;
+                Object val = channelFrameBody.getFrameBody();
+                int channel = channelFrameBody.getChannel();
+                if (val instanceof FrameBody)
+                {
+                    FrameBody frameBody = (FrameBody) val;
+                    if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
+                    {
+                        _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
+                    }
+                    response = new PerformativeResponse((short) channel, frameBody);
+                }
+                else if (val instanceof SaslFrameBody)
+                {
+                    SaslFrameBody frameBody = (SaslFrameBody) val;
+                    response = new SaslPerformativeResponse((short) channel, frameBody);
+
+                    if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
+                    {
+                        resetInputHandlerAfterSaslOutcome();
+                    }
+                }
+                else
+                {
+                    throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
+                }
+
+                _responseQueue.add(response);
+            }
+        }
+
+        @Override
+        public void receiveSaslInit(final SaslInit saslInit)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslResponse(final SaslResponse saslResponse)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+        {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
new file mode 100644
index 0000000..56d6e6f
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                            .registerTransportLayer()
+                                                                                            .registerMessagingLayer()
+                                                                                            .registerTransactionLayer()
+                                                                                            .registerSecurityLayer()
+                                                                                            .registerExtensionSoleconnLayer();
+
+    @Override
+    public ByteBuffer encode(final Object msg)
+    {
+        if (msg instanceof AMQFrame)
+        {
+            List<ByteBuffer> buffers = new ArrayList<>();
+            FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
+            {
+                @Override
+                public boolean isDirectBufferPreferred()
+                {
+                    return false;
+                }
+
+                @Override
+                public void send(final QpidByteBuffer msg)
+                {
+                    byte[] data = new byte[msg.remaining()];
+                    msg.get(data);
+                    buffers.add(ByteBuffer.wrap(data));
+                }
+
+                @Override
+                public void flush()
+                {
+                }
+
+                @Override
+                public void close()
+                {
+
+                }
+            });
+            _frameWriter.send(((AMQFrame) msg));
+
+            int remaining = 0;
+            for (ByteBuffer byteBuffer: buffers)
+            {
+                remaining += byteBuffer.remaining();
+            }
+            ByteBuffer result = ByteBuffer.allocate(remaining);
+            for (ByteBuffer byteBuffer: buffers)
+            {
+                result.put(byteBuffer);
+            }
+            result.flip();
+            return result;
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 4d53751..dd59757 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -19,58 +19,12 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class FrameTransport implements AutoCloseable
+public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
 {
-    public static final long RESPONSE_TIMEOUT = Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout",6000);
-    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
-
-    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
-
-    private final EventLoopGroup _workerGroup;
-    private final InetSocketAddress _brokerAddress;
-    private final boolean _isSasl;
-
-    private Channel _channel;
-    private volatile boolean _channelClosedSeen = false;
-
     public FrameTransport(final InetSocketAddress brokerAddress)
     {
         this(brokerAddress, false);
@@ -78,165 +32,20 @@ public class FrameTransport implements AutoCloseable
 
     public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
     {
-        _brokerAddress = brokerAddress;
-        _isSasl = isSasl;
-        _workerGroup = new NioEventLoopGroup();
-    }
-
-    public InetSocketAddress getBrokerAddress()
-    {
-        return _brokerAddress;
+        super(brokerAddress, new FrameDecoder(isSasl), new FrameEncoder());
     }
 
+    @Override
     public FrameTransport connect()
     {
-        try
-        {
-            Bootstrap b = new Bootstrap();
-            b.group(_workerGroup);
-            b.channel(NioSocketChannel.class);
-            b.option(ChannelOption.SO_KEEPALIVE, true);
-            b.handler(new ChannelInitializer<SocketChannel>()
-            {
-                @Override
-                public void initChannel(SocketChannel ch) throws Exception
-                {
-                    ChannelPipeline pipeline = ch.pipeline();
-                    buildInputOutputPipeline(pipeline);
-                }
-            });
-
-            _channel = b.connect(_brokerAddress).sync().channel();
-            _channel.closeFuture().addListener(future ->
-                                               {
-                                                   _channelClosedSeen = true;
-                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
-                                               });
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
+        super.connect();
         return this;
     }
 
-    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
-    {
-        pipeline.addLast(new InputHandler(_queue, _isSasl)).addLast(new OutputHandler());
-    }
-
     @Override
-    public void close() throws Exception
-    {
-        try
-        {
-            if (_channel != null)
-            {
-                _channel.disconnect().sync();
-                _channel.close().sync();
-                _channel = null;
-            }
-        }
-        finally
-        {
-            _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
-        }
-    }
-
-    public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
-    {
-        Preconditions.checkState(_channel != null, "Not connected");
-        ChannelPromise promise = _channel.newPromise();
-        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-        buffer.writeBytes(bytes);
-        _channel.write(buffer, promise);
-        _channel.flush();
-        return JdkFutureAdapters.listenInPoolThread(promise);
-    }
-
-    public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
+    public byte[] getProtocolHeader()
     {
-        Preconditions.checkState(_channel != null, "Not connected");
-        ChannelPromise promise = _channel.newPromise();
-        final TransportFrame transportFrame;
-        try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
-        {
-            final QpidByteBuffer duplicate;
-            if (payload == null)
-            {
-                duplicate = null;
-            }
-            else
-            {
-                duplicate = payload.duplicate();
-            }
-            transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
-            _channel.write(transportFrame, promise);
-            _channel.flush();
-            final ListenableFuture<Void> listenableFuture = JdkFutureAdapters.listenInPoolThread(promise);
-            if (frameBody instanceof Transfer)
-            {
-                listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
-            }
-            if (duplicate != null)
-            {
-                listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
-            }
-            return listenableFuture;
-        }
-    }
-
-    public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
-    {
-        SASLFrame transportFrame = new SASLFrame(frameBody);
-        ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
-        channelFuture.sync();
-        return JdkFutureAdapters.listenInPoolThread(channelFuture);
-    }
-
-    public <T extends Response<?>> T getNextResponse() throws Exception
-    {
-        return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
-    }
-
-    public void doCloseConnection() throws Exception
-    {
-        Close close = new Close();
-
-        sendPerformative(close, UnsignedShort.valueOf((short) 0));
-        PerformativeResponse response = getNextResponse();
-        if (!(response.getBody() instanceof Close))
-        {
-            throw new IllegalStateException(String.format(
-                    "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
-        }
-    }
-
-    public void assertNoMoreResponses() throws Exception
-    {
-        Response response = getNextResponse();
-        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
-    }
-
-    public void assertNoMoreResponsesAndChannelClosed() throws Exception
-    {
-        assertNoMoreResponses();
-        assertThat(_channelClosedSeen, is(true));
-    }
-
-    private static class ChannelClosedResponse implements Response<Void>
-    {
-        @Override
-        public String toString()
-        {
-            return "ChannelClosed";
-        }
-
-        @Override
-        public Void getBody()
-        {
-            return null;
-        }
+        return "AMQP\0\1\0\0".getBytes(UTF_8);
     }
 
     public Interaction newInteraction()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
deleted file mode 100644
index 9503113..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-public class HeaderResponse implements Response<byte[]>
-{
-    private final byte[] _header;
-
-    public HeaderResponse(final byte[] header)
-    {
-        _header = header;
-    }
-
-    @Override
-    public byte[] getBody()
-    {
-        return _header;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "HeaderResponse{" +
-               "_header=" + Arrays.toString(_header) +
-               '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
deleted file mode 100644
index e3acd24..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.ReferenceCountUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
-import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class InputHandler extends ChannelInboundHandlerAdapter
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
-    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
-                                                                                            .registerTransportLayer()
-                                                                                            .registerMessagingLayer()
-                                                                                            .registerTransactionLayer()
-                                                                                            .registerSecurityLayer()
-                                                                                            .registerExtensionSoleconnLayer();
-
-    private enum ParsingState
-    {
-        HEADER,
-        PERFORMATIVES
-    }
-
-    private final MyConnectionHandler _connectionHandler;
-    private final ValueHandler _valueHandler;
-    private final BlockingQueue<Response<?>> _responseQueue;
-
-    private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
-    private volatile FrameHandler _frameHandler;
-    private volatile ParsingState _state = ParsingState.HEADER;
-
-    public InputHandler(final BlockingQueue<Response<?>> queue, final boolean isSasl)
-    {
-
-        _valueHandler = new ValueHandler(TYPE_REGISTRY);
-        _connectionHandler = new MyConnectionHandler();
-        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
-
-        _responseQueue = queue;
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
-    {
-        ByteBuf buf = (ByteBuf) msg;
-        QpidByteBuffer qpidBuf = QpidByteBuffer.allocate(buf.readableBytes());
-        qpidBuf.put(buf.nioBuffer());
-        qpidBuf.flip();
-        LOGGER.debug("Incoming {} byte(s)", qpidBuf.remaining());
-
-        if (_inputBuffer.hasRemaining())
-        {
-            QpidByteBuffer old = _inputBuffer;
-            _inputBuffer = QpidByteBuffer.allocate(_inputBuffer.remaining() + qpidBuf.remaining());
-            _inputBuffer.put(old);
-            _inputBuffer.put(qpidBuf);
-            old.dispose();
-            qpidBuf.dispose();
-            _inputBuffer.flip();
-        }
-        else
-        {
-            _inputBuffer.dispose();
-            _inputBuffer = qpidBuf;
-        }
-
-        doParsing();
-
-        LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
-
-        if (_inputBuffer.hasRemaining())
-        {
-            _inputBuffer.compact();
-            _inputBuffer.flip();
-        }
-
-        ReferenceCountUtil.release(msg);
-    }
-
-    private void doParsing()
-    {
-        switch(_state)
-        {
-            case HEADER:
-                if (_inputBuffer.remaining() >= 8)
-                {
-                    byte[] header = new byte[8];
-                    _inputBuffer.get(header);
-                    _responseQueue.add(new HeaderResponse(header));
-                    _state = ParsingState.PERFORMATIVES;
-                    doParsing();
-                }
-                break;
-            case PERFORMATIVES:
-                _frameHandler.parse(_inputBuffer);
-                break;
-            default:
-                throw new IllegalStateException("Unexpected state : " + _state);
-        }
-    }
-
-    private void resetInputHandlerAfterSaslOutcome()
-    {
-        _state = ParsingState.HEADER;
-        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
-    }
-
-    private class MyConnectionHandler implements ConnectionHandler
-    {
-        private volatile int _frameSize = 512;
-
-        @Override
-        public void receiveOpen(final int channel, final Open close)
-        {
-        }
-
-        @Override
-        public void receiveClose(final int channel, final Close close)
-        {
-
-        }
-
-        @Override
-        public void receiveBegin(final int channel, final Begin begin)
-        {
-
-        }
-
-        @Override
-        public void receiveEnd(final int channel, final End end)
-        {
-
-        }
-
-        @Override
-        public void receiveAttach(final int channel, final Attach attach)
-        {
-
-        }
-
-        @Override
-        public void receiveDetach(final int channel, final Detach detach)
-        {
-
-        }
-
-        @Override
-        public void receiveTransfer(final int channel, final Transfer transfer)
-        {
-
-        }
-
-        @Override
-        public void receiveDisposition(final int channel, final Disposition disposition)
-        {
-
-        }
-
-        @Override
-        public void receiveFlow(final int channel, final Flow flow)
-        {
-
-        }
-
-        @Override
-        public int getMaxFrameSize()
-        {
-            return _frameSize;
-        }
-
-        @Override
-        public int getChannelMax()
-        {
-            return UnsignedShort.MAX_VALUE.intValue();
-        }
-
-        @Override
-        public void handleError(final Error parsingError)
-        {
-            LOGGER.error("Unexpected error {}", parsingError);
-        }
-
-        @Override
-        public boolean closedForInput()
-        {
-            return false;
-        }
-
-        @Override
-        public void receive(final List<ChannelFrameBody> channelFrameBodies)
-        {
-            for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
-            {
-                Response response;
-                Object val = channelFrameBody.getFrameBody();
-                int channel = channelFrameBody.getChannel();
-                if (val instanceof FrameBody)
-                {
-                    FrameBody frameBody = (FrameBody) val;
-                    if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
-                    {
-                        _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
-                    }
-                    response = new PerformativeResponse((short) channel, frameBody);
-                }
-                else if (val instanceof SaslFrameBody)
-                {
-                    SaslFrameBody frameBody = (SaslFrameBody) val;
-                    response = new SaslPerformativeResponse((short) channel, frameBody);
-
-                    if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
-                    {
-                        resetInputHandlerAfterSaslOutcome();
-                    }
-                }
-                else
-                {
-                    throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
-                }
-
-                _responseQueue.add(response);
-            }
-        }
-
-        @Override
-        public void receiveSaslInit(final SaslInit saslInit)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslChallenge(final SaslChallenge saslChallenge)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslResponse(final SaslResponse saslResponse)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslOutcome(final SaslOutcome saslOutcome)
-        {
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 52518ab..7d73ce8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -20,8 +20,6 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
-import static com.google.common.util.concurrent.Futures.allAsList;
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,21 +28,19 @@ import static org.hamcrest.Matchers.is;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -81,8 +77,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
 
-public class Interaction
+public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
 {
     private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Begin _begin;
@@ -94,14 +91,11 @@ public class Interaction
     private final Flow _flow;
     private final Transfer _transfer;
     private final Disposition _disposition;
-    private final FrameTransport _transport;
     private final SaslInit _saslInit;
     private final SaslResponse _saslResponse;
     private byte[] _protocolHeader;
     private UnsignedShort _connectionChannel;
     private UnsignedShort _sessionChannel;
-    private Response<?> _latestResponse;
-    private ListenableFuture<?> _latestFuture;
     private int _deliveryIdCounter;
     private List<Transfer> _latestDelivery;
     private Object _decodedLatestDelivery;
@@ -109,10 +103,10 @@ public class Interaction
 
     Interaction(final FrameTransport frameTransport)
     {
+        super(frameTransport);
         final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO;
-        _transport = frameTransport;
 
-        _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8);
+        _protocolHeader = frameTransport.getProtocolHeader();
 
         _saslInit = new SaslInit();
         _saslResponse = new SaslResponse();
@@ -154,6 +148,19 @@ public class Interaction
         _disposition.setFirst(UnsignedInteger.ZERO);
     }
 
+    public void doCloseConnection() throws Exception
+    {
+        Close close = new Close();
+
+        sendPerformative(close, UnsignedShort.valueOf((short) 0));
+        Response<?> response = getNextResponse();
+        if (!(response.getBody() instanceof Close))
+        {
+            throw new IllegalStateException(String.format(
+                    "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
+        }
+    }
+
     /////////////////////////
     // Protocol Negotiation //
     /////////////////////////
@@ -164,17 +171,15 @@ public class Interaction
         return this;
     }
 
-    public Interaction negotiateProtocol() throws Exception
+    @Override
+    protected byte[] getProtocolHeader()
+    {
+        return _protocolHeader;
+    }
+
+    @Override
+    protected Interaction getInteraction()
     {
-        final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
         return this;
     }
 
@@ -977,83 +982,35 @@ public class Interaction
 
     private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
     {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
+        SASLFrame transportFrame = new SASLFrame(frameBody);
+        sendPerformativeAndChainFuture(transportFrame, true);
     }
 
     private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
     {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
-    }
-
-    public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception
-    {
-        sync();
-        _latestResponse = _transport.getNextResponse();
-        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
-        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
-            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+        final TransportFrame transportFrame;
+        try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
         {
-            return this;
-        }
-        acceptableResponseClasses.remove(null);
-        if (_latestResponse != null)
-        {
-            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+            final QpidByteBuffer duplicate;
+            if (payload == null)
             {
-                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
-                {
-                    return this;
-                }
+                duplicate = null;
+            }
+            else
+            {
+                duplicate = payload.duplicate();
+            }
+            transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
+            ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+            if (frameBody instanceof Transfer)
+            {
+                listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
+            }
+            if (duplicate != null)
+            {
+                listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
             }
         }
-        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
-                                                      acceptableResponseClasses,
-                                                      _latestResponse == null ? null : _latestResponse.getBody()));
-    }
-
-    public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException
-    {
-        if (_latestFuture != null)
-        {
-            _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
-            _latestFuture = null;
-        }
-        return this;
-    }
-
-    public Response<?> getLatestResponse() throws Exception
-    {
-        sync();
-        return _latestResponse;
-    }
-
-    public <T> T getLatestResponse(Class<T> type) throws Exception
-    {
-        sync();
-        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
-        {
-            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
-                                                          type.getSimpleName(),
-                                                          _latestResponse.getBody()));
-        }
-
-        return (T) _latestResponse.getBody();
     }
 
     public Interaction flowHandleFromLinkHandle()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
deleted file mode 100644
index 029dc70..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-
-public class Matchers
-{
-    public static Matcher<Response> protocolHeader(byte[] expectedHeader)
-    {
-        return new BaseMatcher<Response>()
-        {
-            @Override
-            public void describeTo(final Description description)
-            {
-                description.appendValue(new HeaderResponse(expectedHeader));
-            }
-
-            @Override
-            public boolean matches(final Object o)
-            {
-                if (o == null)
-                {
-                    return false;
-                }
-                if (!(o instanceof HeaderResponse))
-                {
-                    return false;
-                }
-                if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
-                {
-                    return false;
-                }
-                return true;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
deleted file mode 100644
index 68f4322..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.transport.ByteBufferSender;
-
-public class OutputHandler extends ChannelOutboundHandlerAdapter
-{
-    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
-                                                                                            .registerTransportLayer()
-                                                                                            .registerMessagingLayer()
-                                                                                            .registerTransactionLayer()
-                                                                                            .registerSecurityLayer()
-                                                                                            .registerExtensionSoleconnLayer();
-
-
-    @Override
-    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
-    {
-
-        if (msg instanceof AMQFrame)
-        {
-            FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
-            {
-                @Override
-                public boolean isDirectBufferPreferred()
-                {
-                    return false;
-                }
-
-                @Override
-                public void send(final QpidByteBuffer msg)
-                {
-                    byte[] data = new byte[msg.remaining()];
-                    msg.get(data);
-                    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-                    buffer.writeBytes(data);
-                    try
-                    {
-                        OutputHandler.super.write(ctx, buffer, promise);
-                    }
-                    catch (Exception e)
-                    {
-                        promise.setFailure(e);
-                    }
-                }
-
-                @Override
-                public void flush()
-                {
-                }
-
-                @Override
-                public void close()
-                {
-
-                }
-            });
-            _frameWriter.send(((AMQFrame) msg));
-        }
-        else
-        {
-            super.write(ctx, msg, promise);
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
index 06a64dc..9e03a26 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
@@ -20,6 +20,7 @@
 package org.apache.qpid.tests.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.tests.protocol.Response;
 
 public class PerformativeResponse implements Response<FrameBody>
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
deleted file mode 100644
index a7e341c..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-public interface Response<T>
-{
-    T getBody();
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
index 08893e0..02ab3c9 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.tests.protocol.Response;
 
 public class SaslPerformativeResponse implements Response<SaslFrameBody>
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
deleted file mode 100644
index ea3d164..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface SpecificationTest
-{
-    String section();
-    String description();
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index 8a150fa..1ca4419 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -54,6 +54,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 817be05..a3270a0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -44,7 +44,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
@@ -108,7 +108,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 
             interaction.consumeResponse().getLatestResponse(Flow.class);
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
 
         assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
index a39b1b3..e85f356 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
@@ -39,10 +39,11 @@ import org.junit.Test;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class WebSocketTest extends BrokerAdminUsingTestBase
 {
@@ -73,7 +74,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
         try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect())
         {
-            final Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            final Open responseOpen = interaction
                                                .negotiateProtocol().consumeResponse()
                                                .open().consumeResponse()
                                                .getLatestResponse(Open.class);
@@ -84,7 +86,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -97,7 +99,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
         try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
         {
-            final Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            final Open responseOpen = interaction
                                                .negotiateProtocol().consumeResponse()
                                                .open().consumeResponse()
                                                .getLatestResponse(Open.class);
@@ -108,7 +111,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
index 6ba9058..36203a4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
@@ -48,7 +48,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class DeleteOnCloseTest extends BrokerAdminUsingTestBase

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 97fe3bb..1cf1ff0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -43,9 +43,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index bcd155f..0a45410 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -52,9 +52,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
index 1853446..cc377cc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -99,7 +99,7 @@ public class OutcomeTest extends BrokerAdminUsingTestBase
             assertThat(secondDeliveryPayload, is(equalTo("message2")));
 
             // verify that no unexpected performative is received by closing
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index eb72532..245c624 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -76,8 +76,8 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
 import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -439,7 +439,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .disposition();
 
             // verify that no unexpected performative is received by closing
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -688,8 +688,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
             assertThat(isSettled.get(), is(true));
 
             // verify no unexpected performative received by closing the connection
-           transport.doCloseConnection();
-
+            interaction.doCloseConnection();
         }
     }
 
@@ -796,7 +795,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .transfer()
                        .sync();
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 42f6114..bd7c113 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -58,7 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 1496d13..fb61974 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -57,9 +57,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index 619e5d9..db91db1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class ProtocolHeaderTest extends BrokerAdminUsingTestBase
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index b3f57c1..ab570da 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 
 public class OpenTest extends BrokerAdminUsingTestBase
@@ -77,7 +77,8 @@ public class OpenTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            Open responseOpen = interaction
                                          .negotiateProtocol().consumeResponse()
                                          .openContainerId("testContainerId")
                                          .open().consumeResponse()
@@ -88,7 +89,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index c559399..774cc00 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -43,7 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index ed72ba3..1a38cf0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it

Posted by or...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 35ee4e7..ad56ea9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -69,9 +69,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -459,7 +459,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                 }
             }
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -588,7 +588,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                            .disposition();
             }
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             if (getBrokerAdmin().isQueueDepthSupported())
             {
@@ -672,7 +672,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                        .dispositionRole(Role.RECEIVER)
                        .disposition();
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             if (getBrokerAdmin().isQueueDepthSupported())
             {
@@ -771,7 +771,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
             }
             while (!settled);
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index cf12b05..6fea928 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -49,7 +49,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class SaslTest extends BrokerAdminUsingTestBase
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index 352fd19..24cb435 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -36,10 +36,11 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class BeginTest extends BrokerAdminUsingTestBase
 {
@@ -74,7 +75,8 @@ public class BeginTest extends BrokerAdminUsingTestBase
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
             final UnsignedShort channel = UnsignedShort.valueOf(37);
-            Begin responseBegin = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            Begin responseBegin = interaction
                                            .negotiateProtocol().consumeResponse()
                                            .open().consumeResponse(Open.class)
                                            .sessionChannel(channel)
@@ -85,7 +87,7 @@ public class BeginTest extends BrokerAdminUsingTestBase
             assertThat(responseBegin.getOutgoingWindow(), is(instanceOf(UnsignedInteger.class)));
             assertThat(responseBegin.getNextOutgoingId(), is(instanceOf(UnsignedInteger.class)));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/pom.xml b/systests/protocol-tests-core/pom.xml
new file mode 100644
index 0000000..fa561ed
--- /dev/null
+++ b/systests/protocol-tests-core/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  ~
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-systests-parent</artifactId>
+        <version>7.1.0-SNAPSHOT</version>
+        <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>protocol-tests-core</artifactId>
+    <name>Apache Qpid Broker-J Protocol Tests Core</name>
+    <description>Core classes for Apache Qpid protocol tests</description>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-integration</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
new file mode 100644
index 0000000..daf500d
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public abstract class FrameTransport implements AutoCloseable
+{
+    public static final long RESPONSE_TIMEOUT =
+            Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
+    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
+
+    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
+    private final EventLoopGroup _workerGroup;
+    private final InetSocketAddress _brokerAddress;
+    private final InputHandler _inputHandler;
+    private final OutputHandler _outputHandler;
+
+    private volatile Channel _channel;
+    private volatile boolean _channelClosedSeen = false;
+
+    public FrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
+    {
+        _brokerAddress = brokerAddress;
+        _inputHandler = new InputHandler(_queue, inputDecoder);
+        _outputHandler = new OutputHandler(outputEncoder);
+        _workerGroup = new NioEventLoopGroup();
+    }
+
+    public InetSocketAddress getBrokerAddress()
+    {
+        return _brokerAddress;
+    }
+
+    public FrameTransport connect()
+    {
+        try
+        {
+            Bootstrap b = new Bootstrap();
+            b.group(_workerGroup);
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.SO_KEEPALIVE, true);
+            b.handler(new ChannelInitializer<SocketChannel>()
+            {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception
+                {
+                    ChannelPipeline pipeline = ch.pipeline();
+                    buildInputOutputPipeline(pipeline);
+                }
+            });
+
+            _channel = b.connect(_brokerAddress).sync().channel();
+            _channel.closeFuture().addListener(future ->
+                                               {
+                                                   _channelClosedSeen = true;
+                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
+                                               });
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return this;
+    }
+
+    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+    {
+        pipeline.addLast(_inputHandler).addLast(_outputHandler);
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+        try
+        {
+            if (_channel != null)
+            {
+                _channel.disconnect().sync();
+                _channel.close().sync();
+                _channel = null;
+            }
+        }
+        finally
+        {
+            _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
+        }
+    }
+
+    public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+    {
+        Preconditions.checkState(_channel != null, "Not connected");
+        ChannelPromise promise = _channel.newPromise();
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+        buffer.writeBytes(bytes);
+        _channel.write(buffer, promise);
+        _channel.flush();
+        return JdkFutureAdapters.listenInPoolThread(promise);
+    }
+
+    public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+    {
+        Preconditions.checkState(_channel != null, "Not connected");
+        if (!sync)
+        {
+            ChannelPromise promise = _channel.newPromise();
+            _channel.write(data, promise);
+            _channel.flush();
+            return JdkFutureAdapters.listenInPoolThread(promise);
+        }
+        else
+        {
+            ChannelFuture channelFuture = _channel.writeAndFlush(data);
+            channelFuture.sync();
+            return Futures.immediateFuture(null);
+        }
+    }
+
+    public <T extends Response<?>> T getNextResponse() throws Exception
+    {
+        return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
+    public void assertNoMoreResponses() throws Exception
+    {
+        Response response = getNextResponse();
+        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+    }
+
+    public void assertNoMoreResponsesAndChannelClosed() throws Exception
+    {
+        assertNoMoreResponses();
+        assertThat(_channelClosedSeen, is(true));
+    }
+
+    private static class ChannelClosedResponse implements Response<Void>
+    {
+        @Override
+        public String toString()
+        {
+            return "ChannelClosed";
+        }
+
+        @Override
+        public Void getBody()
+        {
+            return null;
+        }
+    }
+
+    public abstract byte[] getProtocolHeader();
+
+    protected abstract Interaction newInteraction();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
new file mode 100644
index 0000000..9767b40
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+public class HeaderResponse implements Response<byte[]>
+{
+    private final byte[] _header;
+
+    public HeaderResponse(final byte[] header)
+    {
+        _header = header;
+    }
+
+    @Override
+    public byte[] getBody()
+    {
+        return _header;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "HeaderResponse{" +
+               "_header=" + Arrays.toString(_header) +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
new file mode 100644
index 0000000..369cfd1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+public interface InputDecoder
+{
+    Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
new file mode 100644
index 0000000..2d5fb45
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InputHandler extends ChannelInboundHandlerAdapter
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
+
+    private final BlockingQueue<Response<?>> _responseQueue;
+    private final InputDecoder _inputDecoder;
+
+    private ByteBuffer _inputBuffer = ByteBuffer.allocate(0);
+
+    InputHandler(final BlockingQueue<Response<?>> queue, InputDecoder inputDecoder)
+    {
+        _responseQueue = queue;
+        _inputDecoder = inputDecoder;
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+    {
+        ByteBuf buf = (ByteBuf) msg;
+        ByteBuffer byteBuffer = ByteBuffer.allocate(buf.readableBytes());
+        byteBuffer.put(buf.nioBuffer());
+        byteBuffer.flip();
+        LOGGER.debug("Incoming {} byte(s)", byteBuffer.remaining());
+
+        if (_inputBuffer.hasRemaining())
+        {
+            ByteBuffer old = _inputBuffer;
+            _inputBuffer = ByteBuffer.allocate(_inputBuffer.remaining() + byteBuffer.remaining());
+            _inputBuffer.put(old);
+            _inputBuffer.put(byteBuffer);
+            _inputBuffer.flip();
+        }
+        else
+        {
+            _inputBuffer = byteBuffer;
+        }
+
+        _responseQueue.addAll(_inputDecoder.decode(_inputBuffer));
+
+        LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
+
+        if (_inputBuffer.hasRemaining())
+        {
+            _inputBuffer.compact();
+            _inputBuffer.flip();
+        }
+
+        ReferenceCountUtil.release(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
new file mode 100644
index 0000000..238c0a5
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class Interaction<I extends Interaction>
+{
+    private final FrameTransport _transport;
+    private ListenableFuture<?> _latestFuture;
+    private Response<?> _latestResponse;
+
+    public Interaction(final FrameTransport frameTransport)
+    {
+        _transport = frameTransport;
+    }
+
+    public I consumeResponse(final Class<?>... responseTypes) throws Exception
+    {
+        sync();
+        _latestResponse = getNextResponse();
+        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+        {
+            return getInteraction();
+        }
+        acceptableResponseClasses.remove(null);
+        if (_latestResponse != null)
+        {
+            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+            {
+                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+                {
+                    return getInteraction();
+                }
+            }
+        }
+        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+                                                      acceptableResponseClasses,
+                                                      _latestResponse == null ? null : _latestResponse.getBody()));
+    }
+
+    protected Response<?> getNextResponse() throws Exception
+    {
+        return _transport.getNextResponse();
+    }
+
+    public I sync() throws InterruptedException, ExecutionException, TimeoutException
+    {
+        if (_latestFuture != null)
+        {
+            _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+            _latestFuture = null;
+        }
+        return getInteraction();
+    }
+
+    public Response<?> getLatestResponse() throws Exception
+    {
+        sync();
+        return _latestResponse;
+    }
+
+    public <T> T getLatestResponse(Class<T> type) throws Exception
+    {
+        sync();
+        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+        {
+            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+                                                          type.getSimpleName(),
+                                                          _latestResponse.getBody()));
+        }
+
+        return (T) _latestResponse.getBody();
+    }
+
+    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return future;
+    }
+
+    public I negotiateProtocol() throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return getInteraction();
+    }
+
+    protected FrameTransport getTransport()
+    {
+        return _transport;
+    }
+
+    protected abstract byte[] getProtocolHeader();
+
+    protected abstract I getInteraction();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
new file mode 100644
index 0000000..292ae9a
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+public class Matchers
+{
+    public static Matcher<Response> protocolHeader(byte[] expectedHeader)
+    {
+        return new BaseMatcher<Response>()
+        {
+            @Override
+            public void describeTo(final Description description)
+            {
+                description.appendValue(new HeaderResponse(expectedHeader));
+            }
+
+            @Override
+            public boolean matches(final Object o)
+            {
+                if (o == null)
+                {
+                    return false;
+                }
+                if (!(o instanceof HeaderResponse))
+                {
+                    return false;
+                }
+                if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
+                {
+                    return false;
+                }
+                return true;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
new file mode 100644
index 0000000..a6a4a47
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+public interface OutputEncoder
+{
+    ByteBuffer encode(Object msg);
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
new file mode 100644
index 0000000..40a2ca7
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class OutputHandler extends ChannelOutboundHandlerAdapter
+{
+    private final OutputEncoder _outputEncoder;
+
+    OutputHandler(final OutputEncoder outputEncoder)
+    {
+        _outputEncoder = outputEncoder;
+    }
+
+    @Override
+    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
+    {
+        ByteBuffer byteBuffer = _outputEncoder.encode(msg);
+        if (byteBuffer != null)
+        {
+            send(ctx, byteBuffer, promise);
+        }
+        else
+        {
+            super.write(ctx, msg, promise);
+        }
+    }
+
+    private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+    {
+        byte[] data = new byte[dataByteBuffer.remaining()];
+        dataByteBuffer.get(data);
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+        buffer.writeBytes(data);
+        try
+        {
+            OutputHandler.super.write(ctx, buffer, promise);
+        }
+        catch (Exception e)
+        {
+            promise.setFailure(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
new file mode 100644
index 0000000..debc06f
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+public interface Response<T>
+{
+    T getBody();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
new file mode 100644
index 0000000..db6d7a1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface SpecificationTest
+{
+    String section();
+    String description();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org