You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/09/03 12:08:14 UTC
[qpid-broker-j] 01/04: QPID-8349: [Tests][AMQP 1.0] Make protocol
tests for 0-9 consistent with protocol tests for 1.0
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 4130a71d17e3b28784d9f2af99b8fc30a0dc4afa
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Mon Sep 2 16:40:14 2019 +0100
QPID-8349: [Tests][AMQP 1.0] Make protocol tests for 0-9 consistent with protocol tests for 1.0
---
.../tests/protocol/v0_8/ConnectionInteraction.java | 5 +
.../qpid/tests/protocol/v0_8/FrameTransport.java | 28 +++-
.../qpid/tests/protocol/v0_8/Interaction.java | 72 ++++++++--
.../apache/qpid/tests/protocol/v0_8/BasicTest.java | 63 ++++-----
.../qpid/tests/protocol/v0_8/ChannelTest.java | 19 +--
.../qpid/tests/protocol/v0_8/ConnectionTest.java | 111 ++++-----------
.../qpid/tests/protocol/v0_8/ExchangeTest.java | 69 ++++------
.../qpid/tests/protocol/v0_8/LargeHeadersTest.java | 11 +-
.../qpid/tests/protocol/v0_8/LargeMessageTest.java | 14 +-
.../qpid/tests/protocol/v0_8/ProtocolTest.java | 18 +--
.../apache/qpid/tests/protocol/v0_8/QueueTest.java | 153 ++++++++++-----------
.../qpid/tests/protocol/v0_8/TransactionTest.java | 58 +++-----
.../authtimeout/AuthenticationTimeoutTest.java | 5 +-
.../v0_8/extension/basic/MalformedMessage.java | 19 ++-
.../basic/MalformedMessageValidation.java | 7 +-
.../extension/confirms/PublisherConfirmsTest.java | 24 ++--
.../v0_8/extension/exchange/ExchangeTest.java | 18 +--
.../extension/maxsize/MaximumMessageSizeTest.java | 7 +-
.../protocoltimeout/ProtocolHeaderTimeoutTest.java | 19 +--
.../protocol/v0_8/extension/queue/QueueTest.java | 17 +--
.../transactiontimeout/TransactionTimeoutTest.java | 11 +-
.../v0_8/extension/tx/AsyncTransactionTest.java | 14 +-
22 files changed, 325 insertions(+), 437 deletions(-)
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
index bc3bc24..d8256e1 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
@@ -140,4 +140,9 @@ public class ConnectionInteraction
{
return _interaction.sendPerformative(_closeOkBody);
}
+
+ public Interaction interaction()
+ {
+ return _interaction;
+ }
}
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
index cee4c0f..54982fe 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
@@ -27,21 +27,34 @@ import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+import org.apache.qpid.tests.utils.BrokerAdmin;
public class FrameTransport extends AbstractFrameTransport<Interaction>
{
private final byte[] _protocolHeader;
private ProtocolVersion _protocolVersion;
+ private final BrokerAdmin.PortType _portType;
+ private final BrokerAdmin _brokerAdmin;
- public FrameTransport(final InetSocketAddress brokerAddress)
+ public FrameTransport(final BrokerAdmin brokerAdmin)
{
- this(brokerAddress, Protocol.AMQP_0_9_1);
+ this(brokerAdmin, getPortType(brokerAdmin), Protocol.AMQP_0_9_1);
}
- FrameTransport(final InetSocketAddress brokerAddress, Protocol protocol)
+ public FrameTransport(final BrokerAdmin brokerAdmin, final BrokerAdmin.PortType portType)
{
- super(brokerAddress, new FrameDecoder(getProtocolVersion(protocol)), new FrameEncoder());
+ this(brokerAdmin, portType, Protocol.AMQP_0_9_1);
+ }
+
+
+ FrameTransport(final BrokerAdmin brokerAdmin,
+ final BrokerAdmin.PortType portType,
+ final Protocol protocol)
+ {
+ super(brokerAdmin.getBrokerAddress(portType), new FrameDecoder(getProtocolVersion(protocol)), new FrameEncoder());
+ _portType = portType;
+ _brokerAdmin = brokerAdmin;
_protocolVersion = getProtocolVersion(protocol);
byte[] protocolHeader = null;
for(ProtocolEngineCreator installedEngine : (new QpidServiceLoader()).instancesOf(ProtocolEngineCreator.class))
@@ -68,7 +81,7 @@ public class FrameTransport extends AbstractFrameTransport<Interaction>
public Interaction newInteraction()
{
- return new Interaction(this);
+ return new Interaction(this, _brokerAdmin, _portType);
}
public byte[] getProtocolHeader()
@@ -100,4 +113,9 @@ public class FrameTransport extends AbstractFrameTransport<Interaction>
}
return protocolVersion;
}
+
+ private static BrokerAdmin.PortType getPortType(final BrokerAdmin brokerAdmin)
+ {
+ return brokerAdmin.isAnonymousSupported() ? BrokerAdmin.PortType.ANONYMOUS_AMQP : BrokerAdmin.PortType.AMQP;
+ }
}
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index 0a4dbff..5e073ab 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
-import java.nio.charset.StandardCharsets;
+import static java.nio.charset.StandardCharsets.US_ASCII;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.server.security.auth.manager.AbstractScramAuthenticationManager.PLAIN;
+
import java.util.Arrays;
+import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.ProtocolVersion;
@@ -29,14 +33,19 @@ import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.tests.protocol.AbstractInteraction;
import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.utils.BrokerAdmin;
public class Interaction extends AbstractInteraction<Interaction>
{
+ private final BrokerAdmin _brokerAdmin;
+ private final BrokerAdmin.PortType _portType;
private byte[] _protocolHeader;
private int _channelId;
private int _maximumPayloadSize = 512;
@@ -47,7 +56,7 @@ public class Interaction extends AbstractInteraction<Interaction>
private TxInteraction _txInteraction;
private ExchangeInteraction _exchangeInteraction;
- Interaction(final FrameTransport transport)
+ Interaction(final FrameTransport transport, final BrokerAdmin brokerAdmin, BrokerAdmin.PortType portType)
{
super(transport);
_connectionInteraction = new ConnectionInteraction(this);
@@ -57,6 +66,8 @@ public class Interaction extends AbstractInteraction<Interaction>
_txInteraction = new TxInteraction(this);
_exchangeInteraction = new ExchangeInteraction(this);
_protocolHeader = getTransport().getProtocolHeader();
+ _brokerAdmin = brokerAdmin;
+ _portType = portType;
}
@Override
@@ -95,12 +106,57 @@ public class Interaction extends AbstractInteraction<Interaction>
return this;
}
- public Interaction openAnonymousConnection() throws Exception
+ public Interaction negotiateOpen() throws Exception
+ {
+ authenticateConnection().connection().tuneOk()
+ .connection().open().consumeResponse(ConnectionOpenOkBody.class);
+ return this;
+ }
+
+ public Interaction authenticateConnection() throws Exception
+ {
+ if (_portType == BrokerAdmin.PortType.ANONYMOUS_AMQP || _portType == BrokerAdmin.PortType.ANONYMOUS_AMQPWS)
+ {
+ authenticateAnonymous();
+ }
+ else
+ {
+ final ConnectionStartBody start = negotiateProtocol().consumeResponse()
+ .getLatestResponse(ConnectionStartBody.class);
+ final String mechanisms = start.getMechanisms() == null ? "" : new String(start.getMechanisms(), US_ASCII);
+ final List<String> supportedMechanisms = Arrays.asList(mechanisms.split(" "));
+
+ if (supportedMechanisms.stream().noneMatch(m -> m.equalsIgnoreCase(PLAIN)))
+ {
+ if (supportedMechanisms.stream()
+ .noneMatch(m -> m.equalsIgnoreCase(AnonymousAuthenticationManager.MECHANISM_NAME)))
+ {
+ throw new IllegalStateException(String.format(
+ "PLAIN or ANONYMOUS SASL mechanism is not listed among supported '%s'", mechanisms));
+ }
+ else
+ {
+ authenticateAnonymous();
+ }
+ }
+ else
+ {
+ final byte[] initialResponse = String.format("\0%s\0%s",
+ _brokerAdmin.getValidUsername(),
+ _brokerAdmin.getValidPassword())
+ .getBytes(US_ASCII);
+ this.connection().startOkMechanism(PLAIN).startOk().consumeResponse(ConnectionSecureBody.class)
+ .connection().secureOk(initialResponse).consumeResponse(ConnectionTuneBody.class);
+ }
+ }
+ return this;
+ }
+
+ private void authenticateAnonymous() throws Exception
{
- return this.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism("ANONYMOUS").startOk().consumeResponse(ConnectionTuneBody.class)
- .connection().tuneOk()
- .connection().open().consumeResponse(ConnectionOpenOkBody.class);
+ this.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism(AnonymousAuthenticationManager.MECHANISM_NAME).startOk()
+ .consumeResponse(ConnectionTuneBody.class);
}
@@ -178,6 +234,6 @@ public class Interaction extends AbstractInteraction<Interaction>
byte[] contentData = new byte[payload.remaining()];
payload.get(contentData);
payload.dispose();
- return new String(contentData, StandardCharsets.UTF_8);
+ return new String(contentData, UTF_8);
}
}
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index f497f05..8c04613 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -27,7 +27,6 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
@@ -62,12 +61,10 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class BasicTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -75,10 +72,10 @@ public class BasicTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.8.3.7", description = "publish a message")
public void publishMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().contentHeaderPropertiesContentType("text/plain")
.contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
@@ -99,10 +96,10 @@ public class BasicTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.8.3.7", description = "indicate mandatory routing")
public void publishMandatoryMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().publishExchange("")
.publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
@@ -123,11 +120,11 @@ public class BasicTest extends BrokerAdminUsingTestBase
+ "Return method.")
public void publishUnrouteableMandatoryMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String messageContent = "Test";
- BasicReturnBody returned = interaction.openAnonymousConnection()
+ BasicReturnBody returned = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().publishExchange("")
.publishRoutingKey("unrouteable")
@@ -152,10 +149,10 @@ public class BasicTest extends BrokerAdminUsingTestBase
+ "queue. If this flag is zero, the server silently drops the message.")
public void publishUnrouteableMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().publishExchange("")
.publishRoutingKey("unrouteable")
@@ -178,10 +175,10 @@ public class BasicTest extends BrokerAdminUsingTestBase
byte deliveryMode = (byte) 2;
Map<String, Object> messageHeaders = Collections.singletonMap("test", "testValue");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(queueName).declareDurable(true).declare()
.consumeResponse(QueueDeclareOkBody.class)
@@ -202,11 +199,11 @@ public class BasicTest extends BrokerAdminUsingTestBase
getBrokerAdmin().restart();
assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(1)));
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().getQueueName(queueName).getNoAck(true).get()
@@ -235,7 +232,7 @@ public class BasicTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.8.3.3", description = "start a queue consumer")
public void consumeMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String messageContent = "Test";
@@ -245,7 +242,7 @@ public class BasicTest extends BrokerAdminUsingTestBase
String messageContentType = "text/plain";
byte deliveryMode = (byte) 1;
byte priority = (byte) 2;
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().qosPrefetchCount(1)
@@ -309,13 +306,13 @@ public class BasicTest extends BrokerAdminUsingTestBase
+ "Note current broker behaviour is spec incompliant: broker ignores not valid delivery tags")
public void ackWithInvalidDeliveryTag() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String consumerTag = "A";
final long deliveryTag = 12345L;
String queueName = BrokerAdmin.TEST_QUEUE_NAME;
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().qosPrefetchCount(1)
@@ -339,10 +336,10 @@ public class BasicTest extends BrokerAdminUsingTestBase
String messageContent = "message";
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- BasicGetOkBody response = interaction.openAnonymousConnection()
+ BasicGetOkBody response = interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().getQueueName(BrokerAdmin.TEST_QUEUE_NAME).get()
@@ -371,10 +368,10 @@ public class BasicTest extends BrokerAdminUsingTestBase
String messageContent = "message";
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().getQueueName(BrokerAdmin.TEST_QUEUE_NAME).getNoAck(true).get()
@@ -396,10 +393,10 @@ public class BasicTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.8.3.10", description = "direct access to a queue")
public void getEmptyQueue() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().getQueueName(BrokerAdmin.TEST_QUEUE_NAME).get()
@@ -418,12 +415,12 @@ public class BasicTest extends BrokerAdminUsingTestBase
String messageContent2 = String.join("", Collections.nCopies(128, "2"));
String messageContent3 = String.join("", Collections.nCopies(256, "3"));
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String consumerTag = "A";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic()
@@ -488,12 +485,12 @@ public class BasicTest extends BrokerAdminUsingTestBase
String messageContent = String.join("", Collections.nCopies(1024, "*"));
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String consumerTag = "A";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.channel().flow(true)
@@ -538,12 +535,12 @@ public class BasicTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", "B", "C", "D", "E", "F");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String consumerTag = "A";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.channel().flow(true)
@@ -606,12 +603,12 @@ public class BasicTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "A", "B", "C");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String consumerTag = "A";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().qosPrefetchCount(1)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
index 40d1918..72d13ba 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
@@ -20,37 +20,26 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
-import java.net.InetSocketAddress;
-
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
import org.apache.qpid.tests.protocol.SpecificationTest;
-import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class ChannelTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
@SpecificationTest(section = "1.5.2.1", description = "open a channel for use")
public void channelOpen() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.channel().open().consumeResponse(ChannelOpenOkBody.class);
}
@@ -60,11 +49,11 @@ public class ChannelTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.5.2.5", description = "request a channel close")
public void noFrameCanBeSentOnClosedChannel() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channelId(1)
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.channel().close().consumeResponse(ChannelCloseOkBody.class)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
index edaf04e..b44561c 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
@@ -34,12 +34,10 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assume.assumeThat;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.ErrorCodes;
@@ -63,19 +61,11 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
private static final String PLAIN = "PLAIN";
private static final String CRAM_MD5 = "CRAM-MD5";
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
-
@Test
@SpecificationTest(section = "1.4.2.1", description = "start connection negotiation")
public void connectionStart() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
ConnectionStartBody response =
@@ -90,14 +80,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.4.2.2", description = "select security mechanism and locale")
public void connectionStartOk() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse();
+ interaction.authenticateConnection();
interaction.getLatestResponse(ConnectionTuneBody.class);
}
@@ -110,7 +96,7 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
+ "further data.")
public void connectionStartOkUnsupportedMechanism() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol()
@@ -129,14 +115,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.4.2.6", description = "negotiate connection tuning parameters")
public void connectionTuneOkAndOpen() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTuneBody response = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+ ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
interaction.connection().tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(response.getFrameMax())
@@ -152,10 +134,7 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
public void connectionSecure() throws Exception
{
assumeThat(getBrokerAdmin().isSASLMechanismSupported(PLAIN), is(true));
-
- final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
-
- try (FrameTransport transport = new FrameTransport(addr).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final byte[] initialResponse = String.format("\0%s\0%s",
getBrokerAdmin().getValidUsername(),
@@ -194,9 +173,7 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
{
assumeThat(getBrokerAdmin().isSASLMechanismSupported(CRAM_MD5), is(true));
- final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
-
- try (FrameTransport transport = new FrameTransport(addr).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
final ConnectionStartBody start = interaction.negotiateProtocol()
@@ -235,9 +212,7 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
{
assumeThat(getBrokerAdmin().isSASLMechanismSupported(PLAIN), is(true));
- final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
-
- try (FrameTransport transport = new FrameTransport(addr).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final byte[] initialResponse = String.format("\0%s\0%s",
getBrokerAdmin().getValidUsername(),
@@ -271,14 +246,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
+ " frame-min-size [4096].")
public void tooSmallFrameSize() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTuneBody response = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+ ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
interaction.connection().tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(1024)
@@ -297,14 +268,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
+ " to assist implementors.")
public void tooLargeFrameSize() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTuneBody response = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+ ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
interaction.connection().tuneOkChannelMax(response.getChannelMax())
.tuneOkFrameMax(Long.MAX_VALUE)
@@ -321,14 +288,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
+ "oversized frame MUST signal a connection exception with reply code 501 (frame error).")
public void overlySizedContentBodyFrame() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTuneBody response = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+ ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
final long frameMax = response.getFrameMax();
// Older Qpid JMS Client 0-x had a defect that meant they could send content body frames that were too
@@ -357,8 +320,7 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
// Server actually abruptly closes the connection. We might see a graceful TCP/IP close or a broken pipe.
try
{
- final ChannelClosedResponse closeResponse = interaction.consumeResponse()
- .getLatestResponse(ChannelClosedResponse.class);
+ interaction.consumeResponse().getLatestResponse(ChannelClosedResponse.class);
}
catch (ExecutionException e)
{
@@ -380,8 +342,7 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
+ " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK")
public void authenticationBypassBySendingTuneOk() throws Exception
{
- final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
@@ -396,8 +357,7 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
+ " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK")
public void authenticationBypassBySendingOpen() throws Exception
{
- final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
@@ -412,8 +372,7 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
public void authenticationBypassAfterSendingStartOk() throws Exception
{
- InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
interaction.negotiateProtocol()
@@ -431,14 +390,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
+ " heartbeat frames is negotiated during connection tuning.")
public void heartbeating() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTuneBody response = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+ ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
final Long heartbeatPeriod = 1L;
@@ -469,14 +424,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "4.2.7", description = "Any sent octet is a valid substitute for a heartbeat")
public void heartbeatingIncomingTrafficIsNonHeartbeat() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTuneBody response = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+ ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
final Long heartbeatPeriod = 1L;
@@ -507,14 +458,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
+ "or longer, it should close the connection without following the Connection.Close/Close-Ok handshaking")
public void heartbeatingNoIncomingTraffic() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTuneBody response = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+ ConnectionTuneBody response = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
final Long heartbeatPeriod = 1L;
@@ -549,14 +496,10 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.4.2.7", description = "The client tried to work with an unknown virtual host.")
public void connectionOpenUnknownVirtualHost() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionTuneBody tune = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection().startOkMechanism(ANONYMOUS)
- .startOk()
- .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+ ConnectionTuneBody tune = interaction.authenticateConnection().getLatestResponse(ConnectionTuneBody.class);
ConnectionCloseBody close = interaction.connection()
.tuneOkChannelMax(tune.getChannelMax())
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
index 1428da9..c2fa14d 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
@@ -26,11 +26,9 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
import java.util.Collections;
import org.hamcrest.Matchers;
-import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -52,22 +50,15 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class ExchangeTest extends BrokerAdminUsingTestBase
{
private static final String TEST_EXCHANGE = "testExchange";
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
@SpecificationTest(section = "1.6.2.1", description = "verify exchange exists, create if needed")
public void exchangeDeclare() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
@@ -89,10 +80,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
+ "use this to check whether an exchange exists without modifying the server state.")
public void exchangeDeclarePassive() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -114,10 +105,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
+ "option is set, or the exchange already exists.")
public void exchangeDeclareAmqDisallowed() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declarePassive(true).declareName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -141,10 +132,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
+ "used in the original Exchange.Declare method")
public void exchangeRedeclareDifferentType() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
@@ -164,10 +155,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
description = "When [passive] set, all other method fields [of declare] except name and no-wait are ignored.")
public void exchangeRedeclarePassiveDifferentType() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
.declareName(TEST_EXCHANGE).declare()
@@ -187,10 +178,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
+ "support.")
public void exchangeUnsupportedExchangeType() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
@@ -212,10 +203,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
+ "exchanges) are purged if/when a server restarts.")
public void exchangeDeclareDurable() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareDurable(true).declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class);
@@ -224,10 +215,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
getBrokerAdmin().restart();
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExchangeBoundOkBody response = interaction.openAnonymousConnection()
+ ExchangeBoundOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.boundExchangeName(TEST_EXCHANGE)
@@ -244,10 +235,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
+ "exchange are cancelled.")
public void exchangeDelete() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
.consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
@@ -278,10 +269,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
description = "The client MUST NOT attempt to delete an exchange that does not exist.")
public void exchangeDeleteExchangeNotFound() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ChannelCloseBody unknownExchange = interaction.openAnonymousConnection()
+ ChannelCloseBody unknownExchange = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().deleteExchangeName("unknownExchange").delete()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
@@ -299,10 +290,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -343,10 +334,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
+ "exchange type that it implements.")
public void exchangeDeleteAmqDisallowed() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ChannelCloseBody response = interaction.openAnonymousConnection()
+ ChannelCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.deleteExchangeName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).delete()
@@ -361,10 +352,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declareAutoDelete(true).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -387,11 +378,11 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
@Test
public void exchangeDeclareWithAlternateExchange() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final String altExchName = "altExchange";
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.declareName(altExchName)
@@ -425,10 +416,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
@Test
public void exchangeDeclareWithUnknownAlternateExchange() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionCloseBody response = interaction.openAnonymousConnection()
+ ConnectionCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.declareName(TEST_EXCHANGE)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeHeadersTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeHeadersTest.java
index cb01027..6f80584 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeHeadersTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeHeadersTest.java
@@ -50,12 +50,10 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class LargeHeadersTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -64,17 +62,12 @@ public class LargeHeadersTest extends BrokerAdminUsingTestBase
public void headersFillContentHeaderFrame() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String consumerTag = "A";
- ConnectionTuneBody connTune = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection()
- .startOkMechanism("ANONYMOUS")
- .startOk()
- .consumeResponse(ConnectionTuneBody.class)
+ ConnectionTuneBody connTune = interaction.authenticateConnection()
.getLatestResponse(ConnectionTuneBody.class);
final String headerName = "test";
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeMessageTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeMessageTest.java
index 196258b..0361e78 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeMessageTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/LargeMessageTest.java
@@ -22,11 +22,8 @@ package org.apache.qpid.tests.protocol.v0_8;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
import java.util.stream.IntStream;
import org.junit.Before;
@@ -49,30 +46,23 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class LargeMessageTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@Test
public void multiBodyMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String consumerTag = "A";
String queueName = "testQueue";
- ConnectionTuneBody connTune = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection()
- .startOkMechanism("ANONYMOUS")
- .startOk()
- .consumeResponse(ConnectionTuneBody.class)
+ ConnectionTuneBody connTune = interaction.authenticateConnection()
.getLatestResponse(ConnectionTuneBody.class);
byte[] messageContent = new byte[(int)connTune.getFrameMax()];
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java
index 8475bb9..42eee8c 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ProtocolTest.java
@@ -25,10 +25,8 @@ import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.ProtocolVersion;
@@ -37,18 +35,10 @@ import org.apache.qpid.server.protocol.v0_8.transport.AMQVersionAwareProtocolSes
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.transport.ByteBufferSender;
import org.apache.qpid.tests.protocol.SpecificationTest;
-import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class ProtocolTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
@SpecificationTest(section = "4.2.2",
@@ -56,7 +46,7 @@ public class ProtocolTest extends BrokerAdminUsingTestBase
+ "write a valid protocol header to the socket, [...] and then close the socket connection.")
public void unrecognisedProtocolHeader() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
assumeThat(transport.getProtocolVersion(), is(equalTo(ProtocolVersion.v0_91)));
@@ -79,7 +69,7 @@ public class ProtocolTest extends BrokerAdminUsingTestBase
+ "the socket connection.")
public void unrecognisedProtocolVersion() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
assumeThat(transport.getProtocolVersion(), is(equalTo(ProtocolVersion.v0_91)));
@@ -99,7 +89,7 @@ public class ProtocolTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "4.2.2", description = "The server either accepts [...] the protocol header")
public void validProtocolVersion() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
@@ -116,7 +106,7 @@ public class ProtocolTest extends BrokerAdminUsingTestBase
+ "further data on it")
public void unrecognisedFrameType() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
index a5b5c26..b41c812 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -28,12 +28,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isEmptyString;
import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import org.hamcrest.Matchers;
-import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -62,22 +60,15 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class QueueTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
@SpecificationTest(section = "1.7.2.1", description = "declare queue, create if needed")
public void queueDeclare() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -94,10 +85,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
+ "exclusive, auto-delete, and arguments fields.")
public void queueDeclareEquivalent() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueInteraction queueInteraction = interaction.openAnonymousConnection()
+ QueueInteraction queueInteraction = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue();
QueueDeclareOkBody response = queueInteraction.declareName(BrokerAdmin.TEST_QUEUE_NAME)
@@ -128,10 +119,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declarePassive(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -155,10 +146,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
+ "Durable queues remain active when a server restarts.")
public void queueDeclareDurable() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareDurable(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -171,10 +162,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
getBrokerAdmin().restart();
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declarePassive(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -192,10 +183,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
+ "If there was no consumer ever on the queue, it won't be deleted.")
public void queueDeclareAutoDelete() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareAutoDelete(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -203,11 +194,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
}
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declarePassive(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -236,10 +227,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
@Ignore("The server does not ignore the auto-delete field if the queue already exists.")
public void queueDeclareAutoDeletePreexistingQueue() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -259,20 +250,20 @@ public class QueueTest extends BrokerAdminUsingTestBase
+ "still-open connection.")
public void queueDeclareExclusive() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declareExclusive(true).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
- try(FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport2 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction2 = transport2.newInteraction();
- ConnectionCloseBody closeResponse = interaction2.openAnonymousConnection()
+ ConnectionCloseBody closeResponse = interaction2.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
@@ -281,10 +272,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
}
}
- try(FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport2 = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction2 = transport2.newInteraction();
- QueueDeclareOkBody response = interaction2.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction2.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -298,10 +289,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
+ "generated name and return this to the client in the Declare-Ok method.")
public void queueDeclareServerAssignedName() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
@@ -324,10 +315,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeleteOkBody response = interaction.openAnonymousConnection()
+ QueueDeleteOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
.consumeResponse().getLatestResponse(QueueDeleteOkBody.class);
@@ -341,10 +332,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
description = "The client MUST NOT attempt to delete a queue that does not exist.")
public void queueDeleteQueueNotFound() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ChannelCloseBody response = interaction.openAnonymousConnection()
+ ChannelCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
@@ -362,12 +353,12 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport consumerTransport = new FrameTransport(_brokerAddress).connect();
- FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport consumerTransport = new FrameTransport(getBrokerAdmin()).connect();
+ FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final String consumerTag = "A";
final Interaction consumerInteraction = consumerTransport.newInteraction();
- final BasicInteraction basicInteraction = consumerInteraction.openAnonymousConnection()
+ final BasicInteraction basicInteraction = consumerInteraction.negotiateOpen()
.channel()
.open()
.consumeResponse(ChannelOpenOkBody.class)
@@ -376,7 +367,7 @@ public class QueueTest extends BrokerAdminUsingTestBase
.consumeResponse(BasicConsumeOkBody.class);
final Interaction deleterInteraction = transport.newInteraction();
- ChannelCloseBody response = deleterInteraction.openAnonymousConnection()
+ ChannelCloseBody response = deleterInteraction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).deleteIfUnused(true).delete()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
@@ -402,10 +393,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeleteOkBody deleteResponse = interaction.openAnonymousConnection()
+ QueueDeleteOkBody deleteResponse = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse(QueueDeclareOkBody.class)
@@ -422,10 +413,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueuePurgeOkBody response = interaction.openAnonymousConnection()
+ QueuePurgeOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().purgeName(BrokerAdmin.TEST_QUEUE_NAME).purge()
.consumeResponse().getLatestResponse(QueuePurgeOkBody.class);
@@ -445,10 +436,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.7.2.7", description = "The client MUST NOT attempt to purge a queue that does not exist.")
public void queuePurgeQueueNotFound() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ChannelCloseBody response = interaction.openAnonymousConnection()
+ ChannelCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().purgeName(BrokerAdmin.TEST_QUEUE_NAME).purge()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
@@ -462,11 +453,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -481,11 +472,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -522,11 +513,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
description = "The client MUST NOT attempt to bind a queue that does not exist.")
public void queueBindUnknownQueue() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
String testExchange = "testExchange";
final Interaction interaction = transport.newInteraction();
- ChannelCloseBody response = interaction.openAnonymousConnection()
+ ChannelCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -542,11 +533,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
description = "The client MUST either specify a queue name or have previously declared a queue on the same channel")
public void queueBindDefaultQueue() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
String testExchange = "testExchange";
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse(QueueDeclareOkBody.class)
@@ -573,10 +564,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
String testExchange = "testExchange";
String testRoutingKey = "rk1";
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declareDurable(true).declare()
.consumeResponse(QueueDeclareOkBody.class)
@@ -599,10 +590,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
getBrokerAdmin().restart();
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ExchangeBoundOkBody response = interaction.openAnonymousConnection()
+ ExchangeBoundOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.boundExchangeName(testExchange)
@@ -622,10 +613,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
String testExchange = "testExchange";
String testRoutingKey = "rk1";
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declareDurable(true).declare()
.consumeResponse(QueueDeclareOkBody.class)
@@ -654,11 +645,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -682,11 +673,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -703,11 +694,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
+ "same channel")
public void queueUnbindDefaultQueue() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse(QueueDeclareOkBody.class)
@@ -727,11 +718,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- ChannelCloseBody response = interaction.openAnonymousConnection()
+ ChannelCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -759,11 +750,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- ChannelCloseBody response = interaction.openAnonymousConnection()
+ ChannelCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -791,11 +782,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- ChannelCloseBody response = interaction.openAnonymousConnection()
+ ChannelCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -822,11 +813,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String testExchange = "testExchange";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(testExchange).declare()
.consumeResponse(ExchangeDeclareOkBody.class)
@@ -841,11 +832,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
@Test
public void queueDeclareWithAlternateExchange() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final String altExchName = "altExchange";
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.declareName(altExchName)
@@ -879,10 +870,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
@Test
public void queueDeclareWithUnknownAlternateExchange() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionCloseBody response = interaction.openAnonymousConnection()
+ ConnectionCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue()
.declareName(BrokerAdmin.TEST_QUEUE_NAME)
@@ -900,11 +891,11 @@ public class QueueTest extends BrokerAdminUsingTestBase
final String content = "content";
final String routingKey = "rk1";
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
final Map<String, Object> messageProps = Collections.singletonMap("prop", 0);
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select()
.consumeResponse(TxSelectOkBody.class)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
index 93127ad..d922096 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
@@ -40,12 +39,10 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class TransactionTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -54,10 +51,10 @@ public class TransactionTest extends BrokerAdminUsingTestBase
+ "channels.")
public void commitNonTransactedChannel() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ChannelCloseBody res = interaction.openAnonymousConnection()
+ ChannelCloseBody res = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().commit()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
@@ -71,10 +68,10 @@ public class TransactionTest extends BrokerAdminUsingTestBase
+ "channels.")
public void rollbackNonTransactedChannel() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ChannelCloseBody res = interaction.openAnonymousConnection()
+ ChannelCloseBody res = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().rollback()
.consumeResponse().getLatestResponse(ChannelCloseBody.class);
@@ -87,10 +84,10 @@ public class TransactionTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.9.2.3", description = "commit the current transaction")
public void publishMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select().consumeResponse(TxSelectOkBody.class)
.basic().contentHeaderPropertiesContentType("text/plain")
@@ -113,10 +110,10 @@ public class TransactionTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "1.9.2.5", description = "abandon the current transaction")
public void publishMessageRollback() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select().consumeResponse(TxSelectOkBody.class)
.basic()
@@ -139,13 +136,13 @@ public class TransactionTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select().consumeResponse(TxSelectOkBody.class)
.basic().qosPrefetchCount(1)
@@ -182,13 +179,13 @@ public class TransactionTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select().consumeResponse(TxSelectOkBody.class)
.basic().qosPrefetchCount(1)
@@ -243,13 +240,13 @@ public class TransactionTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select().consumeResponse(TxSelectOkBody.class)
.basic().qosPrefetchCount(1)
@@ -293,18 +290,13 @@ public class TransactionTest extends BrokerAdminUsingTestBase
@Test
public void publishUnrouteableMandatoryMessageCloseWhenNoRoute() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
Map<String, Object> clientProperties = Collections.singletonMap(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, true);
- ConnectionCloseBody response = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class).connection()
- .startOkClientProperties(clientProperties).startOkMechanism("ANONYMOUS").startOk()
- .consumeResponse(ConnectionTuneBody.class)
- .connection().tuneOk()
- .connection().open()
- .consumeResponse(ConnectionOpenOkBody.class)
+ ConnectionCloseBody response = interaction.connection().startOkClientProperties(clientProperties)
+ .interaction().negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx()
.select()
@@ -325,24 +317,14 @@ public class TransactionTest extends BrokerAdminUsingTestBase
@Test
public void publishUnrouteableMandatory() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
Map<String, Object> clientProperties = Collections.singletonMap(ConnectionStartProperties.QPID_CLOSE_WHEN_NO_ROUTE, false);
final String messageContent = "Test";
- BasicReturnBody returned = interaction.negotiateProtocol()
- .consumeResponse(ConnectionStartBody.class)
- .connection()
- .startOkClientProperties(clientProperties)
- .startOkMechanism("ANONYMOUS")
- .startOk()
- .consumeResponse(ConnectionTuneBody.class)
- .connection()
- .tuneOk()
- .connection()
- .open()
- .consumeResponse(ConnectionOpenOkBody.class)
+ BasicReturnBody returned = interaction.connection().startOkClientProperties(clientProperties)
+ .interaction().negotiateOpen()
.channel()
.open()
.consumeResponse(ChannelOpenOkBody.class)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/authtimeout/AuthenticationTimeoutTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/authtimeout/AuthenticationTimeoutTest.java
index 3e16ded..033c02f 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/authtimeout/AuthenticationTimeoutTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/authtimeout/AuthenticationTimeoutTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assume.assumeThat;
-import java.net.InetSocketAddress;
import java.util.Arrays;
import org.junit.Test;
@@ -46,9 +45,7 @@ public class AuthenticationTimeoutTest extends BrokerAdminUsingTestBase
{
assumeThat(getBrokerAdmin().isSASLMechanismSupported("PLAIN"), is(true));
- final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
-
- try (FrameTransport transport = new FrameTransport(addr).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin(), BrokerAdmin.PortType.AMQP).connect())
{
final Interaction interaction = transport.newInteraction();
final ConnectionStartBody start = interaction.negotiateProtocol()
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
index ccea026..9188de7 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
@@ -27,7 +27,6 @@ import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
@@ -68,13 +67,11 @@ import org.apache.qpid.tests.utils.ConfigItem;
@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
public class MalformedMessage extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
private static final String CONTENT_TEXT = "Test";
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -106,10 +103,10 @@ public class MalformedMessage extends BrokerAdminUsingTestBase
basicContentHeaderProperties.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT);
byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue()
.bindName(ExchangeDefaults.TOPIC_EXCHANGE_NAME)
@@ -140,10 +137,10 @@ public class MalformedMessage extends BrokerAdminUsingTestBase
basicContentHeaderProperties.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT);
byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select()
.consumeResponse(TxSelectOkBody.class)
@@ -169,11 +166,11 @@ public class MalformedMessage extends BrokerAdminUsingTestBase
final String content2 = "message2";
final byte[] content2Bytes = content2.getBytes(StandardCharsets.UTF_8);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
String consumerTag = "A";
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open()
.consumeResponse(ChannelOpenOkBody.class)
.basic().qosPrefetchCount(1)
@@ -228,10 +225,10 @@ public class MalformedMessage extends BrokerAdminUsingTestBase
private void publishMalformedMessage(final FieldTable malformedHeader, final byte[] contentBytes) throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().publishExchange("")
.publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessageValidation.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessageValidation.java
index 6391081..924b860 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessageValidation.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessageValidation.java
@@ -25,7 +25,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import org.junit.Before;
@@ -47,13 +46,11 @@ import org.apache.qpid.tests.utils.ConfigItem;
@ConfigItem(name = "qpid.connection.forceValidation", value = "true")
public class MalformedMessageValidation extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
private static final String CONTENT_TEXT = "Test";
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -62,10 +59,10 @@ public class MalformedMessageValidation extends BrokerAdminUsingTestBase
{
final FieldTable malformedHeader = createMalformedHeaders();
byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().publishExchange("")
.publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
index b5afa53..a5b35a3 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.Set;
import com.google.common.collect.Sets;
@@ -51,12 +50,9 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
*/
public class PublisherConfirmsTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
-
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -68,10 +64,10 @@ public class PublisherConfirmsTest extends BrokerAdminUsingTestBase
+ "contains the sequence number of the confirmed message." )
public void publishMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- BasicAckBody ackBody = interaction.openAnonymousConnection()
+ BasicAckBody ackBody = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().confirmSelect().consumeResponse(ConfirmSelectOkBody.class)
.basic().publishExchange("")
@@ -91,10 +87,10 @@ public class PublisherConfirmsTest extends BrokerAdminUsingTestBase
+ "the broker will send a basic.nack.")
public void publishUnrouteableMandatoryMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- BasicNackBody nackBody = interaction.openAnonymousConnection()
+ BasicNackBody nackBody = interaction.negotiateOpen()
.channel()
.open()
.consumeResponse(ChannelOpenOkBody.class)
@@ -119,10 +115,10 @@ public class PublisherConfirmsTest extends BrokerAdminUsingTestBase
+ "confirmed or nack'd once")
public void publishUnrouteableMessage() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().confirmSelect().consumeResponse(ConfirmSelectOkBody.class)
.basic().publishExchange("")
@@ -138,10 +134,10 @@ public class PublisherConfirmsTest extends BrokerAdminUsingTestBase
@Test
public void publishWithTransactionalConfirms() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- final BasicAckBody ackBody = interaction.openAnonymousConnection()
+ final BasicAckBody ackBody = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select()
.consumeResponse(TxSelectOkBody.class)
@@ -167,10 +163,10 @@ public class PublisherConfirmsTest extends BrokerAdminUsingTestBase
@Test
public void publishUnroutableMessageWithTransactionalConfirms() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select()
.consumeResponse(TxSelectOkBody.class)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java
index 8ad178a..e0b53f5 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/exchange/ExchangeTest.java
@@ -25,10 +25,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.Collections;
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.ErrorCodes;
@@ -38,7 +36,6 @@ import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
import org.apache.qpid.tests.protocol.v0_8.Interaction;
-import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.utils.BrokerSpecific;
@@ -46,21 +43,14 @@ import org.apache.qpid.tests.utils.BrokerSpecific;
public class ExchangeTest extends BrokerAdminUsingTestBase
{
private static final String TEST_EXCHANGE = "testExchange";
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
public void exchangeDeclareValidWireArguments() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange()
.declareName(TEST_EXCHANGE)
@@ -80,10 +70,10 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
@Test
public void exchangeDeclareInvalidWireArguments() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionCloseBody response = interaction.openAnonymousConnection()
+ ConnectionCloseBody response = interaction.negotiateOpen()
.channel()
.open()
.consumeResponse(ChannelOpenOkBody.class)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSizeTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSizeTest.java
index 16721be..94debf8 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSizeTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/maxsize/MaximumMessageSizeTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -48,12 +47,10 @@ import org.apache.qpid.tests.utils.ConfigItem;
@ConfigItem(name = "qpid.max_message_size", value = "1000")
public class MaximumMessageSizeTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -61,10 +58,10 @@ public class MaximumMessageSizeTest extends BrokerAdminUsingTestBase
public void limitExceeded() throws Exception
{
String content = Stream.generate(() -> String.valueOf('.')).limit(1001).collect(Collectors.joining());
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.basic().contentHeaderPropertiesContentType("text/plain")
.contentHeaderPropertiesDeliveryMode((byte) 1)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
index 54e89a8..7e10c48 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
@@ -19,36 +19,21 @@
*/
package org.apache.qpid.tests.protocol.v0_8.extension.protocoltimeout;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assume.assumeThat;
-
-import java.net.InetSocketAddress;
-
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
-import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.utils.ConfigItem;
@ConfigItem(name = AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT, value = "500")
public class ProtocolHeaderTimeoutTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
public void noProtocolHeader() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
transport.assertNoMoreResponsesAndChannelClosed();
}
@@ -57,7 +42,7 @@ public class ProtocolHeaderTimeoutTest extends BrokerAdminUsingTestBase
@Test
public void incompleteProtocolHeader() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final byte[] protocolHeader = transport.getProtocolHeader();
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
index 4bd9b5a..d441ba0 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/queue/QueueTest.java
@@ -27,13 +27,11 @@ import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -60,21 +58,14 @@ import org.apache.qpid.tests.utils.BrokerSpecific;
@BrokerSpecific(kind = KIND_BROKER_J)
public class QueueTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
-
- @Before
- public void setUp()
- {
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- }
@Test
public void queueDeclareUsingRealQueueAttributesInWireArguments() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ QueueDeclareOkBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME)
.declareArguments(Collections.singletonMap("defaultFilters",
@@ -162,10 +153,10 @@ public class QueueTest extends BrokerAdminUsingTestBase
@Test
public void queueDeclareInvalidWireArguments() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- ConnectionCloseBody response = interaction.openAnonymousConnection()
+ ConnectionCloseBody response = interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.queue().declareName(BrokerAdmin.TEST_QUEUE_NAME)
.declareArguments(Collections.singletonMap("foo", "bar"))
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
index 179b845..56bbd31 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
import java.util.Collections;
import org.junit.Before;
@@ -53,22 +52,20 @@ import org.apache.qpid.tests.utils.ConfigItem;
@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@Test
public void publishTransactionTimeout() throws Exception
{
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select().consumeResponse(TxSelectOkBody.class)
.basic().contentHeaderPropertiesContentType("text/plain")
@@ -96,13 +93,13 @@ public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
{
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try(FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select().consumeResponse(TxSelectOkBody.class)
.basic().qosPrefetchCount(1)
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java
index 446e78c..ea54621 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/tx/AsyncTransactionTest.java
@@ -25,8 +25,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import java.net.InetSocketAddress;
-
import org.junit.Before;
import org.junit.Test;
@@ -51,13 +49,11 @@ import org.apache.qpid.tests.utils.BrokerSpecific;
@BrokerSpecific(kind = KIND_BROKER_J)
public class AsyncTransactionTest extends BrokerAdminUsingTestBase
{
- private InetSocketAddress _brokerAddress;
private static final int MESSAGE_COUNT = 10;
@Before
public void setUp()
{
- _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
}
@@ -66,7 +62,7 @@ public class AsyncTransactionTest extends BrokerAdminUsingTestBase
{
publishPersistentMessages();
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(MESSAGE_COUNT)));
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = createConsumerInteraction(transport);
@@ -88,7 +84,7 @@ public class AsyncTransactionTest extends BrokerAdminUsingTestBase
{
publishPersistentMessages();
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(MESSAGE_COUNT)));
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = createConsumerInteraction(transport);
@@ -108,10 +104,10 @@ public class AsyncTransactionTest extends BrokerAdminUsingTestBase
private void publishPersistentMessages() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class);
for (int i = 0; i < MESSAGE_COUNT; i++)
{
@@ -130,7 +126,7 @@ public class AsyncTransactionTest extends BrokerAdminUsingTestBase
throws Exception
{
final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
+ interaction.negotiateOpen()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.tx().select().consumeResponse(TxSelectOkBody.class)
.basic().qosPrefetchCount(MESSAGE_COUNT)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org