You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/10 14:44:40 UTC

[1/6] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it

Repository: qpid-broker-j
Updated Branches:
  refs/heads/7.0.x 9a7628595 -> 7d8391ff7


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index ed72ba3..1a38cf0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 35ee4e7..ad56ea9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -69,9 +69,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -459,7 +459,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                 }
             }
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -588,7 +588,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                            .disposition();
             }
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             if (getBrokerAdmin().isQueueDepthSupported())
             {
@@ -672,7 +672,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                        .dispositionRole(Role.RECEIVER)
                        .disposition();
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             if (getBrokerAdmin().isQueueDepthSupported())
             {
@@ -771,7 +771,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
             }
             while (!settled);
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index cf12b05..6fea928 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -49,7 +49,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class SaslTest extends BrokerAdminUsingTestBase
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index 352fd19..24cb435 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -36,10 +36,11 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class BeginTest extends BrokerAdminUsingTestBase
 {
@@ -74,7 +75,8 @@ public class BeginTest extends BrokerAdminUsingTestBase
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
             final UnsignedShort channel = UnsignedShort.valueOf(37);
-            Begin responseBegin = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            Begin responseBegin = interaction
                                            .negotiateProtocol().consumeResponse()
                                            .open().consumeResponse(Open.class)
                                            .sessionChannel(channel)
@@ -85,7 +87,7 @@ public class BeginTest extends BrokerAdminUsingTestBase
             assertThat(responseBegin.getOutgoingWindow(), is(instanceOf(UnsignedInteger.class)));
             assertThat(responseBegin.getNextOutgoingId(), is(instanceOf(UnsignedInteger.class)));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/pom.xml b/systests/protocol-tests-core/pom.xml
new file mode 100644
index 0000000..8c3bc87
--- /dev/null
+++ b/systests/protocol-tests-core/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  ~
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-systests-parent</artifactId>
+        <version>7.0.1-SNAPSHOT</version>
+        <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>protocol-tests-core</artifactId>
+    <name>Apache Qpid Broker-J Protocol Tests Core</name>
+    <description>Core classes for Apache Qpid protocol tests</description>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-integration</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
new file mode 100644
index 0000000..daf500d
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public abstract class FrameTransport implements AutoCloseable
+{
+    public static final long RESPONSE_TIMEOUT =
+            Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
+    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
+
+    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
+    private final EventLoopGroup _workerGroup;
+    private final InetSocketAddress _brokerAddress;
+    private final InputHandler _inputHandler;
+    private final OutputHandler _outputHandler;
+
+    private volatile Channel _channel;
+    private volatile boolean _channelClosedSeen = false;
+
+    public FrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
+    {
+        _brokerAddress = brokerAddress;
+        _inputHandler = new InputHandler(_queue, inputDecoder);
+        _outputHandler = new OutputHandler(outputEncoder);
+        _workerGroup = new NioEventLoopGroup();
+    }
+
+    public InetSocketAddress getBrokerAddress()
+    {
+        return _brokerAddress;
+    }
+
+    public FrameTransport connect()
+    {
+        try
+        {
+            Bootstrap b = new Bootstrap();
+            b.group(_workerGroup);
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.SO_KEEPALIVE, true);
+            b.handler(new ChannelInitializer<SocketChannel>()
+            {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception
+                {
+                    ChannelPipeline pipeline = ch.pipeline();
+                    buildInputOutputPipeline(pipeline);
+                }
+            });
+
+            _channel = b.connect(_brokerAddress).sync().channel();
+            _channel.closeFuture().addListener(future ->
+                                               {
+                                                   _channelClosedSeen = true;
+                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
+                                               });
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return this;
+    }
+
+    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+    {
+        pipeline.addLast(_inputHandler).addLast(_outputHandler);
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+        try
+        {
+            if (_channel != null)
+            {
+                _channel.disconnect().sync();
+                _channel.close().sync();
+                _channel = null;
+            }
+        }
+        finally
+        {
+            _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
+        }
+    }
+
+    public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+    {
+        Preconditions.checkState(_channel != null, "Not connected");
+        ChannelPromise promise = _channel.newPromise();
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+        buffer.writeBytes(bytes);
+        _channel.write(buffer, promise);
+        _channel.flush();
+        return JdkFutureAdapters.listenInPoolThread(promise);
+    }
+
+    public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+    {
+        Preconditions.checkState(_channel != null, "Not connected");
+        if (!sync)
+        {
+            ChannelPromise promise = _channel.newPromise();
+            _channel.write(data, promise);
+            _channel.flush();
+            return JdkFutureAdapters.listenInPoolThread(promise);
+        }
+        else
+        {
+            ChannelFuture channelFuture = _channel.writeAndFlush(data);
+            channelFuture.sync();
+            return Futures.immediateFuture(null);
+        }
+    }
+
+    public <T extends Response<?>> T getNextResponse() throws Exception
+    {
+        return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
+    public void assertNoMoreResponses() throws Exception
+    {
+        Response response = getNextResponse();
+        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+    }
+
+    public void assertNoMoreResponsesAndChannelClosed() throws Exception
+    {
+        assertNoMoreResponses();
+        assertThat(_channelClosedSeen, is(true));
+    }
+
+    private static class ChannelClosedResponse implements Response<Void>
+    {
+        @Override
+        public String toString()
+        {
+            return "ChannelClosed";
+        }
+
+        @Override
+        public Void getBody()
+        {
+            return null;
+        }
+    }
+
+    public abstract byte[] getProtocolHeader();
+
+    protected abstract Interaction newInteraction();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
new file mode 100644
index 0000000..9767b40
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+public class HeaderResponse implements Response<byte[]>
+{
+    private final byte[] _header;
+
+    public HeaderResponse(final byte[] header)
+    {
+        _header = header;
+    }
+
+    @Override
+    public byte[] getBody()
+    {
+        return _header;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "HeaderResponse{" +
+               "_header=" + Arrays.toString(_header) +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
new file mode 100644
index 0000000..369cfd1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+public interface InputDecoder
+{
+    Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
new file mode 100644
index 0000000..2d5fb45
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InputHandler extends ChannelInboundHandlerAdapter
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
+
+    private final BlockingQueue<Response<?>> _responseQueue;
+    private final InputDecoder _inputDecoder;
+
+    private ByteBuffer _inputBuffer = ByteBuffer.allocate(0);
+
+    InputHandler(final BlockingQueue<Response<?>> queue, InputDecoder inputDecoder)
+    {
+        _responseQueue = queue;
+        _inputDecoder = inputDecoder;
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+    {
+        ByteBuf buf = (ByteBuf) msg;
+        ByteBuffer byteBuffer = ByteBuffer.allocate(buf.readableBytes());
+        byteBuffer.put(buf.nioBuffer());
+        byteBuffer.flip();
+        LOGGER.debug("Incoming {} byte(s)", byteBuffer.remaining());
+
+        if (_inputBuffer.hasRemaining())
+        {
+            ByteBuffer old = _inputBuffer;
+            _inputBuffer = ByteBuffer.allocate(_inputBuffer.remaining() + byteBuffer.remaining());
+            _inputBuffer.put(old);
+            _inputBuffer.put(byteBuffer);
+            _inputBuffer.flip();
+        }
+        else
+        {
+            _inputBuffer = byteBuffer;
+        }
+
+        _responseQueue.addAll(_inputDecoder.decode(_inputBuffer));
+
+        LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
+
+        if (_inputBuffer.hasRemaining())
+        {
+            _inputBuffer.compact();
+            _inputBuffer.flip();
+        }
+
+        ReferenceCountUtil.release(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
new file mode 100644
index 0000000..238c0a5
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class Interaction<I extends Interaction>
+{
+    private final FrameTransport _transport;
+    private ListenableFuture<?> _latestFuture;
+    private Response<?> _latestResponse;
+
+    public Interaction(final FrameTransport frameTransport)
+    {
+        _transport = frameTransport;
+    }
+
+    public I consumeResponse(final Class<?>... responseTypes) throws Exception
+    {
+        sync();
+        _latestResponse = getNextResponse();
+        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+        {
+            return getInteraction();
+        }
+        acceptableResponseClasses.remove(null);
+        if (_latestResponse != null)
+        {
+            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+            {
+                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+                {
+                    return getInteraction();
+                }
+            }
+        }
+        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+                                                      acceptableResponseClasses,
+                                                      _latestResponse == null ? null : _latestResponse.getBody()));
+    }
+
+    protected Response<?> getNextResponse() throws Exception
+    {
+        return _transport.getNextResponse();
+    }
+
+    public I sync() throws InterruptedException, ExecutionException, TimeoutException
+    {
+        if (_latestFuture != null)
+        {
+            _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+            _latestFuture = null;
+        }
+        return getInteraction();
+    }
+
+    public Response<?> getLatestResponse() throws Exception
+    {
+        sync();
+        return _latestResponse;
+    }
+
+    public <T> T getLatestResponse(Class<T> type) throws Exception
+    {
+        sync();
+        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+        {
+            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+                                                          type.getSimpleName(),
+                                                          _latestResponse.getBody()));
+        }
+
+        return (T) _latestResponse.getBody();
+    }
+
+    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return future;
+    }
+
+    public I negotiateProtocol() throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return getInteraction();
+    }
+
+    protected FrameTransport getTransport()
+    {
+        return _transport;
+    }
+
+    protected abstract byte[] getProtocolHeader();
+
+    protected abstract I getInteraction();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
new file mode 100644
index 0000000..292ae9a
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+public class Matchers
+{
+    public static Matcher<Response> protocolHeader(byte[] expectedHeader)
+    {
+        return new BaseMatcher<Response>()
+        {
+            @Override
+            public void describeTo(final Description description)
+            {
+                description.appendValue(new HeaderResponse(expectedHeader));
+            }
+
+            @Override
+            public boolean matches(final Object o)
+            {
+                if (o == null)
+                {
+                    return false;
+                }
+                if (!(o instanceof HeaderResponse))
+                {
+                    return false;
+                }
+                if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
+                {
+                    return false;
+                }
+                return true;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
new file mode 100644
index 0000000..a6a4a47
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+public interface OutputEncoder
+{
+    ByteBuffer encode(Object msg);
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
new file mode 100644
index 0000000..40a2ca7
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class OutputHandler extends ChannelOutboundHandlerAdapter
+{
+    private final OutputEncoder _outputEncoder;
+
+    OutputHandler(final OutputEncoder outputEncoder)
+    {
+        _outputEncoder = outputEncoder;
+    }
+
+    @Override
+    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
+    {
+        ByteBuffer byteBuffer = _outputEncoder.encode(msg);
+        if (byteBuffer != null)
+        {
+            send(ctx, byteBuffer, promise);
+        }
+        else
+        {
+            super.write(ctx, msg, promise);
+        }
+    }
+
+    private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+    {
+        byte[] data = new byte[dataByteBuffer.remaining()];
+        dataByteBuffer.get(data);
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+        buffer.writeBytes(data);
+        try
+        {
+            OutputHandler.super.write(ctx, buffer, promise);
+        }
+        catch (Exception e)
+        {
+            promise.setFailure(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
new file mode 100644
index 0000000..debc06f
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+public interface Response<T>
+{
+    T getBody();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
new file mode 100644
index 0000000..db6d7a1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface SpecificationTest
+{
+    String section();
+    String description();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[5/6] qpid-broker-j git commit: QPID-8042: [Broker-J][AMQP 1.0] Process SASL frames first before parsing the remaining part of incoming byte buffer

Posted by kw...@apache.org.
QPID-8042: [Broker-J][AMQP 1.0] Process SASL frames first before parsing the remaining part of incoming byte buffer

Cherry picked from 89e01ec


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/33cb908e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/33cb908e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/33cb908e

Branch: refs/heads/7.0.x
Commit: 33cb908e42bd09cbde3e663e686d95481fe6911e
Parents: d1a6ca2
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 8 17:33:00 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:40:30 2018 +0000

----------------------------------------------------------------------
 .../protocol/v1_0/framing/FrameHandler.java     | 11 ++++-
 .../qpid/tests/protocol/v1_0/FrameDecoder.java  | 39 ++++++++-------
 .../v1_0/transport/security/sasl/SaslTest.java  | 50 +++++++++++++++-----
 3 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/33cb908e/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index f7e4029..23d08c3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -202,19 +202,26 @@ public class FrameHandler implements ProtocolHandler
                             return frameBody;
                         }
                     });
+
+                    if (_isSasl)
+                    {
+                        break;
+                    }
                 }
                 catch (AmqpErrorException ex)
                 {
                     frameParsingError = ex.getError();
                 }
             }
-            _connectionHandler.receive(channelFrameBodies);
 
             if (frameParsingError != null)
             {
                 _connectionHandler.handleError(frameParsingError);
                 _errored = true;
-
+            }
+            else
+            {
+                _connectionHandler.receive(channelFrameBodies);
             }
         }
         catch (RuntimeException e)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/33cb908e/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
index 4dc06cb..0c94ad7 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -90,27 +90,34 @@ public class FrameDecoder implements InputDecoder
     @Override
     public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
     {
-        List<Response<?>> responses = new ArrayList<>();
+
         QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
-        switch(_state)
+        int remaining;
+
+        do
         {
-            case HEADER:
-                if (inputBuffer.remaining() >= 8)
-                {
-                    byte[] header = new byte[8];
-                    inputBuffer.get(header);
-                    responses.add(new HeaderResponse(header));
-                    _state = ParsingState.PERFORMATIVES;
+            remaining = qpidByteBuffer.remaining();
+            switch(_state)
+            {
+                case HEADER:
+                    if (inputBuffer.remaining() >= 8)
+                    {
+                        byte[] header = new byte[8];
+                        inputBuffer.get(header);
+                        _connectionHandler._responseQueue.add(new HeaderResponse(header));
+                        _state = ParsingState.PERFORMATIVES;
+                    }
+                    break;
+                case PERFORMATIVES:
                     _frameHandler.parse(qpidByteBuffer);
-                }
-                break;
-            case PERFORMATIVES:
-                _frameHandler.parse(qpidByteBuffer);
-                break;
-            default:
-                throw new IllegalStateException("Unexpected state : " + _state);
+                    break;
+                default:
+                    throw new IllegalStateException("Unexpected state : " + _state);
+            }
         }
+        while (qpidByteBuffer.remaining() != remaining);
 
+        List<Response<?>> responses = new ArrayList<>();
         Response<?> r;
         while((r = _connectionHandler._responseQueue.poll())!=null)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/33cb908e/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index 1ca3f20..1b81507 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
@@ -36,18 +37,23 @@ import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 import javax.xml.bind.DatatypeConverter;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameEncoder;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -106,7 +112,6 @@ public class SaslTest extends BrokerAdminUsingTestBase
         }
     }
 
-    @Ignore("QPID-8042")
     @Test
     @SpecificationTest(section = "2.4.2",
             description = "For applications that use many short-lived connections,"
@@ -118,17 +123,38 @@ public class SaslTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
         try (FrameTransport transport = new FrameTransport(addr, true).connect())
         {
-            final Binary initialResponse = new Binary(String.format("\0%s\0%s", _username, _password).getBytes(StandardCharsets.US_ASCII));
             final Interaction interaction = transport.newInteraction();
-            interaction.protocolHeader(SASL_AMQP_HEADER_BYTES)
-                       .negotiateProtocol()
-                       .saslMechanism(PLAIN)
-                       .saslInitialResponse(initialResponse)
-                       .saslInit()
-                       .protocolHeader(AMQP_HEADER_BYTES)
-                       .negotiateProtocol()
-                       .openContainerId("testContainerId")
-                       .open();
+            FrameEncoder frameEncoder = new FrameEncoder();
+
+            SaslInit saslInit = new SaslInit();
+            saslInit.setMechanism(PLAIN);
+            saslInit.setInitialResponse(new Binary(String.format("\0%s\0%s", _username, _password)
+                                                         .getBytes(StandardCharsets.US_ASCII)));
+            ByteBuffer saslInitByteBuffer = frameEncoder.encode(new SASLFrame(saslInit));
+
+            Open open = new Open();
+            open.setContainerId("containerId");
+            ByteBuffer openByteBuffer = frameEncoder.encode(new TransportFrame(0, open));
+
+            int initSize = saslInitByteBuffer.remaining();
+            int openSize = openByteBuffer.remaining();
+            int dataLength = SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize + openSize;
+            byte[] data = new byte[dataLength];
+
+            System.arraycopy(SASL_AMQP_HEADER_BYTES, 0, data, 0, SASL_AMQP_HEADER_BYTES.length);
+            saslInitByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length, initSize);
+            System.arraycopy(AMQP_HEADER_BYTES,
+                             0,
+                             data,
+                             SASL_AMQP_HEADER_BYTES.length + initSize,
+                             AMQP_HEADER_BYTES.length);
+            openByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize, openSize);
+
+            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+            buffer.writeBytes(data);
+
+            transport.sendPerformative(buffer);
+
 
             final byte[] saslHeaderResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
             assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/6] qpid-broker-j git commit: QPID-8042: [Broker-J][AMQP 1.0] Add protocol test for pipelined connection open

Posted by kw...@apache.org.
QPID-8042: [Broker-J][AMQP 1.0] Add protocol test for pipelined connection open

Cherry picked from 9daed1e


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/88fd1245
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/88fd1245
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/88fd1245

Branch: refs/heads/7.0.x
Commit: 88fd1245c6b4ef4fc47314079f24329f5fd0646e
Parents: deab458
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Nov 20 11:19:42 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:40:15 2018 +0000

----------------------------------------------------------------------
 .../v1_0/transport/security/sasl/SaslTest.java  | 47 +++++++++++++++++++-
 .../apache/qpid/tests/protocol/Interaction.java |  8 ++++
 2 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88fd1245/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index 6fea928..d7f3bc1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -37,6 +37,7 @@ import javax.crypto.spec.SecretKeySpec;
 import javax.xml.bind.DatatypeConverter;
 
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -45,11 +46,12 @@ import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class SaslTest extends BrokerAdminUsingTestBase
 {
@@ -104,6 +106,47 @@ public class SaslTest extends BrokerAdminUsingTestBase
         }
     }
 
+    @Ignore("QPID-8042")
+    @Test
+    @SpecificationTest(section = "2.4.2",
+            description = "For applications that use many short-lived connections,"
+                          + " it MAY be desirable to pipeline the connection negotiation process."
+                          + " A peer MAY do this by starting to send subsequent frames before receiving"
+                          + " the partner’s connection header or open frame")
+    public void saslSuccessfulAuthenticationWithPipelinedFrames() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+        try (FrameTransport transport = new FrameTransport(addr, true).connect())
+        {
+            final Binary initialResponse = new Binary(String.format("\0%s\0%s", _username, _password).getBytes(StandardCharsets.US_ASCII));
+            final Interaction interaction = transport.newInteraction();
+            interaction.protocolHeader(SASL_AMQP_HEADER_BYTES)
+                       .negotiateProtocol()
+                       .saslMechanism(PLAIN)
+                       .saslInitialResponse(initialResponse)
+                       .saslInit()
+                       .protocolHeader(AMQP_HEADER_BYTES)
+                       .negotiateProtocol()
+                       .openContainerId("testContainerId")
+                       .open();
+
+            final byte[] saslHeaderResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
+            assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+            SaslMechanisms saslMechanismsResponse = interaction.consumeResponse().getLatestResponse(SaslMechanisms.class);
+            assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(PLAIN));
+
+            SaslOutcome saslOutcome = interaction.consumeResponse().getLatestResponse(SaslOutcome.class);
+            assertThat(saslOutcome.getCode(), equalTo(SaslCode.OK));
+
+            final byte[] headerResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
+            assertThat(headerResponse, is(equalTo(AMQP_HEADER_BYTES)));
+
+            interaction.consumeResponse().getLatestResponse(Open.class);
+            interaction.doCloseConnection();
+        }
+    }
+
     @Test
     @SpecificationTest(section = "5.3.2",
             description = "SASL Negotiation [...] challenge/response step occurs once")

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88fd1245/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
index 238c0a5..2390227 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -92,6 +92,14 @@ public abstract class Interaction<I extends Interaction>
     public <T> T getLatestResponse(Class<T> type) throws Exception
     {
         sync();
+
+        if (_latestResponse.getBody() == null)
+        {
+            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+                                                          type.getSimpleName(),
+                                                          _latestResponse.getClass()));
+        }
+
         if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
         {
             throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/6] qpid-broker-j git commit: QPID-8042: [System Tests] Improve pipe-lining of frames in protocol tests

Posted by kw...@apache.org.
QPID-8042: [System Tests] Improve pipe-lining of frames in protocol tests

Cherry picked from 2cbb629


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d1a6ca21
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d1a6ca21
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d1a6ca21

Branch: refs/heads/7.0.x
Commit: d1a6ca216637907315de91575058e7b6d56e1011
Parents: 88fd124
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Nov 20 16:55:39 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:40:30 2018 +0000

----------------------------------------------------------------------
 .../qpid/tests/protocol/v1_0/Interaction.java   |  6 +-
 .../soleconn/CloseExistingPolicy.java           |  6 +-
 .../v1_0/transport/security/sasl/SaslTest.java  |  2 +-
 .../qpid/tests/protocol/FrameTransport.java     | 26 +++------
 .../apache/qpid/tests/protocol/Interaction.java |  8 +--
 .../qpid/tests/protocol/OutputHandler.java      | 59 +++++++++++++++++++-
 6 files changed, 76 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 7d73ce8..4aad6ee 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -153,7 +153,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
         Close close = new Close();
 
         sendPerformative(close, UnsignedShort.valueOf((short) 0));
-        Response<?> response = getNextResponse();
+        Response<?> response = consumeResponse().getLatestResponse();
         if (!(response.getBody() instanceof Close))
         {
             throw new IllegalStateException(String.format(
@@ -983,7 +983,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
     private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
     {
         SASLFrame transportFrame = new SASLFrame(frameBody);
-        sendPerformativeAndChainFuture(transportFrame, true);
+        sendPerformativeAndChainFuture(transportFrame);
     }
 
     private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
@@ -1001,7 +1001,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
                 duplicate = payload.duplicate();
             }
             transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
-            ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+            ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame);
             if (frameBody instanceof Transfer)
             {
                 listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
index 4bee689..5be5482 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
@@ -106,7 +106,8 @@ public class CloseExistingPolicy extends BrokerAdminUsingTestBase
                             .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
                             .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
                                                                      CLOSE_EXISTING))
-                            .open();
+                            .open()
+                            .sync();
 
                 final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
                 assertThat(close1.getError(), is(notNullValue()));
@@ -145,7 +146,8 @@ public class CloseExistingPolicy extends BrokerAdminUsingTestBase
                             .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
                             .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
                                                                      CLOSE_EXISTING))
-                            .open();
+                            .open()
+                            .sync();
 
                 final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
                 assertThat(close1.getError(), is(notNullValue()));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index d7f3bc1..1ca3f20 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -258,7 +258,7 @@ public class SaslTest extends BrokerAdminUsingTestBase
             assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
 
             interaction.consumeResponse(SaslMechanisms.class);
-            interaction.open();
+            interaction.open().sync();
 
             transport.assertNoMoreResponsesAndChannelClosed();
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
index daf500d..28dc02e 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -31,14 +31,12 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -138,26 +136,15 @@ public abstract class FrameTransport implements AutoCloseable
         ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
         buffer.writeBytes(bytes);
         _channel.write(buffer, promise);
-        _channel.flush();
         return JdkFutureAdapters.listenInPoolThread(promise);
     }
 
-    public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+    public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
     {
         Preconditions.checkState(_channel != null, "Not connected");
-        if (!sync)
-        {
-            ChannelPromise promise = _channel.newPromise();
-            _channel.write(data, promise);
-            _channel.flush();
-            return JdkFutureAdapters.listenInPoolThread(promise);
-        }
-        else
-        {
-            ChannelFuture channelFuture = _channel.writeAndFlush(data);
-            channelFuture.sync();
-            return Futures.immediateFuture(null);
-        }
+        ChannelPromise promise = _channel.newPromise();
+        _channel.write(data, promise);
+        return JdkFutureAdapters.listenInPoolThread(promise);
     }
 
     public <T extends Response<?>> T getNextResponse() throws Exception
@@ -177,6 +164,11 @@ public abstract class FrameTransport implements AutoCloseable
         assertThat(_channelClosedSeen, is(true));
     }
 
