You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/02/09 16:09:22 UTC

qpid-broker-j git commit: QPID-8091: [Broker-J] Move transaction timeout protocol test to separate packages - this features is Broker-J specific.

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 63c315f07 -> d57815f89


QPID-8091: [Broker-J] Move transaction timeout protocol test to separate packages - this features is Broker-J specific.

Also refactored the new test broker configuration mechanism so that the configuration of the whole broker can be adjusted,
rather than just the virtualhost.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d57815f8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d57815f8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d57815f8

Branch: refs/heads/master
Commit: d57815f89427781bb3cf3d5f6c70b3b13a8604ff
Parents: 63c315f
Author: Keith Wall <kw...@apache.org>
Authored: Fri Feb 9 16:04:27 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Fri Feb 9 16:04:31 2018 +0000

----------------------------------------------------------------------
 .../tests/protocol/v0_10/TransactionTest.java   | 132 --------------
 .../TransactionTimeoutTest.java                 | 171 +++++++++++++++++++
 .../protocol/v0_8/ConnectionInteraction.java    |  11 ++
 .../tests/protocol/v0_8/TransactionTest.java    |  82 ---------
 .../TransactionTimeoutTest.java                 | 139 +++++++++++++++
 .../TransactionTimeoutTest.java                 | 171 +++++++++++++++++++
 .../transaction/TransactionalTransferTest.java  | 122 +------------
 .../apache/qpid/tests/utils/BrokerAdmin.java    |   3 -
 .../org/apache/qpid/tests/utils/ConfigItem.java |  36 ++++
 .../apache/qpid/tests/utils/ConfigItems.java    |  33 ++++
 .../utils/EmbeddedBrokerPerClassAdminImpl.java  |  13 +-
 .../utils/ExternalQpidBrokerAdminImpl.java      |   6 -
 .../apache/qpid/tests/utils/QpidTestRunner.java |   4 +
 13 files changed, 573 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
index 7c48e36..443aede 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
@@ -21,11 +21,8 @@
 package org.apache.qpid.tests.protocol.v0_10;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
-import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.net.InetSocketAddress;
@@ -33,20 +30,10 @@ import java.net.InetSocketAddress;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
-import org.apache.qpid.server.protocol.v0_10.transport.ConnectionCloseCode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
-import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
-import org.apache.qpid.server.protocol.v0_10.transport.Range;
-import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class TransactionTest extends BrokerAdminUsingTestBase
 {
@@ -102,123 +89,4 @@ public class TransactionTest extends BrokerAdminUsingTestBase
             assertThat(queueDepthMessages, is(equalTo(1)));
         }
     }
