You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/06/21 10:00:48 UTC

[3/3] qpid-broker-j git commit: NO-JIRA: [Java Broker] Refactor AMQP 1.0 protocol tests

NO-JIRA: [Java Broker] Refactor AMQP 1.0 protocol tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/813ecbbc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/813ecbbc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/813ecbbc

Branch: refs/heads/master
Commit: 813ecbbc1e5308b0ac55e1da64fb42a6c6506410
Parents: 92b79cc
Author: Lorenz Quack <lq...@apache.org>
Authored: Wed Jun 21 10:50:34 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed Jun 21 10:51:40 2017 +0100

----------------------------------------------------------------------
 .../tests/protocol/v1_0/FrameTransport.java     | 219 +------
 .../qpid/tests/protocol/v1_0/Interaction.java   | 616 +++++++++++++++++++
 .../apache/qpid/tests/protocol/v1_0/Utils.java  |  71 +--
 .../bindmapjms/TemporaryDestinationTest.java    |  43 +-
 .../soleconn/CloseExistingPolicy.java           | 155 ++---
 .../v1_0/extensions/soleconn/MixedPolicy.java   | 118 ++--
 .../soleconn/RefuseConnectionPolicy.java        | 253 ++++----
 .../extensions/websocket/WebSocketTest.java     |  27 +-
 .../v1_0/messaging/DeleteOnCloseTest.java       | 138 ++---
 .../protocol/v1_0/messaging/MessageFormat.java  |  61 +-
 .../protocol/v1_0/messaging/TransferTest.java   | 371 ++++-------
 .../v1_0/transaction/DischargeTest.java         | 167 ++---
 .../v1_0/transport/ProtocolHeaderTest.java      |  20 +-
 .../v1_0/transport/connection/OpenTest.java     |  84 +--
 .../v1_0/transport/link/AttachTest.java         |  75 +--
 .../protocol/v1_0/transport/link/FlowTest.java  | 116 ++--
 .../v1_0/transport/security/sasl/SaslTest.java  | 169 +++--
 .../v1_0/transport/session/BeginTest.java       |  68 +-
 18 files changed, 1363 insertions(+), 1408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index b72ed4e..9aeb4e9 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -23,18 +23,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
 import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
@@ -53,30 +47,19 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.core.Is;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
 import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class FrameTransport implements AutoCloseable
 {
     public static final long RESPONSE_TIMEOUT = 6000;
-    private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
 
     private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(100);
@@ -87,8 +70,6 @@ public class FrameTransport implements AutoCloseable
 
     private Channel _channel;
     private volatile boolean _channelClosedSeen = false;
-    private int _amqpConnectionId;
-    private short _amqpChannelId;
 
     public FrameTransport(final InetSocketAddress brokerAddress)
     {
@@ -158,7 +139,6 @@ public class FrameTransport implements AutoCloseable
         }
         finally
         {
-            AMQP_CONNECTION_IDS.remove(_amqpConnectionId);
             _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
         }
     }
@@ -166,21 +146,23 @@ public class FrameTransport implements AutoCloseable
     public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
     {
         Preconditions.checkState(_channel != null, "Not connected");
+        ChannelPromise promise = _channel.newPromise();
         ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
         buffer.writeBytes(bytes);
-        ChannelFuture channelFuture = _channel.writeAndFlush(buffer);
-        channelFuture.sync();
-        return JdkFutureAdapters.listenInPoolThread(channelFuture);
+        _channel.write(buffer, promise);
+        _channel.flush();
+        return JdkFutureAdapters.listenInPoolThread(promise);
     }
 
     public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
     {
         Preconditions.checkState(_channel != null, "Not connected");
+        ChannelPromise promise = _channel.newPromise();
         final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null;
         TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
-        ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
-        channelFuture.sync();
-        return JdkFutureAdapters.listenInPoolThread(channelFuture);
+        _channel.write(transportFrame, promise);
+        _channel.flush();
+        return JdkFutureAdapters.listenInPoolThread(promise);
     }
 
     public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
@@ -191,104 +173,17 @@ public class FrameTransport implements AutoCloseable
         return JdkFutureAdapters.listenInPoolThread(channelFuture);
     }
 
-    public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception
-    {
-        return sendPerformative(frameBody, UnsignedShort.valueOf(_amqpChannelId));
-    }
-
-    public ListenableFuture<Void> sendPipelined(final byte[] protocolHeader, final TransportFrame... frames)
-            throws InterruptedException
-    {
-        ChannelPromise promise = _channel.newPromise();
-        if (protocolHeader != null)
-        {
-            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
-            buffer.writeBytes(protocolHeader);
-            _channel.write(buffer);
-        }
-        for (TransportFrame frame : frames)
-        {
-            _channel.write(frame, promise);
-        }
-        _channel.flush();
-        return JdkFutureAdapters.listenInPoolThread(promise);
-    }
-
-    public ListenableFuture<Void> sendPipelined(final TransportFrame... frames) throws InterruptedException
-    {
-        return sendPipelined(null, frames);
-    }
-
     public <T extends Response<?>> T getNextResponse() throws Exception
     {
         return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
     }
 
