You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/02/09 16:09:22 UTC
qpid-broker-j git commit: QPID-8091: [Broker-J] Move transaction
timeout protocol test to separate packages - this features is Broker-J
specific.
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 63c315f07 -> d57815f89
QPID-8091: [Broker-J] Move transaction timeout protocol test to separate packages - this features is Broker-J specific.
Also refactored the new test broker configuration mechanism so that the configuration of the whole broker can be adjusted,
rather than just the virtualhost.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d57815f8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d57815f8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d57815f8
Branch: refs/heads/master
Commit: d57815f89427781bb3cf3d5f6c70b3b13a8604ff
Parents: 63c315f
Author: Keith Wall <kw...@apache.org>
Authored: Fri Feb 9 16:04:27 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Fri Feb 9 16:04:31 2018 +0000
----------------------------------------------------------------------
.../tests/protocol/v0_10/TransactionTest.java | 132 --------------
.../TransactionTimeoutTest.java | 171 +++++++++++++++++++
.../protocol/v0_8/ConnectionInteraction.java | 11 ++
.../tests/protocol/v0_8/TransactionTest.java | 82 ---------
.../TransactionTimeoutTest.java | 139 +++++++++++++++
.../TransactionTimeoutTest.java | 171 +++++++++++++++++++
.../transaction/TransactionalTransferTest.java | 122 +------------
.../apache/qpid/tests/utils/BrokerAdmin.java | 3 -
.../org/apache/qpid/tests/utils/ConfigItem.java | 36 ++++
.../apache/qpid/tests/utils/ConfigItems.java | 33 ++++
.../utils/EmbeddedBrokerPerClassAdminImpl.java | 13 +-
.../utils/ExternalQpidBrokerAdminImpl.java | 6 -
.../apache/qpid/tests/utils/QpidTestRunner.java | 4 +
13 files changed, 573 insertions(+), 350 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
index 7c48e36..443aede 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
@@ -21,11 +21,8 @@
package org.apache.qpid.tests.protocol.v0_10;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
-import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import java.net.InetSocketAddress;
@@ -33,20 +30,10 @@ import java.net.InetSocketAddress;
import org.junit.Before;
import org.junit.Test;
-import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
-import org.apache.qpid.server.protocol.v0_10.transport.ConnectionCloseCode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
-import org.apache.qpid.server.protocol.v0_10.transport.Range;
-import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.utils.BrokerSpecific;
public class TransactionTest extends BrokerAdminUsingTestBase
{
@@ -102,123 +89,4 @@ public class TransactionTest extends BrokerAdminUsingTestBase
assertThat(queueDepthMessages, is(equalTo(1)));
}
}
-
- @Test
- @BrokerSpecific(kind = KIND_BROKER_J)
- public void publishTransactionTimeout() throws Exception
- {
- int transactionTimeout = 1000;
- getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
- {
- final Interaction interaction = transport.newInteraction();
- byte[] sessionName = "test".getBytes(UTF_8);
- interaction.openAnonymousConnection()
- .channelId(1)
- .attachSession(sessionName)
- .tx().selectId(0).select()
- .message()
- .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
- .transferId(1)
- .transfer()
- .session()
- .flushCompleted()
- .flush();
-
- SessionCompleted completed;
- do
- {
- completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
- }
- while (!completed.getCommands().includes(1));
-
- int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
- assertThat(queueDepthMessages, is(equalTo(0)));
-
- Thread.sleep(transactionTimeout + 1000);
-
- ConnectionClose close = receiveResponse(interaction, ConnectionClose.class);
- assertThat(close.getReplyCode(), is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
- assertThat(close.getReplyText(), containsString("transaction timed out"));
-
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
- }
- }
-
- @Test
- @BrokerSpecific(kind = KIND_BROKER_J)
- public void consumeTransactionTimeout() throws Exception
- {
- int transactionTimeout = 1000;
- getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
- String testMessageBody = "testMessage";
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
- {
- final Interaction interaction = transport.newInteraction();
- byte[] sessionName = "testSession".getBytes(UTF_8);
- final String subscriberName = "testSubscriber";
- interaction.openAnonymousConnection()
- .channelId(1)
- .attachSession(sessionName)
- .tx().selectId(0).select()
- .message()
- .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
- .subscribeAcquireMode(MessageAcquireMode.PRE_ACQUIRED)
- .subscribeDestination(subscriberName)
- .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
- .subscribeId(0)
- .subscribe()
- .message()
- .flowId(1)
- .flowDestination(subscriberName)
- .flowUnit(MessageCreditUnit.MESSAGE)
- .flowValue(1)
- .flow()
- .message()
- .flowId(2)
- .flowDestination(subscriberName)
- .flowUnit(MessageCreditUnit.BYTE)
- .flowValue(-1)
- .flow();
-
- MessageTransfer transfer = receiveResponse(interaction, MessageTransfer.class);
-
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
-
- RangeSet transfers = Range.newInstance(transfer.getId());
- interaction.message().acceptId(3).acceptTransfers(transfers).accept()
- .session()
- .flushCompleted()
- .flush();
-
- SessionCompleted completed = receiveResponse(interaction, SessionCompleted.class);
-
- assertThat(completed.getCommands(), is(notNullValue()));
- assertThat(completed.getCommands().includes(3), is(equalTo(true)));
-
- Thread.sleep(transactionTimeout + 1000);
-
- ConnectionClose close = receiveResponse(interaction, ConnectionClose.class);
- assertThat(close.getReplyCode(), is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
- assertThat(close.getReplyText(), containsString("transaction timed out"));
- }
- }
-
- private <T> T receiveResponse(final Interaction interaction, Class<T> clazz) throws Exception
- {
- T result = null;
- do
- {
- Response<?> response = interaction.consumeResponse().getLatestResponse();
- if (clazz.isInstance(response.getBody()))
- {
- result = (T) response.getBody();
- }
- }
- while (result == null);
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extenstions/transactiontimeout/TransactionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extenstions/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extenstions/transactiontimeout/TransactionTimeoutTest.java
new file mode 100644
index 0000000..e1f35d6
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extenstions/transactiontimeout/TransactionTimeoutTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extenstions.transactiontimeout;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionCloseCode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.Range;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
+public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void publishTransactionTimeout() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "test".getBytes(UTF_8);
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .tx().selectId(0).select()
+ .message()
+ .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
+ .transferId(1)
+ .transfer()
+ .session()
+ .flushCompleted()
+ .flush();
+
+ SessionCompleted completed;
+ do
+ {
+ completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
+ }
+ while (!completed.getCommands().includes(1));
+
+ int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(queueDepthMessages, is(equalTo(0)));
+
+ ConnectionClose close = receiveResponse(interaction, ConnectionClose.class);
+ assertThat(close.getReplyCode(), is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
+ assertThat(close.getReplyText(), containsString("transaction timed out"));
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ public void consumeTransactionTimeout() throws Exception
+ {
+ String testMessageBody = "testMessage";
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "testSession".getBytes(UTF_8);
+ final String subscriberName = "testSubscriber";
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .tx().selectId(0).select()
+ .message()
+ .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
+ .subscribeAcquireMode(MessageAcquireMode.PRE_ACQUIRED)
+ .subscribeDestination(subscriberName)
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(0)
+ .subscribe()
+ .message()
+ .flowId(1)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.MESSAGE)
+ .flowValue(1)
+ .flow()
+ .message()
+ .flowId(2)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.BYTE)
+ .flowValue(-1)
+ .flow();
+
+ MessageTransfer transfer = receiveResponse(interaction, MessageTransfer.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+ RangeSet transfers = Range.newInstance(transfer.getId());
+ interaction.message().acceptId(3).acceptTransfers(transfers).accept()
+ .session()
+ .flushCompleted()
+ .flush();
+
+ SessionCompleted completed = receiveResponse(interaction, SessionCompleted.class);
+
+ assertThat(completed.getCommands(), is(notNullValue()));
+ assertThat(completed.getCommands().includes(3), is(equalTo(true)));
+
+ ConnectionClose close = receiveResponse(interaction, ConnectionClose.class);
+ assertThat(close.getReplyCode(), is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
+ assertThat(close.getReplyText(), containsString("transaction timed out"));
+ }
+ }
+
+ private <T> T receiveResponse(final Interaction interaction, Class<T> clazz) throws Exception
+ {
+ T result = null;
+ do
+ {
+ Response<?> response = interaction.consumeResponse().getLatestResponse();
+ if (clazz.isInstance(response.getBody()))
+ {
+ result = (T) response.getBody();
+ }
+ }
+ while (result == null);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
index d7b661f..bc3bc24 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
@@ -25,9 +25,11 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartOkBody;
@@ -36,6 +38,7 @@ import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneOkBody;
public class ConnectionInteraction
{
private final Interaction _interaction;
+ private final ConnectionCloseOkBody _closeOkBody;
private Map<String, Object> _startOkClientProperties = new HashMap<>();
private String _startOkMechanism;
@@ -55,6 +58,9 @@ public class ConnectionInteraction
public ConnectionInteraction(final Interaction interaction)
{
_interaction = interaction;
+ _closeOkBody = interaction.getProtocolVersion() == ProtocolVersion.v0_8
+ ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8
+ : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9;
}
@@ -129,4 +135,9 @@ public class ConnectionInteraction
_closeClassId,
_closeMethodId));
}
+
+ public Interaction closeOk() throws Exception
+ {
+ return _interaction.sendPerformative(_closeOkBody);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
index 99fb224..93127ad 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
-import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -38,7 +37,6 @@ import org.apache.qpid.server.protocol.v0_8.transport.*;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.utils.BrokerSpecific;
public class TransactionTest extends BrokerAdminUsingTestBase
{
@@ -368,84 +366,4 @@ public class TransactionTest extends BrokerAdminUsingTestBase
.channel().close().consumeResponse(ChannelCloseOkBody.class);
}
}
-
-
- @Test
- @BrokerSpecific(kind = KIND_BROKER_J)
- public void publishTransactionTimeout() throws Exception
- {
- int transactionTimeout = 1000;
- getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
- {
- final Interaction interaction = transport.newInteraction();
- interaction.openAnonymousConnection()
- .channel().open().consumeResponse(ChannelOpenOkBody.class)
- .tx().select().consumeResponse(TxSelectOkBody.class)
- .basic().contentHeaderPropertiesContentType("text/plain")
- .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
- .contentHeaderPropertiesDeliveryMode((byte)1)
- .contentHeaderPropertiesPriority((byte)1)
- .publishExchange("")
- .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
- .content("Test")
- .publishMessage()
- .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-
- Thread.sleep(transactionTimeout + 1000);
-
- interaction.consumeResponse(ConnectionCloseBody.class);
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
- }
- }
-
- @Test
- @BrokerSpecific(kind = KIND_BROKER_J)
- public void consumeTransactionTimeout() throws Exception
- {
- int transactionTimeout = 1000;
- getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
-
- try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
- {
- final Interaction interaction = transport.newInteraction();
-
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
-
- interaction.openAnonymousConnection()
- .channel().open().consumeResponse(ChannelOpenOkBody.class)
- .tx().select().consumeResponse(TxSelectOkBody.class)
- .basic().qosPrefetchCount(1)
- .qos()
- .consumeResponse(BasicQosOkBody.class)
- .channel().flow(true)
- .consumeResponse(ChannelFlowOkBody.class)
- .basic()
- .consumeConsumerTag("A")
- .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
- .consume()
- .consumeResponse(BasicConsumeOkBody.class)
- .consumeResponse(BasicDeliverBody.class);
-
- BasicDeliverBody delivery = interaction.getLatestResponse(BasicDeliverBody.class);
- interaction.consumeResponse(ContentHeaderBody.class)
- .consumeResponse(ContentBody.class);
-
- interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
- .ack()
- .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
-
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
-
- Thread.sleep(transactionTimeout + 1000);
-
- interaction.consumeResponse(ConnectionCloseBody.class);
- assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
new file mode 100644
index 0000000..179b845
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.transactiontimeout;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
+public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void publishTransactionTimeout() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .tx().select().consumeResponse(TxSelectOkBody.class)
+ .basic().contentHeaderPropertiesContentType("text/plain")
+ .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
+ .contentHeaderPropertiesDeliveryMode((byte)1)
+ .contentHeaderPropertiesPriority((byte)1)
+ .publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .content("Test")
+ .publishMessage()
+ .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+
+ final ConnectionCloseBody close = interaction.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ assertThat(close.getReplyCode(), is(equalTo(ErrorCodes.RESOURCE_ERROR)));
+ assertThat(close.getReplyText().toString(), containsString("transaction timed out"));
+ interaction.connection().closeOk();
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ public void consumeTransactionTimeout() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .tx().select().consumeResponse(TxSelectOkBody.class)
+ .basic().qosPrefetchCount(1)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class)
+ .basic()
+ .consumeConsumerTag("A")
+ .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class)
+ .consumeResponse(BasicDeliverBody.class);
+
+ BasicDeliverBody delivery = interaction.getLatestResponse(BasicDeliverBody.class);
+ interaction.consumeResponse(ContentHeaderBody.class)
+ .consumeResponse(ContentBody.class);
+
+ interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
+ .ack()
+ .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+ final ConnectionCloseBody close = interaction.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ assertThat(close.getReplyCode(), is(equalTo(ErrorCodes.RESOURCE_ERROR)));
+ assertThat(close.getReplyText().toString(), containsString("transaction timed out"));
+ interaction.connection().closeOk();
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
new file mode 100644
index 0000000..d8608a4
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.transactiontimeout;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
+public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
+{
+ private static final String TEST_MESSAGE_CONTENT = "testMessageContent";
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ public void transactionalPostingTimeout() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ Disposition responseDisposition = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachHandle(linkHandle)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle)
+ .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transfer()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+ assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+ assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+ Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
+ assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+ assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
+ }
+ }
+
+ @Test
+ public void transactionalRetirementTimeout() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.RECEIVER)
+ .attachHandle(UnsignedInteger.ONE)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach()
+ .consumeResponse(Attach.class)
+
+ .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.MAX_VALUE)
+ .flowHandleFromLinkHandle()
+ .flow()
+
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .disposition()
+ .sync();
+
+ Response<?> response = interaction.consumeResponse(Close.class, Flow.class).getLatestResponse();
+ Close responseClose;
+ if (response.getBody() instanceof Close)
+ {
+ responseClose = (Close) response.getBody();
+ }
+ else
+ {
+ responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
+ }
+ assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+ assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index a893d76..85b5ecd 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.tests.protocol.v1_0.transaction;
-import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -33,7 +32,6 @@ import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
-import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -56,15 +54,14 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.utils.BrokerSpecific;
public class TransactionalTransferTest extends BrokerAdminUsingTestBase
{
@@ -647,121 +644,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
}
}
- @Test
- @BrokerSpecific(kind = KIND_BROKER_J)
- public void transactionalPostingTimeout() throws Exception
- {
- int transactionTimeout = 1000;
- getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
- {
- final UnsignedInteger linkHandle = UnsignedInteger.ONE;
-
- final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
- Disposition responseDisposition = interaction.negotiateProtocol()
- .consumeResponse()
- .open()
- .consumeResponse(Open.class)
- .begin()
- .consumeResponse(Begin.class)
-
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
-
- .attachRole(Role.SENDER)
- .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachHandle(linkHandle)
- .attach().consumeResponse(Attach.class)
- .consumeResponse(Flow.class)
-
- .transferHandle(linkHandle)
- .transferPayloadData(TEST_MESSAGE_CONTENT)
- .transferTransactionalState(txnState.getCurrentTransactionId())
- .transfer()
- .consumeResponse(Disposition.class)
- .getLatestResponse(Disposition.class);
-
- assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
- assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
- assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
- assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
-
- Thread.sleep(transactionTimeout + 1000);
-
- Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
- assertThat(responseClose.getError(), is(Matchers.notNullValue()));
- assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
- }
- }
-
- @Test
- @BrokerSpecific(kind = KIND_BROKER_J)
- public void transactionalRetirementTimeout() throws Exception
- {
- int transactionTimeout = 1000;
- getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
- getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
- try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
- {
- final Interaction interaction = transport.newInteraction();
- final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
- interaction.negotiateProtocol()
- .consumeResponse()
- .open()
- .consumeResponse(Open.class)
- .begin()
- .consumeResponse(Begin.class)
-
- .txnAttachCoordinatorLink(txnState)
- .txnDeclare(txnState)
-
- .attachRole(Role.RECEIVER)
- .attachHandle(UnsignedInteger.ONE)
- .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.FIRST)
- .attach()
- .consumeResponse(Attach.class)
-
- .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
- .flowOutgoingWindow(UnsignedInteger.ZERO)
- .flowNextOutgoingId(UnsignedInteger.ZERO)
- .flowLinkCredit(UnsignedInteger.MAX_VALUE)
- .flowHandleFromLinkHandle()
- .flow()
-
- .receiveDelivery()
- .decodeLatestDelivery();
-
- Object data = interaction.getDecodedLatestDelivery();
- assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
-
- interaction.dispositionSettled(true)
- .dispositionRole(Role.RECEIVER)
- .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
- .disposition()
- .sync();
-
- Thread.sleep(transactionTimeout + 1000);
- Response<?> response = interaction.consumeResponse(Close.class, Flow.class).getLatestResponse();
- Close responseClose;
- if (response.getBody() instanceof Close)
- {
- responseClose = (Close) response.getBody();
- }
- else
- {
- responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
- }
- assertThat(responseClose.getError(), is(Matchers.notNullValue()));
- assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
- }
- }
-
-
private void assertUnknownTransactionIdError(final Response<?> response)
{
assertThat(response, is(notNullValue()));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
index da6790b..5c4f974 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
@@ -59,9 +59,6 @@ public interface BrokerAdmin extends Pluggable
String getKind();
- void configure(String settingName, Object settingValue);
-
-
enum PortType
{
ANONYMOUS_AMQP,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItem.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItem.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItem.java
new file mode 100644
index 0000000..493f865
--- /dev/null
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItem.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.utils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Repeatable(ConfigItems.class)
+@Target({ElementType.TYPE})
+public @interface ConfigItem
+{
+ String name();
+ String value();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItems.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItems.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItems.java
new file mode 100644
index 0000000..9f76180
--- /dev/null
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItems.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.utils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface ConfigItems
+{
+ ConfigItem[] value();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index 9fab318..5333c7a 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import javax.security.auth.Subject;
@@ -88,7 +89,11 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
_currentWorkDirectory = Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName())).toString();
- Map<String,String> context = new HashMap<>();
+ Map<String, String> context = new HashMap<>();
+ context.putAll(Arrays.stream((ConfigItem[]) testClass.getAnnotationsByType(ConfigItem.class))
+ .collect(Collectors.toMap(ConfigItem::name,
+ ConfigItem::value,
+ (name, value) -> value)));
context.put("qpid.work_dir", _currentWorkDirectory);
context.put("qpid.port.protocol_handshake_timeout", "1000000");
@@ -404,12 +409,6 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
}
@Override
- public void configure(final String settingName, final Object settingValue)
- {
- _currentVirtualHostNode.getVirtualHost().setAttributes(Collections.singletonMap(settingName, settingValue));
- }
-
- @Override
public String getType()
{
return "EMBEDDED_BROKER_PER_CLASS";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
index 5f24546..5c3e1cc 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
@@ -162,12 +162,6 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
}
@Override
- public void configure(final String settingName, final Object settingValue)
- {
- throw new UnsupportedOperationException("External Qpid Broker does not support configuring");
- }
-
- @Override
public String getType()
{
return "EXTERNAL_BROKER";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
index d3c9be6..a03c129 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
@@ -64,6 +64,10 @@ public class QpidTestRunner extends BlockJUnit4ClassRunner
protected void runChild(final FrameworkMethod method, final RunNotifier notifier)
{
BrokerSpecific brokerSpecific = method.getAnnotation(BrokerSpecific.class);
+ if (brokerSpecific == null)
+ {
+ brokerSpecific = method.getClass().getAnnotation(BrokerSpecific.class);
+ }
if (brokerSpecific != null && !brokerSpecific.kind().equalsIgnoreCase(_brokerAdmin.getKind()))
{
notifier.fireTestIgnored(describeChild(method));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org