You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/24 01:05:40 UTC

svn commit: r1304676 - in /hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/

Author: todd
Date: Sat Mar 24 00:05:40 2012
New Revision: 1304676

URL: http://svn.apache.org/viewvc?rev=1304676&view=rev
Log:
HADOOP-8163. Improve ActiveStandbyElector to provide hooks for fencing old active. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1304676&r1=1304675&r2=1304676&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Sat Mar 24 00:05:40 2012
@@ -106,6 +106,9 @@ Release 0.23.3 - UNRELEASED
     HADOOP-8184.  ProtoBuf RPC engine uses the IPC layer reply packet.
     (Sanjay Radia via szetszwo)
 
+    HADOOP-8163. Improve ActiveStandbyElector to provide hooks for
+    fencing old active. (todd)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1304676&r1=1304675&r2=1304676&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Sat Mar 24 00:05:40 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ha;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
@@ -37,6 +39,7 @@ import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.KeeperException.Code;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 /**
  * 
@@ -106,6 +109,15 @@ public class ActiveStandbyElector implem
      * called to notify the app about it.
      */
     void notifyFatalError(String errorMessage);
+
+    /**
+     * If an old active has failed, rather than exited gracefully, then
+     * the new active may need to take some fencing actions against it
+     * before proceeding with failover.
+     * 
+     * @param oldActiveData the application data provided by the prior active
+     */
+    void fenceOldActive(byte[] oldActiveData);
   }
 
   /**
@@ -113,7 +125,9 @@ public class ActiveStandbyElector implem
    * classes
    */
   @VisibleForTesting
-  protected static final String LOCKFILENAME = "ActiveStandbyElectorLock";
+  protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock";
+  @VisibleForTesting
+  protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb";
 
   public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
 
@@ -139,6 +153,7 @@ public class ActiveStandbyElector implem
   private final List<ACL> zkAcl;
   private byte[] appData;
   private final String zkLockFilePath;
