You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/21 16:35:45 UTC

[qpid-broker-j] 03/21: QPID-8349: [Tests][AMQP 1.0] Introduce QueueAdmin to delegate queue related operations in external broker admin

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit a8d14bfb38721b3b6674c312d4cc0c552372491b
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Aug 21 16:14:54 2019 +0100

    QPID-8349: [Tests][AMQP 1.0] Introduce QueueAdmin to delegate queue related operations in external broker admin
---
 .../tests/protocol/v1_0/ExistingQueueAdmin.java    | 224 +++++++++++++++++++++
 .../qpid/tests/protocol/v1_0/Interaction.java      |   7 +-
 .../protocol/v1_0/ExistingQueueAdminTest.java      | 146 ++++++++++++++
 ...nBrokerAdmin.java => BrokerAdminException.java} |  21 +-
 .../qpid/tests/utils/BrokerAdminFactory.java       |   2 +-
 .../utils/EmbeddedBrokerPerClassAdminImpl.java     |   2 +-
 .../tests/utils/ExternalQpidBrokerAdminImpl.java   |  29 ++-
 .../apache/qpid/tests/utils/NoOpQueueAdmin.java    |  60 ++++++
 .../utils/{RunBrokerAdmin.java => QueueAdmin.java} |  18 +-
 .../apache/qpid/tests/utils/QueueAdminFactory.java |  50 +++++
 .../apache/qpid/tests/utils/RunBrokerAdmin.java    |   4 +-
 .../qpid/tests/utils/BrokerAdminFactoryTest.java}  |  41 +++-
 .../utils/ExternalQpidBrokerAdminImplTest.java     | 103 ++++++++++
 .../qpid/tests/utils/NoOpQueueAdminTest.java       |  78 +++++++
 .../qpid/tests/utils/QueueAdminFactoryTest.java    | 133 ++++++++++++
 15 files changed, 882 insertions(+), 36 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
