You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/19 17:30:23 UTC

svn commit: r1569814 - in /qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore: src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ systests/src/main/java/org/apache/qpid...

Author: orudyy
Date: Wed Feb 19 16:30:23 2014
New Revision: 1569814

URL: http://svn.apache.org/r1569814
Log:
QPID-5409: Restart the former master environment on MASTER transfer

Modified:
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
    qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1569814&r1=1569813&r2=1569814&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Wed Feb 19 16:30:23 2014
@@ -150,6 +150,7 @@ public class ReplicatedEnvironmentFacade
 
     private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
+    private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
 
     public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
     {
@@ -229,36 +230,41 @@ public class ReplicatedEnvironmentFacade
         boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException);
         if (restart)
         {
-            if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+            tryToRestartEnvironment(dbe);
+        }
+        return new AMQStoreException(contextMessage, dbe);
+    }
+
+    private void tryToRestartEnvironment(final DatabaseException dbe)
+    {
+        if (_state.compareAndSet(State.OPEN, State.RESTARTING))
+        {
+            if (dbe != null && LOGGER.isDebugEnabled())
             {
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
-                }
+                LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe);
+            }
 
-                _environmentJobExecutor.execute(new Runnable()
+            _environmentJobExecutor.execute(new Runnable()
+            {
+                @Override
+                public void run()
                 {
-                    @Override
-                    public void run()
+                    try
                     {
-                        try
-                        {
-                            restartEnvironment(dbe);
-                        }
-                        catch (Exception e)
-                        {
-                            LOGGER.error("Exception on environment restart", e);
-                        }
+                        restartEnvironment();
                     }
-                });
+                    catch (Exception e)
+                    {
+                        LOGGER.error("Exception on environment restart", e);
+                    }
+                }
+            });
 
-            }
-            else
-            {
-                LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
-            }
         }
-        return new AMQStoreException(contextMessage, dbe);
+        else
+        {
+            LOGGER.info("Cannot restart environment because of facade state: " + _state.get());
+        }
     }
 
     @Override
@@ -379,6 +385,12 @@ public class ReplicatedEnvironmentFacade
         {
             listener.stateChange(stateChangeEvent);
         }
+
+        if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN)
+        {
+            tryToRestartEnvironment(null);
+        }
+        _lastKnownEnvironmentState = state;
     }
 
     private void reopenDatabases()
@@ -726,7 +738,7 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    private void restartEnvironment(DatabaseException dbe)
+    private void restartEnvironment()
     {
         LOGGER.info("Restarting environment");
 
@@ -983,15 +995,30 @@ public class ReplicatedEnvironmentFacade
         @Override
         public void run()
         {
-            String groupName = _configuration.getGroupName();
-            if (LOGGER.isDebugEnabled())
+            if (_state.get() == State.OPEN)
             {
-                LOGGER.debug("Checking for changes in the group " + groupName);
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Checking for changes in the group " + _configuration.getGroupName() + " on node " + _configuration.getName());
+                }
+
+                try
+                {
+                    detectGroupChangesAndNotify();
+                }
+                catch(DatabaseException e)
+                {
+                    handleDatabaseException("Exception on replication group check", e);
+                }
             }
+        }
 
