You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/04/10 08:51:09 UTC

[3/3] qpid-broker-j git commit: QPID-8158: [Broker-J] [System Tests] Refactor MultiNodeTest and TwoNodeTest to use QpidTestRunner and BrokerAdmin for running the tests

QPID-8158: [Broker-J] [System Tests] Refactor MultiNodeTest and TwoNodeTest to use QpidTestRunner and BrokerAdmin for running the tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/88a12e8c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/88a12e8c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/88a12e8c

Branch: refs/heads/master
Commit: 88a12e8c5c9b7b912745e0382eb2661264003f19
Parents: 381202d
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Apr 10 01:51:37 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Apr 10 09:50:52 2018 +0100

----------------------------------------------------------------------
 bdbstore/systests/pom.xml                       |   34 +
 .../replication/GroupBrokerAdmin.java           |  654 +++++++++++
 .../berkeleydb/replication/GroupConfig.java     |   33 +
 .../replication/GroupJmsTestBase.java           |  110 ++
 .../berkeleydb/replication/MultiNodeTest.java   | 1023 ++++++++++--------
 .../berkeleydb/replication/TwoNodeTest.java     |  306 +++---
 pom.xml                                         |    7 +
 .../apache/qpid/tests/http/HttpTestBase.java    |   24 +-
 .../qpid/systests/AmqpManagementFacade.java     |   10 +-
 .../org/apache/qpid/systests/JmsTestBase.java   |   20 +-
 .../java/org/apache/qpid/systests/Utils.java    |   55 +
 systests/qpid-systests-spawn-admin/pom.xml      |  123 +++
 .../systests/admin/BrokerAdminException.java    |   34 +
 .../qpid/systests/admin/SpawnBrokerAdmin.java   | 1023 ++++++++++++++++++
 .../src/main/resources/spawn-broker.json        |   78 ++
 .../systests/admin/SpawnBrokerAdminTest.java    |  314 ++++++
 .../qpid/test/utils/QpidBrokerTestCase.java     |   13 +-
 .../utils/EmbeddedBrokerPerClassAdminImpl.java  |   29 -
 .../utils/LoggingBrokerAdminDecorator.java      |  188 ++++
 .../apache/qpid/tests/utils/QpidTestRunner.java |    8 +-
 .../apache/qpid/tests/utils/RunBrokerAdmin.java |   32 +
 21 files changed, 3419 insertions(+), 699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/pom.xml
----------------------------------------------------------------------
diff --git a/bdbstore/systests/pom.xml b/bdbstore/systests/pom.xml
index 3620718..eefb5bc 100644
--- a/bdbstore/systests/pom.xml
+++ b/bdbstore/systests/pom.xml
@@ -53,6 +53,13 @@
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-systests-spawn-admin</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
@@ -66,6 +73,33 @@
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>build-classpath</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>build-classpath</goal>
+            </goals>
+            <configuration>
+              <outputFile>${project.build.directory}/qpid.build.classpath.txt</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <qpid.initialConfigurationLocation>classpath:spawn-broker.json</qpid.initialConfigurationLocation>
+            <qpid.systests.build.classpath.file>${project.build.directory}/qpid.build.classpath.txt</qpid.systests.build.classpath.file>
+            <qpid.amqp.version>${profile.broker.version}</qpid.amqp.version>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-resources-plugin</artifactId>
         <!--version specified in parent pluginManagement -->
         <executions>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupBrokerAdmin.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupBrokerAdmin.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupBrokerAdmin.java