new file mode 100644
index 0000000..313c4ff
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminException;
+import org.apache.qpid.tests.utils.QueueAdmin;
+
+@SuppressWarnings("unused")
+public class ExistingQueueAdmin implements QueueAdmin
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueAdmin.class);
+    private static final String ADMIN_LINK_NAME = "existingQueueAdminLink";
+
+    @Override
+    public void createQueue(final BrokerAdmin brokerAdmin, final String queueName)
+    {
+
+    }
+
+    @Override
+    public void deleteQueue(final BrokerAdmin brokerAdmin, final String queueName)
+    {
+        try
+        {
+            drainQueue(brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP), queueName);
+        }
+        catch (Exception e)
+        {
+            throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e);
+        }
+    }
+
+    @Override
+    public void putMessageOnQueue(final BrokerAdmin brokerAdmin, final String queueName, final String... message)
+    {
+        final InetSocketAddress brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try
+        {
+            putMessageOnQueue(brokerAddress, queueName, message);
+        }
+        catch (Exception e)
+        {
+            throw new BrokerAdminException(String.format("Cannot put %d messages on a queue '%s'",
+                                                         message.length,
+                                                         queueName), e);
+        }
+    }
+
+    @Override
+    public boolean isDeleteQueueSupported()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isPutMessageOnQueueSupported()
+    {
+        return true;
+    }
+
+    private void putMessageOnQueue(final InetSocketAddress brokerAddress,
+                                   final String queueName,
+                                   final String... message) throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
+                       .attachName(ADMIN_LINK_NAME)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(queueName)
+                       .attachSndSettleMode(SenderSettleMode.SETTLED)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+                       .getLatestResponse(Flow.class);
+
+            int tag = 0;
+            for (final String payload : message)
+            {
+                interaction.transferPayloadData(payload)
+                           .transferSettled(true)
+                           .transferDeliveryId()
+                           .transferDeliveryTag(new Binary(String.valueOf(tag).getBytes(UTF_8)))
+                           .transfer()
+                           .sync();
+                tag++;
+            }
+            closeInteraction(interaction);
+        }
+    }
+
+    private void closeInteraction(final Interaction interaction) throws Exception
+    {
+        interaction.detachClose(true)
+                   .detach()
+                   .consumeResponse(Detach.class)
+                   .end()
+                   .consumeResponse(End.class)
+                   .doCloseConnection();
+    }
+
+
+    private void drainQueue(final InetSocketAddress brokerAddress, final String queueName) throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse()
+                       .begin().consumeResponse()
+                       .attachName(ADMIN_LINK_NAME)
+                       .attachRole(Role.RECEIVER)
+                       .attachSndSettleMode(SenderSettleMode.SETTLED)
+                       .attachSourceAddress(queueName)
+                       .attach().consumeResponse();
+
+            boolean received;
+            final Begin begin = interaction.getCachedResponse(Begin.class);
+            int nextIncomingId = begin.getNextOutgoingId().intValue();
+            do
+            {
+                received = receive(interaction, queueName, nextIncomingId);
+                nextIncomingId++;
+            }
+            while (received);
+            closeInteraction(interaction);
+        }
+    }
+
+    private boolean receive(final Interaction interaction, String queueName, int nextIncomingId) throws Exception
+    {
+        interaction.flowIncomingWindow(UnsignedInteger.MAX_VALUE)
+                   .flowNextIncomingId(UnsignedInteger.valueOf(nextIncomingId))
+                   .flowLinkCredit(UnsignedInteger.ONE)
+                   .flowDrain(Boolean.TRUE)
+                   .flowHandleFromLinkHandle()
+                   .flowOutgoingWindow(UnsignedInteger.ZERO)
+                   .flowNextOutgoingId(UnsignedInteger.ZERO)
+                   .flow();
+
+        boolean messageReceived = false;
+        boolean flowReceived = false;
+        do
+        {
+            Response<?> latestResponse;
+            try
+            {
+                latestResponse = interaction.consumeResponse(Transfer.class, Flow.class).getLatestResponse();
+            }
+            catch (IllegalStateException e)
+            {
+                if (messageReceived)
+                {
+                    LOGGER.debug(
+                            "Message was received on draining queue '{}' but flow was not. Assuming successful receive...",
+                            queueName,
+                            e);
+                }
+                else
+                {
+                    LOGGER.warn(
+                            "Neither message no flow was received on draining queue '{}'.  Assuming no messages on the queue...",
+                            queueName,
+                            e);
+                }
+                return messageReceived;
+            }
+            if (latestResponse.getBody() instanceof Transfer)
+            {
+                Transfer responseTransfer = (Transfer) latestResponse.getBody();
+                if (!Boolean.TRUE.equals(responseTransfer.getMore()))
+                {
+                    messageReceived = true;
+                }
+            }
+            else if (latestResponse.getBody() instanceof Flow)
+            {
+                flowReceived = true;
+            }
+        }
+        while (!flowReceived);
+        return messageReceived;
+    }
+}
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 1437e19..4df4db0 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -650,10 +650,15 @@ public class Interaction extends AbstractInteraction<Interaction>
 
     public Interaction flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount()
     {
-        final Begin begin = (Begin) _latestResponses.get(Begin.class);
+        final Begin begin = getCachedResponse(Begin.class);
         return flowNextIncomingId(begin.getNextOutgoingId().add(UnsignedInteger.valueOf(_receivedDeliveryCount.get())));
     }
 
