You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/30 22:23:59 UTC

svn commit: r1307596 - in /hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/

Author: todd
Date: Fri Mar 30 20:23:59 2012
New Revision: 1307596

URL: http://svn.apache.org/viewvc?rev=1307596&view=rev
Log:
HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
Modified:
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java

Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt?rev=1307596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt Fri Mar 30 20:23:59 2012
@@ -0,0 +1,8 @@
+Changes for HDFS-3042 branch.
+
+This change list will be merged into the trunk CHANGES.txt when the HDFS-3-42
+branch is merged.
+------------------------------
+
+HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd)
+

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1307596&r1=1307595&r2=1307596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Fri Mar 30 20:23:59 2012
@@ -81,9 +81,15 @@ public class ActiveStandbyElector implem
    */
   public interface ActiveStandbyElectorCallback {
     /**
-     * This method is called when the app becomes the active leader
+     * This method is called when the app becomes the active leader.
+     * If the service fails to become active, it should throw
+     * ServiceFailedException. This will cause the elector to
+     * sleep for a short period, then re-join the election.
+     * 
+     * Callback implementations are expected to manage their own
+     * timeouts (e.g. when making an RPC to a remote node).
      */
-    void becomeActive();
+    void becomeActive() throws ServiceFailedException;
 
     /**
      * This method is called when the app becomes a standby
@@ -135,6 +141,7 @@ public class ActiveStandbyElector implem
   public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
 
   private static final int NUM_RETRIES = 3;
+  private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
 
   private static enum ConnectionState {
     DISCONNECTED, CONNECTED, TERMINATED
@@ -385,8 +392,11 @@ public class ActiveStandbyElector implem
     Code code = Code.get(rc);
     if (isSuccess(code)) {
       // we successfully created the znode. we are the leader. start monitoring
-      becomeActive();
-      monitorActiveStatus();
+      if (becomeActive()) {
+        monitorActiveStatus();
+      } else {
+        reJoinElectionAfterFailureToBecomeActive();
+      }
       return;
     }
 
@@ -442,7 +452,9 @@ public class ActiveStandbyElector implem
       // creation was retried
       if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
         // we own the lock znode. so we are the leader
-        becomeActive();
+        if (!becomeActive()) {
+          reJoinElectionAfterFailureToBecomeActive();
+        }
       } else {
         // we dont own the lock znode. so we are a standby.
         becomeStandby();
@@ -480,6 +492,17 @@ public class ActiveStandbyElector implem
   }
 
   /**
+   * We failed to become active. Re-join the election, but
+   * sleep for a few seconds after terminating our existing
+   * session, so that other nodes have a chance to become active.
+   * The failure to become active is already logged inside
+   * becomeActive().
+   */
+  private void reJoinElectionAfterFailureToBecomeActive() {
+    reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE);
+  }
+
+  /**
    * interface implementation of Zookeeper watch events (connection and node),
    * proxied by {@link WatcherWithClientRef}.
    */
@@ -516,7 +539,7 @@ public class ActiveStandbyElector implem
         // call listener to reconnect
         LOG.info("Session expired. Entering neutral mode and rejoining...");
         enterNeutralMode();
-        reJoinElection();
+        reJoinElection(0);
         break;
       default:
         fatalError("Unexpected Zookeeper watch event state: "
@@ -591,7 +614,7 @@ public class ActiveStandbyElector implem
     createLockNodeAsync();
   }
 