-    public <R extends Response<?>> R getNextResponse(Class<R> expectedResponseClass) throws Exception
-    {
-        R actualResponse = getNextResponse();
-        if (actualResponse == null)
-        {
-            throw new IllegalStateException(String.format("No response received within timeout %d - expecting %s",
-                                                          RESPONSE_TIMEOUT, expectedResponseClass.getName()));
-        }
-        else if (!expectedResponseClass.isAssignableFrom(actualResponse.getClass()))
-        {
-            throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s",
-                                                          expectedResponseClass.getName(),
-                                                          actualResponse.getClass().getName()));
-        }
-
-        return actualResponse;
-    }
-
-    public <T> T getNextResponseBody(Class<T> expectedFrameBodyClass) throws Exception
-    {
-        Response<T>  response = getNextResponse();
-        T actualFrameBody =  response.getBody();
-
-        if (!expectedFrameBodyClass.isAssignableFrom(actualFrameBody.getClass()))
-        {
-            throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s",
-                                                          expectedFrameBodyClass.getName(),
-                                                          actualFrameBody.getClass().getName()));
-        }
-
-        return actualFrameBody;
-    }
-
-    public void doProtocolNegotiation() throws Exception
-    {
-        byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
-        sendProtocolHeader(bytes);
-        HeaderResponse response = (HeaderResponse) getNextResponse();
-
-        if (!Arrays.equals(bytes, response.getBody()))
-        {
-            throw new IllegalStateException("Unexpected protocol header");
-        }
-    }
-
-    public void doOpenConnection() throws Exception
-    {
-        doProtocolNegotiation();
-        Open open = new Open();
-
-        open.setContainerId(String.format("testContainer-%d", getConnectionId()));
-        sendPerformative(open, UnsignedShort.valueOf((short) 0));
-        PerformativeResponse response = (PerformativeResponse) getNextResponse();
-        if (!(response.getBody() instanceof Open))
-        {
-            throw new IllegalStateException("Unexpected response to connection Open");
-        }
-    }
-
     public void doCloseConnection() throws Exception
     {
         Close close = new Close();
 
         sendPerformative(close, UnsignedShort.valueOf((short) 0));
-        PerformativeResponse response = (PerformativeResponse) getNextResponse();
+        PerformativeResponse response = getNextResponse();
         if (!(response.getBody() instanceof Close))
         {
             throw new IllegalStateException(String.format(
@@ -296,84 +191,6 @@ public class FrameTransport implements AutoCloseable
         }
     }
 
-    public void doBeginSession() throws Exception
-    {
-        doOpenConnection();
-        Begin begin = new Begin();
-        begin.setNextOutgoingId(UnsignedInteger.ZERO);
-        begin.setIncomingWindow(UnsignedInteger.ZERO);
-        begin.setOutgoingWindow(UnsignedInteger.ZERO);
-        _amqpChannelId = (short) 1;
-        sendPerformative(begin, UnsignedShort.valueOf(_amqpChannelId));
-        PerformativeResponse response = (PerformativeResponse) getNextResponse();
-        if (!(response.getBody() instanceof Begin))
-        {
-            throw new IllegalStateException(String.format(
-                    "Unexpected response to connection Begin. Expected Begin got '%s'", response.getBody()));
-        }
-    }
-
-    public void doAttachReceivingLink(String queueName) throws Exception
-    {
-        doAttachReceivingLink(UnsignedInteger.ZERO, queueName);
-    }
-
-    public void doAttachReceivingLink(final UnsignedInteger handle, String queueName) throws Exception
-    {
-        doBeginSession();
-        Role localRole = Role.RECEIVER;
-        Attach attach = new Attach();
-        attach.setName("testReceivingLink");
-        attach.setHandle(handle);
-        attach.setRole(localRole);
-        Source source = new Source();
-        source.setAddress(queueName);
-        attach.setSource(source);
-        Target target = new Target();
-        attach.setTarget(target);
-
-        sendPerformative(attach);
-        PerformativeResponse response = (PerformativeResponse) getNextResponse();
-
-        assertThat(response, is(notNullValue()));
-        assertThat(response.getBody(), is(instanceOf(Attach.class)));
-        Attach responseAttach = (Attach) response.getBody();
-        assertThat(responseAttach.getSource(), is(notNullValue()));
-    }
-
-    public void doAttachSendingLink(final UnsignedInteger handle,
-                                    final String destination) throws Exception
-    {
-        Attach attach = new Attach();
-        attach.setName("testSendingLink");
-        attach.setHandle(handle);
-        attach.setRole(Role.SENDER);
-        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
-        Source source = new Source();
-        attach.setSource(source);
-        Target target = new Target();
-        target.setAddress(destination);
-        attach.setTarget(target);
-        doAttachSendingLink(attach);
-    }
-
-    public void doAttachSendingLink(final Attach attach) throws Exception
-    {
-        doBeginSession();
-
-        sendPerformative(attach);
-        PerformativeResponse response = (PerformativeResponse) getNextResponse();
-
-        assertThat(response, is(notNullValue()));
-        assertThat(response.getBody(), is(instanceOf(Attach.class)));
-        Attach responseAttach = (Attach) response.getBody();
-        assertThat(responseAttach.getTarget(), is(notNullValue()));
-
-        PerformativeResponse flowResponse = (PerformativeResponse) getNextResponse();
-        assertThat(flowResponse, Is.is(CoreMatchers.notNullValue()));
-        assertThat(flowResponse.getBody(), Is.is(CoreMatchers.instanceOf(Flow.class)));
-    }
-
     public void assertNoMoreResponses() throws Exception
     {
         Response response = getNextResponse();
@@ -386,19 +203,6 @@ public class FrameTransport implements AutoCloseable
         assertThat(_channelClosedSeen, is(true));
     }
 
-    private int getConnectionId()
-    {
-        if (_amqpConnectionId == 0)
-        {
-            _amqpConnectionId = 1;
-            while (!AMQP_CONNECTION_IDS.add(_amqpConnectionId))
-            {
-                ++_amqpConnectionId;
-            }
-        }
-        return _amqpConnectionId;
-    }
-
     private static class ChannelClosedResponse implements Response<Void>
     {
         @Override
@@ -413,4 +217,9 @@ public class FrameTransport implements AutoCloseable
             return null;
         }
     }
+
+    public Interaction newInteraction()
+    {
+        return new Interaction(this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
new file mode 100644
index 0000000..eac8770
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+
+public class Interaction
+{
+    private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final Begin _begin;
+    private final Open _open;
+    private final Close _close;
+    private final Attach _attach;
+    private final Detach _detach;
+    private final Flow _flow;
+    private final Transfer _transfer;
+    private final FrameTransport _transport;
+    private final SaslInit _saslInit;
+    private final SaslResponse _saslResponse;
+    private byte[] _protocolHeader;
+    private UnsignedShort _connectionChannel;
+    private UnsignedShort _sessionChannel;
+    private Response<?> _latestResponse;
+    private ListenableFuture<?> _latestFuture;
+
+    Interaction(final FrameTransport frameTransport)
+    {
+        final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO;
+        _transport = frameTransport;
+
+        _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8);
+
+        _saslInit = new SaslInit();
+        _saslResponse = new SaslResponse();
+
+        _open = new Open();
+        _open.setContainerId(getConnectionId());
+        _close = new Close();
+        _connectionChannel = UnsignedShort.valueOf(0);
+
+        _begin = new Begin();
+        _begin.setNextOutgoingId(UnsignedInteger.ZERO);
+        _begin.setIncomingWindow(UnsignedInteger.ZERO);
+        _begin.setOutgoingWindow(UnsignedInteger.ZERO);
+        _sessionChannel = UnsignedShort.valueOf(1);
+
+        _attach = new Attach();
+        _attach.setName("testLink");
+        _attach.setHandle(defaultLinkHandle);
+        _attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        _attach.setSource(new Source());
+        _attach.setTarget(new Target());
+
+        _detach = new Detach();
+        _detach.setHandle(_attach.getHandle());
+
+        _flow = new Flow();
+        _flow.setNextIncomingId(UnsignedInteger.ZERO);
+        _flow.setIncomingWindow(UnsignedInteger.ZERO);
+        _flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        _flow.setOutgoingWindow(UnsignedInteger.ZERO);
+
+        _transfer = new Transfer();
+        _transfer.setHandle(defaultLinkHandle);
+        _transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+        _transfer.setDeliveryId(UnsignedInteger.ZERO);
+    }
+
+    /////////////////////////
+    // Protocol Negotiation //
+    /////////////////////////
+
+    public Interaction protocolHeader(byte[] header)
+    {
+        _protocolHeader = header;
+        return this;
+    }
+
+    public Interaction negotiateProtocol() throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader);
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+        return this;
+    }
+
+    //////////
+    // SASL //
+    //////////
+
+    public Interaction saslMechanism(final Symbol mechanism)
+    {
+        _saslInit.setMechanism(mechanism);
+        return this;
+    }
+
+    public Interaction saslInitialResponse(final Binary initialResponse)
+    {
+        _saslInit.setInitialResponse(initialResponse);
+        return this;
+    }
+
+    public Interaction saslInit() throws Exception
+    {
+        sendPerformativeAndChainFuture(_saslInit);
+        return this;
+    }
+
+    public Interaction saslResponseResponse(Binary response)
+    {
+        _saslResponse.setResponse(response);
+        return this;
+    }
+
+    public Interaction saslResponse() throws Exception
+    {
+        sendPerformativeAndChainFuture(_saslResponse);
+        return this;
+    }
+
+    ////////////////
+    // Connection //
+    ////////////////
+
+    public Interaction connectionChannel(UnsignedShort connectionChannel)
+    {
+        _connectionChannel = connectionChannel;
+        return this;
+    }
+
+    public Interaction openContainerId(String containerId)
+    {
+        _open.setContainerId(containerId);
+        return this;
+    }
+
+    public Interaction openHostname(String hostname)
+    {
+        _open.setHostname(hostname);
+        return this;
+    }
+
+    public Interaction openChannelMax(UnsignedShort channelMax)
+    {
+        _open.setChannelMax(channelMax);
+        return this;
+    }
+
+    public Interaction openDesiredCapabilities(final Symbol... desiredCapabilities)
+    {
+        _open.setDesiredCapabilities(desiredCapabilities);
+        return this;
+    }
+
+    public Interaction openProperties(final Map<Symbol, Object> properties)
+    {
+        _open.setProperties(properties);
+        return this;
+    }
+
+    public Interaction open() throws Exception
+    {
+        sendPerformativeAndChainFuture(_open, _connectionChannel);
+        return this;
+    }
+
+    public Interaction close() throws Exception
+    {
+        sendPerformativeAndChainFuture(_close, _connectionChannel);
+        return this;
+    }
+
+    private String getConnectionId()
+    {
+        int index = 1;
+        String containerId = String.format("testContainer-%d", index);
+        while (CONTAINER_IDS.contains(containerId))
+        {
+            ++index;
+            containerId = String.format("testContainer-%d", index);
+        }
+        CONTAINER_IDS.add(containerId);
+        return containerId;
+    }
+
+    /////////////
+    // Session //
+    /////////////
+
+    public Interaction sessionChannel(UnsignedShort sessionChannel)
+    {
+        _sessionChannel = sessionChannel;
+        return this;
+    }
+
+    public Interaction beginNextOutgoingId(UnsignedInteger nextOutgoingId)
+    {
+        _begin.setNextOutgoingId(nextOutgoingId);
+        return this;
+    }
+
+    public Interaction beginIncomingWindow(UnsignedInteger incomingWindow)
+    {
+        _begin.setIncomingWindow(incomingWindow);
+        return this;
+    }
+
+    public Interaction beginOutgoingWindow(UnsignedInteger outgoingWindow)
+    {
+        _begin.setOutgoingWindow(outgoingWindow);
+        return this;
+    }
+
+    public Interaction begin() throws Exception
+    {
+        sendPerformativeAndChainFuture(_begin, _sessionChannel);
+        return this;
+    }
+
+    //////////
+    // Link //
+    //////////
+
+
+    public Interaction attachName(String linkName)
+    {
+        _attach.setName(linkName);
+        return this;
+    }
+
+    public Interaction attachRole(Role role)
+    {
+        _attach.setRole(role);
+        return this;
+    }
+
+    public Interaction attachHandle(UnsignedInteger handle)
+    {
+        _attach.setHandle(handle);
+        _detach.setHandle(handle);
+        return this;
+    }
+
+    public Interaction attachInitialDeliveryCount(UnsignedInteger initialDeliveryCount)
+    {
+        _attach.setInitialDeliveryCount(initialDeliveryCount);
+        return this;
+    }
+
+    public Interaction attachRcvSettleMode(final ReceiverSettleMode rcvSettleMode)
+    {
+        _attach.setRcvSettleMode(rcvSettleMode);
+        return this;
+    }
+
+    public Interaction attachSource(Source source)
+    {
+        _attach.setSource(source);
+        return this;
+    }
+
+    public Interaction attachTarget(BaseTarget target)
+    {
+        _attach.setTarget(target);
+        return this;
+    }
+
+    public Interaction attachSourceAddress(String address)
+    {
+        Source source = (Source) _attach.getSource();
+        source.setAddress(address);
+        _attach.setSource(source);
+        return this;
+    }
+
+    public Interaction attachSourceOutcomes(final Symbol... outcomes)
+    {
+        Source source = ((Source) _attach.getSource());
+        source.setOutcomes(outcomes);
+        _attach.setSource(source);
+        return this;
+    }
+
+    public Interaction attachTargetAddress(final String address)
+    {
+        final Target target = ((Target) _attach.getTarget());
+        target.setAddress(address);
+        _attach.setTarget(target);
+        return this;
+    }
+
+    public Interaction attach() throws Exception
+    {
+        sendPerformativeAndChainFuture(_attach, _sessionChannel);
+        return this;
+    }
+
+    public Interaction detachClose(Boolean close)
+    {
+        _detach.setClosed(close);
+        return this;
+    }
+
+    public Interaction detach() throws Exception
+    {
+        sendPerformativeAndChainFuture(_detach, _sessionChannel);
+        return this;
+    }
+
+    //////////
+    // FLow //
+    //////////
+
+    public Interaction flowIncomingWindow(final UnsignedInteger incomingWindow)
+    {
+        _flow.setIncomingWindow(incomingWindow);
+        return this;
+    }
+
+    public Interaction flowNextIncomingId(final UnsignedInteger nextIncomingId)
+    {
+        _flow.setNextIncomingId(nextIncomingId);
+        return this;
+    }
+
+    public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow)
+    {
+        _flow.setOutgoingWindow(outgoingWindow);
+        return this;
+    }
+
+    public Interaction flowNextOutgoingId(final UnsignedInteger nextNextOutgoingId)
+    {
+        _flow.setNextOutgoingId(nextNextOutgoingId);
+        return this;
+    }
+
+    public Interaction flowEcho(final Boolean echo)
+    {
+        _flow.setEcho(echo);
+        return this;
+    }
+
+    public Interaction flowHandle(final UnsignedInteger handle)
+    {
+        _flow.setHandle(handle);
+        return this;
+    }
+
+    public Interaction flowAvailable(final UnsignedInteger available)
+    {
+        _flow.setAvailable(available);
+        return this;
+    }
+
+    public Interaction flowDeliveryCount(final UnsignedInteger deliveryCount)
+    {
+        _flow.setDeliveryCount(deliveryCount);
+        return this;
+    }
+
+    public Interaction flowLinkCredit(final UnsignedInteger linkCredit)
+    {
+        _flow.setLinkCredit(linkCredit);
+        return this;
+    }
+
+    public Interaction flowDrain(final Boolean drain)
+    {
+        _flow.setDrain(drain);
+        return this;
+    }
+
+    public Interaction flow() throws Exception
+    {
+        sendPerformativeAndChainFuture(_flow, _sessionChannel);
+        return this;
+    }
+
+    //////////////
+    // Transfer //
+    //////////////
+
+    public Interaction transferHandle(UnsignedInteger transferHandle)
+    {
+        _transfer.setHandle(transferHandle);
+        return this;
+    }
+
+    public Interaction transferDeliveryTag(final Binary deliveryTag)
+    {
+        _transfer.setDeliveryTag(deliveryTag);
+        return this;
+    }
+
+    public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
+    {
+        _transfer.setDeliveryId(deliveryId);
+        return this;
+    }
+
+    public Interaction transferRcvSettleMode(final ReceiverSettleMode receiverSettleMode)
+    {
+        _transfer.setRcvSettleMode(receiverSettleMode);
+        return this;
+    }
+
+    public Interaction transferMore(final Boolean more)
+    {
+        _transfer.setMore(more);
+        return this;
+    }
+
+    public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
+    {
+        _transfer.setMessageFormat(messageFormat);
+        return this;
+    }
+
+    public Interaction transferPayload(final List<QpidByteBuffer> payload)
+    {
+        _transfer.setPayload(payload);
+        return this;
+    }
+
+    public Interaction transferPayloadData(final Object payload)
+    {
+        AmqpValue amqpValue = new AmqpValue(payload);
+        final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
+        final List<QpidByteBuffer> encodedForm = section.getEncodedForm();
+        _transfer.setPayload(encodedForm);
+
+        section.dispose();
+        for (QpidByteBuffer qbb : encodedForm)
+        {
+            qbb.dispose();
+        }
+        return this;
+    }
+
+    public Interaction transferSettled(final Boolean settled)
+    {
+        _transfer.setSettled(settled);
+        return this;
+    }
+
+    public Interaction transfer() throws Exception
+    {
+        sendPerformativeAndChainFuture(_transfer, _sessionChannel);
+        return this;
+    }
+
+    //////////
+    // misc //
+    //////////
+
+    public Interaction sendPerformative(final FrameBody frameBody,
+                                        final UnsignedShort channel) throws Exception
+    {
+        sendPerformativeAndChainFuture(frameBody, channel);
+        return this;
+    }
+
+    public Interaction sendPerformative(final SaslFrameBody saslFrameBody) throws Exception
+    {
+        sendPerformativeAndChainFuture(saslFrameBody);
+        return this;
+    }
+
+    private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+    }
+
+    private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
+    {
+        final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel);
+        if (_latestFuture != null)
+        {
+            _latestFuture = allAsList(_latestFuture, future);
+        }
+        else
+        {
+            _latestFuture = future;
+        }
+    }
+
+    public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception
+    {
+        sync();
+        _latestResponse = _transport.getNextResponse();
+        final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+        if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+            || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+        {
+            return this;
+        }
+        acceptableResponseClasses.remove(null);
+        for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+        {
+            if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+            {
+                return this;
+            }
+        }
+        throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+                                                      acceptableResponseClasses,
+                                                      _latestResponse == null ? null : _latestResponse.getBody()));
+    }
+
+    public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException
+    {
+        if (_latestFuture != null)
+        {
+            _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+            _latestFuture = null;
+        }
+        return this;
+    }
+
+    public Response<?> getLatestResponse() throws Exception
+    {
+        sync();
+        return _latestResponse;
+    }
+
+    public <T> T getLatestResponse(Class<T> type) throws Exception
+    {
+        sync();
+        if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+        {
+            throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+                                                          type.getSimpleName(),
+                                                          _latestResponse.getBody()));
+        }
+
+        return (T) _latestResponse.getBody();
+    }
+
+    public Interaction flowHandleFromLinkHandle()
+    {
+        _flow.setHandle(_attach.getHandle());
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 5d4c52f..bfe05b8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -20,21 +20,15 @@
 
 package org.apache.qpid.tests.protocol.v1_0;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 import java.net.InetSocketAddress;
 
-import org.hamcrest.core.Is;
-
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
@@ -45,39 +39,25 @@ public class Utils
     {
         try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
         {
-            transport.doBeginSession();
-
+            final Interaction interaction = transport.newInteraction();
+            final Attach attachValidationResponse = interaction.negotiateProtocol().consumeResponse()
+                                                               .open().consumeResponse()
+                                                               .begin().consumeResponse()
+                                                               .attachName("validationAttach")
+                                                               .attachRole(Role.RECEIVER)
+                                                               .attachSourceAddress(nodeAddress)
+                                                               .attach().consumeResponse()
+                                                               .getLatestResponse(Attach.class);
             final boolean queueExists;
-            Attach validationAttach = new Attach();
-            validationAttach.setName("validationAttach");
-            validationAttach.setHandle(UnsignedInteger.ZERO);
-            validationAttach.setRole(Role.RECEIVER);
-            Source validationSource = new Source();
-            validationSource.setAddress(nodeAddress);
-            validationAttach.setSource(validationSource);
-            validationAttach.setTarget(new Target());
-            transport.sendPerformative(validationAttach);
-            PerformativeResponse validationResponse = (PerformativeResponse) transport.getNextResponse();
-            assertThat(validationResponse, is(notNullValue()));
-            assertThat(validationResponse.getBody(), is(instanceOf(Attach.class)));
-            final Attach attachValidationResponse = (Attach) validationResponse.getBody();
             if (attachValidationResponse.getSource() != null)
             {
                 queueExists = true;
-                Detach validationDetach = new Detach();
-                validationDetach.setHandle(validationAttach.getHandle());
-                validationDetach.setClosed(true);
-                transport.sendPerformative(validationDetach);
-                PerformativeResponse validationDetachResponse = (PerformativeResponse) transport.getNextResponse();
-                assertThat(validationDetachResponse, is(notNullValue()));
-                assertThat(validationDetachResponse.getBody(), is(instanceOf(Detach.class)));
+                interaction.detachClose(true).detach().consumeResponse().getLatestResponse(Detach.class);
             }
             else
             {
                 queueExists = false;
-                PerformativeResponse validationDetachResponse = (PerformativeResponse) transport.getNextResponse();
-                assertThat(validationDetachResponse, is(notNullValue()));
-                assertThat(validationDetachResponse.getBody(), is(instanceOf(Detach.class)));
+                interaction.consumeResponse().getLatestResponse(Detach.class);
             }
             return queueExists;
         }
@@ -88,25 +68,26 @@ public class Utils
     {
         try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
         {
-            transport.doAttachReceivingLink(queueName);
-            Flow flow = new Flow();
-            flow.setIncomingWindow(UnsignedInteger.ONE);
-            flow.setNextIncomingId(UnsignedInteger.ZERO);
-            flow.setOutgoingWindow(UnsignedInteger.ZERO);
-            flow.setNextOutgoingId(UnsignedInteger.ZERO);
-            flow.setHandle(UnsignedInteger.ZERO);
-            flow.setLinkCredit(UnsignedInteger.ONE);
-
-            transport.sendPerformative(flow);
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(queueName)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow();
 
             MessageDecoder messageDecoder = new MessageDecoder();
             boolean hasMore;
             do
             {
-                PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
-                assertThat(response, Is.is(notNullValue()));
-                assertThat(response.getBody(), Is.is(instanceOf(Transfer.class)));
-                Transfer responseTransfer = (Transfer) response.getBody();
+                Transfer responseTransfer = interaction.consumeResponse().getLatestResponse(Transfer.class);
                 messageDecoder.addTransfer(responseTransfer);
                 hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
             }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index e9a6957..1a1c8bf 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -20,7 +20,6 @@
 
 package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -33,17 +32,17 @@ import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.Session_1_0;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
@@ -84,29 +83,21 @@ public class TemporaryDestinationTest extends ProtocolTestBase
 
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            transport.doBeginSession();
-
-            Attach attach = new Attach();
-            attach.setName("testSendingLink");
-            attach.setHandle(UnsignedInteger.ZERO);
-            attach.setRole(Role.SENDER);
-            attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
-
-            attach.setSource(new Source());
-
             Target target = new Target();
             target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
             target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
             target.setDynamic(true);
             target.setCapabilities(targetCapabilities);
-            attach.setTarget(target);
 
-            transport.sendPerformative(attach);
+            final Interaction interaction = transport.newInteraction();
+            final Attach attachResponse = interaction.negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse(Open.class)
+                                                     .begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.SENDER)
+                                                     .attachTarget(target)
+                                                     .attach().consumeResponse()
+                                                     .getLatestResponse(Attach.class);
 
-            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
-            assertThat(response, is(notNullValue()));
-            assertThat(response.getBody(), is(instanceOf(Attach.class)));
-            final Attach attachResponse = (Attach) response.getBody();
             assertThat(attachResponse.getSource(), is(notNullValue()));
             assertThat(attachResponse.getTarget(), is(notNullValue()));
 
@@ -115,19 +106,11 @@ public class TemporaryDestinationTest extends ProtocolTestBase
 
             assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(true));
 
-            final PerformativeResponse flowResponse = ((PerformativeResponse) transport.getNextResponse());
-            if (flowResponse != null)
-            {
-                assertThat(flowResponse.getBody(), is(instanceOf(Flow.class)));
-            }
+            interaction.consumeResponse().getLatestResponse(Flow.class);
 
             transport.doCloseConnection();
         }
 
-        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
-        {
-            transport.doBeginSession();
-            assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
-        }
+        assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
index 6d8007a..83303ad 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
@@ -20,12 +20,11 @@
 
 package org.apache.qpid.tests.protocol.v1_0.extensions.soleconn;
 
-import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER;
 import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY;
 import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;
+import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER;
 import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.CLOSE_EXISTING;
 import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -48,7 +47,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
 
 public class CloseExistingPolicy extends ProtocolTestBase
@@ -66,19 +65,16 @@ public class CloseExistingPolicy extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            transport.doProtocolNegotiation();
-            Open open = new Open();
-            open.setContainerId("testContainerId");
-            open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-            open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                        CLOSE_EXISTING));
-
-            transport.sendPerformative(open);
-            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
-
-            assertThat(response, is(notNullValue()));
-            assertThat(response.getBody(), is(instanceOf(Open.class)));
-            Open responseOpen = (Open) response.getBody();
+            Open responseOpen = transport.newInteraction()
+                                         .negotiateProtocol().consumeResponse()
+                                         .openContainerId("testContainerId")
+                                         .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                                         .openProperties(Collections.singletonMap(
+                                                 SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                 CLOSE_EXISTING))
+                                         .open().consumeResponse()
+                                         .getLatestResponse(Open.class);
+
             assertThat(Arrays.asList(responseOpen.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
             if (responseOpen.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
             {
@@ -94,42 +90,30 @@ public class CloseExistingPolicy extends ProtocolTestBase
     {
         try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
         {
-            transport1.doProtocolNegotiation();
-            Open open = new Open();
-            open.setContainerId("testContainerId");
-            open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-            open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                        CLOSE_EXISTING));
-
-            transport1.sendPerformative(open);
-            PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
-            assertThat(response, is(notNullValue()));
-            assertThat(response.getBody(), is(instanceOf(Open.class)));
+            final Interaction interaction1 = transport1.newInteraction();
+            interaction1.negotiateProtocol().consumeResponse()
+                        .openContainerId("testContainerId")
+                        .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                        .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                                 CLOSE_EXISTING))
+                        .open().consumeResponse(Open.class);
 
             try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
             {
-                transport2.doProtocolNegotiation();
-                Open open2 = new Open();
-                open2.setContainerId("testContainerId");
-                open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-                open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                            CLOSE_EXISTING));
-
-                transport2.sendPerformative(open2);
-
-                final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse();
-                assertThat(closeResponse1, is(notNullValue()));
-                assertThat(closeResponse1.getBody(), is(instanceOf(Close.class)));
-                Close close1 = (Close) closeResponse1.getBody();
+                final Interaction interaction2 = transport2.newInteraction();
+                interaction2.negotiateProtocol().consumeResponse()
+                            .openContainerId("testContainerId")
+                            .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                            .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                                     CLOSE_EXISTING))
+                            .open();
+
+                final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
                 assertThat(close1.getError(), is(notNullValue()));
                 assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
                 assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true))));
 
