You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/26 21:31:33 UTC

svn commit: r1305509 - in /hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/test/

Author: todd
Date: Mon Mar 26 19:31:32 2012
New Revision: 1305509

URL: http://svn.apache.org/viewvc?rev=1305509&view=rev
Log:
HADOOP-8212. Improve ActiveStandbyElector's behavior when session expires. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1305509&r1=1305508&r2=1305509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Mon Mar 26 19:31:32 2012
@@ -112,6 +112,9 @@ Release 0.23.3 - UNRELEASED
     HADOOP-8193. Refactor FailoverController/HAAdmin code to add an abstract
     class for "target" services. (todd)
 
+    HADOOP-8212. Improve ActiveStandbyElector's behavior when session expires
+    (todd)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1305509&r1=1305508&r2=1305509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Mon Mar 26 19:31:32 2012
@@ -21,6 +21,8 @@ package org.apache.hadoop.ha;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +34,7 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.AsyncCallback.*;
@@ -60,8 +63,7 @@ import com.google.common.base.Preconditi
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class ActiveStandbyElector implements Watcher, StringCallback,
-    StatCallback {
+public class ActiveStandbyElector implements StatCallback, StringCallback {
 
   /**
    * Callback interface to interact with the ActiveStandbyElector object. <br/>
@@ -156,6 +158,9 @@ public class ActiveStandbyElector implem
   private final String zkBreadCrumbPath;
   private final String znodeWorkingDir;
 
+  private Lock sessionReestablishLockForTests = new ReentrantLock();
+  private boolean wantToBeInElection;
+  
   /**
    * Create a new ActiveStandbyElector object <br/>
    * The elector is created by providing to it the Zookeeper configuration, the
@@ -274,6 +279,8 @@ public class ActiveStandbyElector implem
         }
       }
     }
+    
+    LOG.info("Successfully created " + znodeWorkingDir + " in ZK.");
   }
 
   /**
@@ -290,13 +297,14 @@ public class ActiveStandbyElector implem
    * if a failover occurs due to dropping out of the election.
    */
   public synchronized void quitElection(boolean needFence) {
-    LOG.debug("Yielding from election");
+    LOG.info("Yielding from election");
     if (!needFence && state == State.ACTIVE) {
       // If active is gracefully going back to standby mode, remove
       // our permanent znode so no one fences us.
       tryDeleteOwnBreadCrumbNode();
     }
     reset();
+    wantToBeInElection = false;
   }
 
   /**
@@ -343,13 +351,9 @@ public class ActiveStandbyElector implem
   @Override
   public synchronized void processResult(int rc, String path, Object ctx,
       String name) {
+    if (isStaleClient(ctx)) return;
     LOG.debug("CreateNode result: " + rc + " for path: " + path
         + " connectionState: " + zkConnectionState);
-    if (zkClient == null) {
-      // zkClient is nulled before closing the connection
-      // this is the callback with session expired after we closed the session
-      return;
-    }
 
     Code code = Code.get(rc);
     if (isSuccess(code)) {
@@ -386,6 +390,10 @@ public class ActiveStandbyElector implem
       }
       errorMessage = errorMessage
           + ". Not retrying further znode create connection errors.";
+    } else if (isSessionExpired(code)) {
+      // This isn't fatal - the client Watcher will re-join the election
+      LOG.warn("Lock acquisition failed because session was lost");
+      return;
     }
 
     fatalError(errorMessage);
@@ -397,13 +405,9 @@ public class ActiveStandbyElector implem
   @Override
   public synchronized void processResult(int rc, String path, Object ctx,
       Stat stat) {
+    if (isStaleClient(ctx)) return;
     LOG.debug("StatNode result: " + rc + " for path: " + path
         + " connectionState: " + zkConnectionState);
-    if (zkClient == null) {
-      // zkClient is nulled before closing the connection
-      // this is the callback with session expired after we closed the session
-      return;
-    }
 
     Code code = Code.get(rc);
     if (isSuccess(code)) {
@@ -447,22 +451,18 @@ public class ActiveStandbyElector implem
   /**
    * interface implementation of Zookeeper watch events (connection and node)
    */
-  @Override
-  public synchronized void process(WatchedEvent event) {
+  synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
     Event.EventType eventType = event.getType();
+    if (isStaleClient(zk)) return;
     LOG.debug("Watcher event type: " + eventType + " with state:"
         + event.getState() + " for path:" + event.getPath()
         + " connectionState: " + zkConnectionState);
-    if (zkClient == null) {
-      // zkClient is nulled before closing the connection
-      // this is the callback with session expired after we closed the session
-      return;
-    }
 
     if (eventType == Event.EventType.None) {
       // the connection state has changed
       switch (event.getState()) {
       case SyncConnected:
+        LOG.info("Session connected.");
         // if the listener was asked to move to safe state then it needs to
         // be undone
         ConnectionState prevConnectionState = zkConnectionState;
@@ -472,6 +472,8 @@ public class ActiveStandbyElector implem
         }
         break;
       case Disconnected:
+        LOG.info("Session disconnected. Entering neutral mode...");
+
         // ask the app to move to safe state because zookeeper connection
         // is not active and we dont know our state
         zkConnectionState = ConnectionState.DISCONNECTED;
@@ -480,6 +482,7 @@ public class ActiveStandbyElector implem
       case Expired:
         // the connection got terminated because of session timeout
         // call listener to reconnect
+        LOG.info("Session expired. Entering neutral mode and rejoining...");
         enterNeutralMode();
         reJoinElection();
         break;
@@ -527,7 +530,9 @@ public class ActiveStandbyElector implem
    * @throws IOException
    */
   protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
-    return new ZooKeeper(zkHostPort, zkSessionTimeout, this);
+    ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
+    zk.register(new WatcherWithClientRef(zk));
+    return zk;
   }
 
   private void fatalError(String errorMessage) {
@@ -550,13 +555,42 @@ public class ActiveStandbyElector implem
     }
 
     createRetryCount = 0;
+    wantToBeInElection = true;
     createLockNodeAsync();
   }
 
   private void reJoinElection() {
-    LOG.debug("Trying to re-establish ZK session");
-    terminateConnection();
-    joinElectionInternal();
+    LOG.info("Trying to re-establish ZK session");
+    
+    // Some of the test cases rely on expiring the ZK sessions and
+    // ensuring that the other node takes over. But, there's a race
+    // where the original lease holder could reconnect faster than the other
+    // thread manages to take the lock itself. This lock allows the
+    // tests to block the reconnection. It's a shame that this leaked
+    // into non-test code, but the lock is only acquired here so will never
+    // be contended.
+    sessionReestablishLockForTests.lock();
+    try {
+      terminateConnection();
+      joinElectionInternal();
+    } finally {
+      sessionReestablishLockForTests.unlock();
+    }
+  }
+  
+  @VisibleForTesting
+  void preventSessionReestablishmentForTests() {
+    sessionReestablishLockForTests.lock();
+  }
+  
+  @VisibleForTesting
+  void allowSessionReestablishmentForTests() {
+    sessionReestablishLockForTests.unlock();
+  }
+  
+  @VisibleForTesting
+  long getZKSessionIdForTests() {
+    return zkClient.getSessionId();
   }
 
   private boolean reEstablishSession() {
@@ -605,6 +639,7 @@ public class ActiveStandbyElector implem
   }
 
   private void becomeActive() {
+    assert wantToBeInElection;
     if (state != State.ACTIVE) {
       try {
         Stat oldBreadcrumbStat = fenceOldActive();
@@ -727,12 +762,14 @@ public class ActiveStandbyElector implem
   }
 
   private void createLockNodeAsync() {
-    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this,
-        null);
+    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
+        this, zkClient);
   }
 
   private void monitorLockNodeAsync() {
-    zkClient.exists(zkLockFilePath, this, this, null);
+    zkClient.exists(zkLockFilePath, 
+        new WatcherWithClientRef(zkClient), this,
+        zkClient);
   }
 
   private String createWithRetries(final String path, final byte[] data,
@@ -778,10 +815,47 @@ public class ActiveStandbyElector implem
       }
     }
   }
