You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/02/08 00:56:31 UTC

[2/2] qpid-broker-j git commit: QPID-8091: [Broker-J] Add protocol tests for transaction timeout feature

QPID-8091: [Broker-J] Add protocol tests for transaction timeout feature


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c531ca0a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c531ca0a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c531ca0a

Branch: refs/heads/master
Commit: c531ca0ac28e5fd457b4b114674867b3bd2ee093
Parents: 46c49cf
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Feb 7 23:48:05 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Feb 7 23:48:46 2018 +0000

----------------------------------------------------------------------
 .../tests/protocol/v0_10/TransactionTest.java   | 142 +++++++++++++++++++
 .../tests/protocol/v0_8/TransactionTest.java    |  82 +++++++++++
 .../transaction/TransactionalTransferTest.java  | 118 +++++++++++++++
 .../apache/qpid/tests/utils/BrokerAdmin.java    |   4 +
 .../utils/EmbeddedBrokerPerClassAdminImpl.java  |  13 +-
 .../utils/ExternalQpidBrokerAdminImpl.java      |  12 ++
 .../apache/qpid/tests/utils/QpidTestRunner.java |  18 ++-
 7 files changed, 383 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c531ca0a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
index 443aede..7259e66 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
@@ -21,8 +21,11 @@
 package org.apache.qpid.tests.protocol.v0_10;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.net.InetSocketAddress;
@@ -30,10 +33,24 @@ import java.net.InetSocketAddress;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.Range;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class TransactionTest extends BrokerAdminUsingTestBase
 {
@@ -89,4 +106,129 @@ public class TransactionTest extends BrokerAdminUsingTestBase
             assertThat(queueDepthMessages, is(equalTo(1)));
         }
     }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    public void publishTransactionTimeout() throws Exception
+    {
+        int transactionTimeout = 1000;
+        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "test".getBytes(UTF_8);
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .tx().selectId(0).select()
+                       .message()
+                       .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
+                       .transferId(1)
+                       .transfer()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            SessionCompleted completed;
+            do
+            {
+                completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
+            }
+            while (!completed.getCommands().includes(1));
+
+            int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(queueDepthMessages, is(equalTo(0)));
+
+            Thread.sleep(transactionTimeout + 1000);
+
+            ExecutionException e = receiveResponse(interaction, ExecutionException.class);
+            assertThat(e.getErrorCode(), is(equalTo(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED)));
+            assertThat(e.getDescription(), containsString("transaction timed out"));
+
+            SessionDetach detach = receiveResponse(interaction, SessionDetach.class);
+            assertThat(detach.getName(), is(equalTo(sessionName)));
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    public void consumeTransactionTimeout() throws Exception
+    {
+        int transactionTimeout = 1000;
+        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
+
+        String testMessageBody = "testMessage";
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "testSession".getBytes(UTF_8);
+            final String subscriberName = "testSubscriber";
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .tx().selectId(0).select()
+                       .message()
+                       .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
+                       .subscribeAcquireMode(MessageAcquireMode.PRE_ACQUIRED)
+                       .subscribeDestination(subscriberName)
+                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .subscribeId(0)
+                       .subscribe()
+                       .message()
+                       .flowId(1)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.MESSAGE)
+                       .flowValue(1)
+                       .flow()
+                       .message()
+                       .flowId(2)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.BYTE)
+                       .flowValue(-1)
+                       .flow();
+
+            MessageTransfer transfer = receiveResponse(interaction, MessageTransfer.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+            RangeSet transfers = Range.newInstance(transfer.getId());
+            interaction.message().acceptId(3).acceptTransfers(transfers).accept()
+                       .session()
+                       .flushCompleted()
+                       .flush();
+
+            SessionCompleted completed = receiveResponse(interaction, SessionCompleted.class);
+
+            assertThat(completed.getCommands(), is(notNullValue()));
+            assertThat(completed.getCommands().includes(3), is(equalTo(true)));
+
+            Thread.sleep(transactionTimeout + 1000);
+
+            ExecutionException e = receiveResponse(interaction, ExecutionException.class);
+            assertThat(e.getErrorCode(), is(equalTo(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED)));
+            assertThat(e.getDescription(), containsString("transaction timed out"));
+
+            SessionDetach detach = receiveResponse(interaction, SessionDetach.class);
+            assertThat(detach.getName(), is(equalTo(sessionName)));
+        }
+    }
+
+    private <T> T receiveResponse(final Interaction interaction, Class<T> clazz) throws Exception
+    {
+        T result = null;
+        do
+        {
+            Response<?> response = interaction.consumeResponse().getLatestResponse();
+            if (clazz.isInstance(response.getBody()))
+            {
+                result = (T) response.getBody();
+            }
+        }
+        while (result == null);
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c531ca0a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
index 93127ad..99fb224 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -37,6 +38,7 @@ import org.apache.qpid.server.protocol.v0_8.transport.*;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class TransactionTest extends BrokerAdminUsingTestBase
 {
@@ -366,4 +368,84 @@ public class TransactionTest extends BrokerAdminUsingTestBase
                        .channel().close().consumeResponse(ChannelCloseOkBody.class);
         }
     }