-                PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse();
-                assertThat(response2, is(notNullValue()));
-                assertThat(response2.getBody(), is(instanceOf(Open.class)));
-                Open responseOpen2 = (Open) response2.getBody();
+                final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class);
                 assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
                 if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
                 {
@@ -147,40 +131,28 @@ public class CloseExistingPolicy extends ProtocolTestBase
     {
         try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
         {
-            transport1.doProtocolNegotiation();
-            Open open = new Open();
-            open.setContainerId("testContainerId");
+            final Interaction interaction1 = transport1.newInteraction();
             // Omit setting the desired capability to test weak detection
-
-            transport1.sendPerformative(open);
-            PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
-            assertThat(response, is(notNullValue()));
-            assertThat(response.getBody(), is(instanceOf(Open.class)));
+            interaction1.negotiateProtocol().consumeResponse()
+                        .openContainerId("testContainerId")
+                        .open().consumeResponse(Open.class);
 
             try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
             {
-                transport2.doProtocolNegotiation();
-                Open open2 = new Open();
-                open2.setContainerId("testContainerId");
-                open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-                open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                             CLOSE_EXISTING));
-
-                transport2.sendPerformative(open2);
-
-                final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse();
-                assertThat(closeResponse1, is(notNullValue()));
-                assertThat(closeResponse1.getBody(), is(instanceOf(Close.class)));
-                Close close1 = (Close) closeResponse1.getBody();
+                final Interaction interaction2 = transport2.newInteraction();
+                interaction2.negotiateProtocol().consumeResponse()
+                            .openContainerId("testContainerId")
+                            .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                            .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                                     CLOSE_EXISTING))
+                            .open();
+
+                final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
                 assertThat(close1.getError(), is(notNullValue()));
                 assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
                 assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true))));
 