+    <T extends FrameBody> T getCachedResponse(final Class<T> responseClass)
+    {
+        return (T)_latestResponses.get(responseClass);
+    }
+
     public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow)
     {
         _flow.setOutgoingWindow(outgoingWindow);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdminTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdminTest.java
new file mode 100644
index 0000000..26b2050
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdminTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.InetSocketAddress;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.qpid.test.utils.UnitTestBase;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminException;
+import org.apache.qpid.tests.utils.EmbeddedBrokerPerClassAdminImpl;
+
+public class ExistingQueueAdminTest extends UnitTestBase
+{
+    private static BrokerAdmin _brokerAdmin;
+    private static InetSocketAddress _brokerAddress;
+
+    private ExistingQueueAdmin _queueAdmin;
+    private String _testQueueName;
+
+    @BeforeClass
+    public static void beforeSuite()
+    {
+        _brokerAdmin = new EmbeddedBrokerPerClassAdminImpl();
+        _brokerAdmin.beforeTestClass(ExistingQueueAdminTest.class);
+        _brokerAddress = _brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @AfterClass
+    public static void afterSuite()
+    {
+        _brokerAdmin.afterTestClass(ExistingQueueAdminTest.class);
+    }
+
+    @Before
+    public void before() throws NoSuchMethodException
+    {
+        _brokerAdmin.beforeTestMethod(getClass(), getClass().getMethod(getTestName()));
+        _brokerAdmin.createQueue(getTestName());
+        _queueAdmin = new ExistingQueueAdmin();
+        _testQueueName = getTestName();
+    }
+
+    @After
+    public void after() throws NoSuchMethodException
+    {
+        _brokerAdmin.afterTestMethod(getClass(), getClass().getMethod(getTestName()));
+    }
+
+
+    @Test
+    public void createQueue()
+    {
+        _queueAdmin.createQueue(_brokerAdmin, getTestName());
+    }
+
+    @Test
+    public void deleteQueue() throws Exception
+    {
+        final String[] messages = Utils.createTestMessageContents(2, _testQueueName);
+        _brokerAdmin.putMessageOnQueue(_testQueueName, messages);
+
+        _queueAdmin.deleteQueue(_brokerAdmin, _testQueueName);
+
+        final String controlMessage = String.format("controlMessage %s", _testQueueName);
+        _brokerAdmin.putMessageOnQueue(_testQueueName, controlMessage);
+        assertEquals(controlMessage, Utils.receiveMessage(_brokerAddress, _testQueueName));
+    }
+
+    @Test
+    public void deleteQueueNonExisting()
+    {
+        try
+        {
+            _queueAdmin.deleteQueue(_brokerAdmin, _testQueueName + "_NonExisting");
+            fail("Exception is expected");
+        }
+        catch (BrokerAdminException e)
+        {
+            // pass
+        }
+    }
+
+    @Test
+    public void putMessageOnQueue() throws Exception
+    {
+        final String[] messages = Utils.createTestMessageContents(2, _testQueueName);
+        _queueAdmin.putMessageOnQueue(_brokerAdmin, _testQueueName, messages);
+        assertEquals(messages[0], Utils.receiveMessage(_brokerAddress, _testQueueName));
+        assertEquals(messages[1], Utils.receiveMessage(_brokerAddress, _testQueueName));
+    }
+
+    @Test
+    public void putMessageOnQueueNonExisting()
+    {
+        final String[] messages = Utils.createTestMessageContents(2, _testQueueName);
+        try
+        {
+            _queueAdmin.putMessageOnQueue(_brokerAdmin, _testQueueName + "_NonExisting", messages);
+            fail("Exception is expected"); }
+        catch (BrokerAdminException e)
+        {
+            // pass
+        }
+    }
+
+    @Test
+    public void isDeleteQueueSupported()
+    {
+        assertFalse(_queueAdmin.isDeleteQueueSupported());
+    }
+
+    @Test
+    public void isPutMessageOnQueueSupported()
+    {
+        assertTrue(_queueAdmin.isPutMessageOnQueueSupported());
+    }
+}
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminException.java
similarity index 71%
copy from systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java
copy to systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminException.java
index ef8b05a..1d0c65f 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminException.java
@@ -1,4 +1,4 @@
-package org.apache.qpid.tests.utils;/*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,14 +19,17 @@ package org.apache.qpid.tests.utils;/*
  *
  */
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+package org.apache.qpid.tests.utils;
 
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE})
-public @interface RunBrokerAdmin
+public class BrokerAdminException extends RuntimeException
 {
-    String type() default "";
+    public BrokerAdminException(final String message)
+    {
+        super(message);
+    }
+
+    public BrokerAdminException(final String message, final Throwable cause)
+    {
+        super(message, cause);
+    }
 }
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminFactory.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminFactory.java
index 64d4eff..005b363 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminFactory.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminFactory.java
@@ -32,7 +32,7 @@ public class BrokerAdminFactory
         BrokerAdmin brokerAdmin = adminFacades.get(type);
         if (brokerAdmin == null)
         {
-            throw new RuntimeException(String.format("Could not find BrokerAdmin implementation of type '%s'", type));
+            throw new BrokerAdminException(String.format("Could not find BrokerAdmin implementation of type '%s'", type));
         }
         return brokerAdmin;
     }
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index 9ba6aed..f51c970 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -118,7 +118,7 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
         }
         catch (Exception e)
         {
-            throw new RuntimeException("Failed to start broker for test class", e);
+            throw new BrokerAdminException("Failed to start broker for test class", e);
         }
     }
 
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
index db5d44d..e125c21 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
@@ -23,6 +23,8 @@ package org.apache.qpid.tests.utils;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
@@ -38,6 +40,19 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     private static final String EXTERNAL_BROKER = "EXTERNAL_BROKER";
     private static final String KIND_BROKER_UNKNOWN = "unknown";
 