-  
+
   private interface ZKAction<T> {
     T run() throws KeeperException, InterruptedException; 
   }
+  
+  /**
+   * The callbacks and watchers pass a reference to the ZK client
+   * which made the original call. We don't want to take action
+   * based on any callbacks from prior clients after we quit
+   * the election.
+   * @param ctx the ZK client passed into the watcher
+   * @return true if it matches the current client
+   */
+  private synchronized boolean isStaleClient(Object ctx) {
+    Preconditions.checkNotNull(ctx);
+    if (zkClient != (ZooKeeper)ctx) {
+      LOG.warn("Ignoring stale result from old client with sessionId " +
+          String.format("0x%08x", ((ZooKeeper)ctx).getSessionId()));
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Watcher implementation which keeps a reference around to the
+   * original ZK connection, and passes it back along with any
+   * events.
+   */
+  private final class WatcherWithClientRef implements Watcher {
+    private final ZooKeeper zk;
+
+    private WatcherWithClientRef(ZooKeeper zk) {
+      this.zk = zk;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      ActiveStandbyElector.this.processWatchEvent(
+          zk, event);
+    }
+  }
 
   private static boolean isSuccess(Code code) {
     return (code == Code.OK);
@@ -794,6 +868,10 @@ public class ActiveStandbyElector implem
   private static boolean isNodeDoesNotExist(Code code) {
     return (code == Code.NONODE);
   }
+  
+  private static boolean isSessionExpired(Code code) {
+    return (code == Code.SESSIONEXPIRED);
+  }
 
   private static boolean shouldRetry(Code code) {
     switch (code) {

Added: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java?rev=1305509&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java Mon Mar 26 19:31:32 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+public abstract class ActiveStandbyElectorTestUtil {
+
+  public static void waitForActiveLockData(TestContext ctx,
+      ZooKeeperServer zks, String parentDir, byte[] activeData)
+      throws Exception {
+    while (true) {
+      if (ctx != null) {
+        ctx.checkException();
+      }
+      try {
+        Stat stat = new Stat();
+        byte[] data = zks.getZKDatabase().getData(
+          parentDir + "/" +
+          ActiveStandbyElector.LOCK_FILENAME, stat, null);
+        if (activeData != null &&
+            Arrays.equals(activeData, data)) {
+          return;
+        }
+      } catch (NoNodeException nne) {
+        if (activeData == null) {
+          return;
+        }
+      }
+      Thread.sleep(50);
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java?rev=1305509&r1=1305508&r2=1305509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java Mon Mar 26 19:31:32 2012
@@ -26,6 +26,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.data.ACL;
@@ -111,7 +112,7 @@ public class TestActiveStandbyElector {
   public void testJoinElection() {
     elector.joinElection(data);
     Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
   }
 
   /**
@@ -123,7 +124,7 @@ public class TestActiveStandbyElector {
     mockNoPriorActive();
     
     elector.joinElection(data);
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
     verifyExistCall(1);
@@ -133,14 +134,14 @@ public class TestActiveStandbyElector {
     Stat stat = new Stat();
     stat.setEphemeralOwner(1L);
     Mockito.when(mockZK.getSessionId()).thenReturn(1L);
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
     // should not call neutral mode/standby/active
     Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
     Mockito.verify(mockApp, Mockito.times(0)).becomeStandby();
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
     // another joinElection not called.
     Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
     // no new monitor called
     verifyExistCall(1);
   }
@@ -155,7 +156,7 @@ public class TestActiveStandbyElector {
     mockPriorActive(fakeOldActiveData);
     
     elector.joinElection(data);
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     // Application fences active.
     Mockito.verify(mockApp, Mockito.times(1)).fenceOldActive(
@@ -173,7 +174,7 @@ public class TestActiveStandbyElector {
   public void testQuitElectionRemovesBreadcrumbNode() throws Exception {
     mockNoPriorActive();
     elector.joinElection(data);
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     // Writes its own active info
     Mockito.verify(mockZK, Mockito.times(1)).create(
@@ -197,7 +198,7 @@ public class TestActiveStandbyElector {
   public void testCreateNodeResultBecomeStandby() {
     elector.joinElection(data);
 
-    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
     verifyExistCall(1);
@@ -210,7 +211,7 @@ public class TestActiveStandbyElector {
   public void testCreateNodeResultError() {
     elector.joinElection(data);
 
-    elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
         "Received create error from Zookeeper. code:APIERROR " +
@@ -227,13 +228,13 @@ public class TestActiveStandbyElector {
     
     elector.joinElection(data);
 
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     // 4 errors results in fatalError
     Mockito
@@ -246,20 +247,20 @@ public class TestActiveStandbyElector {
     elector.joinElection(data);
     // recreate connection via getNewZooKeeper
     Assert.assertEquals(2, count);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
-    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     verifyExistCall(1);
 
     Stat stat = new Stat();
     stat.setEphemeralOwner(1L);
     Mockito.when(mockZK.getSessionId()).thenReturn(1L);
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
     verifyExistCall(1);
     Mockito.verify(mockZK, Mockito.times(6)).create(ZK_LOCK_NAME, data,
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
   }
 
   /**
@@ -270,16 +271,16 @@ public class TestActiveStandbyElector {
   public void testCreateNodeResultRetryBecomeStandby() {
     elector.joinElection(data);
 
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
-    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     verifyExistCall(1);
 
     Stat stat = new Stat();
     stat.setEphemeralOwner(0);
     Mockito.when(mockZK.getSessionId()).thenReturn(1L);
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
     verifyExistCall(1);
   }
@@ -293,19 +294,19 @@ public class TestActiveStandbyElector {
   public void testCreateNodeResultRetryNoNode() {
     elector.joinElection(data);
 
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
-    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     verifyExistCall(1);
 
-    elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, mockZK,
         (Stat) null);
     Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
     Mockito.verify(mockZK, Mockito.times(4)).create(ZK_LOCK_NAME, data,
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
   }
 
   /**
@@ -313,13 +314,13 @@ public class TestActiveStandbyElector {
    */
   @Test
   public void testStatNodeRetry() {
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         (Stat) null);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         (Stat) null);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         (Stat) null);
-    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
         (Stat) null);
     Mockito
         .verify(mockApp, Mockito.times(1))
@@ -334,7 +335,7 @@ public class TestActiveStandbyElector {
   @Test
   public void testStatNodeError() {
     elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME,
-        null, (Stat) null);
+        mockZK, (Stat) null);
     Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
     Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
         "Received stat error from Zookeeper. code:RUNTIMEINCONSISTENCY");
@@ -354,7 +355,7 @@ public class TestActiveStandbyElector {
     // first SyncConnected should not do anything
     Mockito.when(mockEvent.getState()).thenReturn(
         Event.KeeperState.SyncConnected);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(),
         Mockito.anyBoolean(), Mockito.<AsyncCallback.StatCallback> anyObject(),
         Mockito.<Object> anyObject());
@@ -362,20 +363,20 @@ public class TestActiveStandbyElector {
     // disconnection should enter safe mode
     Mockito.when(mockEvent.getState()).thenReturn(
         Event.KeeperState.Disconnected);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
 
     // re-connection should monitor master status
     Mockito.when(mockEvent.getState()).thenReturn(
         Event.KeeperState.SyncConnected);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     verifyExistCall(1);
 
     // session expired should enter safe mode and initiate re-election
     // re-election checked via checking re-creation of new zookeeper and
     // call to create lock znode
     Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     // already in safe mode above. should not enter safe mode again
     Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
     // called getNewZooKeeper to create new session. first call was in
@@ -383,17 +384,17 @@ public class TestActiveStandbyElector {
     Assert.assertEquals(2, count);
     // once in initial joinElection and one now
     Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
 
     // create znode success. become master and monitor
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
     verifyExistCall(2);
 
     // error event results in fatal error
     Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
         "Unexpected Zookeeper watch event state: AuthFailed");
     // only 1 state change callback is called at a time
@@ -409,7 +410,7 @@ public class TestActiveStandbyElector {
     elector.joinElection(data);
 
     // make the object go into the monitoring state
-    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
     verifyExistCall(1);
@@ -420,25 +421,25 @@ public class TestActiveStandbyElector {
     // monitoring should be setup again after event is received
     Mockito.when(mockEvent.getType()).thenReturn(
         Event.EventType.NodeDataChanged);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     verifyExistCall(2);
 
     // monitoring should be setup again after event is received
     Mockito.when(mockEvent.getType()).thenReturn(
         Event.EventType.NodeChildrenChanged);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     verifyExistCall(3);
 
     // lock node deletion when in standby mode should create znode again
     // successful znode creation enters active state and sets monitor
     Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     // enterNeutralMode not called when app is standby and leader is lost
     Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
     // once in initial joinElection() and one now
     Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
     verifyExistCall(4);
@@ -447,19 +448,19 @@ public class TestActiveStandbyElector {
     // znode again successful znode creation enters active state and sets
     // monitor
     Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
     // another joinElection called
     Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
-    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
+    elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
     verifyExistCall(5);
 
     // bad path name results in fatal error
     Mockito.when(mockEvent.getPath()).thenReturn(null);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
         "Unexpected watch error from Zookeeper");
     // fatal error means no new connection other than one from constructor
@@ -471,7 +472,9 @@ public class TestActiveStandbyElector {
 
   private void verifyExistCall(int times) {
     Mockito.verify(mockZK, Mockito.times(times)).exists(
-        ZK_LOCK_NAME, elector, elector, null);
+        Mockito.eq(ZK_LOCK_NAME), Mockito.<Watcher>any(),
+        Mockito.same(elector),
+        Mockito.same(mockZK));
   }
 
   /**
@@ -482,7 +485,7 @@ public class TestActiveStandbyElector {
     elector.joinElection(data);
 
     // make the object go into the monitoring standby state
-    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
     verifyExistCall(1);
@@ -493,14 +496,14 @@ public class TestActiveStandbyElector {
     // notify node deletion
     // monitoring should be setup again after event is received
     Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
-    elector.process(mockEvent);
+    elector.processWatchEvent(mockZK, mockEvent);
     // is standby. no need to notify anything now
     Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
     // another joinElection called.
     Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
-        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
     // lost election
-    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     // still standby. so no need to notify again
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
@@ -523,7 +526,7 @@ public class TestActiveStandbyElector {
     elector.joinElection(data);
     // getNewZooKeeper called 2 times. once in constructor and once now
     Assert.assertEquals(2, count);
-    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+    elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
         ZK_LOCK_NAME);
     Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
     verifyExistCall(1);

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java?rev=1305509&r1=1305508&r2=1305509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java Mon Mar 26 19:31:32 2012
@@ -18,181 +18,182 @@
 
 package org.apache.hadoop.ha;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Test;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
 import org.apache.log4j.Level;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.test.ClientBase;
+import org.junit.Test;
+import org.mockito.AdditionalMatchers;
+import org.mockito.Mockito;
+
+import com.google.common.primitives.Ints;
 
 /**
  * Test for {@link ActiveStandbyElector} using real zookeeper.
  */
 public class TestActiveStandbyElectorRealZK extends ClientBase {
   static final int NUM_ELECTORS = 2;
-  static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS];
   
   static {
     ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(
         Level.ALL);
   }
   
-  int activeIndex = -1;
-  int standbyIndex = -1;
   static final String PARENT_DIR = "/" + UUID.randomUUID();
 
   ActiveStandbyElector[] electors = new ActiveStandbyElector[NUM_ELECTORS];
+  private byte[][] appDatas = new byte[NUM_ELECTORS][];
+  private ActiveStandbyElectorCallback[] cbs =
+      new ActiveStandbyElectorCallback[NUM_ELECTORS];
+  private ZooKeeperServer zkServer;
+
   
   @Override
   public void setUp() throws Exception {
     // build.test.dir is used by zookeeper
     new File(System.getProperty("build.test.dir", "build")).mkdirs();
     super.setUp();
-  }
-
-  /**
-   * The class object runs on a thread and waits for a signal to start from the 
-   * test object. On getting the signal it joins the election and thus by doing 
-   * this on multiple threads we can test simultaneous attempts at leader lock 
-   * creation. after joining the election, the object waits on a signal to exit.
-   * this signal comes when the object's elector has become a leader or there is 
-   * an unexpected fatal error. this lets another thread object to become a 
-   * leader.
-   */
-  class ThreadRunner extends TestingThread
-      implements  ActiveStandbyElectorCallback {
-    int index;
-    
-    CountDownLatch hasBecomeActive = new CountDownLatch(1);
-
-    ThreadRunner(TestContext ctx,
-        int idx) {
-      super(ctx);
-      index = idx;
-    }
-
-    @Override
-    public void doWork() throws Exception {
-      LOG.info("starting " + index);
-      // join election
-      byte[] data = new byte[1];
-      data[0] = (byte)index;
-      
-      ActiveStandbyElector elector = electors[index];
-      LOG.info("joining " + index);
-      elector.joinElection(data);
-
-      hasBecomeActive.await(30, TimeUnit.SECONDS);
-      Thread.sleep(1000);
-
-      // quit election to allow other elector to become active
-      elector.quitElection(true);
-
-      LOG.info("ending " + index);
-    }
-
-    @Override
-    public synchronized void becomeActive() {
-      reportActive(index);
-      LOG.info("active " + index);
-      hasBecomeActive.countDown();
-    }
-
-    @Override
-    public synchronized void becomeStandby() {
-      reportStandby(index);
-      LOG.info("standby " + index);
-    }
-
-    @Override
-    public synchronized void enterNeutralMode() {
-      LOG.info("neutral " + index);
-    }
-
-    @Override
-    public synchronized void notifyFatalError(String errorMessage) {
-      LOG.info("fatal " + index + " .Error message:" + errorMessage);
-      this.interrupt();
-    }
+    
+    zkServer = getServer(serverFactory);
 
-    @Override
-    public void fenceOldActive(byte[] data) {
-      LOG.info("fenceOldActive " + index);
-      // should not fence itself
-      Assert.assertTrue(index != data[0]);
+    for (int i = 0; i < NUM_ELECTORS; i++) {
+      cbs[i] =  Mockito.mock(ActiveStandbyElectorCallback.class);
+      appDatas[i] = Ints.toByteArray(i);
+      electors[i] = new ActiveStandbyElector(
+          hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, cbs[i]);
     }
   }
-
-  synchronized void reportActive(int index) {
-    if (activeIndex == -1) {
-      activeIndex = index;
-    } else {
-      // standby should become active
-      Assert.assertEquals(standbyIndex, index);
-      // old active should not become active
-      Assert.assertFalse(activeIndex == index);
+  
+  private void checkFatalsAndReset() throws Exception {
+    for (int i = 0; i < NUM_ELECTORS; i++) {
+      Mockito.verify(cbs[i], Mockito.never()).notifyFatalError(
+          Mockito.anyString());
+      Mockito.reset(cbs[i]);
     }
-    activeIndex = index;
-  }
-
-  synchronized void reportStandby(int index) {
-    // only 1 standby should be reported and it should not be the same as active
-    Assert.assertEquals(-1, standbyIndex);
-    standbyIndex = index;
-    Assert.assertFalse(activeIndex == standbyIndex);
   }
 
   /**
    * the test creates 2 electors which try to become active using a real
    * zookeeper server. It verifies that 1 becomes active and 1 becomes standby.
    * Upon becoming active the leader quits election and the test verifies that
-   * the standby now becomes active. these electors run on different threads and 
-   * callback to the test class to report active and standby where the outcome 
-   * is verified
-   * @throws Exception 
+   * the standby now becomes active.
    */
-  @Test
+  @Test(timeout=20000)
   public void testActiveStandbyTransition() throws Exception {
     LOG.info("starting test with parentDir:" + PARENT_DIR);
 
-    TestContext ctx = new TestContext();
+    assertFalse(electors[0].parentZNodeExists());
+    electors[0].ensureParentZNode();
+    assertTrue(electors[0].parentZNodeExists());
+
+    // First elector joins election, becomes active.
+    electors[0].joinElection(appDatas[0]);
+    ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+        zkServer, PARENT_DIR, appDatas[0]);
+    Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeActive();
+    checkFatalsAndReset();
+
+    // Second elector joins election, becomes standby.
+    electors[1].joinElection(appDatas[1]);
+    Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeStandby();
+    checkFatalsAndReset();
+    
+    // First elector quits, second one should become active
+    electors[0].quitElection(true);
+    ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+        zkServer, PARENT_DIR, appDatas[1]);
+    Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeActive();
+    checkFatalsAndReset();
+    
+    // First one rejoins, becomes standby, second one stays active
+    electors[0].joinElection(appDatas[0]);
+    Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeStandby();
+    checkFatalsAndReset();
     
-    for(int i = 0; i < NUM_ELECTORS; i++) {
-      LOG.info("creating " + i);
-      final ZooKeeper zk = createClient();
-      assert zk != null;
+    // Second one expires, first one becomes active
+    electors[1].preventSessionReestablishmentForTests();
+    try {
+      zkServer.closeSession(electors[1].getZKSessionIdForTests());
       
-      ThreadRunner tr = new ThreadRunner(ctx, i);
-      electors[i] = new ActiveStandbyElector(
-          "hostPort", 1000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE,
-          tr) {
-        @Override
-        protected synchronized ZooKeeper getNewZooKeeper()
-            throws IOException {
-          return zk;
-        }
-      };
-      ctx.addThread(tr);
+      ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+          zkServer, PARENT_DIR, appDatas[0]);
+      Mockito.verify(cbs[1], Mockito.timeout(1000)).enterNeutralMode();
+      Mockito.verify(cbs[0], Mockito.timeout(1000)).fenceOldActive(
+          AdditionalMatchers.aryEq(appDatas[1]));
+      Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeActive();
+    } finally {
+      electors[1].allowSessionReestablishmentForTests();
     }
+    
+    // Second one eventually reconnects and becomes standby
+    Mockito.verify(cbs[1], Mockito.timeout(5000)).becomeStandby();
+    checkFatalsAndReset();
+    
+    // First one expires, second one should become active
+    electors[0].preventSessionReestablishmentForTests();
+    try {
+      zkServer.closeSession(electors[0].getZKSessionIdForTests());
+      
+      ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+          zkServer, PARENT_DIR, appDatas[1]);
+      Mockito.verify(cbs[0], Mockito.timeout(1000)).enterNeutralMode();
+      Mockito.verify(cbs[1], Mockito.timeout(1000)).fenceOldActive(
+          AdditionalMatchers.aryEq(appDatas[0]));
+      Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeActive();
+    } finally {
+      electors[0].allowSessionReestablishmentForTests();
+    }
+    
+    checkFatalsAndReset();
+  }
+  
+  @Test(timeout=15000)
+  public void testHandleSessionExpiration() throws Exception {
+    ActiveStandbyElectorCallback cb = cbs[0];
+    byte[] appData = appDatas[0];
+    ActiveStandbyElector elector = electors[0];
+    
+    // Let the first elector become active
+    elector.ensureParentZNode();
+    elector.joinElection(appData);
+    ZooKeeperServer zks = getServer(serverFactory);
+    ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+        zks, PARENT_DIR, appData);
+    Mockito.verify(cb, Mockito.timeout(1000)).becomeActive();
+    checkFatalsAndReset();
+    
+    LOG.info("========================== Expiring session");
+    zks.closeSession(elector.getZKSessionIdForTests());
 
-    assertFalse(electors[0].parentZNodeExists());
-    electors[0].ensureParentZNode();
-    assertTrue(electors[0].parentZNodeExists());
+    // Should enter neutral mode when disconnected
+    Mockito.verify(cb, Mockito.timeout(1000)).enterNeutralMode();
+
+    // Should re-join the election and regain active
+    ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+        zks, PARENT_DIR, appData);
+    Mockito.verify(cb, Mockito.timeout(1000)).becomeActive();
+    checkFatalsAndReset();
+    
+    LOG.info("========================== Quitting election");
+    elector.quitElection(false);
+    ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+        zks, PARENT_DIR, null);
+
+    // Double check that we don't accidentally re-join the election
+    // due to receiving the "expired" event.
+    Thread.sleep(1000);
+    Mockito.verify(cb, Mockito.never()).becomeActive();
+    ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+        zks, PARENT_DIR, null);
 
-    ctx.startThreads();
-    ctx.stop();
+    checkFatalsAndReset();
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java?rev=1305509&r1=1305508&r2=1305509&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java Mon Mar 26 19:31:32 2012
@@ -124,7 +124,7 @@ public abstract class MultithreadedTestU
      * Checks for thread exceptions, and if they've occurred
      * throws them as RuntimeExceptions in a deferred manner.
      */
-    private synchronized void checkException() throws Exception {
+    public synchronized void checkException() throws Exception {
       if (err != null) {
         throw new RuntimeException("Deferred", err);
       }