You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/10 14:44:41 UTC

[2/6] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it

QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it

Cherry picked from 06e53d7 with manual resolutions.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/deab4580
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/deab4580
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/deab4580

Branch: refs/heads/7.0.x
Commit: deab4580b7a876cfe16d35fe40804ac19e277cfc
Parents: 9a76285
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 18 01:31:29 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:34:28 2018 +0000

----------------------------------------------------------------------
 pom.xml                                         |  13 +
 systests/protocol-tests-amqp-1-0/pom.xml        |  35 +--
 .../qpid/tests/protocol/v1_0/FrameDecoder.java  | 276 +++++++++++++++++
 .../qpid/tests/protocol/v1_0/FrameEncoder.java  |  93 ++++++
 .../tests/protocol/v1_0/FrameTransport.java     | 205 +------------
 .../tests/protocol/v1_0/HeaderResponse.java     |  46 ---
 .../qpid/tests/protocol/v1_0/InputHandler.java  | 305 -------------------
 .../qpid/tests/protocol/v1_0/Interaction.java   | 141 +++------
 .../qpid/tests/protocol/v1_0/Matchers.java      |  60 ----
 .../qpid/tests/protocol/v1_0/OutputHandler.java |  96 ------
 .../protocol/v1_0/PerformativeResponse.java     |   1 +
 .../qpid/tests/protocol/v1_0/Response.java      |  25 --
 .../protocol/v1_0/SaslPerformativeResponse.java |   1 +
 .../tests/protocol/v1_0/SpecificationTest.java  |  34 ---
 .../tests/protocol/v1_0/DecodeErrorTest.java    |   2 +
 .../bindmapjms/TemporaryDestinationTest.java    |   4 +-
 .../extensions/management/ManagementTest.java   |   5 +-
 .../extensions/websocket/WebSocketTest.java     |  13 +-
 .../v1_0/messaging/DeleteOnCloseTest.java       |   2 +-
 .../protocol/v1_0/messaging/MessageFormat.java  |   4 +-
 .../v1_0/messaging/MultiTransferTest.java       |   4 +-
 .../protocol/v1_0/messaging/OutcomeTest.java    |   4 +-
 .../protocol/v1_0/messaging/TransferTest.java   |  11 +-
 .../v1_0/transaction/DischargeTest.java         |   2 +-
 .../transaction/TransactionalTransferTest.java  |   4 +-
 .../v1_0/transport/ProtocolHeaderTest.java      |   2 +-
 .../v1_0/transport/connection/OpenTest.java     |   7 +-
 .../v1_0/transport/link/AttachTest.java         |   2 +-
 .../protocol/v1_0/transport/link/FlowTest.java  |   2 +-
 .../transport/link/ResumeDeliveriesTest.java    |  12 +-
 .../v1_0/transport/security/sasl/SaslTest.java  |   2 +-
 .../v1_0/transport/session/BeginTest.java       |   8 +-
 systests/protocol-tests-core/pom.xml            |  75 +++++
 .../qpid/tests/protocol/FrameTransport.java     | 198 ++++++++++++
 .../qpid/tests/protocol/HeaderResponse.java     |  46 +++
 .../qpid/tests/protocol/InputDecoder.java       |  30 ++
 .../qpid/tests/protocol/InputHandler.java       |  81 +++++
 .../apache/qpid/tests/protocol/Interaction.java | 141 +++++++++
 .../apache/qpid/tests/protocol/Matchers.java    |  60 ++++
 .../qpid/tests/protocol/OutputEncoder.java      |  29 ++
 .../qpid/tests/protocol/OutputHandler.java      |  69 +++++
 .../apache/qpid/tests/protocol/Response.java    |  25 ++
 .../qpid/tests/protocol/SpecificationTest.java  |  34 +++
 43 files changed, 1282 insertions(+), 927 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1262002..1287501 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,7 @@
     <module>systests</module>
     <module>systests/systests-utils</module>
     <module>systests/qpid-systests-jms_2.0</module>
