You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/06/21 10:00:48 UTC
[3/3] qpid-broker-j git commit: NO-JIRA: [Java Broker] Refactor AMQP
1.0 protocol tests
NO-JIRA: [Java Broker] Refactor AMQP 1.0 protocol tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/813ecbbc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/813ecbbc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/813ecbbc
Branch: refs/heads/master
Commit: 813ecbbc1e5308b0ac55e1da64fb42a6c6506410
Parents: 92b79cc
Author: Lorenz Quack <lq...@apache.org>
Authored: Wed Jun 21 10:50:34 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed Jun 21 10:51:40 2017 +0100
----------------------------------------------------------------------
.../tests/protocol/v1_0/FrameTransport.java | 219 +------
.../qpid/tests/protocol/v1_0/Interaction.java | 616 +++++++++++++++++++
.../apache/qpid/tests/protocol/v1_0/Utils.java | 71 +--
.../bindmapjms/TemporaryDestinationTest.java | 43 +-
.../soleconn/CloseExistingPolicy.java | 155 ++---
.../v1_0/extensions/soleconn/MixedPolicy.java | 118 ++--
.../soleconn/RefuseConnectionPolicy.java | 253 ++++----
.../extensions/websocket/WebSocketTest.java | 27 +-
.../v1_0/messaging/DeleteOnCloseTest.java | 138 ++---
.../protocol/v1_0/messaging/MessageFormat.java | 61 +-
.../protocol/v1_0/messaging/TransferTest.java | 371 ++++-------
.../v1_0/transaction/DischargeTest.java | 167 ++---
.../v1_0/transport/ProtocolHeaderTest.java | 20 +-
.../v1_0/transport/connection/OpenTest.java | 84 +--
.../v1_0/transport/link/AttachTest.java | 75 +--
.../protocol/v1_0/transport/link/FlowTest.java | 116 ++--
.../v1_0/transport/security/sasl/SaslTest.java | 169 +++--
.../v1_0/transport/session/BeginTest.java | 68 +-
18 files changed, 1363 insertions(+), 1408 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index b72ed4e..9aeb4e9 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -23,18 +23,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
@@ -53,30 +47,19 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.core.Is;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class FrameTransport implements AutoCloseable
{
public static final long RESPONSE_TIMEOUT = 6000;
- private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(100);
@@ -87,8 +70,6 @@ public class FrameTransport implements AutoCloseable
private Channel _channel;
private volatile boolean _channelClosedSeen = false;
- private int _amqpConnectionId;
- private short _amqpChannelId;
public FrameTransport(final InetSocketAddress brokerAddress)
{
@@ -158,7 +139,6 @@ public class FrameTransport implements AutoCloseable
}
finally
{
- AMQP_CONNECTION_IDS.remove(_amqpConnectionId);
_workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
}
}
@@ -166,21 +146,23 @@ public class FrameTransport implements AutoCloseable
public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
{
Preconditions.checkState(_channel != null, "Not connected");
+ ChannelPromise promise = _channel.newPromise();
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(bytes);
- ChannelFuture channelFuture = _channel.writeAndFlush(buffer);
- channelFuture.sync();
- return JdkFutureAdapters.listenInPoolThread(channelFuture);
+ _channel.write(buffer, promise);
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
}
public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
{
Preconditions.checkState(_channel != null, "Not connected");
+ ChannelPromise promise = _channel.newPromise();
final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null;
TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
- ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
- channelFuture.sync();
- return JdkFutureAdapters.listenInPoolThread(channelFuture);
+ _channel.write(transportFrame, promise);
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
}
public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
@@ -191,104 +173,17 @@ public class FrameTransport implements AutoCloseable
return JdkFutureAdapters.listenInPoolThread(channelFuture);
}
- public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception
- {
- return sendPerformative(frameBody, UnsignedShort.valueOf(_amqpChannelId));
- }
-
- public ListenableFuture<Void> sendPipelined(final byte[] protocolHeader, final TransportFrame... frames)
- throws InterruptedException
- {
- ChannelPromise promise = _channel.newPromise();
- if (protocolHeader != null)
- {
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(protocolHeader);
- _channel.write(buffer);
- }
- for (TransportFrame frame : frames)
- {
- _channel.write(frame, promise);
- }
- _channel.flush();
- return JdkFutureAdapters.listenInPoolThread(promise);
- }
-
- public ListenableFuture<Void> sendPipelined(final TransportFrame... frames) throws InterruptedException
- {
- return sendPipelined(null, frames);
- }
-
public <T extends Response<?>> T getNextResponse() throws Exception
{
return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
}
- public <R extends Response<?>> R getNextResponse(Class<R> expectedResponseClass) throws Exception
- {
- R actualResponse = getNextResponse();
- if (actualResponse == null)
- {
- throw new IllegalStateException(String.format("No response received within timeout %d - expecting %s",
- RESPONSE_TIMEOUT, expectedResponseClass.getName()));
- }
- else if (!expectedResponseClass.isAssignableFrom(actualResponse.getClass()))
- {
- throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s",
- expectedResponseClass.getName(),
- actualResponse.getClass().getName()));
- }
-
- return actualResponse;
- }
-
- public <T> T getNextResponseBody(Class<T> expectedFrameBodyClass) throws Exception
- {
- Response<T> response = getNextResponse();
- T actualFrameBody = response.getBody();
-
- if (!expectedFrameBodyClass.isAssignableFrom(actualFrameBody.getClass()))
- {
- throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s",
- expectedFrameBodyClass.getName(),
- actualFrameBody.getClass().getName()));
- }
-
- return actualFrameBody;
- }
-
- public void doProtocolNegotiation() throws Exception
- {
- byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
- sendProtocolHeader(bytes);
- HeaderResponse response = (HeaderResponse) getNextResponse();
-
- if (!Arrays.equals(bytes, response.getBody()))
- {
- throw new IllegalStateException("Unexpected protocol header");
- }
- }
-
- public void doOpenConnection() throws Exception
- {
- doProtocolNegotiation();
- Open open = new Open();
-
- open.setContainerId(String.format("testContainer-%d", getConnectionId()));
- sendPerformative(open, UnsignedShort.valueOf((short) 0));
- PerformativeResponse response = (PerformativeResponse) getNextResponse();
- if (!(response.getBody() instanceof Open))
- {
- throw new IllegalStateException("Unexpected response to connection Open");
- }
- }
-
public void doCloseConnection() throws Exception
{
Close close = new Close();
sendPerformative(close, UnsignedShort.valueOf((short) 0));
- PerformativeResponse response = (PerformativeResponse) getNextResponse();
+ PerformativeResponse response = getNextResponse();
if (!(response.getBody() instanceof Close))
{
throw new IllegalStateException(String.format(
@@ -296,84 +191,6 @@ public class FrameTransport implements AutoCloseable
}
}
- public void doBeginSession() throws Exception
- {
- doOpenConnection();
- Begin begin = new Begin();
- begin.setNextOutgoingId(UnsignedInteger.ZERO);
- begin.setIncomingWindow(UnsignedInteger.ZERO);
- begin.setOutgoingWindow(UnsignedInteger.ZERO);
- _amqpChannelId = (short) 1;
- sendPerformative(begin, UnsignedShort.valueOf(_amqpChannelId));
- PerformativeResponse response = (PerformativeResponse) getNextResponse();
- if (!(response.getBody() instanceof Begin))
- {
- throw new IllegalStateException(String.format(
- "Unexpected response to connection Begin. Expected Begin got '%s'", response.getBody()));
- }
- }
-
- public void doAttachReceivingLink(String queueName) throws Exception
- {
- doAttachReceivingLink(UnsignedInteger.ZERO, queueName);
- }
-
- public void doAttachReceivingLink(final UnsignedInteger handle, String queueName) throws Exception
- {
- doBeginSession();
- Role localRole = Role.RECEIVER;
- Attach attach = new Attach();
- attach.setName("testReceivingLink");
- attach.setHandle(handle);
- attach.setRole(localRole);
- Source source = new Source();
- source.setAddress(queueName);
- attach.setSource(source);
- Target target = new Target();
- attach.setTarget(target);
-
- sendPerformative(attach);
- PerformativeResponse response = (PerformativeResponse) getNextResponse();
-
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Attach.class)));
- Attach responseAttach = (Attach) response.getBody();
- assertThat(responseAttach.getSource(), is(notNullValue()));
- }
-
- public void doAttachSendingLink(final UnsignedInteger handle,
- final String destination) throws Exception
- {
- Attach attach = new Attach();
- attach.setName("testSendingLink");
- attach.setHandle(handle);
- attach.setRole(Role.SENDER);
- attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
- Source source = new Source();
- attach.setSource(source);
- Target target = new Target();
- target.setAddress(destination);
- attach.setTarget(target);
- doAttachSendingLink(attach);
- }
-
- public void doAttachSendingLink(final Attach attach) throws Exception
- {
- doBeginSession();
-
- sendPerformative(attach);
- PerformativeResponse response = (PerformativeResponse) getNextResponse();
-
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Attach.class)));
- Attach responseAttach = (Attach) response.getBody();
- assertThat(responseAttach.getTarget(), is(notNullValue()));
-
- PerformativeResponse flowResponse = (PerformativeResponse) getNextResponse();
- assertThat(flowResponse, Is.is(CoreMatchers.notNullValue()));
- assertThat(flowResponse.getBody(), Is.is(CoreMatchers.instanceOf(Flow.class)));
- }
-
public void assertNoMoreResponses() throws Exception
{
Response response = getNextResponse();
@@ -386,19 +203,6 @@ public class FrameTransport implements AutoCloseable
assertThat(_channelClosedSeen, is(true));
}
- private int getConnectionId()
- {
- if (_amqpConnectionId == 0)
- {
- _amqpConnectionId = 1;
- while (!AMQP_CONNECTION_IDS.add(_amqpConnectionId))
- {
- ++_amqpConnectionId;
- }
- }
- return _amqpConnectionId;
- }
-
private static class ChannelClosedResponse implements Response<Void>
{
@Override
@@ -413,4 +217,9 @@ public class FrameTransport implements AutoCloseable
return null;
}
}
+
+ public Interaction newInteraction()
+ {
+ return new Interaction(this);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
new file mode 100644
index 0000000..eac8770
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+
+public class Interaction
+{
+ private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
+ private final Begin _begin;
+ private final Open _open;
+ private final Close _close;
+ private final Attach _attach;
+ private final Detach _detach;
+ private final Flow _flow;
+ private final Transfer _transfer;
+ private final FrameTransport _transport;
+ private final SaslInit _saslInit;
+ private final SaslResponse _saslResponse;
+ private byte[] _protocolHeader;
+ private UnsignedShort _connectionChannel;
+ private UnsignedShort _sessionChannel;
+ private Response<?> _latestResponse;
+ private ListenableFuture<?> _latestFuture;
+
+ Interaction(final FrameTransport frameTransport)
+ {
+ final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO;
+ _transport = frameTransport;
+
+ _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8);
+
+ _saslInit = new SaslInit();
+ _saslResponse = new SaslResponse();
+
+ _open = new Open();
+ _open.setContainerId(getConnectionId());
+ _close = new Close();
+ _connectionChannel = UnsignedShort.valueOf(0);
+
+ _begin = new Begin();
+ _begin.setNextOutgoingId(UnsignedInteger.ZERO);
+ _begin.setIncomingWindow(UnsignedInteger.ZERO);
+ _begin.setOutgoingWindow(UnsignedInteger.ZERO);
+ _sessionChannel = UnsignedShort.valueOf(1);
+
+ _attach = new Attach();
+ _attach.setName("testLink");
+ _attach.setHandle(defaultLinkHandle);
+ _attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ _attach.setSource(new Source());
+ _attach.setTarget(new Target());
+
+ _detach = new Detach();
+ _detach.setHandle(_attach.getHandle());
+
+ _flow = new Flow();
+ _flow.setNextIncomingId(UnsignedInteger.ZERO);
+ _flow.setIncomingWindow(UnsignedInteger.ZERO);
+ _flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ _flow.setOutgoingWindow(UnsignedInteger.ZERO);
+
+ _transfer = new Transfer();
+ _transfer.setHandle(defaultLinkHandle);
+ _transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+ _transfer.setDeliveryId(UnsignedInteger.ZERO);
+ }
+
+ /////////////////////////
+ // Protocol Negotiation //
+ /////////////////////////
+
+ public Interaction protocolHeader(byte[] header)
+ {
+ _protocolHeader = header;
+ return this;
+ }
+
+ public Interaction negotiateProtocol() throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader);
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return this;
+ }
+
+ //////////
+ // SASL //
+ //////////
+
+ public Interaction saslMechanism(final Symbol mechanism)
+ {
+ _saslInit.setMechanism(mechanism);
+ return this;
+ }
+
+ public Interaction saslInitialResponse(final Binary initialResponse)
+ {
+ _saslInit.setInitialResponse(initialResponse);
+ return this;
+ }
+
+ public Interaction saslInit() throws Exception
+ {
+ sendPerformativeAndChainFuture(_saslInit);
+ return this;
+ }
+
+ public Interaction saslResponseResponse(Binary response)
+ {
+ _saslResponse.setResponse(response);
+ return this;
+ }
+
+ public Interaction saslResponse() throws Exception
+ {
+ sendPerformativeAndChainFuture(_saslResponse);
+ return this;
+ }
+
+ ////////////////
+ // Connection //
+ ////////////////
+
+ public Interaction connectionChannel(UnsignedShort connectionChannel)
+ {
+ _connectionChannel = connectionChannel;
+ return this;
+ }
+
+ public Interaction openContainerId(String containerId)
+ {
+ _open.setContainerId(containerId);
+ return this;
+ }
+
+ public Interaction openHostname(String hostname)
+ {
+ _open.setHostname(hostname);
+ return this;
+ }
+
+ public Interaction openChannelMax(UnsignedShort channelMax)
+ {
+ _open.setChannelMax(channelMax);
+ return this;
+ }
+
+ public Interaction openDesiredCapabilities(final Symbol... desiredCapabilities)
+ {
+ _open.setDesiredCapabilities(desiredCapabilities);
+ return this;
+ }
+
+ public Interaction openProperties(final Map<Symbol, Object> properties)
+ {
+ _open.setProperties(properties);
+ return this;
+ }
+
+ public Interaction open() throws Exception
+ {
+ sendPerformativeAndChainFuture(_open, _connectionChannel);
+ return this;
+ }
+
+ public Interaction close() throws Exception
+ {
+ sendPerformativeAndChainFuture(_close, _connectionChannel);
+ return this;
+ }
+
+ private String getConnectionId()
+ {
+ int index = 1;
+ String containerId = String.format("testContainer-%d", index);
+ while (CONTAINER_IDS.contains(containerId))
+ {
+ ++index;
+ containerId = String.format("testContainer-%d", index);
+ }
+ CONTAINER_IDS.add(containerId);
+ return containerId;
+ }
+
+ /////////////
+ // Session //
+ /////////////
+
+ public Interaction sessionChannel(UnsignedShort sessionChannel)
+ {
+ _sessionChannel = sessionChannel;
+ return this;
+ }
+
+ public Interaction beginNextOutgoingId(UnsignedInteger nextOutgoingId)
+ {
+ _begin.setNextOutgoingId(nextOutgoingId);
+ return this;
+ }
+
+ public Interaction beginIncomingWindow(UnsignedInteger incomingWindow)
+ {
+ _begin.setIncomingWindow(incomingWindow);
+ return this;
+ }
+
+ public Interaction beginOutgoingWindow(UnsignedInteger outgoingWindow)
+ {
+ _begin.setOutgoingWindow(outgoingWindow);
+ return this;
+ }
+
+ public Interaction begin() throws Exception
+ {
+ sendPerformativeAndChainFuture(_begin, _sessionChannel);
+ return this;
+ }
+
+ //////////
+ // Link //
+ //////////
+
+
+ public Interaction attachName(String linkName)
+ {
+ _attach.setName(linkName);
+ return this;
+ }
+
+ public Interaction attachRole(Role role)
+ {
+ _attach.setRole(role);
+ return this;
+ }
+
+ public Interaction attachHandle(UnsignedInteger handle)
+ {
+ _attach.setHandle(handle);
+ _detach.setHandle(handle);
+ return this;
+ }
+
+ public Interaction attachInitialDeliveryCount(UnsignedInteger initialDeliveryCount)
+ {
+ _attach.setInitialDeliveryCount(initialDeliveryCount);
+ return this;
+ }
+
+ public Interaction attachRcvSettleMode(final ReceiverSettleMode rcvSettleMode)
+ {
+ _attach.setRcvSettleMode(rcvSettleMode);
+ return this;
+ }
+
+ public Interaction attachSource(Source source)
+ {
+ _attach.setSource(source);
+ return this;
+ }
+
+ public Interaction attachTarget(BaseTarget target)
+ {
+ _attach.setTarget(target);
+ return this;
+ }
+
+ public Interaction attachSourceAddress(String address)
+ {
+ Source source = (Source) _attach.getSource();
+ source.setAddress(address);
+ _attach.setSource(source);
+ return this;
+ }
+
+ public Interaction attachSourceOutcomes(final Symbol... outcomes)
+ {
+ Source source = ((Source) _attach.getSource());
+ source.setOutcomes(outcomes);
+ _attach.setSource(source);
+ return this;
+ }
+
+ public Interaction attachTargetAddress(final String address)
+ {
+ final Target target = ((Target) _attach.getTarget());
+ target.setAddress(address);
+ _attach.setTarget(target);
+ return this;
+ }
+
+ public Interaction attach() throws Exception
+ {
+ sendPerformativeAndChainFuture(_attach, _sessionChannel);
+ return this;
+ }
+
+ public Interaction detachClose(Boolean close)
+ {
+ _detach.setClosed(close);
+ return this;
+ }
+
+ public Interaction detach() throws Exception
+ {
+ sendPerformativeAndChainFuture(_detach, _sessionChannel);
+ return this;
+ }
+
+ //////////
+ // FLow //
+ //////////
+
+ public Interaction flowIncomingWindow(final UnsignedInteger incomingWindow)
+ {
+ _flow.setIncomingWindow(incomingWindow);
+ return this;
+ }
+
+ public Interaction flowNextIncomingId(final UnsignedInteger nextIncomingId)
+ {
+ _flow.setNextIncomingId(nextIncomingId);
+ return this;
+ }
+
+ public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow)
+ {
+ _flow.setOutgoingWindow(outgoingWindow);
+ return this;
+ }
+
+ public Interaction flowNextOutgoingId(final UnsignedInteger nextNextOutgoingId)
+ {
+ _flow.setNextOutgoingId(nextNextOutgoingId);
+ return this;
+ }
+
+ public Interaction flowEcho(final Boolean echo)
+ {
+ _flow.setEcho(echo);
+ return this;
+ }
+
+ public Interaction flowHandle(final UnsignedInteger handle)
+ {
+ _flow.setHandle(handle);
+ return this;
+ }
+
+ public Interaction flowAvailable(final UnsignedInteger available)
+ {
+ _flow.setAvailable(available);
+ return this;
+ }
+
+ public Interaction flowDeliveryCount(final UnsignedInteger deliveryCount)
+ {
+ _flow.setDeliveryCount(deliveryCount);
+ return this;
+ }
+
+ public Interaction flowLinkCredit(final UnsignedInteger linkCredit)
+ {
+ _flow.setLinkCredit(linkCredit);
+ return this;
+ }
+
+ public Interaction flowDrain(final Boolean drain)
+ {
+ _flow.setDrain(drain);
+ return this;
+ }
+
+ public Interaction flow() throws Exception
+ {
+ sendPerformativeAndChainFuture(_flow, _sessionChannel);
+ return this;
+ }
+
+ //////////////
+ // Transfer //
+ //////////////
+
+ public Interaction transferHandle(UnsignedInteger transferHandle)
+ {
+ _transfer.setHandle(transferHandle);
+ return this;
+ }
+
+ public Interaction transferDeliveryTag(final Binary deliveryTag)
+ {
+ _transfer.setDeliveryTag(deliveryTag);
+ return this;
+ }
+
+ public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
+ {
+ _transfer.setDeliveryId(deliveryId);
+ return this;
+ }
+
+ public Interaction transferRcvSettleMode(final ReceiverSettleMode receiverSettleMode)
+ {
+ _transfer.setRcvSettleMode(receiverSettleMode);
+ return this;
+ }
+
+ public Interaction transferMore(final Boolean more)
+ {
+ _transfer.setMore(more);
+ return this;
+ }
+
+ public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
+ {
+ _transfer.setMessageFormat(messageFormat);
+ return this;
+ }
+
+ public Interaction transferPayload(final List<QpidByteBuffer> payload)
+ {
+ _transfer.setPayload(payload);
+ return this;
+ }
+
+ public Interaction transferPayloadData(final Object payload)
+ {
+ AmqpValue amqpValue = new AmqpValue(payload);
+ final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
+ final List<QpidByteBuffer> encodedForm = section.getEncodedForm();
+ _transfer.setPayload(encodedForm);
+
+ section.dispose();
+ for (QpidByteBuffer qbb : encodedForm)
+ {
+ qbb.dispose();
+ }
+ return this;
+ }
+
+ public Interaction transferSettled(final Boolean settled)
+ {
+ _transfer.setSettled(settled);
+ return this;
+ }
+
+ public Interaction transfer() throws Exception
+ {
+ sendPerformativeAndChainFuture(_transfer, _sessionChannel);
+ return this;
+ }
+
+ //////////
+ // misc //
+ //////////
+
+ public Interaction sendPerformative(final FrameBody frameBody,
+ final UnsignedShort channel) throws Exception
+ {
+ sendPerformativeAndChainFuture(frameBody, channel);
+ return this;
+ }
+
+ public Interaction sendPerformative(final SaslFrameBody saslFrameBody) throws Exception
+ {
+ sendPerformativeAndChainFuture(saslFrameBody);
+ return this;
+ }
+
+ private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ }
+
+ private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel);
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ }
+
+ public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception
+ {
+ sync();
+ _latestResponse = _transport.getNextResponse();
+ final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+ if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+ || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+ {
+ return this;
+ }
+ acceptableResponseClasses.remove(null);
+ for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ {
+ if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ return this;
+ }
+ }
+ throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+ acceptableResponseClasses,
+ _latestResponse == null ? null : _latestResponse.getBody()));
+ }
+
+ public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ if (_latestFuture != null)
+ {
+ _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ _latestFuture = null;
+ }
+ return this;
+ }
+
+ public Response<?> getLatestResponse() throws Exception
+ {
+ sync();
+ return _latestResponse;
+ }
+
+ public <T> T getLatestResponse(Class<T> type) throws Exception
+ {
+ sync();
+ if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+ type.getSimpleName(),
+ _latestResponse.getBody()));
+ }
+
+ return (T) _latestResponse.getBody();
+ }
+
+ public Interaction flowHandleFromLinkHandle()
+ {
+ _flow.setHandle(_attach.getHandle());
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 5d4c52f..bfe05b8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -20,21 +20,15 @@
package org.apache.qpid.tests.protocol.v1_0;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import java.net.InetSocketAddress;
-import org.hamcrest.core.Is;
-
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -45,39 +39,25 @@ public class Utils
{
try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
{
- transport.doBeginSession();
-
+ final Interaction interaction = transport.newInteraction();
+ final Attach attachValidationResponse = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachName("validationAttach")
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(nodeAddress)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
final boolean queueExists;
- Attach validationAttach = new Attach();
- validationAttach.setName("validationAttach");
- validationAttach.setHandle(UnsignedInteger.ZERO);
- validationAttach.setRole(Role.RECEIVER);
- Source validationSource = new Source();
- validationSource.setAddress(nodeAddress);
- validationAttach.setSource(validationSource);
- validationAttach.setTarget(new Target());
- transport.sendPerformative(validationAttach);
- PerformativeResponse validationResponse = (PerformativeResponse) transport.getNextResponse();
- assertThat(validationResponse, is(notNullValue()));
- assertThat(validationResponse.getBody(), is(instanceOf(Attach.class)));
- final Attach attachValidationResponse = (Attach) validationResponse.getBody();
if (attachValidationResponse.getSource() != null)
{
queueExists = true;
- Detach validationDetach = new Detach();
- validationDetach.setHandle(validationAttach.getHandle());
- validationDetach.setClosed(true);
- transport.sendPerformative(validationDetach);
- PerformativeResponse validationDetachResponse = (PerformativeResponse) transport.getNextResponse();
- assertThat(validationDetachResponse, is(notNullValue()));
- assertThat(validationDetachResponse.getBody(), is(instanceOf(Detach.class)));
+ interaction.detachClose(true).detach().consumeResponse().getLatestResponse(Detach.class);
}
else
{
queueExists = false;
- PerformativeResponse validationDetachResponse = (PerformativeResponse) transport.getNextResponse();
- assertThat(validationDetachResponse, is(notNullValue()));
- assertThat(validationDetachResponse.getBody(), is(instanceOf(Detach.class)));
+ interaction.consumeResponse().getLatestResponse(Detach.class);
}
return queueExists;
}
@@ -88,25 +68,26 @@ public class Utils
{
try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
{
- transport.doAttachReceivingLink(queueName);
- Flow flow = new Flow();
- flow.setIncomingWindow(UnsignedInteger.ONE);
- flow.setNextIncomingId(UnsignedInteger.ZERO);
- flow.setOutgoingWindow(UnsignedInteger.ZERO);
- flow.setNextOutgoingId(UnsignedInteger.ZERO);
- flow.setHandle(UnsignedInteger.ZERO);
- flow.setLinkCredit(UnsignedInteger.ONE);
-
- transport.sendPerformative(flow);
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(queueName)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow();
MessageDecoder messageDecoder = new MessageDecoder();
boolean hasMore;
do
{
- PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
- assertThat(response, Is.is(notNullValue()));
- assertThat(response.getBody(), Is.is(instanceOf(Transfer.class)));
- Transfer responseTransfer = (Transfer) response.getBody();
+ Transfer responseTransfer = interaction.consumeResponse().getLatestResponse(Transfer.class);
messageDecoder.addTransfer(responseTransfer);
hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index e9a6957..1a1c8bf 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -20,7 +20,6 @@
package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -33,17 +32,17 @@ import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.Session_1_0;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
@@ -84,29 +83,21 @@ public class TemporaryDestinationTest extends ProtocolTestBase
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- transport.doBeginSession();
-
- Attach attach = new Attach();
- attach.setName("testSendingLink");
- attach.setHandle(UnsignedInteger.ZERO);
- attach.setRole(Role.SENDER);
- attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
-
- attach.setSource(new Source());
-
Target target = new Target();
target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
target.setDynamic(true);
target.setCapabilities(targetCapabilities);
- attach.setTarget(target);
- transport.sendPerformative(attach);
+ final Interaction interaction = transport.newInteraction();
+ final Attach attachResponse = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTarget(target)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
- PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Attach.class)));
- final Attach attachResponse = (Attach) response.getBody();
assertThat(attachResponse.getSource(), is(notNullValue()));
assertThat(attachResponse.getTarget(), is(notNullValue()));
@@ -115,19 +106,11 @@ public class TemporaryDestinationTest extends ProtocolTestBase
assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(true));
- final PerformativeResponse flowResponse = ((PerformativeResponse) transport.getNextResponse());
- if (flowResponse != null)
- {
- assertThat(flowResponse.getBody(), is(instanceOf(Flow.class)));
- }
+ interaction.consumeResponse().getLatestResponse(Flow.class);
transport.doCloseConnection();
}
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
- {
- transport.doBeginSession();
- assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
- }
+ assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
index 6d8007a..83303ad 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
@@ -20,12 +20,11 @@
package org.apache.qpid.tests.protocol.v1_0.extensions.soleconn;
-import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER;
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_DETECTION_POLICY;
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;
+import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER;
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.CLOSE_EXISTING;
import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -48,7 +47,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
public class CloseExistingPolicy extends ProtocolTestBase
@@ -66,19 +65,16 @@ public class CloseExistingPolicy extends ProtocolTestBase
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- transport.doProtocolNegotiation();
- Open open = new Open();
- open.setContainerId("testContainerId");
- open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- CLOSE_EXISTING));
-
- transport.sendPerformative(open);
- PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
-
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Open.class)));
- Open responseOpen = (Open) response.getBody();
+ Open responseOpen = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(
+ SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ CLOSE_EXISTING))
+ .open().consumeResponse()
+ .getLatestResponse(Open.class);
+
assertThat(Arrays.asList(responseOpen.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
if (responseOpen.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
{
@@ -94,42 +90,30 @@ public class CloseExistingPolicy extends ProtocolTestBase
{
try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
- transport1.doProtocolNegotiation();
- Open open = new Open();
- open.setContainerId("testContainerId");
- open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- CLOSE_EXISTING));
-
- transport1.sendPerformative(open);
- PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Open.class)));
+ final Interaction interaction1 = transport1.newInteraction();
+ interaction1.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ CLOSE_EXISTING))
+ .open().consumeResponse(Open.class);
try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
- transport2.doProtocolNegotiation();
- Open open2 = new Open();
- open2.setContainerId("testContainerId");
- open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- CLOSE_EXISTING));
-
- transport2.sendPerformative(open2);
-
- final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse();
- assertThat(closeResponse1, is(notNullValue()));
- assertThat(closeResponse1.getBody(), is(instanceOf(Close.class)));
- Close close1 = (Close) closeResponse1.getBody();
+ final Interaction interaction2 = transport2.newInteraction();
+ interaction2.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ CLOSE_EXISTING))
+ .open();
+
+ final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
assertThat(close1.getError(), is(notNullValue()));
assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true))));
- PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse();
- assertThat(response2, is(notNullValue()));
- assertThat(response2.getBody(), is(instanceOf(Open.class)));
- Open responseOpen2 = (Open) response2.getBody();
+ final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class);
assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
{
@@ -147,40 +131,28 @@ public class CloseExistingPolicy extends ProtocolTestBase
{
try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
- transport1.doProtocolNegotiation();
- Open open = new Open();
- open.setContainerId("testContainerId");
+ final Interaction interaction1 = transport1.newInteraction();
// Omit setting the desired capability to test weak detection
-
- transport1.sendPerformative(open);
- PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Open.class)));
+ interaction1.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .open().consumeResponse(Open.class);
try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
- transport2.doProtocolNegotiation();
- Open open2 = new Open();
- open2.setContainerId("testContainerId");
- open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- CLOSE_EXISTING));
-
- transport2.sendPerformative(open2);
-
- final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse();
- assertThat(closeResponse1, is(notNullValue()));
- assertThat(closeResponse1.getBody(), is(instanceOf(Close.class)));
- Close close1 = (Close) closeResponse1.getBody();
+ final Interaction interaction2 = transport2.newInteraction();
+ interaction2.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ CLOSE_EXISTING))
+ .open();
+
+ final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
assertThat(close1.getError(), is(notNullValue()));
assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true))));
- PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse();
- assertThat(response2, is(notNullValue()));
- assertThat(response2.getBody(), is(instanceOf(Open.class)));
- Open responseOpen2 = (Open) response2.getBody();
+ final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class);
assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
{
@@ -197,19 +169,15 @@ public class CloseExistingPolicy extends ProtocolTestBase
{
try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
- transport1.doProtocolNegotiation();
- Open open = new Open();
- open.setContainerId("testContainerId");
- open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- CLOSE_EXISTING));
-
- transport1.sendPerformative(open);
- PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Open.class)));
- Open responseOpen = (Open) response.getBody();
+ final Interaction interaction1 = transport1.newInteraction();
+ Open responseOpen = interaction1.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(
+ SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ CLOSE_EXISTING))
+ .open().consumeResponse()
+ .getLatestResponse(Open.class);
assertThat(Arrays.asList(responseOpen.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
if (responseOpen.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
{
@@ -219,25 +187,18 @@ public class CloseExistingPolicy extends ProtocolTestBase
try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
- transport2.doProtocolNegotiation();
- Open open2 = new Open();
- open2.setContainerId("testContainerId");
+ final Interaction interaction2 = transport2.newInteraction();
// Omit setting the desired capability to test strong detection
+ interaction2.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .open().sync();
- transport2.sendPerformative(open2);
-
- final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse();
- assertThat(closeResponse1, is(notNullValue()));
- assertThat(closeResponse1.getBody(), is(instanceOf(Close.class)));
- Close close1 = (Close) closeResponse1.getBody();
+ final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
assertThat(close1.getError(), is(notNullValue()));
assertThat(close1.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
assertThat(close1.getError().getInfo(), is(equalTo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true))));
- PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse();
- assertThat(response2, is(notNullValue()));
- assertThat(response2.getBody(), is(instanceOf(Open.class)));
- Open responseOpen2 = (Open) response2.getBody();
+ final Open responseOpen2 = interaction2.consumeResponse().getLatestResponse(Open.class);
assertThat(Arrays.asList(responseOpen2.getOfferedCapabilities()), hasItem(SOLE_CONNECTION_FOR_CONTAINER));
if (responseOpen2.getProperties().containsKey(SOLE_CONNECTION_DETECTION_POLICY))
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/813ecbbc/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
index c579bf8..d11e2fc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
@@ -20,14 +20,10 @@
package org.apache.qpid.tests.protocol.v1_0.extensions.soleconn;
-import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER;
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_ENFORCEMENT_POLICY;
+import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties.SOLE_CONNECTION_FOR_CONTAINER;
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.CLOSE_EXISTING;
import static org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy.REFUSE_CONNECTION;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
import java.net.InetSocketAddress;
import java.util.Collections;
@@ -35,12 +31,11 @@ import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
public class MixedPolicy extends ProtocolTestBase
@@ -58,55 +53,39 @@ public class MixedPolicy extends ProtocolTestBase
{
try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
- transport1.doProtocolNegotiation();
- Open open = new Open();
- open.setContainerId("testContainerId");
- open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- CLOSE_EXISTING));
-
- transport1.sendPerformative(open);
- PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Open.class)));
+ final Interaction interaction1 = transport1.newInteraction();
+ interaction1.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ CLOSE_EXISTING))
+ .open().consumeResponse(Open.class);
try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
- transport2.doProtocolNegotiation();
- Open open2 = new Open();
- open2.setContainerId("testContainerId");
- open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- REFUSE_CONNECTION));
-
- transport2.sendPerformative(open2);
+ final Interaction interaction2 = transport2.newInteraction();
+ interaction2.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ REFUSE_CONNECTION))
+ .open().sync();
- final PerformativeResponse closeResponse1 = (PerformativeResponse) transport1.getNextResponse();
- assertThat(closeResponse1, is(notNullValue()));
- assertThat(closeResponse1.getBody(), is(instanceOf(Close.class)));
+ interaction1.consumeResponse(Close.class);
- PerformativeResponse response2 = (PerformativeResponse) transport2.getNextResponse();
- assertThat(response2, is(notNullValue()));
- assertThat(response2.getBody(), is(instanceOf(Open.class)));
+ interaction2.consumeResponse(Open.class);
try (FrameTransport transport3 = new FrameTransport(_brokerAddress).connect())
{
- transport3.doProtocolNegotiation();
- Open open3 = new Open();
- open3.setContainerId("testContainerId");
- open3.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open3.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- CLOSE_EXISTING));
-
- transport3.sendPerformative(open3);
-
- PerformativeResponse closeResponse3 = (PerformativeResponse) transport3.getNextResponse();
- assertThat(closeResponse3, is(notNullValue()));
- assertThat(closeResponse3.getBody(), is(instanceOf(Open.class)));
- PerformativeResponse closeResponse3b = (PerformativeResponse) transport3.getNextResponse();
- assertThat(closeResponse3b, is(notNullValue()));
- assertThat(closeResponse3b.getBody(), is(instanceOf(Close.class)));
+ final Interaction interaction3 = transport3.newInteraction();
+ interaction3.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ CLOSE_EXISTING))
+ .open()
+ .consumeResponse(Open.class)
+ .consumeResponse(Close.class);
}
}
}
@@ -117,36 +96,25 @@ public class MixedPolicy extends ProtocolTestBase
{
try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
- transport1.doProtocolNegotiation();
- Open open = new Open();
- open.setContainerId("testContainerId");
- open.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- REFUSE_CONNECTION));
-
- transport1.sendPerformative(open);
- PerformativeResponse response = (PerformativeResponse) transport1.getNextResponse();
-
- assertThat(response, is(notNullValue()));
- assertThat(response.getBody(), is(instanceOf(Open.class)));
+ final Interaction interaction1 = transport1.newInteraction();
+ interaction1.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ REFUSE_CONNECTION))
+ .open().consumeResponse(Open.class);
try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
- transport2.doProtocolNegotiation();
- Open open2 = new Open();
- open2.setContainerId("testContainerId");
- open2.setDesiredCapabilities(new Symbol[]{SOLE_CONNECTION_FOR_CONTAINER});
- open2.setProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
- CLOSE_EXISTING));
-
- transport2.sendPerformative(open2);
-
- final PerformativeResponse openResponse2 = (PerformativeResponse) transport2.getNextResponse();
- assertThat(openResponse2, is(notNullValue()));
- assertThat(openResponse2.getBody(), is(instanceOf(Open.class)));
- final PerformativeResponse closeResponse2 = (PerformativeResponse) transport2.getNextResponse();
- assertThat(closeResponse2, is(notNullValue()));
- assertThat(closeResponse2.getBody(), is(instanceOf(Close.class)));
+ final Interaction interaction2 = transport2.newInteraction();
+ interaction2.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
+ .openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
+ CLOSE_EXISTING))
+ .open()
+ .consumeResponse(Open.class)
+ .consumeResponse(Close.class);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org