+
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    public void publishTransactionTimeout() throws Exception
+    {
+        int transactionTimeout = 1000;
+        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .tx().select().consumeResponse(TxSelectOkBody.class)
+                       .basic().contentHeaderPropertiesContentType("text/plain")
+                       .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
+                       .contentHeaderPropertiesDeliveryMode((byte)1)
+                       .contentHeaderPropertiesPriority((byte)1)
+                       .publishExchange("")
+                       .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                       .content("Test")
+                       .publishMessage()
+                       .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+
+            Thread.sleep(transactionTimeout + 1000);
+
+            interaction.consumeResponse(ConnectionCloseBody.class);
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    public void consumeTransactionTimeout() throws Exception
+    {
+        int transactionTimeout = 1000;
+        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
+
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .tx().select().consumeResponse(TxSelectOkBody.class)
+                       .basic().qosPrefetchCount(1)
+                       .qos()
+                       .consumeResponse(BasicQosOkBody.class)
+                       .channel().flow(true)
+                       .consumeResponse(ChannelFlowOkBody.class)
+                       .basic()
+                       .consumeConsumerTag("A")
+                       .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .consume()
+                       .consumeResponse(BasicConsumeOkBody.class)
+                       .consumeResponse(BasicDeliverBody.class);
+
+            BasicDeliverBody delivery = interaction.getLatestResponse(BasicDeliverBody.class);
+            interaction.consumeResponse(ContentHeaderBody.class)
+                       .consumeResponse(ContentBody.class);
+
+            interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
+                       .ack()
+                       .exchange().declarePassive(true).declare().consumeResponse(ExchangeDeclareOkBody.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+            Thread.sleep(transactionTimeout + 1000);
+
+            interaction.consumeResponse(ConnectionCloseBody.class);
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c531ca0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index fb61974..a893d76 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.tests.protocol.v1_0.transaction;
 
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -32,6 +33,7 @@ import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
 
+import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -62,6 +64,7 @@ import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class TransactionalTransferTest extends BrokerAdminUsingTestBase
 {
@@ -644,6 +647,121 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
         }
     }
 
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    public void transactionalPostingTimeout() throws Exception
+    {
+        int transactionTimeout = 1000;
+            getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Disposition responseDisposition = interaction.negotiateProtocol()
+                                                         .consumeResponse()
+                                                         .open()
+                                                         .consumeResponse(Open.class)
+                                                         .begin()
+                                                         .consumeResponse(Begin.class)
+
+                                                         .txnAttachCoordinatorLink(txnState)
+                                                         .txnDeclare(txnState)
+
+                                                         .attachRole(Role.SENDER)
+                                                         .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                         .attachHandle(linkHandle)
+                                                         .attach().consumeResponse(Attach.class)
+                                                         .consumeResponse(Flow.class)
+
+                                                         .transferHandle(linkHandle)
+                                                         .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                         .transferTransactionalState(txnState.getCurrentTransactionId())
+                                                         .transfer()
+                                                         .consumeResponse(Disposition.class)
+                                                         .getLatestResponse(Disposition.class);
+
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+            Thread.sleep(transactionTimeout + 1000);
+
+            Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
+            assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+            assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
+        }
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    public void transactionalRetirementTimeout() throws Exception
+    {
+        int transactionTimeout = 1000;
+        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout);
+
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       .attach()
+                       .consumeResponse(Attach.class)
+
+                       .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.MAX_VALUE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .disposition()
+                       .sync();
+
+            Thread.sleep(transactionTimeout + 1000);
+            Response<?> response = interaction.consumeResponse(Close.class, Flow.class).getLatestResponse();
+            Close responseClose;
+            if (response.getBody() instanceof Close)
+            {
+                responseClose = (Close) response.getBody();
+            }
+            else
+            {
+                responseClose = interaction.consumeResponse().getLatestResponse(Close.class);
+            }
+            assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+            assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT));
+        }
+    }
+
+
     private void assertUnknownTransactionIdError(final Response<?> response)
     {
         assertThat(response, is(notNullValue()));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c531ca0a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
index 3ec35ad..da6790b 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.plugin.Pluggable;
 
 public interface BrokerAdmin extends Pluggable
 {
+    String KIND_BROKER_J = "broker-j";
     String TEST_QUEUE_NAME = "testQueue";
     Long RESTART_TIMEOUT = Long.getLong("brokerAdmin.restart_timeout", 10000);
 
@@ -56,6 +57,9 @@ public interface BrokerAdmin extends Pluggable
     String getValidUsername();
     String getValidPassword();
 
+    String getKind();
+
+    void configure(String settingName, Object settingValue);
 
 
     enum PortType

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c531ca0a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index 6016f56..9fab318 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
-import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -399,6 +398,18 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
     }
 
     @Override
+    public String getKind()
+    {
+        return KIND_BROKER_J;
+    }
+
+    @Override
+    public void configure(final String settingName, final Object settingValue)
+    {
+        _currentVirtualHostNode.getVirtualHost().setAttributes(Collections.singletonMap(settingName, settingValue));
+    }
+
+    @Override
     public String getType()
     {
         return "EMBEDDED_BROKER_PER_CLASS";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c531ca0a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
index f359053..5f24546 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
@@ -156,6 +156,18 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     }
 
     @Override
+    public String getKind()
+    {
+        return KIND_BROKER_J;
+    }
+
+    @Override
+    public void configure(final String settingName, final Object settingValue)
+    {
+        throw new UnsupportedOperationException("External Qpid Broker does not support configuring");
+    }
+
+    @Override
     public String getType()
     {
         return "EXTERNAL_BROKER";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c531ca0a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
index 02cec34..d3c9be6 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
@@ -63,14 +63,22 @@ public class QpidTestRunner extends BlockJUnit4ClassRunner
     @Override
     protected void runChild(final FrameworkMethod method, final RunNotifier notifier)
     {
-        _brokerAdmin.beforeTestMethod(_testClass, method.getMethod());
-        try
+        BrokerSpecific brokerSpecific = method.getAnnotation(BrokerSpecific.class);
+        if (brokerSpecific != null && !brokerSpecific.kind().equalsIgnoreCase(_brokerAdmin.getKind()))
         {
-            super.runChild(method, notifier);
+            notifier.fireTestIgnored(describeChild(method));
         }
-        finally
+        else
         {
-            _brokerAdmin.afterTestMethod(_testClass, method.getMethod());
+            _brokerAdmin.beforeTestMethod(_testClass, method.getMethod());
+            try
+            {
+                super.runChild(method, notifier);
+            }
+            finally
+            {
+                _brokerAdmin.afterTestMethod(_testClass, method.getMethod());
+            }
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org