-
-    @Test
-    @BrokerSpecific(kind = KIND_BROKER_J)
-    public void publishTransactionTimeout() throws Exception
-    {
-        int transactionTimeout = 1000;
-        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
-        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
-        {
-            final Interaction interaction = transport.newInteraction();
-            byte[] sessionName = "test".getBytes(UTF_8);
-            interaction.openAnonymousConnection()
-                       .channelId(1)
-                       .attachSession(sessionName)
-                       .tx().selectId(0).select()
-                       .message()
-                       .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
-                       .transferId(1)
-                       .transfer()
-                       .session()
-                       .flushCompleted()
-                       .flush();
-
-            SessionCompleted completed;
-            do
-            {
-                completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
-            }
-            while (!completed.getCommands().includes(1));
-
-            int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
-            assertThat(queueDepthMessages, is(equalTo(0)));
-
-            Thread.sleep(transactionTimeout + 1000);
-
-            ConnectionClose close = receiveResponse(interaction, ConnectionClose.class);
-            assertThat(close.getReplyCode(), is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
-            assertThat(close.getReplyText(), containsString("transaction timed out"));
-
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-        }
-    }
-
-    @Test
-    @BrokerSpecific(kind = KIND_BROKER_J)
-    public void consumeTransactionTimeout() throws Exception
-    {
-        int transactionTimeout = 1000;
-        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
-        String testMessageBody = "testMessage";
-        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
-        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
-        {
-            final Interaction interaction = transport.newInteraction();
-            byte[] sessionName = "testSession".getBytes(UTF_8);
-            final String subscriberName = "testSubscriber";
-            interaction.openAnonymousConnection()
-                       .channelId(1)
-                       .attachSession(sessionName)
-                       .tx().selectId(0).select()
-                       .message()
-                       .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
-                       .subscribeAcquireMode(MessageAcquireMode.PRE_ACQUIRED)
-                       .subscribeDestination(subscriberName)
-                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
-                       .subscribeId(0)
-                       .subscribe()
-                       .message()
-                       .flowId(1)
-                       .flowDestination(subscriberName)
-                       .flowUnit(MessageCreditUnit.MESSAGE)
-                       .flowValue(1)
-                       .flow()
-                       .message()
-                       .flowId(2)
-                       .flowDestination(subscriberName)
-                       .flowUnit(MessageCreditUnit.BYTE)
-                       .flowValue(-1)
-                       .flow();
-
-            MessageTransfer transfer = receiveResponse(interaction, MessageTransfer.class);
-
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
-
-            RangeSet transfers = Range.newInstance(transfer.getId());
-            interaction.message().acceptId(3).acceptTransfers(transfers).accept()
-                       .session()
-                       .flushCompleted()
-                       .flush();
-
-            SessionCompleted completed = receiveResponse(interaction, SessionCompleted.class);
-
-            assertThat(completed.getCommands(), is(notNullValue()));
-            assertThat(completed.getCommands().includes(3), is(equalTo(true)));
-
-            Thread.sleep(transactionTimeout + 1000);
-
-            ConnectionClose close = receiveResponse(interaction, ConnectionClose.class);
-            assertThat(close.getReplyCode(), is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
-            assertThat(close.getReplyText(), containsString("transaction timed out"));
-        }
-    }
-
-    private <T> T receiveResponse(final Interaction interaction, Class<T> clazz) throws Exception
-    {
-        T result = null;
-        do
-        {
-            Response<?> response = interaction.consumeResponse().getLatestResponse();
-            if (clazz.isInstance(response.getBody()))
-            {
-                result = (T) response.getBody();
-            }
-        }
-        while (result == null);
-        return result;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extenstions/transactiontimeout/TransactionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extenstions/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extenstions/transactiontimeout/TransactionTimeoutTest.java
new file mode 100644
index 0000000..e1f35d6
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extenstions/transactiontimeout/TransactionTimeoutTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extenstions.transactiontimeout;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionCloseCode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.Range;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
+public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void publishTransactionTimeout() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "test".getBytes(UTF_8);
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .tx().selectId(0).select()
+                       .message()
+                       .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
+                       .transferId(1)
+                       .transfer()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            SessionCompleted completed;
+            do
+            {
+                completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
+            }
+            while (!completed.getCommands().includes(1));
+
+            int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(queueDepthMessages, is(equalTo(0)));
+
+            ConnectionClose close = receiveResponse(interaction, ConnectionClose.class);
+            assertThat(close.getReplyCode(), is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
+            assertThat(close.getReplyText(), containsString("transaction timed out"));
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    public void consumeTransactionTimeout() throws Exception
+    {
+        String testMessageBody = "testMessage";
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "testSession".getBytes(UTF_8);
+            final String subscriberName = "testSubscriber";
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .tx().selectId(0).select()
+                       .message()
+                       .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
+                       .subscribeAcquireMode(MessageAcquireMode.PRE_ACQUIRED)
+                       .subscribeDestination(subscriberName)
+                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .subscribeId(0)
+                       .subscribe()
+                       .message()
+                       .flowId(1)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.MESSAGE)
+                       .flowValue(1)
+                       .flow()
+                       .message()
+                       .flowId(2)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.BYTE)
+                       .flowValue(-1)
+                       .flow();
+
+            MessageTransfer transfer = receiveResponse(interaction, MessageTransfer.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+            RangeSet transfers = Range.newInstance(transfer.getId());
+            interaction.message().acceptId(3).acceptTransfers(transfers).accept()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            SessionCompleted completed = receiveResponse(interaction, SessionCompleted.class);
+
+            assertThat(completed.getCommands(), is(notNullValue()));
+            assertThat(completed.getCommands().includes(3), is(equalTo(true)));
+
+            ConnectionClose close = receiveResponse(interaction, ConnectionClose.class);
+            assertThat(close.getReplyCode(), is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
+            assertThat(close.getReplyText(), containsString("transaction timed out"));
+        }
+    }
+
+    private <T> T receiveResponse(final Interaction interaction, Class<T> clazz) throws Exception
+    {
+        T result = null;
+        do
+        {
+            Response<?> response = interaction.consumeResponse().getLatestResponse();
+            if (clazz.isInstance(response.getBody()))
+            {
+                result = (T) response.getBody();
+            }
+        }
+        while (result == null);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
index d7b661f..bc3bc24 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
@@ -25,9 +25,11 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.ProtocolVersion;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartOkBody;
@@ -36,6 +38,7 @@ import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneOkBody;
 public class ConnectionInteraction
 {
     private final Interaction _interaction;
+    private final ConnectionCloseOkBody _closeOkBody;
 
     private Map<String, Object> _startOkClientProperties = new HashMap<>();
     private String _startOkMechanism;
@@ -55,6 +58,9 @@ public class ConnectionInteraction
     public ConnectionInteraction(final Interaction interaction)
     {
         _interaction = interaction;
+        _closeOkBody = interaction.getProtocolVersion() == ProtocolVersion.v0_8
+                ? ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8
+                : ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9;
     }
 
 
@@ -129,4 +135,9 @@ public class ConnectionInteraction
                                                                      _closeClassId,
                                                                      _closeMethodId));
     }
+
+    public Interaction closeOk() throws Exception
+    {
+        return _interaction.sendPerformative(_closeOkBody);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
index 99fb224..93127ad 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
-import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -38,7 +37,6 @@ import org.apache.qpid.server.protocol.v0_8.transport.*;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class TransactionTest extends BrokerAdminUsingTestBase
 {
@@ -368,84 +366,4 @@ public class TransactionTest extends BrokerAdminUsingTestBase
                        .channel().close().consumeResponse(ChannelCloseOkBody.class);
         }
     }
-
-
-    @Test
-    @BrokerSpecific(kind = KIND_BROKER_J)
-    public void publishTransactionTimeout() throws Exception
-    {
-        int transactionTimeout = 1000;
-        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
-        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
-        {
-            final Interaction interaction = transport.newInteraction();
-            interaction.openAnonymousConnection()
-                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
-                       .tx().select().consumeResponse(TxSelectOkBody.class)
-                       .basic().contentHeaderPropertiesContentType("text/plain")
-                       .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
-                       .contentHeaderPropertiesDeliveryMode((byte)1)
-                       .contentHeaderPropertiesPriority((byte)1)
-                       .publishExchange("")
-                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
-                       .content("Test")
-                       .publishMessage()
-                       .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-
-            Thread.sleep(transactionTimeout + 1000);
-
-            interaction.consumeResponse(ConnectionCloseBody.class);
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
-        }
-    }
-
-    @Test
-    @BrokerSpecific(kind = KIND_BROKER_J)
-    public void consumeTransactionTimeout() throws Exception
-    {
-        int transactionTimeout = 1000;
-        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
-        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
-
-        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
-        {
-            final Interaction interaction = transport.newInteraction();
-
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
-
-            interaction.openAnonymousConnection()
-                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
-                       .tx().select().consumeResponse(TxSelectOkBody.class)
-                       .basic().qosPrefetchCount(1)
-                       .qos()
-                       .consumeResponse(BasicQosOkBody.class)
-                       .channel().flow(true)
-                       .consumeResponse(ChannelFlowOkBody.class)
-                       .basic()
-                       .consumeConsumerTag("A")
-                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
-                       .consume()
-                       .consumeResponse(BasicConsumeOkBody.class)
-                       .consumeResponse(BasicDeliverBody.class);
-
-            BasicDeliverBody delivery = interaction.getLatestResponse(BasicDeliverBody.class);
-            interaction.consumeResponse(ContentHeaderBody.class)
-                       .consumeResponse(ContentBody.class);
-
-            interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
-                       .ack()
-                       .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
-
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
-
-            Thread.sleep(transactionTimeout + 1000);
-
-            interaction.consumeResponse(ConnectionCloseBody.class);
-            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
new file mode 100644
index 0000000..179b845
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/transactiontimeout/TransactionTimeoutTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.transactiontimeout;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
+public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void publishTransactionTimeout() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .tx().select().consumeResponse(TxSelectOkBody.class)
+                       .basic().contentHeaderPropertiesContentType("text/plain")
+                       .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
+                       .contentHeaderPropertiesDeliveryMode((byte)1)
+                       .contentHeaderPropertiesPriority((byte)1)
+                       .publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .content("Test")
+                       .publishMessage()
+                       .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+
+            final ConnectionCloseBody close = interaction.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+            assertThat(close.getReplyCode(), is(equalTo(ErrorCodes.RESOURCE_ERROR)));
+            assertThat(close.getReplyText().toString(), containsString("transaction timed out"));
+            interaction.connection().closeOk();
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    public void consumeTransactionTimeout() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .tx().select().consumeResponse(TxSelectOkBody.class)
+                       .basic().qosPrefetchCount(1)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic()
+                       .consumeConsumerTag("A")
+                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class)
+                       .consumeResponse(BasicDeliverBody.class);
+
+            BasicDeliverBody delivery = interaction.getLatestResponse(BasicDeliverBody.class);
+            interaction.consumeResponse(ContentHeaderBody.class)
+                       .consumeResponse(ContentBody.class);
+
+            interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
+                       .ack()
+                       .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+            final ConnectionCloseBody close = interaction.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+            assertThat(close.getReplyCode(), is(equalTo(ErrorCodes.RESOURCE_ERROR)));
+            assertThat(close.getReplyText().toString(), containsString("transaction timed out"));
+            interaction.connection().closeOk();
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
new file mode 100644
index 0000000..d8608a4
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.transactiontimeout;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "virtualhost.storeTransactionOpenTimeoutClose", value = "1000")
+public class TransactionTimeoutTest extends BrokerAdminUsingTestBase
+{
+    private static final String TEST_MESSAGE_CONTENT = "testMessageContent";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    public void transactionalPostingTimeout() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Disposition responseDisposition = interaction.negotiateProtocol()
+                                                         .consumeResponse()
+                                                         .open()
+                                                         .consumeResponse(Open.class)
+                                                         .begin()
+                                                         .consumeResponse(Begin.class)
+
+                                                         .txnAttachCoordinatorLink(txnState)
+                                                         .txnDeclare(txnState)
+
+                                                         .attachRole(Role.SENDER)
+                                                         .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                         .attachHandle(linkHandle)
+                                                         .attach().consumeResponse(Attach.class)
+                                                         .consumeResponse(Flow.class)
+
+                                                         .transferHandle(linkHandle)
+                                                         .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                         .transferTransactionalState(txnState.getCurrentTransactionId())
+                                                         .transfer()
+                                                         .consumeResponse(Disposition.class)
+                                                         .getLatestResponse(Disposition.class);
+
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+            Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
+            assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+            assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
+        }
+    }
+
+    @Test
+    public void transactionalRetirementTimeout() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       .attach()
+                       .consumeResponse(Attach.class)
+
+                       .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.MAX_VALUE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .disposition()
+                       .sync();
+
+            Response<?> response = interaction.consumeResponse(Close.class, Flow.class).getLatestResponse();
+            Close responseClose;
+            if (response.getBody() instanceof Close)
+            {
+                responseClose = (Close) response.getBody();
+            }
+            else
+            {
+                responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
+            }
+            assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+            assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index a893d76..85b5ecd 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.tests.protocol.v1_0.transaction;
 
-import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -33,7 +32,6 @@ import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
 
-import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -56,15 +54,14 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class TransactionalTransferTest extends BrokerAdminUsingTestBase
 {
@@ -647,121 +644,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
         }
     }
 