+    <module>systests/protocol-tests-core</module>
     <module>systests/protocol-tests-amqp-1-0</module>
     <module>systests/end-to-end-conversion-tests</module>
     <module>perftests</module>
@@ -407,6 +408,18 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-core</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-amqp-1-0</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <!-- External dependencies -->
       <dependency>
         <groupId>org.apache.qpid</groupId>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index 56fc1f7..c392cf7 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -56,6 +56,11 @@
 
         <dependency>
             <groupId>org.apache.qpid</groupId>
+            <artifactId>protocol-tests-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-systests-utils</artifactId>
         </dependency>
 
@@ -94,38 +99,9 @@
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-bdbstore</artifactId>
             <scope>test</scope>
-            <optional>true</optional>
         </dependency>
 
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-buffer</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-common</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-codec-http</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-handler</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-transport</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-library</artifactId>
         </dependency>
@@ -133,6 +109,7 @@
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-integration</artifactId>
         </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
new file mode 100644
index 0000000..4dc06cb
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(FrameDecoder.class);
+    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                            .registerTransportLayer()
+                                                                                            .registerMessagingLayer()
+                                                                                            .registerTransactionLayer()
+                                                                                            .registerSecurityLayer()
+                                                                                            .registerExtensionSoleconnLayer();
+    private final MyConnectionHandler _connectionHandler;
+    private volatile FrameHandler _frameHandler;
+
+    private enum ParsingState
+    {
+        HEADER,
+        PERFORMATIVES;
+    }
+
+    private final ValueHandler _valueHandler;
+
+    private volatile ParsingState _state = ParsingState.HEADER;
+
+    public FrameDecoder(final boolean isSasl)
+    {
+        _valueHandler = new ValueHandler(TYPE_REGISTRY);
+        _connectionHandler = new MyConnectionHandler();
+        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
+    }
+
+    @Override
+    public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
+    {
+        List<Response<?>> responses = new ArrayList<>();
+        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
+        switch(_state)
+        {
+            case HEADER:
+                if (inputBuffer.remaining() >= 8)
+                {
+                    byte[] header = new byte[8];
+                    inputBuffer.get(header);
+                    responses.add(new HeaderResponse(header));
+                    _state = ParsingState.PERFORMATIVES;
+                    _frameHandler.parse(qpidByteBuffer);
+                }
+                break;
+            case PERFORMATIVES:
+                _frameHandler.parse(qpidByteBuffer);
+                break;
+            default:
+                throw new IllegalStateException("Unexpected state : " + _state);
+        }
+
+        Response<?> r;
+        while((r = _connectionHandler._responseQueue.poll())!=null)
+        {
+            responses.add(r);
+        }
+        return responses;
+    }
+
+    private void resetInputHandlerAfterSaslOutcome()
+    {
+        _state = ParsingState.HEADER;
+        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
+    }
+
+    private class MyConnectionHandler implements ConnectionHandler
+    {
+        private volatile int _frameSize = 512;
+        private Queue<Response<?>> _responseQueue = new ConcurrentLinkedQueue<>();
+
+        @Override
+        public void receiveOpen(final int channel, final Open close)
+        {
+        }
+
+        @Override
+        public void receiveClose(final int channel, final Close close)
+        {
+
+        }
+
+        @Override
+        public void receiveBegin(final int channel, final Begin begin)
+        {
+
+        }
+
+        @Override
+        public void receiveEnd(final int channel, final End end)
+        {
+
+        }
+
+        @Override
+        public void receiveAttach(final int channel, final Attach attach)
+        {
+
+        }
+
+        @Override
+        public void receiveDetach(final int channel, final Detach detach)
+        {
+
+        }
+
+        @Override
+        public void receiveTransfer(final int channel, final Transfer transfer)
+        {
+
+        }
+
+        @Override
+        public void receiveDisposition(final int channel, final Disposition disposition)
+        {
+
+        }
+
+        @Override
+        public void receiveFlow(final int channel, final Flow flow)
+        {
+
+        }
+
+        @Override
+        public int getMaxFrameSize()
+        {
+            return _frameSize;
+        }
+
+        @Override
+        public int getChannelMax()
+        {
+            return UnsignedShort.MAX_VALUE.intValue();
+        }
+
+        @Override
+        public void handleError(final Error parsingError)
+        {
+            LOGGER.error("Unexpected error {}", parsingError);
+        }
+
+        @Override
+        public boolean closedForInput()
+        {
+            return false;
+        }
+
+        @Override
+        public void receive(final List<ChannelFrameBody> channelFrameBodies)
+        {
+            for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
+            {
+                Response response;
+                Object val = channelFrameBody.getFrameBody();
+                int channel = channelFrameBody.getChannel();
+                if (val instanceof FrameBody)
+                {
+                    FrameBody frameBody = (FrameBody) val;
+                    if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
+                    {
+                        _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
+                    }
+                    response = new PerformativeResponse((short) channel, frameBody);
+                }
+                else if (val instanceof SaslFrameBody)
+                {
+                    SaslFrameBody frameBody = (SaslFrameBody) val;
+                    response = new SaslPerformativeResponse((short) channel, frameBody);
+
+                    if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
+                    {
+                        resetInputHandlerAfterSaslOutcome();
+                    }
+                }
+                else
+                {
+                    throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
+                }
+
+                _responseQueue.add(response);
+            }
+        }
+
+        @Override
+        public void receiveSaslInit(final SaslInit saslInit)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslResponse(final SaslResponse saslResponse)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+        {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
new file mode 100644
index 0000000..56d6e6f
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                            .registerTransportLayer()
+                                                                                            .registerMessagingLayer()
+                                                                                            .registerTransactionLayer()
+                                                                                            .registerSecurityLayer()
+                                                                                            .registerExtensionSoleconnLayer();
+
+    @Override
+    public ByteBuffer encode(final Object msg)
+    {
+        if (msg instanceof AMQFrame)
+        {
+            List<ByteBuffer> buffers = new ArrayList<>();
+            FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
+            {
+                @Override
+                public boolean isDirectBufferPreferred()
+                {
+                    return false;
+                }
+
+                @Override
+                public void send(final QpidByteBuffer msg)
+                {
+                    byte[] data = new byte[msg.remaining()];
+                    msg.get(data);
+                    buffers.add(ByteBuffer.wrap(data));
+                }
+
+                @Override
+                public void flush()
+                {
+                }
+
+                @Override
+                public void close()
+                {
+
+                }
+            });
+            _frameWriter.send(((AMQFrame) msg));
+
+            int remaining = 0;
+            for (ByteBuffer byteBuffer: buffers)
+            {
+                remaining += byteBuffer.remaining();
+            }
+            ByteBuffer result = ByteBuffer.allocate(remaining);
+            for (ByteBuffer byteBuffer: buffers)
+            {
+                result.put(byteBuffer);
+            }
+            result.flip();
+            return result;
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 4d53751..dd59757 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -19,58 +19,12 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class FrameTransport implements AutoCloseable
+public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
 {
-    public static final long RESPONSE_TIMEOUT = Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout",6000);
-    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
-
-    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
-
-    private final EventLoopGroup _workerGroup;
-    private final InetSocketAddress _brokerAddress;
-    private final boolean _isSasl;
-
-    private Channel _channel;
-    private volatile boolean _channelClosedSeen = false;
-
     public FrameTransport(final InetSocketAddress brokerAddress)
     {
         this(brokerAddress, false);
@@ -78,165 +32,20 @@ public class FrameTransport implements AutoCloseable
 
     public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
     {
-        _brokerAddress = brokerAddress;
-        _isSasl = isSasl;
-        _workerGroup = new NioEventLoopGroup();
-    }
-
-    public InetSocketAddress getBrokerAddress()
-    {
-        return _brokerAddress;
+        super(brokerAddress, new FrameDecoder(isSasl), new FrameEncoder());
     }
 
+    @Override
     public FrameTransport connect()
     {
-        try
-        {
-            Bootstrap b = new Bootstrap();
-            b.group(_workerGroup);
-            b.channel(NioSocketChannel.class);
-            b.option(ChannelOption.SO_KEEPALIVE, true);
-            b.handler(new ChannelInitializer<SocketChannel>()
-            {
-                @Override
-                public void initChannel(SocketChannel ch) throws Exception
-                {
-                    ChannelPipeline pipeline = ch.pipeline();
-                    buildInputOutputPipeline(pipeline);
-                }
-            });
-
-            _channel = b.connect(_brokerAddress).sync().channel();
-            _channel.closeFuture().addListener(future ->
-                                               {
-                                                   _channelClosedSeen = true;
-                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
-                                               });
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
+        super.connect();
         return this;
     }
 
-    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
-    {
-        pipeline.addLast(new InputHandler(_queue, _isSasl)).addLast(new OutputHandler());
-    }
-
     @Override
-    public void close() throws Exception
-    {
-        try
-        {
-            if (_channel != null)
-            {
-                _channel.disconnect().sync();
-                _channel.close().sync();
-                _channel = null;
-            }
-        }
-        finally
-        {
-            _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
-        }
-    }
-
-    public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
-    {
-        Preconditions.checkState(_channel != null, "Not connected");
-        ChannelPromise promise = _channel.newPromise();
-        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-        buffer.writeBytes(bytes);
-        _channel.write(buffer, promise);
-        _channel.flush();
-        return JdkFutureAdapters.listenInPoolThread(promise);
-    }
-
-    public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
+    public byte[] getProtocolHeader()
     {
-        Preconditions.checkState(_channel != null, "Not connected");
-        ChannelPromise promise = _channel.newPromise();
-        final TransportFrame transportFrame;
-        try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
-        {
-            final QpidByteBuffer duplicate;
-            if (payload == null)
-            {
-                duplicate = null;
-            }
-            else
-            {
-                duplicate = payload.duplicate();
-            }
-            transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
-            _channel.write(transportFrame, promise);
-            _channel.flush();
-            final ListenableFuture<Void> listenableFuture = JdkFutureAdapters.listenInPoolThread(promise);
-            if (frameBody instanceof Transfer)
-            {
-                listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
-            }
-            if (duplicate != null)
-            {
-                listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
-            }
-            return listenableFuture;
-        }
-    }
-
-    public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
-    {
-        SASLFrame transportFrame = new SASLFrame(frameBody);
-        ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
-        channelFuture.sync();
-        return JdkFutureAdapters.listenInPoolThread(channelFuture);
-    }
-
-    public <T extends Response<?>> T getNextResponse() throws Exception
-    {
-        return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
-    }
-
-    public void doCloseConnection() throws Exception
-    {
-        Close close = new Close();
-
-        sendPerformative(close, UnsignedShort.valueOf((short) 0));
-        PerformativeResponse response = getNextResponse();
-        if (!(response.getBody() instanceof Close))
-        {
-            throw new IllegalStateException(String.format(
-                    "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
-        }
-    }
-
-    public void assertNoMoreResponses() throws Exception
-    {
-        Response response = getNextResponse();
-        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
-    }
-
-    public void assertNoMoreResponsesAndChannelClosed() throws Exception
-    {
-        assertNoMoreResponses();
-        assertThat(_channelClosedSeen, is(true));
-    }
-
-    private static class ChannelClosedResponse implements Response<Void>
-    {
-        @Override
-        public String toString()
-        {
-            return "ChannelClosed";
-        }
-
-        @Override
-        public Void getBody()
-        {
-            return null;
-        }
+        return "AMQP\0\1\0\0".getBytes(UTF_8);
     }
 
     public Interaction newInteraction()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
deleted file mode 100644
index 9503113..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-public class HeaderResponse implements Response<byte[]>
-{
-    private final byte[] _header;
-
-    public HeaderResponse(final byte[] header)
-    {
-        _header = header;
-    }
-
-    @Override
-    public byte[] getBody()
-    {
-        return _header;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "HeaderResponse{" +
-               "_header=" + Arrays.toString(_header) +
-               '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
deleted file mode 100644
index e3acd24..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.ReferenceCountUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
-import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class InputHandler extends ChannelInboundHandlerAdapter
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
-    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
-                                                                                            .registerTransportLayer()
-                                                                                            .registerMessagingLayer()
-                                                                                            .registerTransactionLayer()
-                                                                                            .registerSecurityLayer()
-                                                                                            .registerExtensionSoleconnLayer();
-
-    private enum ParsingState
-    {
-        HEADER,
-        PERFORMATIVES
-    }
-
-    private final MyConnectionHandler _connectionHandler;
-    private final ValueHandler _valueHandler;
-    private final BlockingQueue<Response<?>> _responseQueue;
-
-    private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
-    private volatile FrameHandler _frameHandler;
-    private volatile ParsingState _state = ParsingState.HEADER;
-
-    public InputHandler(final BlockingQueue<Response<?>> queue, final boolean isSasl)
-    {
-
-        _valueHandler = new ValueHandler(TYPE_REGISTRY);
-        _connectionHandler = new MyConnectionHandler();
-        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
-
-        _responseQueue = queue;
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
-    {
-        ByteBuf buf = (ByteBuf) msg;
-        QpidByteBuffer qpidBuf = QpidByteBuffer.allocate(buf.readableBytes());
-        qpidBuf.put(buf.nioBuffer());
-        qpidBuf.flip();
-        LOGGER.debug("Incoming {} byte(s)", qpidBuf.remaining());
-
-        if (_inputBuffer.hasRemaining())
-        {
-            QpidByteBuffer old = _inputBuffer;
-            _inputBuffer = QpidByteBuffer.allocate(_inputBuffer.remaining() + qpidBuf.remaining());
-            _inputBuffer.put(old);
-            _inputBuffer.put(qpidBuf);
-            old.dispose();
-            qpidBuf.dispose();
-            _inputBuffer.flip();
-        }
-        else
-        {
-            _inputBuffer.dispose();
-            _inputBuffer = qpidBuf;
-        }
-
-        doParsing();
-
-        LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
-
-        if (_inputBuffer.hasRemaining())
-        {
-            _inputBuffer.compact();
-            _inputBuffer.flip();
-        }
-
-        ReferenceCountUtil.release(msg);
-    }
-
-    private void doParsing()
-    {
-        switch(_state)
-        {
-            case HEADER:
-                if (_inputBuffer.remaining() >= 8)
-                {
-                    byte[] header = new byte[8];
-                    _inputBuffer.get(header);
-                    _responseQueue.add(new HeaderResponse(header));
-                    _state = ParsingState.PERFORMATIVES;
-                    doParsing();
-                }
-                break;
-            case PERFORMATIVES:
-                _frameHandler.parse(_inputBuffer);
-                break;
-            default:
-                throw new IllegalStateException("Unexpected state : " + _state);
-        }
-    }
-
-    private void resetInputHandlerAfterSaslOutcome()
-    {
-        _state = ParsingState.HEADER;
-        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
-    }
-
-    private class MyConnectionHandler implements ConnectionHandler
-    {
-        private volatile int _frameSize = 512;
-
-        @Override
-        public void receiveOpen(final int channel, final Open close)
-        {
-        }
-
-        @Override
-        public void receiveClose(final int channel, final Close close)
-        {
-
-        }
-
-        @Override
-        public void receiveBegin(final int channel, final Begin begin)
-        {
-
-        }
-
-        @Override
-        public void receiveEnd(final int channel, final End end)
-        {
-
-        }
-
-        @Override
-        public void receiveAttach(final int channel, final Attach attach)
-        {
-
-        }
-
-        @Override
-        public void receiveDetach(final int channel, final Detach detach)
-        {
-
-        }
-
-        @Override
-        public void receiveTransfer(final int channel, final Transfer transfer)
-        {
-
-        }
-
-        @Override
-        public void receiveDisposition(final int channel, final Disposition disposition)
-        {
-
-        }
-
-        @Override
-        public void receiveFlow(final int channel, final Flow flow)
-        {
-
-        }
-
-        @Override
-        public int getMaxFrameSize()
-        {
-            return _frameSize;
-        }
-
-        @Override
-        public int getChannelMax()
-        {
-            return UnsignedShort.MAX_VALUE.intValue();
-        }
-
-        @Override
-        public void handleError(final Error parsingError)
-        {
-            LOGGER.error("Unexpected error {}", parsingError);
-        }
-
-        @Override
-        public boolean closedForInput()
-        {
-            return false;
-        }
-
-        @Override
-        public void receive(final List<ChannelFrameBody> channelFrameBodies)
-        {
-            for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
-            {
-                Response response;
-                Object val = channelFrameBody.getFrameBody();
-                int channel = channelFrameBody.getChannel();
-                if (val instanceof FrameBody)
-                {
-                    FrameBody frameBody = (FrameBody) val;
-                    if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
-                    {
-                        _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
-                    }
-                    response = new PerformativeResponse((short) channel, frameBody);
-                }
-                else if (val instanceof SaslFrameBody)
-                {
-                    SaslFrameBody frameBody = (SaslFrameBody) val;
-                    response = new SaslPerformativeResponse((short) channel, frameBody);
-
-                    if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
-                    {
-                        resetInputHandlerAfterSaslOutcome();
-                    }
-                }
-                else
-                {
-                    throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
-                }
-
-                _responseQueue.add(response);
-            }
-        }
-
-        @Override
-        public void receiveSaslInit(final SaslInit saslInit)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslChallenge(final SaslChallenge saslChallenge)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslResponse(final SaslResponse saslResponse)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslOutcome(final SaslOutcome saslOutcome)
-        {
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 52518ab..7d73ce8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -20,8 +20,6 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
-import static com.google.common.util.concurrent.Futures.allAsList;
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,21 +28,19 @@ import static org.hamcrest.Matchers.is;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -81,8 +77,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
 
-public class Interaction
+public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
 {
     private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Begin _begin;
@@ -94,14 +91,11 @@ public class Interaction
     private final Flow _flow;
     private final Transfer _transfer;
     private final Disposition _disposition;
-    private final FrameTransport _transport;
     private final SaslInit _saslInit;
     private final SaslResponse _saslResponse;
     private byte[] _protocolHeader;
     private UnsignedShort _connectionChannel;
     private UnsignedShort _sessionChannel;
-    private Response<?> _latestResponse;
-    private ListenableFuture<?> _latestFuture;
     private int _deliveryIdCounter;
     private List<Transfer> _latestDelivery;
     private Object _decodedLatestDelivery;
@@ -109,10 +103,10 @@ public class Interaction
 
     Interaction(final FrameTransport frameTransport)
     {
+        super(frameTransport);
         final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO;
-        _transport = frameTransport;
 
-        _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8);
+        _protocolHeader = frameTransport.getProtocolHeader();
 
         _saslInit = new SaslInit();
         _saslResponse = new SaslResponse();
@@ -154,6 +148,19 @@ public class Interaction
         _disposition.setFirst(UnsignedInteger.ZERO);
     }
 
+    public void doCloseConnection() throws Exception
+    {
+        Close close = new Close();
+
+        sendPerformative(close, UnsignedShort.valueOf((short) 0));
+        Response<?> response = getNextResponse();
+        if (!(response.getBody() instanceof Close))
+        {
+            throw new IllegalStateException(String.format(
+                    "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
+        }
+    }
+
     /////////////////////////
     // Protocol Negotiation //
     /////////////////////////
@@ -164,17 +171,15 @@ public class Interaction
         return this;
     }
 
-    public Interaction negotiateProtocol() throws Exception
+    @Override
+    protected byte[] getProtocolHeader()
+    {
+        return _protocolHeader;
+    }
+
+    @Override
+    protected Interaction getInteraction()
     {
-        final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
         return this;
     }
 
@@ -977,83 +982,35 @@ public class Interaction
 
     private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
     {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
+        SASLFrame transportFrame = new SASLFrame(frameBody);
+        sendPerformativeAndChainFuture(transportFrame, true);
     }
 
     private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
     {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
-    }
-
-    public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception
-    {
-        sync();
-        _latestResponse = _transport.getNextResponse();
-        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
-        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
-            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+        final TransportFrame transportFrame;
+        try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
         {
-            return this;
-        }
-        acceptableResponseClasses.remove(null);
-        if (_latestResponse != null)
-        {
-            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+            final QpidByteBuffer duplicate;
+            if (payload == null)
             {
-                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
-                {
-                    return this;
-                }
+                duplicate = null;
+            }
+            else
+            {
+                duplicate = payload.duplicate();
+            }
+            transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
+            ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+            if (frameBody instanceof Transfer)
+            {
+                listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
+            }
+            if (duplicate != null)
+            {
+                listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
             }
         }
-        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
-                                                      acceptableResponseClasses,
-                                                      _latestResponse == null ? null : _latestResponse.getBody()));
-    }
-
-    public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException
-    {
-        if (_latestFuture != null)
-        {
-            _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
-            _latestFuture = null;
-        }
-        return this;
-    }
-
-    public Response<?> getLatestResponse() throws Exception
-    {
-        sync();
-        return _latestResponse;
-    }
-
-    public <T> T getLatestResponse(Class<T> type) throws Exception
-    {
-        sync();
-        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
-        {
-            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
-                                                          type.getSimpleName(),
-                                                          _latestResponse.getBody()));
-        }
-
-        return (T) _latestResponse.getBody();
     }
 
     public Interaction flowHandleFromLinkHandle()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
deleted file mode 100644
index 029dc70..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-
-public class Matchers
-{
-    public static Matcher<Response> protocolHeader(byte[] expectedHeader)
-    {
-        return new BaseMatcher<Response>()
-        {
-            @Override
-            public void describeTo(final Description description)
-            {
-                description.appendValue(new HeaderResponse(expectedHeader));
-            }
-
-            @Override
-            public boolean matches(final Object o)
-            {
-                if (o == null)
-                {
-                    return false;
-                }
-                if (!(o instanceof HeaderResponse))
-                {
-                    return false;
-                }
-                if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
-                {
-                    return false;
-                }
-                return true;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
deleted file mode 100644
index 68f4322..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.transport.ByteBufferSender;
-
-public class OutputHandler extends ChannelOutboundHandlerAdapter
-{
-    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
-                                                                                            .registerTransportLayer()
-                                                                                            .registerMessagingLayer()
-                                                                                            .registerTransactionLayer()
-                                                                                            .registerSecurityLayer()
-                                                                                            .registerExtensionSoleconnLayer();
-
-
-    @Override
-    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
-    {
-
-        if (msg instanceof AMQFrame)
-        {
-            FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
-            {
-                @Override
-                public boolean isDirectBufferPreferred()
-                {
-                    return false;
-                }
-
-                @Override
-                public void send(final QpidByteBuffer msg)
-                {
-                    byte[] data = new byte[msg.remaining()];
-                    msg.get(data);
-                    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-                    buffer.writeBytes(data);
-                    try
-                    {
-                        OutputHandler.super.write(ctx, buffer, promise);
-                    }
-                    catch (Exception e)
-                    {
-                        promise.setFailure(e);
-                    }
-                }
-
-                @Override
-                public void flush()
-                {
-                }
-
-                @Override
-                public void close()
-                {
-
-                }
-            });
-            _frameWriter.send(((AMQFrame) msg));
-        }
-        else
-        {
-            super.write(ctx, msg, promise);
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
index 06a64dc..9e03a26 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
@@ -20,6 +20,7 @@
 package org.apache.qpid.tests.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.tests.protocol.Response;
 
 public class PerformativeResponse implements Response<FrameBody>
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
deleted file mode 100644
index a7e341c..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-public interface Response<T>
-{
-    T getBody();
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
index 08893e0..02ab3c9 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.tests.protocol.Response;
 
 public class SaslPerformativeResponse implements Response<SaslFrameBody>
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
deleted file mode 100644
index ea3d164..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface SpecificationTest
-{
-    String section();
-    String description();
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index 8a150fa..1ca4419 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -54,6 +54,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 817be05..a3270a0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -44,7 +44,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
@@ -108,7 +108,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 
             interaction.consumeResponse().getLatestResponse(Flow.class);
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
 
         assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
index c75bb20..524d991 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
@@ -46,10 +46,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -125,7 +124,7 @@ public class ManagementTest extends BrokerAdminUsingTestBase
             assertThat(flow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
             assertThat(flow.getHandle(), is(equalTo(receiverResponse.getHandle())));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
index a39b1b3..e85f356 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
@@ -39,10 +39,11 @@ import org.junit.Test;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class WebSocketTest extends BrokerAdminUsingTestBase
 {
@@ -73,7 +74,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
         try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect())
         {
-            final Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            final Open responseOpen = interaction
                                                .negotiateProtocol().consumeResponse()
                                                .open().consumeResponse()
                                                .getLatestResponse(Open.class);
@@ -84,7 +86,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -97,7 +99,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
         try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
         {
-            final Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            final Open responseOpen = interaction
                                                .negotiateProtocol().consumeResponse()
                                                .open().consumeResponse()
                                                .getLatestResponse(Open.class);
@@ -108,7 +111,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
index 6ba9058..36203a4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
@@ -48,7 +48,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class DeleteOnCloseTest extends BrokerAdminUsingTestBase

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 97fe3bb..1cf1ff0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -43,9 +43,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index bcd155f..0a45410 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -52,9 +52,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
index 1853446..cc377cc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -99,7 +99,7 @@ public class OutcomeTest extends BrokerAdminUsingTestBase
             assertThat(secondDeliveryPayload, is(equalTo("message2")));
 
             // verify that no unexpected performative is received by closing
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index eb72532..245c624 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -76,8 +76,8 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
 import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -439,7 +439,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .disposition();
 
             // verify that no unexpected performative is received by closing
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -688,8 +688,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
             assertThat(isSettled.get(), is(true));
 
             // verify no unexpected performative received by closing the connection
-           transport.doCloseConnection();
-
+            interaction.doCloseConnection();
         }
     }
 
@@ -796,7 +795,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .transfer()
                        .sync();
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 42f6114..bd7c113 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -58,7 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 1496d13..fb61974 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -57,9 +57,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index 619e5d9..db91db1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class ProtocolHeaderTest extends BrokerAdminUsingTestBase
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index b3f57c1..ab570da 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 
 public class OpenTest extends BrokerAdminUsingTestBase
@@ -77,7 +77,8 @@ public class OpenTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            Open responseOpen = interaction
                                          .negotiateProtocol().consumeResponse()
                                          .openContainerId("testContainerId")
                                          .open().consumeResponse()
@@ -88,7 +89,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index c559399..774cc00 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -43,7 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org