+        private void detectGroupChangesAndNotify()
+        {
+            String groupName = _configuration.getGroupName();
             ReplicatedEnvironment env = _environment;
             ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get();
-            if (env != null && env.isValid())
+            if (env != null)
             {
                 ReplicationGroup group = env.getGroup();
                 Set<ReplicationNode> nodes = new HashSet<ReplicationNode>(group.getElectableNodes());

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1569814&r1=1569813&r2=1569814&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java Wed Feb 19 16:30:23 2014
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.model.ReplicationNode;
@@ -48,6 +47,7 @@ import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.Environment;
 import com.sleepycat.je.rep.InsufficientReplicasException;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.ReplicatedEnvironment.State;
 import com.sleepycat.je.rep.ReplicationConfig;
 import com.sleepycat.je.rep.StateChangeEvent;
@@ -55,7 +55,6 @@ import com.sleepycat.je.rep.StateChangeL
 
 public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
 {
-    private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacadeTest.class);
 
     private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
     private static final int LISTENER_TIMEOUT = 5;
@@ -132,6 +131,123 @@ public class ReplicatedEnvironmentFacade
         assertNull("Environment should be null after facade close", e);
     }
 
+    public void testTransferMasterToSelf() throws Exception
+    {
+        final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
+        final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
+        StateChangeListener stateChangeListener = new StateChangeListener(){
+
+            @Override
+            public void stateChange(StateChangeEvent event) throws RuntimeException
+            {
+                ReplicatedEnvironment.State state = event.getState();
+                if (state == ReplicatedEnvironment.State.REPLICA)
+                {
+                    firstNodeReplicaStateLatch.countDown();
+                }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    firstNodeMasterStateLatch.countDown();
+                }
+            }
+        };
+        ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
+        assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
+
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String node1NodeHostPort = "localhost:" + replica1Port;
+        ReplicatedEnvironmentFacade secondNode = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+        assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
+
+        int replica2Port = getNextAvailable(replica1Port + 1);
+        String node2NodeHostPort = "localhost:" + replica2Port;
+        final CountDownLatch replicaStateLatch = new CountDownLatch(1);
+        final CountDownLatch masterStateLatch = new CountDownLatch(1);
+        StateChangeListener testStateChangeListener = new StateChangeListener()
+        {
+            @Override
+            public void stateChange(StateChangeEvent event) throws RuntimeException
+            {
+                ReplicatedEnvironment.State state = event.getState();
+                if (state == ReplicatedEnvironment.State.REPLICA)
+                {
+                    replicaStateLatch.countDown();
+                }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    masterStateLatch.countDown();
+                }
+            }
+        };
+        ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
+        assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
+        assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
+
+        thirdNode.transferMasterToSelfAsynchronously();
+        assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
+        assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
+        assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
+    }
+
+    public void testTransferMasterAnotherNode() throws Exception
+    {
+        final CountDownLatch firstNodeReplicaStateLatch = new CountDownLatch(1);
+        final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
+        StateChangeListener stateChangeListener = new StateChangeListener(){
+
+            @Override
+            public void stateChange(StateChangeEvent event) throws RuntimeException
+            {
+                ReplicatedEnvironment.State state = event.getState();
+                if (state == ReplicatedEnvironment.State.REPLICA)
+                {
+                    firstNodeReplicaStateLatch.countDown();
+                }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    firstNodeMasterStateLatch.countDown();
+                }
+            }
+        };
+        ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, stateChangeListener, new NoopReplicationGroupListener());
+        assertTrue("Environment did not become a master", firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
+
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String node1NodeHostPort = "localhost:" + replica1Port;
+        ReplicatedEnvironmentFacade secondNode = addReplica(TEST_NODE_NAME + "_1", node1NodeHostPort);
+        assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
+
+        int replica2Port = getNextAvailable(replica1Port + 1);
+        String node2NodeHostPort = "localhost:" + replica2Port;
+        final CountDownLatch replicaStateLatch = new CountDownLatch(1);
+        final CountDownLatch masterStateLatch = new CountDownLatch(1);
+        StateChangeListener testStateChangeListener = new StateChangeListener()
+        {
+            @Override
+            public void stateChange(StateChangeEvent event) throws RuntimeException
+            {
+                ReplicatedEnvironment.State state = event.getState();
+                if (state == ReplicatedEnvironment.State.REPLICA)
+                {
+                    replicaStateLatch.countDown();
+                }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    masterStateLatch.countDown();
+                }
+            }
+        };
+        String thirdNodeName = TEST_NODE_NAME + "_2";
+        ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, testStateChangeListener, new NoopReplicationGroupListener());
+        assertTrue("Environment did not become a replica", replicaStateLatch.await(10, TimeUnit.SECONDS));
+        assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
+
+        firstNode.transferMasterAsynchronously(thirdNodeName);
+        assertTrue("Environment did not become a master", masterStateLatch.await(10, TimeUnit.SECONDS));
+        assertTrue("First node environment did not become a replica", firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
+        assertEquals("Unexpected state", ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
+    }
+
     public void testOpenDatabases() throws Exception
     {
         EnvironmentFacade ef = createMaster();
@@ -404,7 +520,6 @@ public class ReplicatedEnvironmentFacade
 
     public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
     {
-        long startTime = System.currentTimeMillis();
 
         ReplicatedEnvironmentFacade master = createMaster();
 
@@ -417,9 +532,7 @@ public class ReplicatedEnvironmentFacade
         String replica2NodeName = TEST_NODE_NAME + "_2";
         String replica2NodeHostPort = "localhost:" + replica2Port;
         ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
-        
-        long setUpTime = System.currentTimeMillis();
-        LOGGER.debug("XXX Start Up Time " + (setUpTime - startTime));
+
         String databaseName = "test";
 
         DatabaseConfig dbConfig = createDatabase(master, databaseName);
@@ -428,8 +541,6 @@ public class ReplicatedEnvironmentFacade
         replica1.close();
         replica2.close();
 
-        long closeTime = System.currentTimeMillis();
-        LOGGER.debug("XXX Env close  Time " + (closeTime - setUpTime));
         Environment e = master.getEnvironment();
         Database db = master.getOpenDatabase(databaseName);
         try
@@ -441,22 +552,17 @@ public class ReplicatedEnvironmentFacade
         {
             master.handleDatabaseException(null, ex);
         }
-        long openDatabaseTime = System.currentTimeMillis();
-        LOGGER.debug("XXX Open db Time " + (openDatabaseTime - closeTime ));
 
         replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
         replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
 
-        long reopenTime = System.currentTimeMillis();
-        LOGGER.debug("XXX Restart Time " + (reopenTime - openDatabaseTime ));
         // Need to poll to await the remote node updating itself
         long timeout = System.currentTimeMillis() + 5000;
         while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
         {
             Thread.sleep(200);
         }
-        long recoverTime = System.currentTimeMillis();
-        LOGGER.debug("XXX Recover Time " + (recoverTime - reopenTime));
+
         assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
                 State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java?rev=1569814&r1=1569813&r2=1569814&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterBlackboxTest.java Wed Feb 19 16:30:23 2014
@@ -143,6 +143,9 @@ public class HAClusterBlackboxTest exten
         assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE));
 
         assertProducingConsuming(connection);
+
+        String nodeName = _clusterCreator.getNodeNameForBrokerPort(activeBrokerPort);
+        _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA");
     }
 
     public void testTransferMasterFromRemoteNode() throws Exception
@@ -171,7 +174,10 @@ public class HAClusterBlackboxTest exten
         assertEquals("Inactive broker has unexpeced role", "MASTER", attributes.get(ReplicationNode.ROLE));
 
         assertProducingConsuming(connection);
+
+        _clusterCreator.awaitNodeToAttainRole(activeBrokerPort, nodeName, "REPLICA");
     }
+
     public void testQuorumOverride() throws Exception
     {
         final Connection connection = getConnection(_brokerFailoverUrl);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org