-    @Test
-    @BrokerSpecific(kind = KIND_BROKER_J)
-    public void transactionalPostingTimeout() throws Exception
-    {
-        int transactionTimeout = 1000;
-            getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
-        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
-        {
-            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
-
-            final Interaction interaction = transport.newInteraction();
-            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
-            Disposition responseDisposition = interaction.negotiateProtocol()
-                                                         .consumeResponse()
-                                                         .open()
-                                                         .consumeResponse(Open.class)
-                                                         .begin()
-                                                         .consumeResponse(Begin.class)
-
-                                                         .txnAttachCoordinatorLink(txnState)
-                                                         .txnDeclare(txnState)
-
-                                                         .attachRole(Role.SENDER)
-                                                         .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                                                         .attachHandle(linkHandle)
-                                                         .attach().consumeResponse(Attach.class)
-                                                         .consumeResponse(Flow.class)
-
-                                                         .transferHandle(linkHandle)
-                                                         .transferPayloadData(TEST_MESSAGE_CONTENT)
-                                                         .transferTransactionalState(txnState.getCurrentTransactionId())
-                                                         .transfer()
-                                                         .consumeResponse(Disposition.class)
-                                                         .getLatestResponse(Disposition.class);
-
-            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
-            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
-            assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
-            assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
-
-            Thread.sleep(transactionTimeout + 1000);
-
-            Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
-            assertThat(responseClose.getError(), is(Matchers.notNullValue()));
-            assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
-        }
-    }
-
-    @Test
-    @BrokerSpecific(kind = KIND_BROKER_J)
-    public void transactionalRetirementTimeout() throws Exception
-    {
-        int transactionTimeout = 1000;
-        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
-
-        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
-        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
-        {
-            final Interaction interaction = transport.newInteraction();
-            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
-            interaction.negotiateProtocol()
-                       .consumeResponse()
-                       .open()
-                       .consumeResponse(Open.class)
-                       .begin()
-                       .consumeResponse(Begin.class)
-
-                       .txnAttachCoordinatorLink(txnState)
-                       .txnDeclare(txnState)
-
-                       .attachRole(Role.RECEIVER)
-                       .attachHandle(UnsignedInteger.ONE)
-                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
-                       .attach()
-                       .consumeResponse(Attach.class)
-
-                       .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
-                       .flowOutgoingWindow(UnsignedInteger.ZERO)
-                       .flowNextOutgoingId(UnsignedInteger.ZERO)
-                       .flowLinkCredit(UnsignedInteger.MAX_VALUE)
-                       .flowHandleFromLinkHandle()
-                       .flow()
-
-                       .receiveDelivery()
-                       .decodeLatestDelivery();
-
-            Object data = interaction.getDecodedLatestDelivery();
-            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
-
-            interaction.dispositionSettled(true)
-                       .dispositionRole(Role.RECEIVER)
-                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
-                       .disposition()
-                       .sync();
-
-            Thread.sleep(transactionTimeout + 1000);
-            Response<?> response = interaction.consumeResponse(Close.class, Flow.class).getLatestResponse();
-            Close responseClose;
-            if (response.getBody() instanceof Close)
-            {
-                responseClose = (Close) response.getBody();
-            }
-            else
-            {
-                responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
-            }
-            assertThat(responseClose.getError(), is(Matchers.notNullValue()));
-            assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
-        }
-    }
-
-
     private void assertUnknownTransactionIdError(final Response<?> response)
     {
         assertThat(response, is(notNullValue()));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
index da6790b..5c4f974 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
@@ -59,9 +59,6 @@ public interface BrokerAdmin extends Pluggable
 
     String getKind();
 
-    void configure(String settingName, Object settingValue);
-
-
     enum PortType
     {
         ANONYMOUS_AMQP,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItem.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItem.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItem.java
new file mode 100644
index 0000000..493f865
--- /dev/null
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItem.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.utils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Repeatable(ConfigItems.class)
+@Target({ElementType.TYPE})
+public @interface ConfigItem
+{
+    String name();
+    String value();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItems.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItems.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItems.java
new file mode 100644
index 0000000..9f76180
--- /dev/null
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ConfigItems.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.utils;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface ConfigItems
+{
+    ConfigItem[] value();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index 9fab318..5333c7a 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -36,6 +36,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import javax.security.auth.Subject;
 
@@ -88,7 +89,11 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
             String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
             _currentWorkDirectory = Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName())).toString();
 
-            Map<String,String> context = new HashMap<>();
+            Map<String, String> context = new HashMap<>();
+            context.putAll(Arrays.stream((ConfigItem[]) testClass.getAnnotationsByType(ConfigItem.class))
+                                 .collect(Collectors.toMap(ConfigItem::name,
+                                                           ConfigItem::value,
+                                                           (name, value) -> value)));
             context.put("qpid.work_dir", _currentWorkDirectory);
             context.put("qpid.port.protocol_handshake_timeout", "1000000");
 
@@ -404,12 +409,6 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
     }
 
     @Override
-    public void configure(final String settingName, final Object settingValue)
-    {
-        _currentVirtualHostNode.getVirtualHost().setAttributes(Collections.singletonMap(settingName, settingValue));
-    }
-
-    @Override
     public String getType()
     {
         return "EMBEDDED_BROKER_PER_CLASS";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
index 5f24546..5c3e1cc 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
@@ -162,12 +162,6 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     }
 
     @Override
-    public void configure(final String settingName, final Object settingValue)
-    {
-        throw new UnsupportedOperationException("External Qpid Broker does not support configuring");
-    }
-
-    @Override
     public String getType()
     {
         return "EXTERNAL_BROKER";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d57815f8/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
index d3c9be6..a03c129 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
@@ -64,6 +64,10 @@ public class QpidTestRunner extends BlockJUnit4ClassRunner
     protected void runChild(final FrameworkMethod method, final RunNotifier notifier)
     {
         BrokerSpecific brokerSpecific = method.getAnnotation(BrokerSpecific.class);
+        if (brokerSpecific == null)
+        {
+            brokerSpecific = method.getClass().getAnnotation(BrokerSpecific.class);
+        }
         if (brokerSpecific != null && !brokerSpecific.kind().equalsIgnoreCase(_brokerAdmin.getKind()))
         {
             notifier.fireTestIgnored(describeChild(method));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org