-                PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse();
-                assertThat(response2, is(notNullValue()));
-                assertThat(response2.getBody(), is(instanceOf(Open.class)));
-                Open responseOpen2 = (Open) response2.getBody();
+                final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class);
                 assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
                 if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
                 {
@@ -197,19 +169,15 @@ public class CloseExistingPolicy extends ProtocolTestBase
     {
         try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
         {
-            transport1.doProtocolNegotiation();
-            Open open = new Open();
-            open.setContainerId("testContainerId");
-            open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-            open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                        CLOSE_EXISTING));
-
-            transport1.sendPerformative(open);
-            PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
-            assertThat(response, is(notNullValue()));
-            assertThat(response.getBody(), is(instanceOf(Open.class)));
-            Open responseOpen = (Open) response.getBody();
+            final Interaction interaction1 = transport1.newInteraction();
+            Open responseOpen = interaction1.negotiateProtocol().consumeResponse()
+                                            .openContainerId("testContainerId")
+                                            .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                                            .openProperties(Collections.singletonMap(
+                                                    SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                    CLOSE_EXISTING))
+                                            .open().consumeResponse()
+                                            .getLatestResponse(Open.class);
             assertThat(Arrays.asList(responseOpen.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
             if (responseOpen.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
             {
@@ -219,25 +187,18 @@ public class CloseExistingPolicy extends ProtocolTestBase
 
             try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
             {
-                transport2.doProtocolNegotiation();
-                Open open2 = new Open();
-                open2.setContainerId("testContainerId");
+                final Interaction interaction2 = transport2.newInteraction();
                 // Omit setting the desired capability to test strong detection
+                interaction2.negotiateProtocol().consumeResponse()
+                            .openContainerId("testContainerId")
+                            .open().sync();
 
-                transport2.sendPerformative(open2);
-
-                final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse();
-                assertThat(closeResponse1, is(notNullValue()));
-                assertThat(closeResponse1.getBody(), is(instanceOf(Close.class)));
-                Close close1 = (Close) closeResponse1.getBody();
+                final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
                 assertThat(close1.getError(), is(notNullValue()));
                 assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
                 assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true))));
 
-                PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse();
-                assertThat(response2, is(notNullValue()));
-                assertThat(response2.getBody(), is(instanceOf(Open.class)));
-                Open responseOpen2 = (Open) response2.getBody();
+                final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class);
                 assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
                 if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
                 {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
index c579bf8..d11e2fc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
@@ -20,14 +20,10 @@
 
 package org.apache.qpid.tests.protocol.v1_0.extensions.soleconn;
 
-import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER;
 import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;
+import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER;
 import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.CLOSE_EXISTING;
 import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.REFUSE_CONNECTION;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -35,12 +31,11 @@ import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
 
 public class MixedPolicy extends ProtocolTestBase
@@ -58,55 +53,39 @@ public class MixedPolicy extends ProtocolTestBase
     {
         try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
         {
-            transport1.doProtocolNegotiation();
-            Open open = new Open();
-            open.setContainerId("testContainerId");
-            open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-            open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                        CLOSE_EXISTING));
-
-            transport1.sendPerformative(open);
-            PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
-            assertThat(response, is(notNullValue()));
-            assertThat(response.getBody(), is(instanceOf(Open.class)));
+            final Interaction interaction1 = transport1.newInteraction();
+            interaction1.negotiateProtocol().consumeResponse()
+                        .openContainerId("testContainerId")
+                        .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                        .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                                 CLOSE_EXISTING))
+                        .open().consumeResponse(Open.class);
 
             try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
             {
-                transport2.doProtocolNegotiation();
-                Open open2 = new Open();
-                open2.setContainerId("testContainerId");
-                open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-                open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                             REFUSE_CONNECTION));
-
-                transport2.sendPerformative(open2);
+                final Interaction interaction2 = transport2.newInteraction();
+                interaction2.negotiateProtocol().consumeResponse()
+                            .openContainerId("testContainerId")
+                            .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                            .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                                     REFUSE_CONNECTION))
+                            .open().sync();
 