+  private final String zkBreadCrumbPath;
   private final String znodeWorkingDir;
 
   /**
@@ -182,7 +197,8 @@ public class ActiveStandbyElector implem
     zkAcl = acl;
     appClient = app;
     znodeWorkingDir = parentZnodeName;
-    zkLockFilePath = znodeWorkingDir + "/" + LOCKFILENAME;
+    zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
+    zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;    
 
     // createConnection for future API calls
     createConnection();
@@ -204,6 +220,7 @@ public class ActiveStandbyElector implem
    */
   public synchronized void joinElection(byte[] data)
       throws HadoopIllegalArgumentException {
+    
     LOG.debug("Attempting active election");
 
     if (data == null) {
@@ -215,6 +232,49 @@ public class ActiveStandbyElector implem
 
     joinElectionInternal();
   }
+  
+  /**
+   * @return true if the configured parent znode exists
+   */
+  public synchronized boolean parentZNodeExists()
+      throws IOException, InterruptedException {
+    Preconditions.checkState(zkClient != null);
+    try {
+      return zkClient.exists(znodeWorkingDir, false) != null;
+    } catch (KeeperException e) {
+      throw new IOException("Couldn't determine existence of znode '" +
+          znodeWorkingDir + "'", e);
+    }
+  }
+
+  /**
+   * Utility function to ensure that the configured base znode exists.
+   * This recursively creates the znode as well as all of its parents.
+   */
+  public synchronized void ensureParentZNode()
+      throws IOException, InterruptedException {
+    String pathParts[] = znodeWorkingDir.split("/");
+    Preconditions.checkArgument(pathParts.length >= 1 &&
+        "".equals(pathParts[0]),
+        "Invalid path: %s", znodeWorkingDir);
+    
+    StringBuilder sb = new StringBuilder();
+    for (int i = 1; i < pathParts.length; i++) {
+      sb.append("/").append(pathParts[i]);
+      String prefixPath = sb.toString();
+      LOG.debug("Ensuring existence of " + prefixPath);
+      try {
+        createWithRetries(prefixPath, new byte[]{}, zkAcl, CreateMode.PERSISTENT);
+      } catch (KeeperException e) {
+        if (isNodeExists(e.code())) {
+          // This is OK - just ensuring existence.
+          continue;
+        } else {
+          throw new IOException("Couldn't create " + prefixPath, e);
+        }
+      }
+    }
+  }
 
   /**
    * Any service instance can drop out of the election by calling quitElection. 
@@ -225,9 +285,17 @@ public class ActiveStandbyElector implem
    * call joinElection(). <br/>
    * This allows service instances to take themselves out of rotation for known
    * impending unavailable states (e.g. long GC pause or software upgrade).
+   * 
+   * @param needFence true if the underlying daemon may need to be fenced
+   * if a failover occurs due to dropping out of the election.
    */
-  public synchronized void quitElection() {
+  public synchronized void quitElection(boolean needFence) {
     LOG.debug("Yielding from election");
+    if (!needFence && state == State.ACTIVE) {
+      // If active is gracefully going back to standby mode, remove
+      // our permanent znode so no one fences us.
+      tryDeleteOwnBreadCrumbNode();
+    }
     reset();
   }
 
@@ -260,7 +328,7 @@ public class ActiveStandbyElector implem
       return zkClient.getData(zkLockFilePath, false, stat);
     } catch(KeeperException e) {
       Code code = e.code();
-      if (operationNodeDoesNotExist(code)) {
+      if (isNodeDoesNotExist(code)) {
         // handle the commonly expected cases that make sense for us
         throw new ActiveNotFoundException();
       } else {
@@ -284,14 +352,14 @@ public class ActiveStandbyElector implem
     }
 
     Code code = Code.get(rc);
-    if (operationSuccess(code)) {
+    if (isSuccess(code)) {
       // we successfully created the znode. we are the leader. start monitoring
       becomeActive();
       monitorActiveStatus();
       return;
     }
 
-    if (operationNodeExists(code)) {
+    if (isNodeExists(code)) {
       if (createRetryCount == 0) {
         // znode exists and we did not retry the operation. so a different
         // instance has created it. become standby and monitor lock.
@@ -306,14 +374,14 @@ public class ActiveStandbyElector implem
     }
 
     String errorMessage = "Received create error from Zookeeper. code:"
-        + code.toString();
+        + code.toString() + " for path " + path;
     LOG.debug(errorMessage);
 
-    if (operationRetry(code)) {
+    if (shouldRetry(code)) {
       if (createRetryCount < NUM_RETRIES) {
         LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
         ++createRetryCount;
-        createNode();
+        createLockNodeAsync();
         return;
       }
       errorMessage = errorMessage
@@ -338,7 +406,7 @@ public class ActiveStandbyElector implem
     }
 
     Code code = Code.get(rc);
-    if (operationSuccess(code)) {
+    if (isSuccess(code)) {
       // the following owner check completes verification in case the lock znode
       // creation was retried
       if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
@@ -352,7 +420,7 @@ public class ActiveStandbyElector implem
       return;
     }
 
-    if (operationNodeDoesNotExist(code)) {
+    if (isNodeDoesNotExist(code)) {
       // the lock znode disappeared before we started monitoring it
       enterNeutralMode();
       joinElectionInternal();
@@ -363,10 +431,10 @@ public class ActiveStandbyElector implem
         + code.toString();
     LOG.debug(errorMessage);
 
-    if (operationRetry(code)) {
+    if (shouldRetry(code)) {
       if (statRetryCount < NUM_RETRIES) {
         ++statRetryCount;
-        monitorNode();
+        monitorLockNodeAsync();
         return;
       }
       errorMessage = errorMessage
@@ -470,7 +538,7 @@ public class ActiveStandbyElector implem
   private void monitorActiveStatus() {
     LOG.debug("Monitoring active leader");
     statRetryCount = 0;
-    monitorNode();
+    monitorLockNodeAsync();
   }
 
   private void joinElectionInternal() {
@@ -482,7 +550,7 @@ public class ActiveStandbyElector implem
     }
 
     createRetryCount = 0;
-    createNode();
+    createLockNodeAsync();
   }
 
   private void reJoinElection() {
@@ -515,7 +583,7 @@ public class ActiveStandbyElector implem
   private void createConnection() throws IOException {
     zkClient = getNewZooKeeper();
   }
-
+  
   private void terminateConnection() {
     if (zkClient == null) {
       return;
@@ -538,12 +606,110 @@ public class ActiveStandbyElector implem
 
   private void becomeActive() {
     if (state != State.ACTIVE) {
+      try {
+        Stat oldBreadcrumbStat = fenceOldActive();
+        writeBreadCrumbNode(oldBreadcrumbStat);
+      } catch (Exception e) {
+        LOG.warn("Exception handling the winning of election", e);
+        reJoinElection();
+        return;
+      }
       LOG.debug("Becoming active");
       state = State.ACTIVE;
       appClient.becomeActive();
     }
   }
 
+  /**
+   * Write the "ActiveBreadCrumb" node, indicating that this node may need
+   * to be fenced on failover.
+   * @param oldBreadcrumbStat 
+   */
+  private void writeBreadCrumbNode(Stat oldBreadcrumbStat)
+      throws KeeperException, InterruptedException {
+    LOG.info("Writing znode " + zkBreadCrumbPath +
+        " to indicate that the local node is the most recent active...");
+    if (oldBreadcrumbStat == null) {
+      // No previous active, just create the node
+      createWithRetries(zkBreadCrumbPath, appData, zkAcl,
+        CreateMode.PERSISTENT);
+    } else {
+      // There was a previous active, update the node
+      setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
+    }
+  }
+  
+  /**
+   * Try to delete the "ActiveBreadCrumb" node when gracefully giving up
+   * active status.
+   * If this fails, it will simply warn, since the graceful release behavior
+   * is only an optimization.
+   */
+  private void tryDeleteOwnBreadCrumbNode() {
+    assert state == State.ACTIVE;
+    LOG.info("Deleting bread-crumb of active node...");
+    
+    // Sanity check the data. This shouldn't be strictly necessary,
+    // but better to play it safe.
+    Stat stat = new Stat();
+    byte[] data = null;
+    try {
+      data = zkClient.getData(zkBreadCrumbPath, false, stat);
+
+      if (!Arrays.equals(data, appData)) {
+        throw new IllegalStateException(
+            "We thought we were active, but in fact " +
+            "the active znode had the wrong data: " +
+            StringUtils.byteToHexString(data) + " (stat=" + stat + ")");
+      }
+      
+      deleteWithRetries(zkBreadCrumbPath, stat.getVersion());
+    } catch (Exception e) {
+      LOG.warn("Unable to delete our own bread-crumb of being active at " +
+          zkBreadCrumbPath + ": " + e.getLocalizedMessage() + ". " +
+          "Expecting to be fenced by the next active.");
+    }
+  }
+
+  /**
+   * If there is a breadcrumb node indicating that another node may need
+   * fencing, try to fence that node.
+   * @return the Stat of the breadcrumb node that was read, or null
+   * if no breadcrumb node existed
+   */
+  private Stat fenceOldActive() throws InterruptedException, KeeperException {
+    final Stat stat = new Stat();
+    byte[] data;
+    LOG.info("Checking for any old active which needs to be fenced...");
+    try {
+      data = zkDoWithRetries(new ZKAction<byte[]>() {
+        @Override
+        public byte[] run() throws KeeperException, InterruptedException {
+          return zkClient.getData(zkBreadCrumbPath, false, stat);
+        }
+      });
+    } catch (KeeperException ke) {
+      if (isNodeDoesNotExist(ke.code())) {
+        LOG.info("No old node to fence");
+        return null;
+      }
+      
+      // If we failed to read for any other reason, then likely we lost
+      // our session, or we don't have permissions, etc. In any case,
+      // we probably shouldn't become active, and failing the whole
+      // thing is the best bet.
+      throw ke;
+    }
+
+    LOG.info("Old node exists: " + StringUtils.byteToHexString(data));
+    if (Arrays.equals(data, appData)) {
+      LOG.info("But old node has our own data, so don't need to fence it.");
+    } else {
+      appClient.fenceOldActive(data);
+    }
+    return stat;
+  }
+
   private void becomeStandby() {
     if (state != State.STANDBY) {
       LOG.debug("Becoming standby");
@@ -560,28 +726,76 @@ public class ActiveStandbyElector implem
     }
   }
 
-  private void createNode() {
+  private void createLockNodeAsync() {
     zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this,
         null);
   }
 
-  private void monitorNode() {
-    zkClient.exists(zkLockFilePath, true, this, null);
+  private void monitorLockNodeAsync() {
+    zkClient.exists(zkLockFilePath, this, this, null);
+  }
+
+  private String createWithRetries(final String path, final byte[] data,
+      final List<ACL> acl, final CreateMode mode)
+      throws InterruptedException, KeeperException {
+    return zkDoWithRetries(new ZKAction<String>() {
+      public String run() throws KeeperException, InterruptedException {
+        return zkClient.create(path, data, acl, mode);
+      }
+    });
+  }
+
+  private Stat setDataWithRetries(final String path, final byte[] data,
+      final int version) throws InterruptedException, KeeperException {
+    return zkDoWithRetries(new ZKAction<Stat>() {
+      public Stat run() throws KeeperException, InterruptedException {
+        return zkClient.setData(path, data, version);
+      }
+    });
+  }
+  
+  private void deleteWithRetries(final String path, final int version)
+      throws KeeperException, InterruptedException {
+    zkDoWithRetries(new ZKAction<Void>() {
+      public Void run() throws KeeperException, InterruptedException {
+        zkClient.delete(path, version);
+        return null;
+      }
+    });
+  }
+
+  private static <T> T zkDoWithRetries(ZKAction<T> action)
+      throws KeeperException, InterruptedException {
+    int retry = 0;
+    while (true) {
+      try {
+        return action.run();
+      } catch (KeeperException ke) {
+        if (shouldRetry(ke.code()) && ++retry < NUM_RETRIES) {
+          continue;
+        }
+        throw ke;
+      }
+    }
+  }
+  
+  private interface ZKAction<T> {
+    T run() throws KeeperException, InterruptedException; 
   }
 
-  private boolean operationSuccess(Code code) {
+  private static boolean isSuccess(Code code) {
     return (code == Code.OK);
   }
 
-  private boolean operationNodeExists(Code code) {
+  private static boolean isNodeExists(Code code) {
     return (code == Code.NODEEXISTS);
   }
 
-  private boolean operationNodeDoesNotExist(Code code) {
+  private static boolean isNodeDoesNotExist(Code code) {
     return (code == Code.NONODE);
   }
 
-  private boolean operationRetry(Code code) {
+  private static boolean shouldRetry(Code code) {
     switch (code) {
     case CONNECTIONLOSS:
     case OPERATIONTIMEOUT:

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java?rev=1304676&r1=1304675&r2=1304676&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java Sat Mar 24 00:05:40 2012
@@ -42,12 +42,12 @@ import org.apache.hadoop.ha.ActiveStandb
 
 public class TestActiveStandbyElector {
 
-  static ZooKeeper mockZK;
-  static int count;
-  static ActiveStandbyElectorCallback mockApp;
-  static final byte[] data = new byte[8];
+  private ZooKeeper mockZK;
+  private int count;
+  private ActiveStandbyElectorCallback mockApp;
+  private final byte[] data = new byte[8];
 
-  ActiveStandbyElectorTester elector;
+  private ActiveStandbyElectorTester elector;
 
   class ActiveStandbyElectorTester extends ActiveStandbyElector {
     ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
@@ -57,26 +57,46 @@ public class TestActiveStandbyElector {
 
     @Override
     public ZooKeeper getNewZooKeeper() {
-      ++TestActiveStandbyElector.count;
-      return TestActiveStandbyElector.mockZK;
+      ++count;
+      return mockZK;
     }
-
   }
 
-  private static final String zkParentName = "/zookeeper";
-  private static final String zkLockPathName = "/zookeeper/"
-      + ActiveStandbyElector.LOCKFILENAME;
+  private static final String ZK_PARENT_NAME = "/parent/node";
+  private static final String ZK_LOCK_NAME = ZK_PARENT_NAME + "/" +
+      ActiveStandbyElector.LOCK_FILENAME;
+  private static final String ZK_BREADCRUMB_NAME = ZK_PARENT_NAME + "/" +
+      ActiveStandbyElector.BREADCRUMB_FILENAME;
 
   @Before
   public void init() throws IOException {
     count = 0;
     mockZK = Mockito.mock(ZooKeeper.class);
     mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
-    elector = new ActiveStandbyElectorTester("hostPort", 1000, zkParentName,
+    elector = new ActiveStandbyElectorTester("hostPort", 1000, ZK_PARENT_NAME,
         Ids.OPEN_ACL_UNSAFE, mockApp);
   }
 
   /**
+   * Set up the mock ZK to return no info for a prior active in ZK.
+   */
+  private void mockNoPriorActive() throws Exception {
+    Mockito.doThrow(new KeeperException.NoNodeException()).when(mockZK)
+        .getData(Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.anyBoolean(),
+            Mockito.<Stat>any());
+  }
+  
+  /**
+   * Set up the mock to return info for some prior active node in ZK./
+   */
+  private void mockPriorActive(byte[] data) throws Exception {
+    Mockito.doReturn(data).when(mockZK)
+        .getData(Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.anyBoolean(),
+            Mockito.<Stat>any());
+  }
+
+
+  /**
    * verify that joinElection checks for null data
    */
   @Test(expected = HadoopIllegalArgumentException.class)
@@ -90,7 +110,7 @@ public class TestActiveStandbyElector {
   @Test
   public void testJoinElection() {
     elector.joinElection(data);
-    Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data,
+    Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
   }
 
@@ -99,30 +119,74 @@ public class TestActiveStandbyElector {
    * started
    */
   @Test
-  public void testCreateNodeResultBecomeActive() {
+  public void testCreateNodeResultBecomeActive() throws Exception {
+    mockNoPriorActive();
+    
     elector.joinElection(data);
-    elector.processResult(Code.OK.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(1);
 
     // monitor callback verifies the leader is ephemeral owner of lock but does
     // not call becomeActive since its already active
     Stat stat = new Stat();
     stat.setEphemeralOwner(1L);
     Mockito.when(mockZK.getSessionId()).thenReturn(1L);
-    elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
     // should not call neutral mode/standby/active
     Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
     Mockito.verify(mockApp, Mockito.times(0)).becomeStandby();
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
     // another joinElection not called.
-    Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data,
+    Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
     // no new monitor called
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(1);
+  }
+  
+  /**
+   * Verify that, if there is a record of a prior active node, the
+   * elector asks the application to fence it before becoming active.
+   */
+  @Test
+  public void testFencesOldActive() throws Exception {
+    byte[] fakeOldActiveData = new byte[0];
+    mockPriorActive(fakeOldActiveData);
+    
+    elector.joinElection(data);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    // Application fences active.
+    Mockito.verify(mockApp, Mockito.times(1)).fenceOldActive(
+        fakeOldActiveData);
+    // Updates breadcrumb node to new data
+    Mockito.verify(mockZK, Mockito.times(1)).setData(
+        Mockito.eq(ZK_BREADCRUMB_NAME),
+        Mockito.eq(data),
+        Mockito.eq(0));
+    // Then it becomes active itself
+    Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+  }
+  
+  @Test
+  public void testQuitElectionRemovesBreadcrumbNode() throws Exception {
+    mockNoPriorActive();
+    elector.joinElection(data);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    // Writes its own active info
+    Mockito.verify(mockZK, Mockito.times(1)).create(
+        Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(data),
+        Mockito.eq(Ids.OPEN_ACL_UNSAFE),
+        Mockito.eq(CreateMode.PERSISTENT));
+    mockPriorActive(data);
+    
+    elector.quitElection(false);
+    
+    // Deletes its own active data
+    Mockito.verify(mockZK, Mockito.times(1)).delete(
+        Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(0));
   }
 
   /**
@@ -133,11 +197,10 @@ public class TestActiveStandbyElector {
   public void testCreateNodeResultBecomeStandby() {
     elector.joinElection(data);
 
-    elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(1);
   }
 
   /**
@@ -147,10 +210,11 @@ public class TestActiveStandbyElector {
   public void testCreateNodeResultError() {
     elector.joinElection(data);
 
-    elector.processResult(Code.APIERROR.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
-        "Received create error from Zookeeper. code:APIERROR");
+        "Received create error from Zookeeper. code:APIERROR " +
+        "for path " + ZK_LOCK_NAME);
   }
 
   /**
@@ -158,42 +222,43 @@ public class TestActiveStandbyElector {
    * becomes active if they match. monitoring is started.
    */
   @Test
-  public void testCreateNodeResultRetryBecomeActive() {
+  public void testCreateNodeResultRetryBecomeActive() throws Exception {
+    mockNoPriorActive();
+    
     elector.joinElection(data);
 
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     // 4 errors results in fatalError
     Mockito
         .verify(mockApp, Mockito.times(1))
         .notifyFatalError(
-            "Received create error from Zookeeper. code:CONNECTIONLOSS. "+
+            "Received create error from Zookeeper. code:CONNECTIONLOSS " +
+            "for path " + ZK_LOCK_NAME + ". " +
             "Not retrying further znode create connection errors.");
 
     elector.joinElection(data);
     // recreate connection via getNewZooKeeper
-    Assert.assertEquals(2, TestActiveStandbyElector.count);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    Assert.assertEquals(2, count);
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    verifyExistCall(1);
 
     Stat stat = new Stat();
     stat.setEphemeralOwner(1L);
     Mockito.when(mockZK.getSessionId()).thenReturn(1L);
-    elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
-    Mockito.verify(mockZK, Mockito.times(6)).create(zkLockPathName, data,
+    verifyExistCall(1);
+    Mockito.verify(mockZK, Mockito.times(6)).create(ZK_LOCK_NAME, data,
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
   }
 
@@ -205,20 +270,18 @@ public class TestActiveStandbyElector {
   public void testCreateNodeResultRetryBecomeStandby() {
     elector.joinElection(data);
 
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    verifyExistCall(1);
 
     Stat stat = new Stat();
     stat.setEphemeralOwner(0);
     Mockito.when(mockZK.getSessionId()).thenReturn(1L);
-    elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(1);
   }
 
   /**
@@ -230,19 +293,18 @@ public class TestActiveStandbyElector {
   public void testCreateNodeResultRetryNoNode() {
     elector.joinElection(data);
 
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
-        zkLockPathName);
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
+    verifyExistCall(1);
 
-    elector.processResult(Code.NONODE.intValue(), zkLockPathName, null,
+    elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, null,
         (Stat) null);
     Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
-    Mockito.verify(mockZK, Mockito.times(4)).create(zkLockPathName, data,
+    Mockito.verify(mockZK, Mockito.times(4)).create(ZK_LOCK_NAME, data,
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
   }
 
@@ -251,13 +313,13 @@ public class TestActiveStandbyElector {
    */
   @Test
   public void testStatNodeRetry() {
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
         (Stat) null);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
         (Stat) null);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
         (Stat) null);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
         (Stat) null);
     Mockito
         .verify(mockApp, Mockito.times(1))
@@ -271,7 +333,7 @@ public class TestActiveStandbyElector {
    */
   @Test
   public void testStatNodeError() {
-    elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), zkLockPathName,
+    elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME,
         null, (Stat) null);
     Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
     Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
@@ -282,7 +344,8 @@ public class TestActiveStandbyElector {
    * verify behavior of watcher.process callback with non-node event
    */
   @Test
-  public void testProcessCallbackEventNone() {
+  public void testProcessCallbackEventNone() throws Exception {
+    mockNoPriorActive();
     elector.joinElection(data);
 
     WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
@@ -306,8 +369,7 @@ public class TestActiveStandbyElector {
     Mockito.when(mockEvent.getState()).thenReturn(
         Event.KeeperState.SyncConnected);
     elector.process(mockEvent);
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(1);
 
     // session expired should enter safe mode and initiate re-election
     // re-election checked via checking re-creation of new zookeeper and
@@ -318,17 +380,16 @@ public class TestActiveStandbyElector {
     Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
     // called getNewZooKeeper to create new session. first call was in
     // constructor
-    Assert.assertEquals(2, TestActiveStandbyElector.count);
+    Assert.assertEquals(2, count);
     // once in initial joinElection and one now
-    Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+    Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
 
     // create znode success. become master and monitor
-    elector.processResult(Code.OK.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
-    Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(2);
 
     // error event results in fatal error
     Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
@@ -343,32 +404,30 @@ public class TestActiveStandbyElector {
    * verify behavior of watcher.process with node event
    */
   @Test
-  public void testProcessCallbackEventNode() {
+  public void testProcessCallbackEventNode() throws Exception {
+    mockNoPriorActive();
     elector.joinElection(data);
 
     // make the object go into the monitoring state
-    elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(1);
 
     WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
-    Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName);
+    Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
 
     // monitoring should be setup again after event is received
     Mockito.when(mockEvent.getType()).thenReturn(
         Event.EventType.NodeDataChanged);
     elector.process(mockEvent);
-    Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(2);
 
     // monitoring should be setup again after event is received
     Mockito.when(mockEvent.getType()).thenReturn(
         Event.EventType.NodeChildrenChanged);
     elector.process(mockEvent);
-    Mockito.verify(mockZK, Mockito.times(3)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(3);
 
     // lock node deletion when in standby mode should create znode again
     // successful znode creation enters active state and sets monitor
@@ -377,13 +436,12 @@ public class TestActiveStandbyElector {
     // enterNeutralMode not called when app is standby and leader is lost
     Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
     // once in initial joinElection() and one now
-    Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+    Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
-    elector.processResult(Code.OK.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
-    Mockito.verify(mockZK, Mockito.times(4)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(4);
 
     // lock node deletion in active mode should enter neutral mode and create
     // znode again successful znode creation enters active state and sets
@@ -392,13 +450,12 @@ public class TestActiveStandbyElector {
     elector.process(mockEvent);
     Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
     // another joinElection called
-    Mockito.verify(mockZK, Mockito.times(3)).create(zkLockPathName, data,
+    Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
-    elector.processResult(Code.OK.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
-    Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(5);
 
     // bad path name results in fatal error
     Mockito.when(mockEvent.getPath()).thenReturn(null);
@@ -406,11 +463,15 @@ public class TestActiveStandbyElector {
     Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
         "Unexpected watch error from Zookeeper");
     // fatal error means no new connection other than one from constructor
-    Assert.assertEquals(1, TestActiveStandbyElector.count);
+    Assert.assertEquals(1, count);
     // no new watches after fatal error
-    Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(5);
+
+  }
 
+  private void verifyExistCall(int times) {
+    Mockito.verify(mockZK, Mockito.times(times)).exists(
+        ZK_LOCK_NAME, elector, elector, null);
   }
 
   /**
@@ -421,14 +482,13 @@ public class TestActiveStandbyElector {
     elector.joinElection(data);
 
     // make the object go into the monitoring standby state
-    elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(1);
 
     WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
-    Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName);
+    Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
 
     // notify node deletion
     // monitoring should be setup again after event is received
@@ -437,16 +497,15 @@ public class TestActiveStandbyElector {
     // is standby. no need to notify anything now
     Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
     // another joinElection called.
-    Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+    Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
     // lost election
-    elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     // still standby. so no need to notify again
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
     // monitor is set again
-    Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(2);
   }
 
   /**
@@ -454,22 +513,20 @@ public class TestActiveStandbyElector {
    * next call to joinElection creates new connection and performs election
    */
   @Test
-  public void testQuitElection() throws InterruptedException {
-    elector.quitElection();
+  public void testQuitElection() throws Exception {
+    elector.quitElection(true);
     Mockito.verify(mockZK, Mockito.times(1)).close();
     // no watches added
-    Mockito.verify(mockZK, Mockito.times(0)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(0);
 
     byte[] data = new byte[8];
     elector.joinElection(data);
     // getNewZooKeeper called 2 times. once in constructor and once now
-    Assert.assertEquals(2, TestActiveStandbyElector.count);
-    elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
-        zkLockPathName);
+    Assert.assertEquals(2, count);
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+        ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
-    Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
-        elector, null);
+    verifyExistCall(1);
 
   }
 
@@ -488,16 +545,16 @@ public class TestActiveStandbyElector {
     // get valid active data
     byte[] data = new byte[8];
     Mockito.when(
-        mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+        mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
             Mockito.<Stat> anyObject())).thenReturn(data);
     Assert.assertEquals(data, elector.getActiveData());
     Mockito.verify(mockZK, Mockito.times(1)).getData(
-        Mockito.eq(zkLockPathName), Mockito.eq(false),
+        Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
         Mockito.<Stat> anyObject());
 
     // active does not exist
     Mockito.when(
-        mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+        mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
             Mockito.<Stat> anyObject())).thenThrow(
         new KeeperException.NoNodeException());
     try {
@@ -505,23 +562,65 @@ public class TestActiveStandbyElector {
       Assert.fail("ActiveNotFoundException expected");
     } catch(ActiveNotFoundException e) {
       Mockito.verify(mockZK, Mockito.times(2)).getData(
-          Mockito.eq(zkLockPathName), Mockito.eq(false),
+          Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
           Mockito.<Stat> anyObject());
     }
 
     // error getting active data rethrows keeperexception
     try {
       Mockito.when(
-          mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+          mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
               Mockito.<Stat> anyObject())).thenThrow(
           new KeeperException.AuthFailedException());
       elector.getActiveData();
       Assert.fail("KeeperException.AuthFailedException expected");
     } catch(KeeperException.AuthFailedException ke) {
       Mockito.verify(mockZK, Mockito.times(3)).getData(
-          Mockito.eq(zkLockPathName), Mockito.eq(false),
+          Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
           Mockito.<Stat> anyObject());
     }
   }
 
+  /**
+   * Test that ensureBaseNode() recursively creates the specified dir
+   */
+  @Test
+  public void testEnsureBaseNode() throws Exception {
+    elector.ensureParentZNode();
+    StringBuilder prefix = new StringBuilder();
+    for (String part : ZK_PARENT_NAME.split("/")) {
+      if (part.isEmpty()) continue;
+      prefix.append("/").append(part);
+      if (!"/".equals(prefix.toString())) {
+        Mockito.verify(mockZK).create(
+            Mockito.eq(prefix.toString()), Mockito.<byte[]>any(),
+            Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
+      }
+    }
+  }
+  
+  /**
+   * Test for a bug encountered during development of HADOOP-8163:
+   * ensureBaseNode() should throw an exception if it has to retry
+   * more than 3 times to create any part of the path.
+   */
+  @Test
+  public void testEnsureBaseNodeFails() throws Exception {
+    Mockito.doThrow(new KeeperException.ConnectionLossException())
+      .when(mockZK).create(
+          Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
+          Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
+    try {
+      elector.ensureParentZNode();
+      Assert.fail("Did not throw!");
+    } catch (IOException ioe) {
+      if (!(ioe.getCause() instanceof KeeperException.ConnectionLossException)) {
+        throw ioe;
+      }
+    }
+    // Should have tried three times
+    Mockito.verify(mockZK, Mockito.times(3)).create(
+        Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
+        Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java?rev=1304676&r1=1304675&r2=1304676&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java Sat Mar 24 00:05:40 2012
@@ -18,19 +18,24 @@
 
 package org.apache.hadoop.ha;
 
+import static org.junit.Assert.*;
+
 import java.io.File;
 import java.io.IOException;
-import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.log4j.Level;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.test.ClientBase;
 
 /**
@@ -39,7 +44,17 @@ import org.apache.zookeeper.test.ClientB
 public class TestActiveStandbyElectorRealZK extends ClientBase {
   static final int NUM_ELECTORS = 2;
   static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS];
-  static int currentClientIndex = 0;
+  
+  static {
+    ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(
+        Level.ALL);
+  }
+  
+  int activeIndex = -1;
+  int standbyIndex = -1;
+  static final String PARENT_DIR = "/" + UUID.randomUUID();
+
+  ActiveStandbyElector[] electors = new ActiveStandbyElector[NUM_ELECTORS];
   
   @Override
   public void setUp() throws Exception {
@@ -48,20 +63,6 @@ public class TestActiveStandbyElectorRea
     super.setUp();
   }
 
-  class ActiveStandbyElectorTesterRealZK extends ActiveStandbyElector {
-    ActiveStandbyElectorTesterRealZK(String hostPort, int timeout,
-        String parent, List<ACL> acl, ActiveStandbyElectorCallback app)
-        throws IOException {
-      super(hostPort, timeout, parent, acl, app);
-    }
-
-    @Override
-    public ZooKeeper getNewZooKeeper() {
-      return TestActiveStandbyElectorRealZK.zkClient[
-                             TestActiveStandbyElectorRealZK.currentClientIndex];
-    }
-  }
-
   /**
    * The class object runs on a thread and waits for a signal to start from the 
    * test object. On getting the signal it joins the election and thus by doing 
@@ -71,71 +72,48 @@ public class TestActiveStandbyElectorRea
    * an unexpected fatal error. this lets another thread object to become a 
    * leader.
    */
-  class ThreadRunner implements Runnable, ActiveStandbyElectorCallback {
+  class ThreadRunner extends TestingThread
+      implements  ActiveStandbyElectorCallback {
     int index;
-    TestActiveStandbyElectorRealZK test;
-    boolean wait = true;
+    
+    CountDownLatch hasBecomeActive = new CountDownLatch(1);
 
-    ThreadRunner(int i, TestActiveStandbyElectorRealZK s) {
-      index = i;
-      test = s;
+    ThreadRunner(TestContext ctx,
+        int idx) {
+      super(ctx);
+      index = idx;
     }
 
     @Override
-    public void run() {
+    public void doWork() throws Exception {
       LOG.info("starting " + index);
-      while(true) {
-        synchronized (test) {
-          // wait for test start signal to come
-          if (!test.start) {
-            try {
-              test.wait();
-            } catch(InterruptedException e) {
-              Assert.fail(e.getMessage());
-            }
-          } else {
-            break;
-          }
-        }
-      }
       // join election
-      byte[] data = new byte[8];
-      ActiveStandbyElector elector = test.elector[index];
+      byte[] data = new byte[1];
+      data[0] = (byte)index;
+      
+      ActiveStandbyElector elector = electors[index];
       LOG.info("joining " + index);
       elector.joinElection(data);
-      try {
-        while(true) {
-          synchronized (this) {
-            // wait for elector to become active/fatal error
-            if (wait) {
-              // wait to become active
-              // wait capped at 30s to prevent hung test
-              wait(30000);
-            } else {
-              break;
-            }
-          }
-        }
-        Thread.sleep(1000);
-        // quit election to allow other elector to become active
-        elector.quitElection();
-      } catch(InterruptedException e) {
-        Assert.fail(e.getMessage());
-      }
+
+      hasBecomeActive.await(30, TimeUnit.SECONDS);
+      Thread.sleep(1000);
+
+      // quit election to allow other elector to become active
+      elector.quitElection(true);
+
       LOG.info("ending " + index);
     }
 
     @Override
     public synchronized void becomeActive() {
-      test.reportActive(index);
+      reportActive(index);
       LOG.info("active " + index);
-      wait = false;
-      notifyAll();
+      hasBecomeActive.countDown();
     }
 
     @Override
     public synchronized void becomeStandby() {
-      test.reportStandby(index);
+      reportStandby(index);
       LOG.info("standby " + index);
     }
 
@@ -147,19 +125,16 @@ public class TestActiveStandbyElectorRea
     @Override
     public synchronized void notifyFatalError(String errorMessage) {
       LOG.info("fatal " + index + " .Error message:" + errorMessage);
-      wait = false;
-      notifyAll();
+      this.interrupt();
     }
-  }
-
-  boolean start = false;
-  int activeIndex = -1;
-  int standbyIndex = -1;
-  String parentDir = "/" + java.util.UUID.randomUUID().toString();
 
-  ActiveStandbyElector[] elector = new ActiveStandbyElector[NUM_ELECTORS];
-  ThreadRunner[] threadRunner = new ThreadRunner[NUM_ELECTORS];
-  Thread[] thread = new Thread[NUM_ELECTORS];
+    @Override
+    public void fenceOldActive(byte[] data) {
+      LOG.info("fenceOldActive " + index);
+      // should not fence itself
+      Assert.assertTrue(index != data[0]);
+    }
+  }
 
   synchronized void reportActive(int index) {
     if (activeIndex == -1) {
@@ -187,45 +162,37 @@ public class TestActiveStandbyElectorRea
    * the standby now becomes active. these electors run on different threads and 
    * callback to the test class to report active and standby where the outcome 
    * is verified
-   * 
-   * @throws IOException
-   * @throws InterruptedException
-   * @throws KeeperException
+   * @throws Exception 
    */
   @Test
-  public void testActiveStandbyTransition() throws IOException,
-      InterruptedException, KeeperException {
-    LOG.info("starting test with parentDir:" + parentDir);
-    start = false;
-    byte[] data = new byte[8];
-    // create random working directory
-    createClient().create(parentDir, data, Ids.OPEN_ACL_UNSAFE,
-        CreateMode.PERSISTENT);
-
-    for(currentClientIndex = 0; 
-        currentClientIndex < NUM_ELECTORS; 
-        ++currentClientIndex) {
-      LOG.info("creating " + currentClientIndex);
-      zkClient[currentClientIndex] = createClient();
-      threadRunner[currentClientIndex] = new ThreadRunner(currentClientIndex,
-          this);
-      elector[currentClientIndex] = new ActiveStandbyElectorTesterRealZK(
-          "hostPort", 1000, parentDir, Ids.OPEN_ACL_UNSAFE,
-          threadRunner[currentClientIndex]);
-      zkClient[currentClientIndex].register(elector[currentClientIndex]);
-      thread[currentClientIndex] = new Thread(threadRunner[currentClientIndex]);
-      thread[currentClientIndex].start();
-    }
-
-    synchronized (this) {
-      // signal threads to start
-      LOG.info("signaling threads");
-      start = true;
-      notifyAll();
-    }
+  public void testActiveStandbyTransition() throws Exception {
+    LOG.info("starting test with parentDir:" + PARENT_DIR);
 
-    for(int i = 0; i < thread.length; i++) {
-      thread[i].join();
+    TestContext ctx = new TestContext();
+    
+    for(int i = 0; i < NUM_ELECTORS; i++) {
+      LOG.info("creating " + i);
+      final ZooKeeper zk = createClient();
+      assert zk != null;
+      
+      ThreadRunner tr = new ThreadRunner(ctx, i);
+      electors[i] = new ActiveStandbyElector(
+          "hostPort", 1000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE,
+          tr) {
+        @Override
+        protected synchronized ZooKeeper getNewZooKeeper()
+            throws IOException {
+          return zk;
+        }
+      };
+      ctx.addThread(tr);
     }
+
+    assertFalse(electors[0].parentZNodeExists());
+    electors[0].ensureParentZNode();
+    assertTrue(electors[0].parentZNodeExists());
+
+    ctx.startThreads();
+    ctx.stop();
   }
 }