You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/02/05 16:40:51 UTC

svn commit: r1564813 - in /qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore: src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ systests/src/main/java/org/apache/qpid...

Author: kwall
Date: Wed Feb  5 15:40:50 2014
New Revision: 1564813

URL: http://svn.apache.org/r1564813
Log:
QPID-5409: Add functionality into RemoteReplicationNode to change role attribute from REPLICA to MASTER

Added:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
Modified:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java?rev=1564813&r1=1564812&r2=1564813&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java Wed Feb  5 15:40:50 2014
@@ -21,11 +21,18 @@
 package org.apache.qpid.server.store.berkeleydb.replication;
 
 import java.io.IOException;
+import java.lang.reflect.Type;
 import java.security.AccessControlException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -36,9 +43,12 @@ import org.apache.qpid.server.model.UUID
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.adapter.AbstractAdapter;
 import org.apache.qpid.server.model.adapter.NoStatistics;
+import org.apache.qpid.server.util.MapValueConverter;
 
 import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
 import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
 
 /**
@@ -48,23 +58,32 @@ public class RemoteReplicationNode exten
 {
     private static final Logger LOGGER = Logger.getLogger(RemoteReplicationNode.class);
 
+    @SuppressWarnings("serial")
+    private static final Map<String, Type> ATTRIBUTE_TYPES = new HashMap<String, Type>()
+    {{
+        put(ROLE, String.class);
+    }};
+
     private final com.sleepycat.je.rep.ReplicationNode _replicationNode;
-    private final VirtualHost _virtualHost;
     private final String _hostPort;
     private final String _groupName;
+    private final DbPing _dbPing;
+    private final ReplicationGroupAdmin _replicationGroupAdmin;
 
     private volatile String _role;
     private volatile long _joinTime;
     private volatile long _lastTransactionId;
 
-    public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost, TaskExecutor taskExecutor)
+    public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost,
+            TaskExecutor taskExecutor, DbPing dbPing, ReplicationGroupAdmin admin)
     {
         super(UUIDGenerator.generateReplicationNodeId(groupName, replicationNode.getName()), null, null, taskExecutor);
         addParent(VirtualHost.class, virtualHost);
         _groupName = groupName;
         _hostPort = replicationNode.getHostName() + ":" + replicationNode.getPort();
         _replicationNode = replicationNode;
-        _virtualHost = virtualHost;
+        _dbPing = dbPing;
+        _replicationGroupAdmin = admin;
     }
 
     @Override
@@ -154,7 +173,6 @@ public class RemoteReplicationNode exten
         }
     }
 
-
     @Override
     public Object getAttribute(String name)
     {
@@ -207,15 +225,13 @@ public class RemoteReplicationNode exten
 
     public void updateNodeState()
     {
-        Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT);
-        DbPing ping = new DbPing(_replicationNode, _groupName, monitorTimeout.intValue());
         String oldRole = _role;
         long oldJoinTime = _joinTime;
         long oldTransactionId = _lastTransactionId;
 
         try
         {
-            NodeState state = ping.getNodeState();
+            NodeState state = _dbPing.getNodeState();
             _role = state.getNodeState().name();
             _joinTime = state.getJoinTime();
             _lastTransactionId = state.getCurrentTxnEndVLSN();
@@ -252,4 +268,73 @@ public class RemoteReplicationNode exten
     {
         return ReplicationNode.AVAILABLE_ATTRIBUTES;
     }
+
+    @Override
+    public void changeAttributes(Map<String, Object> attributes)
+            throws IllegalStateException, AccessControlException,
+            IllegalArgumentException
+    {
+        checkWhetherImmutableAttributeChanged(attributes);
+        Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
+
+        if (convertedAttributes.containsKey(ROLE))
+        {
+            String currentRole = (String)getAttribute(ROLE);
+            if (!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+            {
+                throw new IllegalConfigurationException("Cannot transfer mastership when not a replica");
+            }
+
+            String role  = (String)convertedAttributes.get(ROLE);
+
+            if (ReplicatedEnvironment.State.MASTER.name().equals(role) )
+            {
+                try
+                {
+                    String nodeName = getName();
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("Trying to transfer master to " + nodeName);
+                    }
+
+                    _replicationGroupAdmin.transferMaster(Collections.singleton(nodeName), ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("The mastership has been transfered to " + nodeName);
+                    }
+                }
+                catch(Exception e)
+                {
+                    throw new IllegalConfigurationException("Cannot transfer mastership to " + getName(), e);
+                }
+            }
+            else
+            {
+                throw new IllegalConfigurationException("Changing role to other value then "
+                        + ReplicatedEnvironment.State.MASTER.name() + " is unsupported");
+            }
+        }
+
+        super.changeAttributes(convertedAttributes);
+    }
+
+    private void checkWhetherImmutableAttributeChanged(Map<String, Object> attributes)
+    {
+        Set<String> immutableAttributeNames = new HashSet<String>(getAttributeNames());
+        immutableAttributeNames.remove(ROLE);
+        for (String attributeName : immutableAttributeNames)
+        {
+            if (attributes.containsKey(attributeName))
+            {
+                // the name is appended into attributes map in REST layer
+                if (attributeName.equals(NAME) && getName().equals(attributes.get(NAME)))
+                {
+                    continue;
+                }
+                throw new IllegalConfigurationException("Cannot change value of immutable attribute " + attributeName);
+            }
+        }
+    }
+
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java?rev=1564813&r1=1564812&r2=1564813&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java Wed Feb  5 15:40:50 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.store.berkeleydb.replication;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,6 +33,8 @@ import org.apache.qpid.server.store.berk
 
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.Durability.SyncPolicy;
+import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
 
 //TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class?
 public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
@@ -82,7 +85,13 @@ public class ReplicatedEnvironmentFacade
             attributes.put(ReplicationNode.NAME, replicationNode.getName());
             attributes.put(ReplicationNode.GROUP_NAME, groupName);
             attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort());
-            return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor());
+
+            Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT);
+            DbPing dbPing = new DbPing(replicationNode, groupName, monitorTimeout.intValue());
+
+            ReplicationGroupAdmin replicationGroupAdmin = new ReplicationGroupAdmin(groupName, Collections.singleton(replicationNode.getSocketAddress()));
+
+            return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor(), dbPing, replicationGroupAdmin);
         }
 
         @Override

Added: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java?rev=1564813&view=auto
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java (added)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java Wed Feb  5 15:40:50 2014
@@ -0,0 +1,133 @@
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.apache.qpid.server.model.ReplicationNode.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.ReplicatedEnvironment.State;
+import com.sleepycat.je.rep.ReplicationNode;
+import com.sleepycat.je.rep.util.DbPing;
+import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+
+public class RemoteReplicationNodeTest extends QpidTestCase
+{
+
+    private RemoteReplicationNode _node;
+    private String _groupName;
+    private VirtualHost _virtualHost;
+    private TaskExecutor _taskExecutor;
+    private ReplicationNode _replicationNode;
+    private String _nodeName;
+    private int _port;
+    private DbPing _dbPing;
+    private ReplicationGroupAdmin _remoteReplicationAdmin;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _groupName = getTestName();
+        _nodeName = getTestName() + "Name";
+        _port = 5000;
+        _replicationNode = mock(ReplicationNode.class);
+        _virtualHost = mock(VirtualHost.class);
+        _taskExecutor = mock(TaskExecutor.class);
+        _dbPing = mock(DbPing.class);
+        _remoteReplicationAdmin = mock(ReplicationGroupAdmin.class);
+
+        when(_taskExecutor.isTaskExecutorThread()).thenReturn(true);
+        when(_replicationNode.getName()).thenReturn(_nodeName);
+        when(_replicationNode.getHostName()).thenReturn("localhost");
+        when(_replicationNode.getPort()).thenReturn(_port);
+
+        _node = new RemoteReplicationNode(_replicationNode, _groupName, _virtualHost, _taskExecutor, _dbPing, _remoteReplicationAdmin);
+    }
+
+    public void testGetAttribute() throws Exception
+    {
+        State state = State.MASTER;
+        long joinTime = System.currentTimeMillis();
+        long currentTxnEndVLSN = 3;
+
+        updateNodeState(state, joinTime, currentTxnEndVLSN);
+
+        assertEquals("Unexpected name", _nodeName, _node.getAttribute(NAME));
+        assertEquals("Unexpected group name", _groupName, _node.getAttribute(GROUP_NAME));
+        assertEquals("Unexpected state", state.name(), _node.getAttribute(ROLE));
+        assertEquals("Unexpected transaction id", currentTxnEndVLSN, _node.getAttribute(LAST_KNOWN_REPLICATION_TRANSACTION_ID));
+        assertEquals("Unexpected join time", joinTime, _node.getAttribute(JOIN_TIME));
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testSetRoleAttribute() throws Exception
+    {
+        updateNodeState();
+        _node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name()));
+        when(_remoteReplicationAdmin.transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class))).thenReturn(_nodeName);
+
+        verify(_remoteReplicationAdmin).transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class));
+    }
+
+    public void testSetImmutableAttributesThrowException() throws Exception
+    {
+        Map<String, Object> changeAttributeMap = new HashMap<String, Object>();
+        changeAttributeMap.put(GROUP_NAME, "newGroupName");
+        changeAttributeMap.put(HELPER_HOST_PORT, "newhost:1234");
+        changeAttributeMap.put(HOST_PORT, "newhost:1234");
+        changeAttributeMap.put(COALESCING_SYNC, Boolean.FALSE);
+        changeAttributeMap.put(DURABILITY, "durability");
+        changeAttributeMap.put(JOIN_TIME, 1000l);
+        changeAttributeMap.put(LAST_KNOWN_REPLICATION_TRANSACTION_ID, 10001l);
+        changeAttributeMap.put(NAME, "newName");
+        changeAttributeMap.put(STORE_PATH, "/not/used");
+        changeAttributeMap.put(PARAMETERS, Collections.emptyMap());
+        changeAttributeMap.put(REPLICATION_PARAMETERS, Collections.emptyMap());
+
+        for (Entry<String, Object> entry : changeAttributeMap.entrySet())
+        {
+            assertSetAttributesThrowsException(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void assertSetAttributesThrowsException(String attributeName, Object attributeValue) throws Exception
+    {
+        updateNodeState();
+
+        try
+        {
+            _node.setAttributes(Collections.<String, Object>singletonMap(attributeName, attributeValue));
+            fail("Operation to change attribute '" + attributeName + "' should fail");
+        }
+        catch(IllegalConfigurationException e)
+        {
+            // pass
+        }
+    }
+
+    private void updateNodeState() throws Exception
+    {
+        updateNodeState( State.REPLICA, System.currentTimeMillis(), 3);
+    }
+
+    private void updateNodeState(State state, long joinTime, long currentTxnEndVLSN) throws Exception
+    {
+        NodeState nodeState = new NodeState(_nodeName, _groupName, state, null, null, joinTime, currentTxnEndVLSN, 2, 1, 0, null, 0.0);
+        when(_dbPing.getNodeState()).thenReturn(nodeState);
+        _node.updateNodeState();
+    }
+}

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java?rev=1564813&r1=1564812&r2=1564813&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java Wed Feb  5 15:40:50 2014
@@ -120,7 +120,7 @@ public class HAClusterBlackboxTest exten
         connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
 
-    public void testTransferMaster() throws Exception
+    public void testTransferMasterFromLocalNode() throws Exception
     {
         final Connection connection = getConnection(_brokerFailoverUrl);
 
@@ -145,6 +145,33 @@ public class HAClusterBlackboxTest exten
         assertProducingConsuming(connection);
     }
 
+    public void testTransferMasterFromRemoteNode() throws Exception
+    {
+        final Connection connection = getConnection(_brokerFailoverUrl);
+
+        ((AMQConnection)connection).setConnectionListener(_failoverAwaitingListener);
+
+        final int activeBrokerPort = _clusterCreator.getBrokerPortNumberFromConnection(connection);
+        LOGGER.info("Active connection port " + activeBrokerPort);
+
+        final int inactiveBrokerPort = _clusterCreator.getPortNumberOfAnInactiveBroker(connection);
+        LOGGER.info("Update role attribute on inactive broker on port " + inactiveBrokerPort);
+
+        String nodeName = _clusterCreator.getNodeNameForBrokerPort(inactiveBrokerPort);
+        _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA");
+        Map<String, Object> attributes = _clusterCreator.getReplicationNodeAttributes(activeBrokerPort, nodeName);
+        assertEquals("Inactive broker has unexpeced role", "REPLICA", attributes.get(ReplicationNode.ROLE));
+
+        _clusterCreator.setReplicationNodeAttributes(activeBrokerPort, nodeName, Collections.<String, Object>singletonMap(ReplicationNode.ROLE, "MASTER"));
+
+        _failoverAwaitingListener.assertFailoverOccurs(20000);
+        LOGGER.info("Listener has finished");
+
+        attributes = _clusterCreator.getReplicationNodeAttributes(inactiveBrokerPort);
+        assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE));
+
+        assertProducingConsuming(connection);
+    }
     public void testQuorumOverride() throws Exception
     {
         final Connection connection = getConnection(_brokerFailoverUrl);

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java?rev=1564813&r1=1564812&r2=1564813&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java Wed Feb  5 15:40:50 2014
@@ -221,19 +221,8 @@ public class HAClusterTwoNodeTest extend
 
     private void awaitNodeToAttainRole(int brokerPort, String desiredRole) throws Exception
     {
-        final long startTime = System.currentTimeMillis();
-        Map<String, Object> data = Collections.emptyMap();
-
-        while(!desiredRole.equals(data.get(ReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000)
-        {
-            _logger.debug("Awaiting node to transit into " + desiredRole + " role");
-            data = _clusterCreator.getReplicationNodeAttributes(brokerPort);
-            if (!desiredRole.equals(data.get(ReplicationNode.ROLE)))
-            {
-                Thread.sleep(1000);
-            }
-        }
-        assertEquals("Node is in unexpected role", desiredRole, data.get(ReplicationNode.ROLE));
+        String nodeName = _clusterCreator.getNodeNameForBrokerPort(brokerPort);
+        _clusterCreator.awaitNodeToAttainRole(brokerPort, nodeName, desiredRole);
     }
 
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java?rev=1564813&r1=1564812&r2=1564813&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java Wed Feb  5 15:40:50 2014
@@ -20,8 +20,10 @@
 package org.apache.qpid.server.store.berkeleydb;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -39,6 +41,8 @@ import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 
+import junit.framework.Assert;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQConnectionURL;
@@ -50,6 +54,8 @@ import org.apache.qpid.systest.rest.Rest
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.test.utils.TestBrokerConfiguration;
 import org.apache.qpid.url.URLSyntaxException;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
 
 public class HATestClusterCreator
 {
@@ -384,8 +390,14 @@ public class HATestClusterCreator
 
     public void setReplicationNodeAttributes(int brokerPort, Map<String, Object> attributeMap) throws Exception
     {
-        RestTestHelper restHelper = createRestTestHelper(brokerPort);
         String replicationNodeName = getNodeNameForBrokerPort(brokerPort);
+        setReplicationNodeAttributes(brokerPort, replicationNodeName, attributeMap);
+    }
+
+    public void setReplicationNodeAttributes(int brokerPort, String replicationNodeName, Map<String, Object> attributeMap)
+            throws IOException, JsonGenerationException, JsonMappingException, Exception
+    {
+        RestTestHelper restHelper = createRestTestHelper(brokerPort);
         int status = restHelper.submitRequest("/rest/replicationnode/" + _virtualHostName + "/" + replicationNodeName , "PUT", attributeMap);
         if (status != 200)
         {
@@ -396,6 +408,11 @@ public class HATestClusterCreator
     public Map<String, Object> getReplicationNodeAttributes(int brokerPort) throws Exception
     {
         String replicationNodeName = getNodeNameForBrokerPort(brokerPort);
+        return getReplicationNodeAttributes(brokerPort, replicationNodeName);
+    }
+
+    public Map<String, Object> getReplicationNodeAttributes(int brokerPort, String replicationNodeName) throws IOException
+    {
         RestTestHelper restHelper = createRestTestHelper(brokerPort);
         return restHelper.getJsonAsSingletonList("/rest/replicationnode/" + _virtualHostName + "/" + replicationNodeName );
     }
@@ -406,4 +423,20 @@ public class HATestClusterCreator
         return RestTestHelper.createRestTestHelperWithDefaultCredentials(httpPort);
     }
 
+    public void awaitNodeToAttainRole(int brokerPort, String nodeName, String desiredRole) throws Exception
+    {
+        final long startTime = System.currentTimeMillis();
+        Map<String, Object> data = Collections.emptyMap();
+
+        while(!desiredRole.equals(data.get(ReplicationNode.ROLE)) && (System.currentTimeMillis() - startTime) < 30000)
+        {
+            LOGGER.debug("Awaiting node to transit into " + desiredRole + " role");
+            data = getReplicationNodeAttributes(brokerPort, nodeName);
+            if (!desiredRole.equals(data.get(ReplicationNode.ROLE)))
+            {
+                Thread.sleep(1000);
+            }
+        }
+        Assert.assertEquals("Node is in unexpected role", desiredRole, data.get(ReplicationNode.ROLE));
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org