new file mode 100644
index 0000000..c11666d
--- /dev/null
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupBrokerAdmin.java
@@ -0,0 +1,654 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.apache.qpid.systests.admin.SpawnBrokerAdmin.SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.sleepycat.je.rep.ReplicationConfig;
+
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl;
+import org.apache.qpid.systests.admin.BrokerAdminException;
+import org.apache.qpid.systests.admin.SpawnBrokerAdmin;
+import org.apache.qpid.test.utils.PortHelper;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+
+@SuppressWarnings("unused")
+@PluggableService
+public class GroupBrokerAdmin implements BrokerAdmin
+{
+    private static final int WAIT_LIMIT = Integer.getInteger("qpid.test.ha.await", 10000);
+    private static final String AMQP_NODE_TYPE = "org.apache.qpid.VirtualHostNode";
+    private static final String AMQP_REMOTE_NODE_TYPE = "org.apache.qpid.server.model.RemoteReplicationNode";
+    private static final String ROLE_UNKNOWN = "UNKNOWN";
+    private static final String ROLE_MASTER = "MASTER";
+    private static final String ROLE_REPLICA = "REPLICA";
+    private static final String NODE_TYPE = "BDB_HA";
+    private static final String HOST = "127.0.0.1";
+
+    private GroupMember[] _members;
+    private ListeningExecutorService _executorService;
+    private Map<String, String> _lastKnownRoles = new ConcurrentHashMap<>();
+
+    @Override
+    public void beforeTestClass(final Class testClass)
+    {
+        GroupConfig runBrokerAdmin = (GroupConfig) testClass.getAnnotation(GroupConfig.class);
+        int numberOfNodes = runBrokerAdmin == null ? 2 : runBrokerAdmin.numberOfNodes();
+        String groupName = runBrokerAdmin == null ? "test-ha" : runBrokerAdmin.groupName();
+        _executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numberOfNodes));
+
+        SpawnBrokerAdmin[] admins =
+                Stream.generate(SpawnBrokerAdmin::new).limit(numberOfNodes).toArray(SpawnBrokerAdmin[]::new);
+
+        boolean started = false;
+        try
+        {
+            int startupTimeout = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);
+            awaitFuture(startupTimeout, invokeParallel(Arrays.stream(admins).map(a -> (Callable<Void>) () -> {
+                a.beforeTestClass(testClass);
+                return null;
+            }).collect(Collectors.toList())));
+
+            _members = initializeGroupData(groupName, admins);
+            started = true;
+        }
+        finally
+        {
+            if (!started)
+            {
+                for (SpawnBrokerAdmin a : admins)
+                {
+                    a.afterTestClass(testClass);
+                }
+                _executorService.shutdown();
+            }
+        }
+    }
+
+    @Override
+    public void beforeTestMethod(final Class testClass, final Method method)
+    {
+        _lastKnownRoles.clear();
+        GroupMember first = _members[0];
+        first.getAdmin().beforeTestMethod(_members[0].getName(), NODE_TYPE, _members[0].getNodeAttributes());
+        Object role = awaitNodeRoleReplicaOrMaster(first);
+        _lastKnownRoles.put(first.getName(), String.valueOf(role));
+        ListenableFuture<Void> f;
+        if (_members.length > 2)
+        {
+            f = invokeParallel(Arrays.stream(_members).skip(1).map(m -> (Callable<Void>) () -> {
+                m.getAdmin().beforeTestMethod(m.getName(), NODE_TYPE, m.getNodeAttributes());
+                _lastKnownRoles.put(m.getName(), ROLE_UNKNOWN);
+                return null;
+            }).collect(Collectors.toList()));
+        }
+        else
+        {
+            for (int i = 1; i < _members.length; i++)
+            {
+                _members[i].getAdmin()
+                           .beforeTestMethod(_members[i].getName(), NODE_TYPE, _members[i].getNodeAttributes());
+                _lastKnownRoles.put(_members[i].getName(), ROLE_UNKNOWN);
+            }
+            f = Futures.immediateFuture(null);
+        }
+
+        awaitFuture(WAIT_LIMIT, f);
+        awaitAllTransitionIntoReplicaOrMaster();
+    }
+
+
+    @Override
+    public void afterTestMethod(final Class testClass, final Method method)
+    {
+        awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
+            m.getAdmin().afterTestMethod(testClass, method);
+            return null;
+        }).collect(Collectors.toList())));
+        _lastKnownRoles.clear();
+    }
+
+    @Override
+    public void afterTestClass(final Class testClass)
+    {
+        try
+        {
+            awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
+                m.getAdmin().afterTestClass(testClass);
+                return null;
+            }).collect(Collectors.toList())));
+        }
+        finally
+        {
+            if (_executorService != null)
+            {
+                _executorService.shutdown();
+            }
+        }
+    }
+
+    @Override
+    public InetSocketAddress getBrokerAddress(final PortType portType)
+    {
+        return getLastKnownMasterAdmin().getBrokerAddress(portType);
+    }
+
+    @Override
+    public void createQueue(final String queueName)
+    {
+        getLastKnownMasterAdmin().createQueue(queueName);
+    }
+
+    @Override
+    public void deleteQueue(final String queueName)
+    {
+        getLastKnownMasterAdmin().deleteQueue(queueName);
+    }
+
+    @Override
+    public void putMessageOnQueue(final String queueName, final String... messages)
+    {
+        getLastKnownMasterAdmin().putMessageOnQueue(queueName, messages);
+    }
+
+    @Override
+    public int getQueueDepthMessages(final String testQueueName)
+    {
+        return getLastKnownMasterAdmin().getQueueDepthMessages(testQueueName);
+    }
+
+    @Override
+    public boolean supportsRestart()
+    {
+        return getLastKnownMasterAdmin().supportsRestart();
+    }
+
+    @Override
+    public ListenableFuture<Void> restart()
+    {
+        awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
+            _lastKnownRoles.put(m.getName(), ROLE_UNKNOWN);
+            m.getAdmin().restart();
+            return null;
+        }).collect(Collectors.toList())));
+        awaitAllTransitionIntoReplicaOrMaster();
+
+        return Futures.immediateFuture(null);
+    }
+
+    @Override
+    public boolean isSASLSupported()
+    {
+        return getLastKnownMasterAdmin().isSASLSupported();
+    }
+
+    @Override
+    public boolean isSASLMechanismSupported(final String mechanismName)
+    {
+        return getLastKnownMasterAdmin().isSASLMechanismSupported(mechanismName);
+    }
+
+    @Override
+    public boolean isWebSocketSupported()
+    {
+        return getLastKnownMasterAdmin().isWebSocketSupported();
+    }
+
+    @Override
+    public boolean isQueueDepthSupported()
+    {
+        return getLastKnownMasterAdmin().isQueueDepthSupported();
+    }
+
+    @Override
+    public boolean isManagementSupported()
+    {
+        return getLastKnownMasterAdmin().isManagementSupported();
+    }
+
+    @Override
+    public String getValidUsername()
+    {
+        return getLastKnownMasterAdmin().getValidUsername();
+    }
+
+    @Override
+    public String getValidPassword()
+    {
+        return getLastKnownMasterAdmin().getValidPassword();
+    }
+
+    @Override
+    public String getKind()
+    {
+        return KIND_BROKER_J;
+    }
+
+    @Override
+    public String getType()
+    {
+        return "BDB-HA";
+    }
+
+    public void stop()
+    {
+        awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
+            _lastKnownRoles.put(m.getName(), ROLE_UNKNOWN);
+            m.getAdmin().stop();
+            return null;
+        }).collect(Collectors.toList())));
+    }
+
+    public void start()
+    {
+        start(true);
+    }
+
+    public void start(boolean assertRoles)
+    {
+        awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
+            m.getAdmin().start();
+            return null;
+        }).collect(Collectors.toList())));
+
+        if (assertRoles)
+        {
+            awaitAllTransitionIntoReplicaOrMaster();
+        }
+    }
+
+    public void startNode(final int amqpPort)
+    {
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        member.getAdmin().start();
+        awaitNodeRole(amqpPort, ROLE_MASTER, ROLE_REPLICA);
+    }
+
+    public void stopNode(final int amqpPort)
+    {
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        member.getAdmin().stop();
+        _lastKnownRoles.put(member.getName(), ROLE_UNKNOWN);
+    }
+
+    public int[] getGroupAmqpPorts()
+    {
+        int[] ports = new int[_members.length];
+        int i = 0;
+        for (GroupMember m : _members)
+        {
+            ports[i++] = m.getAmqpPort();
+        }
+        return ports;
+    }
+
+    public int[] getBdbPorts()
+    {
+        int[] ports = new int[_members.length];
+        int i = 0;
+        for (GroupMember m : _members)
+        {
+            ports[i++] = m.getBdbPort();
+        }
+        return ports;
+    }
+
+    public int getAmqpPort(final int... exclude)
+    {
+        Set<Integer> excluded = Arrays.stream(exclude).boxed().collect(Collectors.toSet());
+        return Arrays.stream(_members)
+                     .map(GroupMember::getAmqpPort)
+                     .filter(p -> !excluded.contains(p))
+                     .findFirst()
+                     .orElseThrow(() -> new BrokerAdminException("Amqp Port not found"));
+    }
+
+    public String getHost()
+    {
+        return HOST;
+    }
+
+    public Map<Integer, String> groupThreadDumps()
+    {
+        Map<Integer, String> threadDumps = new HashMap<>();
+        for (GroupMember m : _members)
+        {
+            threadDumps.put(m._amqpPort, m.getAdmin().dumpThreads());
+        }
+        return threadDumps;
+    }
+
+    public String getHelperHostPort()
+    {
+        return HOST + ":" + _members[0].getBdbPort();
+    }
+
+    public Map<String, Object> getNodeAttributes(final int amqpPort)
+    {
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        return member.getAdmin().getAttributes(true, member.getName(), AMQP_NODE_TYPE);
+    }
+
+    public void setNodeAttributes(final int amqpPort, final Map<String, Object> attributes)
+    {
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        member.getAdmin().update(true, member.getName(), AMQP_NODE_TYPE, attributes);
+    }
+
+    public void setDesignatedPrimary(int brokerPort, boolean designatedPrimary)
+    {
+        setNodeAttributes(brokerPort, Collections.singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY,
+                                                               String.valueOf(designatedPrimary)));
+    }
+
+    public void awaitNodeRole(final int amqpPort, String... role)
+    {
+        Object actualRole = awaitNodeToAttainAttributeValue(amqpPort, BDBHAVirtualHostNode.ROLE, (Object[]) role);
+        _lastKnownRoles.put(getMemberByAmqpPort(amqpPort).getName(), String.valueOf(actualRole));
+    }
+
+    public Object awaitNodeToAttainAttributeValue(final int amqpPort,
+                                                  final String attributeName,
+                                                  final Object... attributeValue)
+    {
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        return member.getAdmin().awaitAttributeValue(WAIT_LIMIT,
+                                                     true,
+                                                     member.getName(),
+                                                     AMQP_NODE_TYPE,
+                                                     attributeName,
+                                                     attributeValue);
+    }
+
+    public Map<String, Object> getRemoteNodeAttributes(final int amqpPort, final int remoteAmqpPort)
+    {
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort);
+        return member.getAdmin()
+                     .getAttributes(true,
+                                    member.getName() + "/" + member2.getName(),
+                                    AMQP_REMOTE_NODE_TYPE);
+    }
+
+    public void setRemoteNodeAttributes(final int amqpPort,
+                                        final int remoteAmqpPort,
+                                        final Map<String, Object> attributes)
+    {
+
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort);
+        member.getAdmin()
+              .update(true,
+                      member.getName() + "/" + member2.getName(),
+                      AMQP_REMOTE_NODE_TYPE,
+                      attributes);
+    }
+
+    public void awaitRemoteNodeRole(final int amqpPort, final int remoteAmqpPort, final String... role)
+    {
+
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        GroupMember member2 = getMemberByAmqpPort(remoteAmqpPort);
+        member.getAdmin().awaitAttributeValue(WAIT_LIMIT,
+                                              true,
+                                              member.getName() + "/" + member2.getName(),
+                                              AMQP_REMOTE_NODE_TYPE,
+                                              BDBHARemoteReplicationNode.ROLE,
+                                              ROLE_REPLICA,
+                                              ROLE_MASTER);
+    }
+
+    private SpawnBrokerAdmin getNodeAdmin(final int amqpPort)
+    {
+        GroupMember member = getMemberByAmqpPort(amqpPort);
+        return member.getAdmin();
+    }
+
+    private SpawnBrokerAdmin getLastKnownMasterAdmin()
+    {
+        return getLastKnownMaster().getAdmin();
+    }
+
+    private GroupMember[] initializeGroupData(final String groupName, final SpawnBrokerAdmin[] admins)
+    {
+        PortHelper helper = new PortHelper();
+        int[] ports = new int[admins.length];
+        String[] addresses = new String[admins.length];
+        int port = -1;
+        for (int i = 0; i < admins.length; i++)
+        {
+            port = i == 0 ? helper.getNextAvailable() : helper.getNextAvailable(port + 1);
+            addresses[i] = HOST + ":" + port;
+            ports[i] = port;
+        }
+
+        Map<String, String> context = new HashMap<>();
+        context.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
+        context.put(ReplicationConfig.ELECTIONS_PRIMARY_RETRIES, "0");
+        context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, "{\"type\":\"BDB_HA\"}");
+
+        String permitted = objectToJson(addresses);
+        String contextAsString = objectToJson(context);
+
+        GroupMember[] members = new GroupMember[admins.length];
+        for (int i = 0; i < admins.length; i++)
+        {
+            String nodeName = "node-" + ports[i];
+
+            Map<String, Object> nodeAttributes = new HashMap<>();
+            nodeAttributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+            nodeAttributes.put(BDBHAVirtualHostNode.NAME, nodeName);
+            nodeAttributes.put(BDBHAVirtualHostNode.ADDRESS, addresses[i]);
+            nodeAttributes.put(BDBHAVirtualHostNode.TYPE, BDBHAVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE);
+            nodeAttributes.put(BDBHAVirtualHostNode.DEFAULT_VIRTUAL_HOST_NODE, true);
+            nodeAttributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, addresses[0]);
+            if (i > 0)
+            {
+                nodeAttributes.put(BDBHAVirtualHostNode.HELPER_NODE_NAME, "node-" + ports[0]);
+            }
+            nodeAttributes.put(BDBHAVirtualHostNode.PERMITTED_NODES, permitted);
+            nodeAttributes.put(BDBHAVirtualHostNode.CONTEXT, contextAsString);
+            members[i] = new GroupMember(nodeName,
+                                         admins[i].getBrokerAddress(PortType.AMQP).getPort(),
+                                         port,
+                                         admins[i],
+                                         nodeAttributes);
+        }
+        return members;
+    }
+
+    private GroupMember getMemberByAmqpPort(final int amqpPort)
+    {
+        return Arrays.stream(_members)
+                     .filter(m -> m.getAmqpPort() == amqpPort)
+                     .findFirst()
+                     .orElseThrow(() -> new BrokerAdminException(
+                             String.format("Could not find node by amqp port %d", amqpPort)));
+    }
+
+    private <T> ListenableFuture<T> invokeParallel(Collection<Callable<T>> tasks)
+    {
+        try
+        {
+            @SuppressWarnings("unchecked")
+            List<ListenableFuture<T>> futures = (List) _executorService.invokeAll(tasks);
+            ListenableFuture<List<T>> combinedFuture = Futures.allAsList(futures);
+            return Futures.transform(combinedFuture, input -> null, _executorService);
+        }
+        catch (InterruptedException e)
+        {
+            Thread.interrupted();
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    private <T> void awaitFuture(long waitLimit, ListenableFuture<T> future)
+    {
+        try
+        {
+            future.get(waitLimit, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            Thread.interrupted();
+            throw new BrokerAdminException("Interrupted", e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new BrokerAdminException("Operation failed", e.getCause());
+        }
+        catch (TimeoutException e)
+        {
+            throw new BrokerAdminException("Timeout");
+        }
+    }
+
+    private void awaitAllTransitionIntoReplicaOrMaster()
+    {
+        awaitFuture(WAIT_LIMIT, invokeParallel(Arrays.stream(_members).map(m -> (Callable<Void>) () -> {
+            awaitNodeRoleReplicaOrMaster(m);
+            return null;
+        }).collect(Collectors.toList())));
+
+        if (_lastKnownRoles.values().stream().noneMatch(role -> ROLE_MASTER.equals(role) || ROLE_REPLICA.equals(role)))
+        {
+            throw new BrokerAdminException("Unexpected node roles " + Joiner.on(", ").withKeyValueSeparator(" -> ")
+                                                                            .join(_lastKnownRoles));
+        }
+    }
+
+    private Object awaitNodeRoleReplicaOrMaster(final GroupMember m)
+    {
+        Object result = m.getAdmin().awaitAttributeValue(WAIT_LIMIT,
+                                                         true,
+                                                         m.getName(),
+                                                         AMQP_NODE_TYPE,
+                                                         BDBHAVirtualHostNode.ROLE,
+                                                         ROLE_REPLICA,
+                                                         ROLE_MASTER);
+        _lastKnownRoles.put(m.getName(), String.valueOf(result));
+        return result;
+    }
+
+
+    private GroupMember getLastKnownMaster()
+    {
+        final Map.Entry<String, String> entry =
+                _lastKnownRoles.entrySet()
+                               .stream()
+                               .filter(e -> ROLE_MASTER.equals(e.getValue()))
+                               .findFirst()
+                               .orElseThrow(() -> new BrokerAdminException("Master node is not found"));
+
+        return Arrays.stream(_members)
+                     .filter(m -> entry.getKey().equals(m.getName()))
+                     .findFirst()
+                     .orElseThrow(() -> new BrokerAdminException("Master node is not found"));
+    }
+
+    private String objectToJson(final Object object)
+    {
+        try
+        {
+            return new ObjectMapper().writeValueAsString(object);
+        }
+        catch (JsonProcessingException e)
+        {
+            throw new BrokerAdminException("Cannot convert object to json", e);
+        }
+    }
+
+    private class GroupMember
+    {
+        private final Map<String, Object> _nodeAttributes;
+        private final SpawnBrokerAdmin _admin;
+        private final int _bdbPort;
+        private final int _amqpPort;
+        private final String _name;
+
+        private GroupMember(final String name,
+                            final int amqpPort,
+                            final int bdbPort,
+                            final SpawnBrokerAdmin admin,
+                            final Map<String, Object> nodeAttributes)
+        {
+            _name = name;
+            _admin = admin;
+            _bdbPort = bdbPort;
+            _amqpPort = amqpPort;
+            _nodeAttributes = nodeAttributes;
+        }
+
+        private String getName()
+        {
+            return _name;
+        }
+
+        private SpawnBrokerAdmin getAdmin()
+        {
+            return _admin;
+        }
+
+        private int getBdbPort()
+        {
+            return _bdbPort;
+        }
+
+        private int getAmqpPort()
+        {
+            return _amqpPort;
+        }
+
+        private Map<String, Object> getNodeAttributes()
+        {
+            return _nodeAttributes;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupConfig.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupConfig.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupConfig.java
new file mode 100644
index 0000000..4ed8b62
--- /dev/null
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupConfig.java
@@ -0,0 +1,33 @@
+package org.apache.qpid.server.store.berkeleydb.replication;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE})
+public @interface GroupConfig
+{
+    int numberOfNodes() default 2;
+    String groupName() default "test";
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupJmsTestBase.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupJmsTestBase.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupJmsTestBase.java
new file mode 100644
index 0000000..1c2e6fc
--- /dev/null
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupJmsTestBase.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.systests.ConnectionBuilder;
+import org.apache.qpid.systests.JmsProvider;
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+import org.apache.qpid.tests.utils.RunBrokerAdmin;
+
+@RunBrokerAdmin(type = "BDB-HA")
+public class GroupJmsTestBase extends JmsTestBase
+{
+    private static final int FAILOVER_CYCLECOUNT = 40;
+    private static final int FAILOVER_CONNECTDELAY = 1000;
+    static final int SHORT_FAILOVER_CYCLECOUNT = 2;
+    static final int SHORT_FAILOVER_CONNECTDELAY = 200;
+
+    @Override
+    public GroupBrokerAdmin getBrokerAdmin()
+    {
+        return (GroupBrokerAdmin) super.getBrokerAdmin();
+    }
+
+    @Override
+    public ConnectionBuilder getConnectionBuilder()
+    {
+        final ConnectionBuilder connectionBuilder = getJmsProvider().getConnectionBuilder()
+                                                                    .setClientId(getTestName())
+                                                                    .setFailoverReconnectDelay(FAILOVER_CONNECTDELAY)
+                                                                    .setFailoverReconnectAttempts(FAILOVER_CYCLECOUNT)
+                                                                    .setVirtualHost("test")
+                                                                    .setFailover(true)
+                                                                    .setHost(getBrokerAdmin().getHost());
+        int[] ports = getBrokerAdmin().getGroupAmqpPorts();
+        for (int i = 0; i < ports.length; i++)
+        {
+            int port = ports[i];
+            if (i == 0)
+            {
+                connectionBuilder.setPort(port);
+            }
+            else
+            {
+                connectionBuilder.addFailoverPort(port);
+            }
+        }
+        return connectionBuilder;
+    }
+
+    protected void assertProduceConsume(final Queue queue) throws Exception
+    {
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+
+    protected JmsProvider getJmsProvider()
+    {
+        return Utils.getJmsProvider();
+    }
+
+    protected Queue createTestQueue(final Connection connection) throws JMSException
+    {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            return getJmsProvider().createQueue(session, getTestName());
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org