-                final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse();
-                assertThat(closeResponse1, is(notNullValue()));
-                assertThat(closeResponse1.getBody(), is(instanceOf(Close.class)));
+                interaction1.consumeResponse(Close.class);
 
-                PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse();
-                assertThat(response2, is(notNullValue()));
-                assertThat(response2.getBody(), is(instanceOf(Open.class)));
+                interaction2.consumeResponse(Open.class);
 
                 try (FrameTransport transport3 = new FrameTransport(_brokerAddress).connect())
                 {
-                    transport3.doProtocolNegotiation();
-                    Open open3 = new Open();
-                    open3.setContainerId("testContainerId");
-                    open3.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-                    open3.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                                 CLOSE_EXISTING));
-
-                    transport3.sendPerformative(open3);
-
-                    PerformativeResponse closeResponse3 = (PerformativeResponse) transport3.getNextResponse();
-                    assertThat(closeResponse3, is(notNullValue()));
-                    assertThat(closeResponse3.getBody(), is(instanceOf(Open.class)));
-                    PerformativeResponse closeResponse3b = (PerformativeResponse) transport3.getNextResponse();
-                    assertThat(closeResponse3b, is(notNullValue()));
-                    assertThat(closeResponse3b.getBody(), is(instanceOf(Close.class)));
+                    final Interaction interaction3 = transport3.newInteraction();
+                    interaction3.negotiateProtocol().consumeResponse()
+                                .openContainerId("testContainerId")
+                                .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                                .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                                         CLOSE_EXISTING))
+                                .open()
+                                .consumeResponse(Open.class)
+                                .consumeResponse(Close.class);
                 }
             }
         }