+    public void flush()
+    {
+        _channel.flush();
+    }
+
     private static class ChannelClosedResponse implements Response<Void>
     {
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
index 2390227..b6e631d 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -75,6 +75,7 @@ public abstract class Interaction<I extends Interaction>
 
     public I sync() throws InterruptedException, ExecutionException, TimeoutException
     {
+        _transport.flush();
         if (_latestFuture != null)
         {
             _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -85,14 +86,11 @@ public abstract class Interaction<I extends Interaction>
 
     public Response<?> getLatestResponse() throws Exception
     {
-        sync();
         return _latestResponse;
     }
 
     public <T> T getLatestResponse(Class<T> type) throws Exception
     {
-        sync();
-
         if (_latestResponse.getBody() == null)
         {
             throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
@@ -110,9 +108,9 @@ public abstract class Interaction<I extends Interaction>
         return (T) _latestResponse.getBody();
     }
 
-    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody) throws Exception
     {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
         if (_latestFuture != null)
         {
             _latestFuture = allAsList(_latestFuture, future);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
index 40a2ca7..5d40447 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -21,6 +21,8 @@
 package org.apache.qpid.tests.protocol;
 
 import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -31,10 +33,14 @@ import io.netty.channel.ChannelPromise;
 public class OutputHandler extends ChannelOutboundHandlerAdapter
 {
     private final OutputEncoder _outputEncoder;
+    private Queue<ByteBufferPromisePair> _cachedEncodedFramePromisePairs;
+    private int _encodedSize;
 
     OutputHandler(final OutputEncoder outputEncoder)
     {
         _outputEncoder = outputEncoder;
+        _cachedEncodedFramePromisePairs = new LinkedList<>();
+        _encodedSize = 0;
     }
 
     @Override
@@ -51,12 +57,44 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
         }
     }
 
-    private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+    private synchronized void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
     {
-        byte[] data = new byte[dataByteBuffer.remaining()];
-        dataByteBuffer.get(data);
+        _cachedEncodedFramePromisePairs.add(new ByteBufferPromisePair(dataByteBuffer, promise));
+        _encodedSize += dataByteBuffer.remaining();
+    }
+
+
+    @Override
+    public synchronized void flush(final ChannelHandlerContext ctx) throws Exception
+    {
+        final ChannelPromise promise = ctx.channel().newPromise();
+        byte[] data  = new byte[_encodedSize];
+
+        int offset = 0;
+        while(offset < _encodedSize)
+        {
+            ByteBufferPromisePair currentPair = _cachedEncodedFramePromisePairs.poll();
+            int remaining = currentPair.byteBuffer.remaining();
+            currentPair.byteBuffer.get(data, offset, remaining) ;
+            offset += remaining;
+
+            promise.addListener(future -> {
+                if (future.isSuccess())
+                {
+                    currentPair.channelPromise.setSuccess();
+                }
+                else
+                {
+                    currentPair.channelPromise.setFailure(future.cause());
+                }
+            });
+        }
+
+        _encodedSize = 0;
+
         ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
         buffer.writeBytes(data);
+
         try
         {
             OutputHandler.super.write(ctx, buffer, promise);
@@ -65,5 +103,20 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
         {
             promise.setFailure(e);
         }
+
+        super.flush(ctx);
+    }
+
+    class ByteBufferPromisePair
+    {
+        private ByteBuffer byteBuffer;
+        private ChannelPromise channelPromise;
+
+        ByteBufferPromisePair(final ByteBuffer byteBuffer, final ChannelPromise channelPromise)
+        {
+            this.byteBuffer = byteBuffer;
+            this.channelPromise = channelPromise;
+        }
     }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/6] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it

Posted by kw...@apache.org.
QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it

Cherry picked from 06e53d7 with manual resolutions.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/deab4580
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/deab4580
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/deab4580

Branch: refs/heads/7.0.x
Commit: deab4580b7a876cfe16d35fe40804ac19e277cfc
Parents: 9a76285
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 18 01:31:29 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:34:28 2018 +0000

----------------------------------------------------------------------
 pom.xml                                         |  13 +
 systests/protocol-tests-amqp-1-0/pom.xml        |  35 +--
 .../qpid/tests/protocol/v1_0/FrameDecoder.java  | 276 +++++++++++++++++
 .../qpid/tests/protocol/v1_0/FrameEncoder.java  |  93 ++++++
 .../tests/protocol/v1_0/FrameTransport.java     | 205 +------------
 .../tests/protocol/v1_0/HeaderResponse.java     |  46 ---
 .../qpid/tests/protocol/v1_0/InputHandler.java  | 305 -------------------
 .../qpid/tests/protocol/v1_0/Interaction.java   | 141 +++------
 .../qpid/tests/protocol/v1_0/Matchers.java      |  60 ----
 .../qpid/tests/protocol/v1_0/OutputHandler.java |  96 ------
 .../protocol/v1_0/PerformativeResponse.java     |   1 +
 .../qpid/tests/protocol/v1_0/Response.java      |  25 --
 .../protocol/v1_0/SaslPerformativeResponse.java |   1 +
 .../tests/protocol/v1_0/SpecificationTest.java  |  34 ---
 .../tests/protocol/v1_0/DecodeErrorTest.java    |   2 +
 .../bindmapjms/TemporaryDestinationTest.java    |   4 +-
 .../extensions/management/ManagementTest.java   |   5 +-
 .../extensions/websocket/WebSocketTest.java     |  13 +-
 .../v1_0/messaging/DeleteOnCloseTest.java       |   2 +-
 .../protocol/v1_0/messaging/MessageFormat.java  |   4 +-
 .../v1_0/messaging/MultiTransferTest.java       |   4 +-
 .../protocol/v1_0/messaging/OutcomeTest.java    |   4 +-
 .../protocol/v1_0/messaging/TransferTest.java   |  11 +-
 .../v1_0/transaction/DischargeTest.java         |   2 +-
 .../transaction/TransactionalTransferTest.java  |   4 +-
 .../v1_0/transport/ProtocolHeaderTest.java      |   2 +-
 .../v1_0/transport/connection/OpenTest.java     |   7 +-
 .../v1_0/transport/link/AttachTest.java         |   2 +-
 .../protocol/v1_0/transport/link/FlowTest.java  |   2 +-
 .../transport/link/ResumeDeliveriesTest.java    |  12 +-
 .../v1_0/transport/security/sasl/SaslTest.java  |   2 +-
 .../v1_0/transport/session/BeginTest.java       |   8 +-
 systests/protocol-tests-core/pom.xml            |  75 +++++
 .../qpid/tests/protocol/FrameTransport.java     | 198 ++++++++++++
 .../qpid/tests/protocol/HeaderResponse.java     |  46 +++
 .../qpid/tests/protocol/InputDecoder.java       |  30 ++
 .../qpid/tests/protocol/InputHandler.java       |  81 +++++
 .../apache/qpid/tests/protocol/Interaction.java | 141 +++++++++
 .../apache/qpid/tests/protocol/Matchers.java    |  60 ++++
 .../qpid/tests/protocol/OutputEncoder.java      |  29 ++
 .../qpid/tests/protocol/OutputHandler.java      |  69 +++++
 .../apache/qpid/tests/protocol/Response.java    |  25 ++
 .../qpid/tests/protocol/SpecificationTest.java  |  34 +++
 43 files changed, 1282 insertions(+), 927 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1262002..1287501 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,7 @@
     <module>systests</module>
     <module>systests/systests-utils</module>
     <module>systests/qpid-systests-jms_2.0</module>
+    <module>systests/protocol-tests-core</module>
     <module>systests/protocol-tests-amqp-1-0</module>
     <module>systests/end-to-end-conversion-tests</module>
     <module>perftests</module>
@@ -407,6 +408,18 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-core</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>protocol-tests-amqp-1-0</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <!-- External dependencies -->
       <dependency>
         <groupId>org.apache.qpid</groupId>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index 56fc1f7..c392cf7 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -56,6 +56,11 @@
 
         <dependency>
             <groupId>org.apache.qpid</groupId>
+            <artifactId>protocol-tests-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-systests-utils</artifactId>
         </dependency>
 
@@ -94,38 +99,9 @@
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-bdbstore</artifactId>
             <scope>test</scope>
-            <optional>true</optional>
         </dependency>
 
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-buffer</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-common</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-codec-http</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-handler</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-transport</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-library</artifactId>
         </dependency>
@@ -133,6 +109,7 @@
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-integration</artifactId>
         </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
new file mode 100644
index 0000000..4dc06cb
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(FrameDecoder.class);
+    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                            .registerTransportLayer()
+                                                                                            .registerMessagingLayer()
+                                                                                            .registerTransactionLayer()
+                                                                                            .registerSecurityLayer()
+                                                                                            .registerExtensionSoleconnLayer();
+    private final MyConnectionHandler _connectionHandler;
+    private volatile FrameHandler _frameHandler;
+
+    private enum ParsingState
+    {
+        HEADER,
+        PERFORMATIVES;
+    }
+
+    private final ValueHandler _valueHandler;
+
+    private volatile ParsingState _state = ParsingState.HEADER;
+
+    public FrameDecoder(final boolean isSasl)
+    {
+        _valueHandler = new ValueHandler(TYPE_REGISTRY);
+        _connectionHandler = new MyConnectionHandler();
+        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
+    }
+
+    @Override
+    public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
+    {
+        List<Response<?>> responses = new ArrayList<>();
+        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
+        switch(_state)
+        {
+            case HEADER:
+                if (inputBuffer.remaining() >= 8)
+                {
+                    byte[] header = new byte[8];
+                    inputBuffer.get(header);
+                    responses.add(new HeaderResponse(header));
+                    _state = ParsingState.PERFORMATIVES;
+                    _frameHandler.parse(qpidByteBuffer);
+                }
+                break;
+            case PERFORMATIVES:
+                _frameHandler.parse(qpidByteBuffer);
+                break;
+            default:
+                throw new IllegalStateException("Unexpected state : " + _state);
+        }
+
+        Response<?> r;
+        while((r = _connectionHandler._responseQueue.poll())!=null)
+        {
+            responses.add(r);
+        }
+        return responses;
+    }
+
+    private void resetInputHandlerAfterSaslOutcome()
+    {
+        _state = ParsingState.HEADER;
+        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
+    }
+
+    private class MyConnectionHandler implements ConnectionHandler
+    {
+        private volatile int _frameSize = 512;
+        private Queue<Response<?>> _responseQueue = new ConcurrentLinkedQueue<>();
+
+        @Override
+        public void receiveOpen(final int channel, final Open close)
+        {
+        }
+
+        @Override
+        public void receiveClose(final int channel, final Close close)
+        {
+
+        }
+
+        @Override
+        public void receiveBegin(final int channel, final Begin begin)
+        {
+
+        }
+
+        @Override
+        public void receiveEnd(final int channel, final End end)
+        {
+
+        }
+
+        @Override
+        public void receiveAttach(final int channel, final Attach attach)
+        {
+
+        }
+
+        @Override
+        public void receiveDetach(final int channel, final Detach detach)
+        {
+
+        }
+
+        @Override
+        public void receiveTransfer(final int channel, final Transfer transfer)
+        {
+
+        }
+
+        @Override
+        public void receiveDisposition(final int channel, final Disposition disposition)
+        {
+
+        }
+
+        @Override
+        public void receiveFlow(final int channel, final Flow flow)
+        {
+
+        }
+
+        @Override
+        public int getMaxFrameSize()
+        {
+            return _frameSize;
+        }
+
+        @Override
+        public int getChannelMax()
+        {
+            return UnsignedShort.MAX_VALUE.intValue();
+        }
+
+        @Override
+        public void handleError(final Error parsingError)
+        {
+            LOGGER.error("Unexpected error {}", parsingError);
+        }
+
+        @Override
+        public boolean closedForInput()
+        {
+            return false;
+        }
+
+        @Override
+        public void receive(final List<ChannelFrameBody> channelFrameBodies)
+        {
+            for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
+            {
+                Response response;
+                Object val = channelFrameBody.getFrameBody();
+                int channel = channelFrameBody.getChannel();
+                if (val instanceof FrameBody)
+                {
+                    FrameBody frameBody = (FrameBody) val;
+                    if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
+                    {
+                        _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
+                    }
+                    response = new PerformativeResponse((short) channel, frameBody);
+                }
+                else if (val instanceof SaslFrameBody)
+                {
+                    SaslFrameBody frameBody = (SaslFrameBody) val;
+                    response = new SaslPerformativeResponse((short) channel, frameBody);
+
+                    if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
+                    {
+                        resetInputHandlerAfterSaslOutcome();
+                    }
+                }
+                else
+                {
+                    throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
+                }
+
+                _responseQueue.add(response);
+            }
+        }
+
+        @Override
+        public void receiveSaslInit(final SaslInit saslInit)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslResponse(final SaslResponse saslResponse)
+        {
+
+        }
+
+        @Override
+        public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+        {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
new file mode 100644
index 0000000..56d6e6f
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                            .registerTransportLayer()
+                                                                                            .registerMessagingLayer()
+                                                                                            .registerTransactionLayer()
+                                                                                            .registerSecurityLayer()
+                                                                                            .registerExtensionSoleconnLayer();
+
+    @Override
+    public ByteBuffer encode(final Object msg)
+    {
+        if (msg instanceof AMQFrame)
+        {
+            List<ByteBuffer> buffers = new ArrayList<>();
+            FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
+            {
+                @Override
+                public boolean isDirectBufferPreferred()
+                {
+                    return false;
+                }
+
+                @Override
+                public void send(final QpidByteBuffer msg)
+                {
+                    byte[] data = new byte[msg.remaining()];
+                    msg.get(data);
+                    buffers.add(ByteBuffer.wrap(data));
+                }
+
+                @Override
+                public void flush()
+                {
+                }
+
+                @Override
+                public void close()
+                {
+
+                }
+            });
+            _frameWriter.send(((AMQFrame) msg));
+
+            int remaining = 0;
+            for (ByteBuffer byteBuffer: buffers)
+            {
+                remaining += byteBuffer.remaining();
+            }
+            ByteBuffer result = ByteBuffer.allocate(remaining);
+            for (ByteBuffer byteBuffer: buffers)
+            {
+                result.put(byteBuffer);
+            }
+            result.flip();
+            return result;
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 4d53751..dd59757 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -19,58 +19,12 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
+import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class FrameTransport implements AutoCloseable
+public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
 {
-    public static final long RESPONSE_TIMEOUT = Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout",6000);
-    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
-
-    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
-
-    private final EventLoopGroup _workerGroup;
-    private final InetSocketAddress _brokerAddress;
-    private final boolean _isSasl;
-
-    private Channel _channel;
-    private volatile boolean _channelClosedSeen = false;
-
     public FrameTransport(final InetSocketAddress brokerAddress)
     {
         this(brokerAddress, false);
@@ -78,165 +32,20 @@ public class FrameTransport implements AutoCloseable
 
     public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
     {
-        _brokerAddress = brokerAddress;
-        _isSasl = isSasl;
-        _workerGroup = new NioEventLoopGroup();
-    }
-
-    public InetSocketAddress getBrokerAddress()
-    {
-        return _brokerAddress;
+        super(brokerAddress, new FrameDecoder(isSasl), new FrameEncoder());
     }
 
+    @Override
     public FrameTransport connect()
     {
-        try
-        {
-            Bootstrap b = new Bootstrap();
-            b.group(_workerGroup);
-            b.channel(NioSocketChannel.class);
-            b.option(ChannelOption.SO_KEEPALIVE, true);
-            b.handler(new ChannelInitializer<SocketChannel>()
-            {
-                @Override
-                public void initChannel(SocketChannel ch) throws Exception
-                {
-                    ChannelPipeline pipeline = ch.pipeline();
-                    buildInputOutputPipeline(pipeline);
-                }
-            });
-
-            _channel = b.connect(_brokerAddress).sync().channel();
-            _channel.closeFuture().addListener(future ->
-                                               {
-                                                   _channelClosedSeen = true;
-                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
-                                               });
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
+        super.connect();
         return this;
     }
 
-    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
-    {
-        pipeline.addLast(new InputHandler(_queue, _isSasl)).addLast(new OutputHandler());
-    }
-
     @Override
-    public void close() throws Exception
-    {
-        try
-        {
-            if (_channel != null)
-            {
-                _channel.disconnect().sync();
-                _channel.close().sync();
-                _channel = null;
-            }
-        }
-        finally
-        {
-            _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
-        }
-    }
-
-    public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
-    {
-        Preconditions.checkState(_channel != null, "Not connected");
-        ChannelPromise promise = _channel.newPromise();
-        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-        buffer.writeBytes(bytes);
-        _channel.write(buffer, promise);
-        _channel.flush();
-        return JdkFutureAdapters.listenInPoolThread(promise);
-    }
-
-    public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
+    public byte[] getProtocolHeader()
     {
-        Preconditions.checkState(_channel != null, "Not connected");
-        ChannelPromise promise = _channel.newPromise();
-        final TransportFrame transportFrame;
-        try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
-        {
-            final QpidByteBuffer duplicate;
-            if (payload == null)
-            {
-                duplicate = null;
-            }
-            else
-            {
-                duplicate = payload.duplicate();
-            }
-            transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
-            _channel.write(transportFrame, promise);
-            _channel.flush();
-            final ListenableFuture<Void> listenableFuture = JdkFutureAdapters.listenInPoolThread(promise);
-            if (frameBody instanceof Transfer)
-            {
-                listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
-            }
-            if (duplicate != null)
-            {
-                listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
-            }
-            return listenableFuture;
-        }
-    }
-
-    public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
-    {
-        SASLFrame transportFrame = new SASLFrame(frameBody);
-        ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
-        channelFuture.sync();
-        return JdkFutureAdapters.listenInPoolThread(channelFuture);
-    }
-
-    public <T extends Response<?>> T getNextResponse() throws Exception
-    {
-        return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
-    }
-
-    public void doCloseConnection() throws Exception
-    {
-        Close close = new Close();
-
-        sendPerformative(close, UnsignedShort.valueOf((short) 0));
-        PerformativeResponse response = getNextResponse();
-        if (!(response.getBody() instanceof Close))
-        {
-            throw new IllegalStateException(String.format(
-                    "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
-        }
-    }
-
-    public void assertNoMoreResponses() throws Exception
-    {
-        Response response = getNextResponse();
-        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
-    }
-
-    public void assertNoMoreResponsesAndChannelClosed() throws Exception
-    {
-        assertNoMoreResponses();
-        assertThat(_channelClosedSeen, is(true));
-    }
-
-    private static class ChannelClosedResponse implements Response<Void>
-    {
-        @Override
-        public String toString()
-        {
-            return "ChannelClosed";
-        }
-
-        @Override
-        public Void getBody()
-        {
-            return null;
-        }
+        return "AMQP\0\1\0\0".getBytes(UTF_8);
     }
 
     public Interaction newInteraction()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
deleted file mode 100644
index 9503113..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-public class HeaderResponse implements Response<byte[]>
-{
-    private final byte[] _header;
-
-    public HeaderResponse(final byte[] header)
-    {
-        _header = header;
-    }
-
-    @Override
-    public byte[] getBody()
-    {
-        return _header;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "HeaderResponse{" +
-               "_header=" + Arrays.toString(_header) +
-               '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
deleted file mode 100644
index e3acd24..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.ReferenceCountUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
-import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class InputHandler extends ChannelInboundHandlerAdapter
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
-    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
-                                                                                            .registerTransportLayer()
-                                                                                            .registerMessagingLayer()
-                                                                                            .registerTransactionLayer()
-                                                                                            .registerSecurityLayer()
-                                                                                            .registerExtensionSoleconnLayer();
-
-    private enum ParsingState
-    {
-        HEADER,
-        PERFORMATIVES
-    }
-
-    private final MyConnectionHandler _connectionHandler;
-    private final ValueHandler _valueHandler;
-    private final BlockingQueue<Response<?>> _responseQueue;
-
-    private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
-    private volatile FrameHandler _frameHandler;
-    private volatile ParsingState _state = ParsingState.HEADER;
-
-    public InputHandler(final BlockingQueue<Response<?>> queue, final boolean isSasl)
-    {
-
-        _valueHandler = new ValueHandler(TYPE_REGISTRY);
-        _connectionHandler = new MyConnectionHandler();
-        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
-
-        _responseQueue = queue;
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
-    {
-        ByteBuf buf = (ByteBuf) msg;
-        QpidByteBuffer qpidBuf = QpidByteBuffer.allocate(buf.readableBytes());
-        qpidBuf.put(buf.nioBuffer());
-        qpidBuf.flip();
-        LOGGER.debug("Incoming {} byte(s)", qpidBuf.remaining());
-
-        if (_inputBuffer.hasRemaining())
-        {
-            QpidByteBuffer old = _inputBuffer;
-            _inputBuffer = QpidByteBuffer.allocate(_inputBuffer.remaining() + qpidBuf.remaining());
-            _inputBuffer.put(old);
-            _inputBuffer.put(qpidBuf);
-            old.dispose();
-            qpidBuf.dispose();
-            _inputBuffer.flip();
-        }
-        else
-        {
-            _inputBuffer.dispose();
-            _inputBuffer = qpidBuf;
-        }
-
-        doParsing();
-
-        LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
-
-        if (_inputBuffer.hasRemaining())
-        {
-            _inputBuffer.compact();
-            _inputBuffer.flip();
-        }
-
-        ReferenceCountUtil.release(msg);
-    }
-
-    private void doParsing()
-    {
-        switch(_state)
-        {
-            case HEADER:
-                if (_inputBuffer.remaining() >= 8)
-                {
-                    byte[] header = new byte[8];
-                    _inputBuffer.get(header);
-                    _responseQueue.add(new HeaderResponse(header));
-                    _state = ParsingState.PERFORMATIVES;
-                    doParsing();
-                }
-                break;
-            case PERFORMATIVES:
-                _frameHandler.parse(_inputBuffer);
-                break;
-            default:
-                throw new IllegalStateException("Unexpected state : " + _state);
-        }
-    }
-
-    private void resetInputHandlerAfterSaslOutcome()
-    {
-        _state = ParsingState.HEADER;
-        _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
-    }
-
-    private class MyConnectionHandler implements ConnectionHandler
-    {
-        private volatile int _frameSize = 512;
-
-        @Override
-        public void receiveOpen(final int channel, final Open close)
-        {
-        }
-
-        @Override
-        public void receiveClose(final int channel, final Close close)
-        {
-
-        }
-
-        @Override
-        public void receiveBegin(final int channel, final Begin begin)
-        {
-
-        }
-
-        @Override
-        public void receiveEnd(final int channel, final End end)
-        {
-
-        }
-
-        @Override
-        public void receiveAttach(final int channel, final Attach attach)
-        {
-
-        }
-
-        @Override
-        public void receiveDetach(final int channel, final Detach detach)
-        {
-
-        }
-
-        @Override
-        public void receiveTransfer(final int channel, final Transfer transfer)
-        {
-
-        }
-
-        @Override
-        public void receiveDisposition(final int channel, final Disposition disposition)
-        {
-
-        }
-
-        @Override
-        public void receiveFlow(final int channel, final Flow flow)
-        {
-
-        }
-
-        @Override
-        public int getMaxFrameSize()
-        {
-            return _frameSize;
-        }
-
-        @Override
-        public int getChannelMax()
-        {
-            return UnsignedShort.MAX_VALUE.intValue();
-        }
-
-        @Override
-        public void handleError(final Error parsingError)
-        {
-            LOGGER.error("Unexpected error {}", parsingError);
-        }
-
-        @Override
-        public boolean closedForInput()
-        {
-            return false;
-        }
-
-        @Override
-        public void receive(final List<ChannelFrameBody> channelFrameBodies)
-        {
-            for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
-            {
-                Response response;
-                Object val = channelFrameBody.getFrameBody();
-                int channel = channelFrameBody.getChannel();
-                if (val instanceof FrameBody)
-                {
-                    FrameBody frameBody = (FrameBody) val;
-                    if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
-                    {
-                        _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
-                    }
-                    response = new PerformativeResponse((short) channel, frameBody);
-                }
-                else if (val instanceof SaslFrameBody)
-                {
-                    SaslFrameBody frameBody = (SaslFrameBody) val;
-                    response = new SaslPerformativeResponse((short) channel, frameBody);
-
-                    if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
-                    {
-                        resetInputHandlerAfterSaslOutcome();
-                    }
-                }
-                else
-                {
-                    throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
-                }
-
-                _responseQueue.add(response);
-            }
-        }
-
-        @Override
-        public void receiveSaslInit(final SaslInit saslInit)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslChallenge(final SaslChallenge saslChallenge)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslResponse(final SaslResponse saslResponse)
-        {
-
-        }
-
-        @Override
-        public void receiveSaslOutcome(final SaslOutcome saslOutcome)
-        {
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 52518ab..7d73ce8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -20,8 +20,6 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
-import static com.google.common.util.concurrent.Futures.allAsList;
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,21 +28,19 @@ import static org.hamcrest.Matchers.is;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -81,8 +77,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
 
-public class Interaction
+public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
 {
     private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Begin _begin;
@@ -94,14 +91,11 @@ public class Interaction
     private final Flow _flow;
     private final Transfer _transfer;
     private final Disposition _disposition;
-    private final FrameTransport _transport;
     private final SaslInit _saslInit;
     private final SaslResponse _saslResponse;
     private byte[] _protocolHeader;
     private UnsignedShort _connectionChannel;
     private UnsignedShort _sessionChannel;
-    private Response<?> _latestResponse;
-    private ListenableFuture<?> _latestFuture;
     private int _deliveryIdCounter;
     private List<Transfer> _latestDelivery;
     private Object _decodedLatestDelivery;
@@ -109,10 +103,10 @@ public class Interaction
 
     Interaction(final FrameTransport frameTransport)
     {
+        super(frameTransport);
         final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO;
-        _transport = frameTransport;
 
-        _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8);
+        _protocolHeader = frameTransport.getProtocolHeader();
 
         _saslInit = new SaslInit();
         _saslResponse = new SaslResponse();
@@ -154,6 +148,19 @@ public class Interaction
         _disposition.setFirst(UnsignedInteger.ZERO);
     }
 
+    public void doCloseConnection() throws Exception
+    {
+        Close close = new Close();
+
+        sendPerformative(close, UnsignedShort.valueOf((short) 0));
+        Response<?> response = getNextResponse();
+        if (!(response.getBody() instanceof Close))
+        {
+            throw new IllegalStateException(String.format(
+                    "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
+        }
+    }
+
     /////////////////////////
     // Protocol Negotiation //
     /////////////////////////
@@ -164,17 +171,15 @@ public class Interaction
         return this;
     }
 
-    public Interaction negotiateProtocol() throws Exception
+    @Override
+    protected byte[] getProtocolHeader()
+    {
+        return _protocolHeader;
+    }
+
+    @Override
+    protected Interaction getInteraction()
     {
-        final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
         return this;
     }
 
@@ -977,83 +982,35 @@ public class Interaction
 
     private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
     {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
+        SASLFrame transportFrame = new SASLFrame(frameBody);
+        sendPerformativeAndChainFuture(transportFrame, true);
     }
 
     private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
     {
-        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel);
-        if (_latestFuture != null)
-        {
-            _latestFuture = allAsList(_latestFuture, future);
-        }
-        else
-        {
-            _latestFuture = future;
-        }
-    }
-
-    public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception
-    {
-        sync();
-        _latestResponse = _transport.getNextResponse();
-        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
-        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
-            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+        final TransportFrame transportFrame;
+        try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
         {
-            return this;
-        }
-        acceptableResponseClasses.remove(null);
-        if (_latestResponse != null)
-        {
-            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+            final QpidByteBuffer duplicate;
+            if (payload == null)
             {
-                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
-                {
-                    return this;
-                }
+                duplicate = null;
+            }
+            else
+            {
+                duplicate = payload.duplicate();
+            }
+            transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
+            ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+            if (frameBody instanceof Transfer)
+            {
+                listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
+            }
+            if (duplicate != null)
+            {
+                listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
             }
         }
-        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
-                                                      acceptableResponseClasses,
-                                                      _latestResponse == null ? null : _latestResponse.getBody()));
-    }
-
-    public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException
-    {
-        if (_latestFuture != null)
-        {
-            _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
-            _latestFuture = null;
-        }
-        return this;
-    }
-
-    public Response<?> getLatestResponse() throws Exception
-    {
-        sync();
-        return _latestResponse;
-    }
-
-    public <T> T getLatestResponse(Class<T> type) throws Exception
-    {
-        sync();
-        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
-        {
-            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
-                                                          type.getSimpleName(),
-                                                          _latestResponse.getBody()));
-        }
-
-        return (T) _latestResponse.getBody();
     }
 
     public Interaction flowHandleFromLinkHandle()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
deleted file mode 100644
index 029dc70..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-
-public class Matchers
-{
-    public static Matcher<Response> protocolHeader(byte[] expectedHeader)
-    {
-        return new BaseMatcher<Response>()
-        {
-            @Override
-            public void describeTo(final Description description)
-            {
-                description.appendValue(new HeaderResponse(expectedHeader));
-            }
-
-            @Override
-            public boolean matches(final Object o)
-            {
-                if (o == null)
-                {
-                    return false;
-                }
-                if (!(o instanceof HeaderResponse))
-                {
-                    return false;
-                }
-                if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
-                {
-                    return false;
-                }
-                return true;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
deleted file mode 100644
index 68f4322..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.transport.ByteBufferSender;
-
-public class OutputHandler extends ChannelOutboundHandlerAdapter
-{
-    private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
-                                                                                            .registerTransportLayer()
-                                                                                            .registerMessagingLayer()
-                                                                                            .registerTransactionLayer()
-                                                                                            .registerSecurityLayer()
-                                                                                            .registerExtensionSoleconnLayer();
-
-
-    @Override
-    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
-    {
-
-        if (msg instanceof AMQFrame)
-        {
-            FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
-            {
-                @Override
-                public boolean isDirectBufferPreferred()
-                {
-                    return false;
-                }
-
-                @Override
-                public void send(final QpidByteBuffer msg)
-                {
-                    byte[] data = new byte[msg.remaining()];
-                    msg.get(data);
-                    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-                    buffer.writeBytes(data);
-                    try
-                    {
-                        OutputHandler.super.write(ctx, buffer, promise);
-                    }
-                    catch (Exception e)
-                    {
-                        promise.setFailure(e);
-                    }
-                }
-
-                @Override
-                public void flush()
-                {
-                }
-
-                @Override
-                public void close()
-                {
-
-                }
-            });
-            _frameWriter.send(((AMQFrame) msg));
-        }
-        else
-        {
-            super.write(ctx, msg, promise);
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
index 06a64dc..9e03a26 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
@@ -20,6 +20,7 @@
 package org.apache.qpid.tests.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.tests.protocol.Response;
 
 public class PerformativeResponse implements Response<FrameBody>
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
deleted file mode 100644
index a7e341c..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-public interface Response<T>
-{
-    T getBody();
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
index 08893e0..02ab3c9 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
@@ -21,6 +21,7 @@
 package org.apache.qpid.tests.protocol.v1_0;
 
 import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.tests.protocol.Response;
 
 public class SaslPerformativeResponse implements Response<SaslFrameBody>
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
deleted file mode 100644
index ea3d164..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface SpecificationTest
-{
-    String section();
-    String description();
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index 8a150fa..1ca4419 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -54,6 +54,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 817be05..a3270a0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -44,7 +44,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
@@ -108,7 +108,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 
             interaction.consumeResponse().getLatestResponse(Flow.class);
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
 
         assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
index c75bb20..524d991 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
@@ -46,10 +46,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -125,7 +124,7 @@ public class ManagementTest extends BrokerAdminUsingTestBase
             assertThat(flow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
             assertThat(flow.getHandle(), is(equalTo(receiverResponse.getHandle())));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
index a39b1b3..e85f356 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
@@ -39,10 +39,11 @@ import org.junit.Test;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class WebSocketTest extends BrokerAdminUsingTestBase
 {
@@ -73,7 +74,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
         try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect())
         {
-            final Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            final Open responseOpen = interaction
                                                .negotiateProtocol().consumeResponse()
                                                .open().consumeResponse()
                                                .getLatestResponse(Open.class);
@@ -84,7 +86,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -97,7 +99,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
         try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
         {
-            final Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            final Open responseOpen = interaction
                                                .negotiateProtocol().consumeResponse()
                                                .open().consumeResponse()
                                                .getLatestResponse(Open.class);
@@ -108,7 +111,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
index 6ba9058..36203a4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
@@ -48,7 +48,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class DeleteOnCloseTest extends BrokerAdminUsingTestBase

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 97fe3bb..1cf1ff0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -43,9 +43,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index bcd155f..0a45410 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -52,9 +52,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
index 1853446..cc377cc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -99,7 +99,7 @@ public class OutcomeTest extends BrokerAdminUsingTestBase
             assertThat(secondDeliveryPayload, is(equalTo("message2")));
 
             // verify that no unexpected performative is received by closing
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index eb72532..245c624 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -76,8 +76,8 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
 import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -439,7 +439,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .disposition();
 
             // verify that no unexpected performative is received by closing
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -688,8 +688,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
             assertThat(isSettled.get(), is(true));
 
             // verify no unexpected performative received by closing the connection
-           transport.doCloseConnection();
-
+            interaction.doCloseConnection();
         }
     }
 
@@ -796,7 +795,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .transfer()
                        .sync();
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
             assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 42f6114..bd7c113 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -58,7 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 1496d13..fb61974 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -57,9 +57,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index 619e5d9..db91db1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class ProtocolHeaderTest extends BrokerAdminUsingTestBase
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index b3f57c1..ab570da 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 
 public class OpenTest extends BrokerAdminUsingTestBase
@@ -77,7 +77,8 @@ public class OpenTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Open responseOpen = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            Open responseOpen = interaction
                                          .negotiateProtocol().consumeResponse()
                                          .openContainerId("testContainerId")
                                          .open().consumeResponse()
@@ -88,7 +89,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
             assertThat(responseOpen.getChannelMax().intValue(),
                        is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index c559399..774cc00 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -43,7 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[6/6] qpid-broker-j git commit: QPID-8042: Fix defect that prevents pipelining with header within protocol tests

Posted by kw...@apache.org.
QPID-8042: Fix defect that prevents pipelining with header within protocol tests

Revert saslSuccessfulAuthenticationWithPipelinedFrames to a readable style.

Cherry picked from 1c382be


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7d8391ff
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7d8391ff
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7d8391ff

Branch: refs/heads/7.0.x
Commit: 7d8391ff7f5946e46c5c3840c628d80debe9e433
Parents: 33cb908
Author: Keith Wall <kw...@apache.org>
Authored: Wed Jan 10 11:02:49 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:40:30 2018 +0000

----------------------------------------------------------------------
 .../v1_0/transport/security/sasl/SaslTest.java  | 49 +++++---------------
 .../qpid/tests/protocol/OutputHandler.java      |  9 ++++
 2 files changed, 20 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7d8391ff/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index 1b81507..ee657f3 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
@@ -37,23 +36,17 @@ import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 import javax.xml.bind.DatatypeConverter;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.tests.protocol.SpecificationTest;
-import org.apache.qpid.tests.protocol.v1_0.FrameEncoder;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -123,38 +116,18 @@ public class SaslTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
         try (FrameTransport transport = new FrameTransport(addr, true).connect())
         {
+            final Binary initialResponse =
+                    new Binary(String.format("\0%s\0%s", _username, _password).getBytes(StandardCharsets.US_ASCII));
             final Interaction interaction = transport.newInteraction();
-            FrameEncoder frameEncoder = new FrameEncoder();
-
-            SaslInit saslInit = new SaslInit();
-            saslInit.setMechanism(PLAIN);
-            saslInit.setInitialResponse(new Binary(String.format("\0%s\0%s", _username, _password)
-                                                         .getBytes(StandardCharsets.US_ASCII)));
-            ByteBuffer saslInitByteBuffer = frameEncoder.encode(new SASLFrame(saslInit));
-
-            Open open = new Open();
-            open.setContainerId("containerId");
-            ByteBuffer openByteBuffer = frameEncoder.encode(new TransportFrame(0, open));
-
-            int initSize = saslInitByteBuffer.remaining();
-            int openSize = openByteBuffer.remaining();
-            int dataLength = SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize + openSize;
-            byte[] data = new byte[dataLength];
-
-            System.arraycopy(SASL_AMQP_HEADER_BYTES, 0, data, 0, SASL_AMQP_HEADER_BYTES.length);
-            saslInitByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length, initSize);
-            System.arraycopy(AMQP_HEADER_BYTES,
-                             0,
-                             data,
-                             SASL_AMQP_HEADER_BYTES.length + initSize,
-                             AMQP_HEADER_BYTES.length);
-            openByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize, openSize);
-
-            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-            buffer.writeBytes(data);
-
-            transport.sendPerformative(buffer);
-
+            interaction.protocolHeader(SASL_AMQP_HEADER_BYTES)
+                       .negotiateProtocol()
+                       .saslMechanism(PLAIN)
+                       .saslInitialResponse(initialResponse)
+                       .saslInit()
+                       .protocolHeader(AMQP_HEADER_BYTES)
+                       .negotiateProtocol()
+                       .openContainerId("testContainerId")
+                       .open();
 
             final byte[] saslHeaderResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
             assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7d8391ff/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
index 5d40447..fae1ce4 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -51,6 +51,15 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
         {
             send(ctx, byteBuffer, promise);
         }
+        else if (msg instanceof ByteBuf)
+        {
+            ByteBuf buf = (ByteBuf) msg;
+            final ByteBuffer bytes = ByteBuffer.allocate(buf.readableBytes());
+            buf.readBytes(bytes.array());
+            buf.release();
+
+            send(ctx, bytes, promise);
+        }
         else
         {
             super.write(ctx, msg, promise);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org