+    private final QueueAdmin _queueAdmin;
+    private final Set<String> _createdQueues;
+
+    public ExternalQpidBrokerAdminImpl()
+    {
+       this(new QueueAdminFactory().create());
+    }
+
+    ExternalQpidBrokerAdminImpl(QueueAdmin queueAdmin)
+    {
+        _queueAdmin = queueAdmin;
+        _createdQueues = new HashSet<>();
+    }
     @Override
     public void beforeTestClass(final Class testClass)
     {
@@ -54,6 +69,8 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     public void afterTestMethod(final Class testClass, final Method method)
     {
         LOGGER.debug("afterTestMethod");
+        new HashSet<>(_createdQueues).forEach(this::deleteQueue);
+        _createdQueues.clear();
     }
 
     @Override
@@ -86,19 +103,21 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     @Override
     public void createQueue(final String queueName)
     {
-        LOGGER.debug(String.format("creation of queue '%s' requested", queueName));
+        _queueAdmin.createQueue(this, queueName);
+        _createdQueues.add(queueName);
     }
 
     @Override
     public void deleteQueue(final String queueName)
     {
-        LOGGER.debug(String.format("deletion of queue '%s' requested", queueName));
+        _queueAdmin.deleteQueue(this, queueName);
+        _createdQueues.remove(queueName);
     }
 
     @Override
     public void putMessageOnQueue(final String queueName, final String... messages)
     {
-        LOGGER.debug(String.format("puting of %d messages on queue '%s' requested", messages.length, queueName));
+        _queueAdmin.putMessageOnQueue(this, queueName, messages);
     }
 
     @Override
@@ -177,13 +196,13 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     @Override
     public boolean isPutMessageOnQueueSupported()
     {
-        return false;
+        return _queueAdmin.isPutMessageOnQueueSupported();
     }
 
     @Override
     public boolean isDeleteQueueSupported()
     {
-        return false;
+        return _queueAdmin.isDeleteQueueSupported();
     }
 
 }
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/NoOpQueueAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/NoOpQueueAdmin.java
new file mode 100644
index 0000000..ed94122
--- /dev/null
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/NoOpQueueAdmin.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NoOpQueueAdmin implements QueueAdmin
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(NoOpQueueAdmin.class);
+
+    @Override
+    public void createQueue(BrokerAdmin brokerAdmin, final String queueName)
+    {
+        LOGGER.debug(String.format("creation of queue '%s' requested", queueName));
+    }
+
+    @Override
+    public void deleteQueue(BrokerAdmin brokerAdmin, final String queueName)
+    {
+        LOGGER.debug(String.format("deletion of queue '%s' requested", queueName));
+    }
+
+    @Override
+    public void putMessageOnQueue(BrokerAdmin brokerAdmin, final String queueName, final String... messages)
+    {
+        LOGGER.debug(String.format("putting of %d messages on queue '%s' requested", messages.length, queueName));
+    }
+
+    @Override
+    public boolean isDeleteQueueSupported()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isPutMessageOnQueueSupported()
+    {
+        return false;
+    }
+}
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QueueAdmin.java
similarity index 68%
copy from systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java
copy to systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QueueAdmin.java
index ef8b05a..31266b7 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QueueAdmin.java
@@ -1,4 +1,4 @@
-package org.apache.qpid.tests.utils;/*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,14 +19,14 @@ package org.apache.qpid.tests.utils;/*
  *
  */
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+package org.apache.qpid.tests.utils;
 
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE})
-public @interface RunBrokerAdmin
+public interface QueueAdmin
 {
-    String type() default "";
+    void createQueue(BrokerAdmin brokerAdmin, String queueName);
+    void deleteQueue(BrokerAdmin brokerAdmin, String queueName);
+    void putMessageOnQueue(BrokerAdmin brokerAdmin, String queueName, String... messages);
+
+    boolean isDeleteQueueSupported();
+    boolean isPutMessageOnQueueSupported();
 }
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QueueAdminFactory.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QueueAdminFactory.java
new file mode 100644
index 0000000..020e805
--- /dev/null
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QueueAdminFactory.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class QueueAdminFactory
+{
+    static final String QUEUE_ADMIN_TYPE_PROPERTY_NAME = "qpid.tests.protocol.broker.external.queueAdmin";
+    private static final Logger LOGGER = LoggerFactory.getLogger(QueueAdminFactory.class);
+
+    @SuppressWarnings("unchecked")
+    QueueAdmin create() throws BrokerAdminException
+    {
+        final String queueAdminClassName =
+                System.getProperty(QUEUE_ADMIN_TYPE_PROPERTY_NAME, NoOpQueueAdmin.class.getName());
+        LOGGER.debug(String.format("Using queue admin of type '%s'", queueAdminClassName));
+        try
+        {
+            final Class<? extends QueueAdmin> queueCreatorClass =
+                    (Class<? extends QueueAdmin>) Class.forName(queueAdminClassName);
+            return queueCreatorClass.newInstance();
+        }
+        catch (ClassNotFoundException | InstantiationException | IllegalAccessException e)
+        {
+            throw new BrokerAdminException(String.format("Unable to instantiate queue admin of type '%s'",
+                                                         queueAdminClassName), e);
+        }
+    }
+}
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java
index ef8b05a..4d7104d 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/RunBrokerAdmin.java
@@ -1,4 +1,4 @@
-package org.apache.qpid.tests.utils;/*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,6 +19,8 @@ package org.apache.qpid.tests.utils;/*
  *
  */
 