-  private void reJoinElection() {
+  private void reJoinElection(int sleepTime) {
     LOG.info("Trying to re-establish ZK session");
     
     // Some of the test cases rely on expiring the ZK sessions and
@@ -604,12 +627,30 @@ public class ActiveStandbyElector implem
     sessionReestablishLockForTests.lock();
     try {
       terminateConnection();
+      sleepFor(sleepTime);
+      
       joinElectionInternal();
     } finally {
       sessionReestablishLockForTests.unlock();
     }
   }
-  
+
+  /**
+   * Sleep for the given number of milliseconds.
+   * This is non-static, and separated out, so that unit tests
+   * can override the behavior not to sleep.
+   */
+  @VisibleForTesting
+  protected void sleepFor(int sleepMs) {
+    if (sleepMs > 0) {
+      try {
+        Thread.sleep(sleepMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
   @VisibleForTesting
   void preventSessionReestablishmentForTests() {
     sessionReestablishLockForTests.lock();
@@ -640,11 +681,7 @@ public class ActiveStandbyElector implem
         success = true;
       } catch(IOException e) {
         LOG.warn(e);
-        try {
-          Thread.sleep(5000);
-        } catch(InterruptedException e1) {
-          LOG.warn(e1);
-        }
+        sleepFor(5000);
       }
       ++connectionRetryCount;
     }
@@ -675,20 +712,24 @@ public class ActiveStandbyElector implem
     terminateConnection();
   }
 
-  private void becomeActive() {
+  private boolean becomeActive() {
     assert wantToBeInElection;
-    if (state != State.ACTIVE) {
-      try {
-        Stat oldBreadcrumbStat = fenceOldActive();
-        writeBreadCrumbNode(oldBreadcrumbStat);
-      } catch (Exception e) {
-        LOG.warn("Exception handling the winning of election", e);
-        reJoinElection();
-        return;
-      }
+    if (state == State.ACTIVE) {
+      // already active
+      return true;
+    }
+    try {
+      Stat oldBreadcrumbStat = fenceOldActive();
+      writeBreadCrumbNode(oldBreadcrumbStat);
+      
       LOG.debug("Becoming active");
-      state = State.ACTIVE;
       appClient.becomeActive();
+      state = State.ACTIVE;
+      return true;
+    } catch (Exception e) {
+      LOG.warn("Exception handling the winning of election", e);
+      // Caller will handle quitting and rejoining the election.
+      return false;
     }
   }
 

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1307596&r1=1307595&r2=1307596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Fri Mar 30 20:23:59 2012
@@ -242,15 +242,20 @@ public abstract class ZKFailoverControll
     notifyAll();
   }
   
-  private synchronized void becomeActive() {
+  private synchronized void becomeActive() throws ServiceFailedException {
     LOG.info("Trying to make " + localTarget + " active...");
     try {
-      localTarget.getProxy().transitionToActive();
+      HAServiceProtocolHelper.transitionToActive(localTarget.getProxy());
       LOG.info("Successfully transitioned " + localTarget +
           " to active state");
     } catch (Throwable t) {
       LOG.fatal("Couldn't make " + localTarget + " active", t);
-      elector.quitElection(true);
+      if (t instanceof ServiceFailedException) {
+        throw (ServiceFailedException)t;
+      } else {
+        throw new ServiceFailedException("Couldn't transition to active",
+            t);
+      }
 /*
 * TODO:
 * we need to make sure that if we get fenced and then quickly restarted,
@@ -297,7 +302,7 @@ public abstract class ZKFailoverControll
    */
   class ElectorCallbacks implements ActiveStandbyElectorCallback {
     @Override
-    public void becomeActive() {
+    public void becomeActive() throws ServiceFailedException {
       ZKFailoverController.this.becomeActive();
     }
 

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java?rev=1307596&r1=1307595&r2=1307596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java Fri Mar 30 20:23:59 2012
@@ -19,16 +19,25 @@ package org.apache.hadoop.ha;
 
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.ZooKeeperServer;
 
 public abstract class ActiveStandbyElectorTestUtil {
+  
+  private static final Log LOG = LogFactory.getLog(
+      ActiveStandbyElectorTestUtil.class);
+  private static final long LOG_INTERVAL_MS = 500;
 
   public static void waitForActiveLockData(TestContext ctx,
       ZooKeeperServer zks, String parentDir, byte[] activeData)
       throws Exception {
+    long st = System.currentTimeMillis();
+    long lastPrint = st;
     while (true) {
       if (ctx != null) {
         ctx.checkException();
@@ -42,10 +51,18 @@ public abstract class ActiveStandbyElect
             Arrays.equals(activeData, data)) {
           return;
         }
+        if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) {
+          LOG.info("Cur data: " + StringUtils.byteToHexString(data));
+          lastPrint = System.currentTimeMillis();
+        }
       } catch (NoNodeException nne) {
         if (activeData == null) {
           return;
         }
+        if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) {
+          LOG.info("Cur data: no node");
+          lastPrint = System.currentTimeMillis();
+        }
       }
       Thread.sleep(50);
     }

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java?rev=1307596&r1=1307595&r2=1307596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java Fri Mar 30 20:23:59 2012
@@ -51,6 +51,8 @@ public class TestActiveStandbyElector {
   private ActiveStandbyElectorTester elector;
 
   class ActiveStandbyElectorTester extends ActiveStandbyElector {
+    private int sleptFor = 0;
+    
     ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
         List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
       super(hostPort, timeout, parent, acl, app);
@@ -61,6 +63,14 @@ public class TestActiveStandbyElector {
       ++count;
       return mockZK;
     }
+    
+    @Override
+    protected void sleepFor(int ms) {
+      // don't sleep in unit tests! Instead, just record the amount of
+      // time slept
+      LOG.info("Would have slept for " + ms + "ms");
+      sleptFor += ms;
+    }
   }
 
   private static final String ZK_PARENT_NAME = "/parent/node";
@@ -147,6 +157,68 @@ public class TestActiveStandbyElector {
   }
   
   /**
+   * Verify that, when the callback fails to enter active state,
+   * the elector rejoins the election after sleeping for a short period.
+   */
+  @Test
+  public void testFailToBecomeActive() throws Exception {
+    mockNoPriorActive();
+    elector.joinElection(data);
+    Assert.assertEquals(0, elector.sleptFor);
+    
+    Mockito.doThrow(new ServiceFailedException("failed to become active"))
+        .when(mockApp).becomeActive();
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
+        ZK_LOCK_NAME);
+    // Should have tried to become active
+    Mockito.verify(mockApp).becomeActive();
+    
+    // should re-join
+    Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
+    Assert.assertEquals(2, count);
+    Assert.assertTrue(elector.sleptFor > 0);
+  }
+  
+  /**
+   * Verify that, when the callback fails to enter active state, after
+   * a ZK disconnect (i.e from the StatCallback), that the elector rejoins
+   * the election after sleeping for a short period.
+   */
+  @Test
+  public void testFailToBecomeActiveAfterZKDisconnect() throws Exception {
+    mockNoPriorActive();
+    elector.joinElection(data);
+    Assert.assertEquals(0, elector.sleptFor);
+
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
+        ZK_LOCK_NAME);
+    Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
+
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
+        ZK_LOCK_NAME);
+    verifyExistCall(1);
+
+    Stat stat = new Stat();
+    stat.setEphemeralOwner(1L);
+    Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+
+    // Fake failure to become active from within the stat callback
+    Mockito.doThrow(new ServiceFailedException("fail to become active"))
+        .when(mockApp).becomeActive();
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+    Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+    
+    // should re-join
+    Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
+    Assert.assertEquals(2, count);
+    Assert.assertTrue(elector.sleptFor > 0);
+  }
+
+  
+  /**
    * Verify that, if there is a record of a prior active node, the
    * elector asks the application to fence it before becoming active.
    */

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1307596&r1=1307595&r2=1307596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Fri Mar 30 20:23:59 2012
@@ -273,7 +273,8 @@ public class TestZKFailoverController ex
       waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
       waitForActiveLockHolder(null);
 
-      Mockito.verify(svc2.proxy).transitionToActive();
+      Mockito.verify(svc2.proxy, Mockito.timeout(2000).atLeastOnce())
+        .transitionToActive();
 
       waitForHAState(svc1, HAServiceState.STANDBY);
       waitForHAState(svc2, HAServiceState.STANDBY);
@@ -283,6 +284,12 @@ public class TestZKFailoverController ex
       waitForHAState(svc1, HAServiceState.ACTIVE);
       waitForHAState(svc2, HAServiceState.STANDBY);
       waitForActiveLockHolder(svc1);
+      
+      // Ensure that we can fail back to thr2 once it it is able
+      // to become active (e.g the admin has restarted it)
+      LOG.info("Allowing svc2 to become active, expiring svc1");
+      svc2.failToBecomeActive = false;
+      expireAndVerifyFailover(thr1, thr2);
     } finally {
       stopFCs();
     }