You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/18 01:42:20 UTC

[2/3] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 35ee4e7..ad56ea9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -69,9 +69,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -459,7 +459,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                 }
             }
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 
@@ -588,7 +588,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                            .disposition();
             }
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             if (getBrokerAdmin().isQueueDepthSupported())
             {
@@ -672,7 +672,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
                        .dispositionRole(Role.RECEIVER)
                        .disposition();
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
 
             if (getBrokerAdmin().isQueueDepthSupported())
             {
@@ -771,7 +771,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
             }
             while (!settled);
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index cf12b05..6fea928 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -49,7 +49,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class SaslTest extends BrokerAdminUsingTestBase
 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index 352fd19..24cb435 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -36,10 +36,11 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 
 public class BeginTest extends BrokerAdminUsingTestBase
 {
@@ -74,7 +75,8 @@ public class BeginTest extends BrokerAdminUsingTestBase
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
             final UnsignedShort channel = UnsignedShort.valueOf(37);
-            Begin responseBegin = transport.newInteraction()
+            Interaction interaction = transport.newInteraction();
+            Begin responseBegin = interaction
                                            .negotiateProtocol().consumeResponse()
                                            .open().consumeResponse(Open.class)
                                            .sessionChannel(channel)
@@ -85,7 +87,7 @@ public class BeginTest extends BrokerAdminUsingTestBase
             assertThat(responseBegin.getOutgoingWindow(), is(instanceOf(UnsignedInteger.class)));
             assertThat(responseBegin.getNextOutgoingId(), is(instanceOf(UnsignedInteger.class)));
 
-            transport.doCloseConnection();
+            interaction.doCloseConnection();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/pom.xml b/systests/protocol-tests-core/pom.xml
new file mode 100644
index 0000000..fa561ed
--- /dev/null
+++ b/systests/protocol-tests-core/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  ~
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-systests-parent</artifactId>
+        <version>7.1.0-SNAPSHOT</version>
+        <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>protocol-tests-core</artifactId>
+    <name>Apache Qpid Broker-J Protocol Tests Core</name>
+    <description>Core classes for Apache Qpid protocol tests</description>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-buffer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-integration</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
new file mode 100644
index 0000000..daf500d
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public abstract class FrameTransport implements AutoCloseable
+{
+    public static final long RESPONSE_TIMEOUT =
+            Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
+    private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
+
+    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
+    private final EventLoopGroup _workerGroup;
+    private final InetSocketAddress _brokerAddress;
+    private final InputHandler _inputHandler;
+    private final OutputHandler _outputHandler;
+
+    private volatile Channel _channel;
+    private volatile boolean _channelClosedSeen = false;
+
+    public FrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
+    {
+        _brokerAddress = brokerAddress;
+        _inputHandler = new InputHandler(_queue, inputDecoder);
+        _outputHandler = new OutputHandler(outputEncoder);
+        _workerGroup = new NioEventLoopGroup();
+    }
+
+    public InetSocketAddress getBrokerAddress()
+    {
+        return _brokerAddress;
+    }
+
+    public FrameTransport connect()
+    {
+        try
+        {
+            Bootstrap b = new Bootstrap();
+            b.group(_workerGroup);
+            b.channel(NioSocketChannel.class);
+            b.option(ChannelOption.SO_KEEPALIVE, true);
+            b.handler(new ChannelInitializer<SocketChannel>()
+            {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception
+                {
+                    ChannelPipeline pipeline = ch.pipeline();
+                    buildInputOutputPipeline(pipeline);
+                }
+            });
+
+            _channel = b.connect(_brokerAddress).sync().channel();
+            _channel.closeFuture().addListener(future ->
+                                               {
+                                                   _channelClosedSeen = true;
+                                                   _queue.add(CHANNEL_CLOSED_RESPONSE);
+                                               });
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return this;
+    }
+
+    protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+    {
+        pipeline.addLast(_inputHandler).addLast(_outputHandler);
+    }
+
+    @Override
+    public void close() throws Exception
+    {
+        try
+        {
+            if (_channel != null)
+            {
+                _channel.disconnect().sync();
+                _channel.close().sync();
+                _channel = null;
+            }
+        }
+        finally
+        {
+            _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
+        }
+    }
+
+    public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+    {
+        Preconditions.checkState(_channel != null, "Not connected");
+        ChannelPromise promise = _channel.newPromise();
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+        buffer.writeBytes(bytes);
+        _channel.write(buffer, promise);
+        _channel.flush();
+        return JdkFutureAdapters.listenInPoolThread(promise);
+    }
+
+    public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+    {
+        Preconditions.checkState(_channel != null, "Not connected");
+        if (!sync)
+        {
+            ChannelPromise promise = _channel.newPromise();
+            _channel.write(data, promise);
+            _channel.flush();
+            return JdkFutureAdapters.listenInPoolThread(promise);
+        }
+        else
+        {
+            ChannelFuture channelFuture = _channel.writeAndFlush(data);
+            channelFuture.sync();
+            return Futures.immediateFuture(null);
+        }
+    }
+
+    public <T extends Response<?>> T getNextResponse() throws Exception
+    {
+        return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+    }
+
+    public void assertNoMoreResponses() throws Exception
+    {
+        Response response = getNextResponse();
+        assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+    }
+
+    public void assertNoMoreResponsesAndChannelClosed() throws Exception
+    {
+        assertNoMoreResponses();
+        assertThat(_channelClosedSeen, is(true));
+    }
+
+    private static class ChannelClosedResponse implements Response<Void>
+    {
+        @Override
+        public String toString()
+        {
+            return "ChannelClosed";
+        }
+
+        @Override
+        public Void getBody()
+        {
+            return null;
+        }
+    }
+
+    public abstract byte[] getProtocolHeader();
+
+    protected abstract Interaction newInteraction();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
new file mode 100644
index 0000000..9767b40
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+public class HeaderResponse implements Response<byte[]>
+{
+    private final byte[] _header;
+
+    public HeaderResponse(final byte[] header)
+    {
+        _header = header;
+    }
+
+    @Override
+    public byte[] getBody()
+    {
+        return _header;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "HeaderResponse{" +
+               "_header=" + Arrays.toString(_header) +
+               '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
new file mode 100644
index 0000000..369cfd1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+public interface InputDecoder
+{
+    Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
new file mode 100644
index 0000000..2d5fb45
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InputHandler extends ChannelInboundHandlerAdapter
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
+
+    private final BlockingQueue<Response<?>> _responseQueue;
+    private final InputDecoder _inputDecoder;
+
+    private ByteBuffer _inputBuffer = ByteBuffer.allocate(0);
+
+    InputHandler(final BlockingQueue<Response<?>> queue, InputDecoder inputDecoder)
+    {
+        _responseQueue = queue;
+        _inputDecoder = inputDecoder;
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+    {
+        ByteBuf buf = (ByteBuf) msg;
+        ByteBuffer byteBuffer = ByteBuffer.allocate(buf.readableBytes());
+        byteBuffer.put(buf.nioBuffer());
+        byteBuffer.flip();
+        LOGGER.debug("Incoming {} byte(s)", byteBuffer.remaining());
+
+        if (_inputBuffer.hasRemaining())
+        {
+            ByteBuffer old = _inputBuffer;
+            _inputBuffer = ByteBuffer.allocate(_inputBuffer.remaining() + byteBuffer.remaining());
+            _inputBuffer.put(old);
+            _inputBuffer.put(byteBuffer);
+            _inputBuffer.flip();
+        }
+        else
+        {
+            _inputBuffer = byteBuffer;
+        }
+
+        _responseQueue.addAll(_inputDecoder.decode(_inputBuffer));
+
+        LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
+
+        if (_inputBuffer.hasRemaining())
+        {
+            _inputBuffer.compact();
+            _inputBuffer.flip();
+        }
+
+        ReferenceCountUtil.release(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
new file mode 100644
index 0000000..238c0a5
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class Interaction<I extends Interaction>
+{
+    private final FrameTransport _transport;
+    private ListenableFuture<?> _latestFuture;
+    private Response<?> _latestResponse;
+
+    public Interaction(final FrameTransport frameTransport)
+    {
+        _transport = frameTransport;
+    }
+
+    public I consumeResponse(final Class<?>... responseTypes) throws Exception
+    {
+        sync();
+        _latestResponse = getNextResponse();
+        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+        {
+            return getInteraction();
+        }
+        acceptableResponseClasses.remove(null);
+        if (_latestResponse != null)
+        {
+            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+            {
+                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+                {
+                    return getInteraction();
+                }
+            }
+        }
+        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+                                                      acceptableResponseClasses,
+                                                      _latestResponse == null ? null : _latestResponse.getBody()));
+    }
+
+    protected Response<?> getNextResponse() throws Exception
+    {
+        return _transport.getNextResponse();
+    }
+
+    public I sync() throws InterruptedException, ExecutionException, TimeoutException
+    {
+        if (_latestFuture != null)
+        {
+            _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+            _latestFuture = null;
+        }
+        return getInteraction();
+    }
+
+    public Response<?> getLatestResponse() throws Exception
+    {
+        sync();
+        return _latestResponse;
+    }
+
+    public <T> T getLatestResponse(Class<T> type) throws Exception
+    {
+        sync();
+        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+        {
+            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+                                                          type.getSimpleName(),
+                                                          _latestResponse.getBody()));
+        }
+
+        return (T) _latestResponse.getBody();
+    }
+
+    protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return future;
+    }
+
+    public I negotiateProtocol() throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return getInteraction();
+    }
+
+    protected FrameTransport getTransport()
+    {
+        return _transport;
+    }
+
+    protected abstract byte[] getProtocolHeader();
+
+    protected abstract I getInteraction();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
new file mode 100644
index 0000000..292ae9a
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+public class Matchers
+{
+    public static Matcher<Response> protocolHeader(byte[] expectedHeader)
+    {
+        return new BaseMatcher<Response>()
+        {
+            @Override
+            public void describeTo(final Description description)
+            {
+                description.appendValue(new HeaderResponse(expectedHeader));
+            }
+
+            @Override
+            public boolean matches(final Object o)
+            {
+                if (o == null)
+                {
+                    return false;
+                }
+                if (!(o instanceof HeaderResponse))
+                {
+                    return false;
+                }
+                if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
+                {
+                    return false;
+                }
+                return true;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
new file mode 100644
index 0000000..a6a4a47
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+public interface OutputEncoder
+{
+    ByteBuffer encode(Object msg);
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
new file mode 100644
index 0000000..40a2ca7
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class OutputHandler extends ChannelOutboundHandlerAdapter
+{
+    private final OutputEncoder _outputEncoder;
+
+    OutputHandler(final OutputEncoder outputEncoder)
+    {
+        _outputEncoder = outputEncoder;
+    }
+
+    @Override
+    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
+    {
+        ByteBuffer byteBuffer = _outputEncoder.encode(msg);
+        if (byteBuffer != null)
+        {
+            send(ctx, byteBuffer, promise);
+        }
+        else
+        {
+            super.write(ctx, msg, promise);
+        }
+    }
+
+    private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+    {
+        byte[] data = new byte[dataByteBuffer.remaining()];
+        dataByteBuffer.get(data);
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+        buffer.writeBytes(data);
+        try
+        {
+            OutputHandler.super.write(ctx, buffer, promise);
+        }
+        catch (Exception e)
+        {
+            promise.setFailure(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
new file mode 100644
index 0000000..debc06f
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+public interface Response<T>
+{
+    T getBody();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
new file mode 100644
index 0000000..db6d7a1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface SpecificationTest
+{
+    String section();
+    String description();
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org