+package org.apache.qpid.tests.utils;
+
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminFactory.java b/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/BrokerAdminFactoryTest.java
similarity index 50%
copy from systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminFactory.java
copy to systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/BrokerAdminFactoryTest.java
index 64d4eff..45bd733 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdminFactory.java
+++ b/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/BrokerAdminFactoryTest.java
@@ -1,4 +1,5 @@
 /*
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,20 +21,42 @@
 
 package org.apache.qpid.tests.utils;
 
-import java.util.Map;
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Before;
+import org.junit.Test;
 
-import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.test.utils.UnitTestBase;
 
-public class BrokerAdminFactory
+public class BrokerAdminFactoryTest extends UnitTestBase
 {
-    BrokerAdmin createInstance(String type)
+    private BrokerAdminFactory _factory;
+
+    @Before
+    public void setUp()
+    {
+        _factory = new BrokerAdminFactory();
+    }
+
+    @Test
+    public void createInstanceForExistingType()
     {
-        Map<String, BrokerAdmin> adminFacades = new QpidServiceLoader().getInstancesByType(BrokerAdmin.class);
-        BrokerAdmin brokerAdmin = adminFacades.get(type);
-        if (brokerAdmin == null)
+        final BrokerAdmin admin = _factory.createInstance(EmbeddedBrokerPerClassAdminImpl.TYPE);
+        assertTrue(admin instanceof EmbeddedBrokerPerClassAdminImpl);
+    }
+
+    @Test
+    public void createInstanceForNonExistingType()
+    {
+        try
+        {
+            _factory.createInstance("foo");
+            fail("Exception is expected");
+        }
+        catch (BrokerAdminException e)
         {
-            throw new RuntimeException(String.format("Could not find BrokerAdmin implementation of type '%s'", type));
+            // pass
         }
-        return brokerAdmin;
     }
 }
diff --git a/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImplTest.java b/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImplTest.java
new file mode 100644
index 0000000..8ebcc22
--- /dev/null
+++ b/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImplTest.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class ExternalQpidBrokerAdminImplTest extends UnitTestBase
+{
+    private ExternalQpidBrokerAdminImpl _admin;
+    private QueueAdmin _queueAdmin;
+
+    @Before
+    public void setUp()
+    {
+        _queueAdmin = mock(QueueAdmin.class);
+        _admin = new ExternalQpidBrokerAdminImpl(_queueAdmin);
+    }
+
+    @Test
+    public void createQueue()
+    {
+        final String queueName = getTestName();
+        _admin.createQueue(queueName);
+        verify(_queueAdmin).createQueue(_admin, queueName);
+    }
+
+
+    @Test
+    public void deleteQueue()
+    {
+        final String queueName = getTestName();
+        _admin.createQueue(queueName);
+        _admin.deleteQueue(queueName);
+        verify(_queueAdmin).deleteQueue(_admin, queueName);
+    }
+
+    @Test
+    public void putMessageOnQueue()
+    {
+        final String queueName = getTestName();
+        final String testMessage = "Test Message";
+        _admin.putMessageOnQueue(queueName, testMessage);
+        verify(_queueAdmin).putMessageOnQueue(_admin, queueName, testMessage);
+    }
+
+    @Test
+    public void isPutMessageOnQueueSupported()
+    {
+        assertFalse(_admin.isPutMessageOnQueueSupported());
+    }
+
+    @Test
+    public void isDeleteQueueSupported()
+    {
+        assertFalse(_admin.isDeleteQueueSupported());
+    }
+
+    @Test
+    public void afterTestMethod()
+    {
+        final String queueName1 = getTestName();
+        final String queueName2= getTestName() + "_2";
+        _admin.createQueue(queueName1);
+        _admin.createQueue(queueName2);
+        _admin.afterTestMethod(null, null);
+        verify(_queueAdmin).deleteQueue(_admin, queueName1);
+        verify(_queueAdmin).deleteQueue(_admin, queueName2);
+    }
+
+    @Test
+    public void beforeTestMethod()
+    {
+        _admin.beforeTestMethod(null, null);
+        verifyZeroInteractions(_queueAdmin);
+    }
+
+}
diff --git a/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/NoOpQueueAdminTest.java b/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/NoOpQueueAdminTest.java
new file mode 100644
index 0000000..45f9a5c
--- /dev/null
+++ b/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/NoOpQueueAdminTest.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class NoOpQueueAdminTest extends UnitTestBase
+{
+
+    private NoOpQueueAdmin _admin;
+    private BrokerAdmin _brokerAdmin;
+
+    @Before
+    public void setUp()
+    {
+        _admin = new NoOpQueueAdmin();
+        _brokerAdmin = mock(BrokerAdmin.class);
+    }
+
+    @Test
+    public void createQueue()
+    {
+        _admin.createQueue(_brokerAdmin, getTestName());
+        verifyZeroInteractions(_brokerAdmin);
+    }
+
+    @Test
+    public void deleteQueue()
+    {
+        _admin.deleteQueue(_brokerAdmin, getTestName());
+        verifyZeroInteractions(_brokerAdmin);
+    }
+
+    @Test
+    public void putMessageOnQueue()
+    {
+        _admin.putMessageOnQueue(_brokerAdmin, getTestName());
+        verifyZeroInteractions(_brokerAdmin);
+    }
+
+    @Test
+    public void isDeleteQueueSupported()
+    {
+        assertFalse(_admin.isDeleteQueueSupported());
+    }
+
+    @Test
+    public void isPutMessageOnQueueSupported()
+    {
+        assertFalse(_admin.isPutMessageOnQueueSupported());
+    }
+}
diff --git a/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/QueueAdminFactoryTest.java b/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/QueueAdminFactoryTest.java
new file mode 100644
index 0000000..5074bd4
--- /dev/null
+++ b/systests/systests-utils/src/test/java/org/apache/qpid/tests/utils/QueueAdminFactoryTest.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.utils;
+
+import static org.apache.qpid.tests.utils.QueueAdminFactory.QUEUE_ADMIN_TYPE_PROPERTY_NAME;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class QueueAdminFactoryTest extends UnitTestBase
+{
+    private QueueAdminFactory _factory;
+    private String _preservedAdminType;
+
+    @Before
+    public void setUp()
+    {
+        _factory = new QueueAdminFactory();
+        _preservedAdminType = System.getProperty(QUEUE_ADMIN_TYPE_PROPERTY_NAME);
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (_preservedAdminType == null)
+        {
+            System.clearProperty(QUEUE_ADMIN_TYPE_PROPERTY_NAME);
+        }
+        else
+        {
+            System.setProperty(QUEUE_ADMIN_TYPE_PROPERTY_NAME, _preservedAdminType);
+        }
+    }
+
+    @Test
+    public void testQueueAdminCreationForNonExistingType()
+    {
+        System.setProperty(QUEUE_ADMIN_TYPE_PROPERTY_NAME, "foo");
+        try
+        {
+            _factory.create();
+            fail("Exception is expected");
+        }
+        catch (BrokerAdminException e)
+        {
+            // pass
+        }
+    }
+
+    @Test
+    public void testQueueAdminCreationForExistingTypeWithPrivateConstructor()
+    {
+        System.setProperty(QUEUE_ADMIN_TYPE_PROPERTY_NAME, TestQueueAdmin2.class.getName());
+        try
+        {
+            _factory.create();
+            fail("Exception is expected");
+        }
+        catch (BrokerAdminException e)
+        {
+            // pass
+        }
+    }
+
+    @Test
+    public void testQueueAdminCreationForExistingTypeThrowingInstantiationException()
+    {
+        System.setProperty(QUEUE_ADMIN_TYPE_PROPERTY_NAME, TestQueueAdmin3.class.getName());
+        try
+        {
+            _factory.create();
+            fail("Exception is expected");
+        }
+        catch (BrokerAdminException e)
+        {
+            // pass
+        }
+    }
+
+    @Test
+    public void testQueueAdminCreationForExistingType()
+    {
+        System.setProperty(QUEUE_ADMIN_TYPE_PROPERTY_NAME, TestQueueAdmin.class.getName());
+        final QueueAdmin admin = _factory.create();
+        assertTrue(admin instanceof TestQueueAdmin);
+    }
+
+    @SuppressWarnings("WeakerAccess")
+    public static class TestQueueAdmin extends NoOpQueueAdmin
+    {
+
+    }
+
+    @SuppressWarnings("WeakerAccess")
+    public static class TestQueueAdmin2 extends NoOpQueueAdmin
+    {
+        private TestQueueAdmin2()
+        {
+        }
+    }
+
+    public static class TestQueueAdmin3 extends NoOpQueueAdmin
+    {
+        public TestQueueAdmin3() throws InstantiationException
+        {
+            throw new InstantiationException("Test");
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org