@@ -117,36 +96,25 @@ public class MixedPolicy extends ProtocolTestBase
     {
         try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
         {
-            transport1.doProtocolNegotiation();
-            Open open = new Open();
-            open.setContainerId("testContainerId");
-            open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-            open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                        REFUSE_CONNECTION));
-
-            transport1.sendPerformative(open);
-            PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
-            assertThat(response, is(notNullValue()));
-            assertThat(response.getBody(), is(instanceOf(Open.class)));
+            final Interaction interaction1 = transport1.newInteraction();
+            interaction1.negotiateProtocol().consumeResponse()
+                        .openContainerId("testContainerId")
+                        .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                        .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                                 REFUSE_CONNECTION))
+                        .open().consumeResponse(Open.class);
 
             try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
             {
-                transport2.doProtocolNegotiation();
-                Open open2 = new Open();
-                open2.setContainerId("testContainerId");
-                open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
-                open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
-                                                             CLOSE_EXISTING));
-
-                transport2.sendPerformative(open2);
-
-                final PerformativeResponse openResponse2 = (PerformativeResponse) transport2.getNextResponse();
-                assertThat(openResponse2, is(notNullValue()));
-                assertThat(openResponse2.getBody(), is(instanceOf(Open.class)));
-                final PerformativeResponse closeResponse2 = (PerformativeResponse) transport2.getNextResponse();
-                assertThat(closeResponse2, is(notNullValue()));
-                assertThat(closeResponse2.getBody(), is(instanceOf(Close.class)));
+                final Interaction interaction2 = transport2.newInteraction();
+                interaction2.negotiateProtocol().consumeResponse()
+                            .openContainerId("testContainerId")
+                            .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+                            .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+                                                                     CLOSE_EXISTING))
+                            .open()
+                            .consumeResponse(Open.class)
+                            .consumeResponse(Close.class);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org