You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/30 22:30:48 UTC

svn commit: r1307599 - in /hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/

Author: todd
Date: Fri Mar 30 20:30:47 2012
New Revision: 1307599

URL: http://svn.apache.org/viewvc?rev=1307599&view=rev
Log:
HADOOP-8228. Auto HA: Refactor tests and add stress tests. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
Modified:
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
    hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt Fri Mar 30 20:30:47 2012
@@ -6,3 +6,4 @@ branch is merged.
 
 HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd)
 
+HADOOP-8228. Auto HA: Refactor tests and add stress tests. (todd)

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Fri Mar 30 20:30:47 2012
@@ -140,7 +140,7 @@ public class ActiveStandbyElector implem
 
   public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
 
-  private static final int NUM_RETRIES = 3;
+  static int NUM_RETRIES = 3;
   private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
 
   private static enum ConnectionState {
@@ -662,8 +662,12 @@ public class ActiveStandbyElector implem
   }
   
   @VisibleForTesting
-  long getZKSessionIdForTests() {
-    return zkClient.getSessionId();
+  synchronized long getZKSessionIdForTests() {
+    if (zkClient != null) {
+      return zkClient.getSessionId();
+    } else {
+      return -1;
+    }
   }
   
   @VisibleForTesting

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Fri Mar 30 20:30:47 2012
@@ -146,7 +146,12 @@ public abstract class ZKFailoverControll
     }
 
     initHM();
-    mainLoop();
+    try {
+      mainLoop();
+    } finally {
+      healthMonitor.shutdown();
+      healthMonitor.join();
+    }
     return 0;
   }
 

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java Fri Mar 30 20:30:47 2012
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.security.AccessControlException;
@@ -34,6 +36,7 @@ import com.google.common.collect.Lists;
  * a mock implementation.
  */
 class DummyHAService extends HAServiceTarget {
+  public static final Log LOG = LogFactory.getLog(DummyHAService.class);
   volatile HAServiceState state;
   HAServiceProtocol proxy;
   NodeFencer fencer;
@@ -42,13 +45,21 @@ class DummyHAService extends HAServiceTa
   boolean actUnreachable = false;
   boolean failToBecomeActive;
   
+  DummySharedResource sharedResource;
+  
   static ArrayList<DummyHAService> instances = Lists.newArrayList();
   int index;
 
   DummyHAService(HAServiceState state, InetSocketAddress address) {
     this.state = state;
     this.proxy = makeMock();
-    this.fencer = Mockito.mock(NodeFencer.class);
+    try {
+      Configuration conf = new Configuration();
+      conf.set(NodeFencer.CONF_METHODS_KEY, DummyFencer.class.getName());
+      this.fencer = Mockito.spy(NodeFencer.create(conf));
+    } catch (BadFencingConfigurationException e) {
+      throw new RuntimeException(e);
+    }
     this.address = address;
     synchronized (instances) {
       instances.add(this);
@@ -56,6 +67,10 @@ class DummyHAService extends HAServiceTa
     }
   }
   
+  public void setSharedResource(DummySharedResource rsrc) {
+    this.sharedResource = rsrc;
+  }
+  
   private HAServiceProtocol makeMock() {
     return Mockito.spy(new MockHAProtocolImpl());
   }
@@ -107,7 +122,9 @@ class DummyHAService extends HAServiceTa
       if (failToBecomeActive) {
         throw new ServiceFailedException("injected failure");
       }
-    
+      if (sharedResource != null) {
+        sharedResource.take(DummyHAService.this);
+      }
       state = HAServiceState.ACTIVE;
     }
     
@@ -115,6 +132,9 @@ class DummyHAService extends HAServiceTa
     public void transitionToStandby() throws ServiceFailedException,
         AccessControlException, IOException {
       checkUnreachable();
+      if (sharedResource != null) {
+        sharedResource.release(DummyHAService.this);
+      }
       state = HAServiceState.STANDBY;
     }
     
@@ -138,4 +158,20 @@ class DummyHAService extends HAServiceTa
     public void close() throws IOException {
     }
   }
+  
+  public static class DummyFencer implements FenceMethod {
+
+    public void checkArgs(String args) throws BadFencingConfigurationException {
+    }
+
+    @Override
+    public boolean tryFence(HAServiceTarget target, String args)
+        throws BadFencingConfigurationException {
+      LOG.info("tryFence(" + target + ")");
+      DummyHAService svc = (DummyHAService)target;
+      svc.sharedResource.release(svc);
+      return true;
+    }
+  }
+
 }

Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java?rev=1307599&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java Fri Mar 30 20:30:47 2012
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import org.junit.Assert;
+
+/**
+ * A fake shared resource, for use in automatic failover testing.
+ * This simulates a real shared resource like a shared edit log.
+ * When the {@link DummyHAService} instances change state or get
+ * fenced, they notify the shared resource, which asserts that
+ * we never have two HA services who think they're holding the
+ * resource at the same time.
+ */
+public class DummySharedResource {
+  private DummyHAService holder = null;
+  private int violations = 0;
+  
+  public synchronized void take(DummyHAService newHolder) {
+    if (holder == null || holder == newHolder) {
+      holder = newHolder;
+    } else {
+      violations++;
+      throw new IllegalStateException("already held by: " + holder);
+    }
+  }
+  
+  public synchronized void release(DummyHAService oldHolder) {
+    if (holder == oldHolder) {
+      holder = null;
+    }
+  }
+  
+  public synchronized void assertNoViolations() {
+    Assert.assertEquals(0, violations);
+  }
+}

Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java?rev=1307599&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java Fri Mar 30 20:30:47 2012
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+
+/**
+ * Harness for starting two dummy ZK FailoverControllers, associated with
+ * DummyHAServices. This harness starts two such ZKFCs, designated by
+ * indexes 0 and 1, and provides utilities for building tests around them.
+ */
+public class MiniZKFCCluster {
+  private final TestContext ctx;
+  private final ZooKeeperServer zks;
+
+  private DummyHAService svcs[];
+  private DummyZKFCThread thrs[];
+  private Configuration conf;
+  
+  private DummySharedResource sharedResource = new DummySharedResource();
+  
+  private static final Log LOG = LogFactory.getLog(MiniZKFCCluster.class);
+  
+  public MiniZKFCCluster(Configuration conf, ZooKeeperServer zks) {
+    this.conf = conf;
+    // Fast check interval so tests run faster
+    conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
+    conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
+    conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
+    svcs = new DummyHAService[2];
+    svcs[0] = new DummyHAService(HAServiceState.INITIALIZING,
+        new InetSocketAddress("svc1", 1234));
+    svcs[0].setSharedResource(sharedResource);
+    svcs[1] = new DummyHAService(HAServiceState.INITIALIZING,
+        new InetSocketAddress("svc2", 1234));
+    svcs[1].setSharedResource(sharedResource);
+    
+    this.ctx = new TestContext();
+    this.zks = zks;
+  }
+  
+  /**
+   * Set up two services and their failover controllers. svc1 is started
+   * first, so that it enters ACTIVE state, and then svc2 is started,
+   * which enters STANDBY
+   */
+  public void start() throws Exception {
+    // Format the base dir, should succeed
+    thrs = new DummyZKFCThread[2];
+    thrs[0] = new DummyZKFCThread(ctx, svcs[0]);
+    assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"}));
+    ctx.addThread(thrs[0]);
+    thrs[0].start();
+    
+    LOG.info("Waiting for svc0 to enter active state");
+    waitForHAState(0, HAServiceState.ACTIVE);
+    
+    LOG.info("Adding svc1");
+    thrs[1] = new DummyZKFCThread(ctx, svcs[1]);
+    thrs[1].start();
+    waitForHAState(1, HAServiceState.STANDBY);
+  }
+  
+  /**
+   * Stop the services.
+   * @throws Exception if either of the services had encountered a fatal error
+   */
+  public void stop() throws Exception {
+    for (DummyZKFCThread thr : thrs) {
+      if (thr != null) {
+        thr.interrupt();
+      }
+    }
+    if (ctx != null) {
+      ctx.stop();
+    }
+    sharedResource.assertNoViolations();
+  }
+
+  /**
+   * @return the TestContext implementation used internally. This allows more
+   * threads to be added to the context, etc.
+   */
+  public TestContext getTestContext() {
+    return ctx;
+  }
+  
+  public DummyHAService getService(int i) {
+    return svcs[i];
+  }
+
+  public ActiveStandbyElector getElector(int i) {
+    return thrs[i].zkfc.getElectorForTests();
+  }
+  
+  public void setHealthy(int idx, boolean healthy) {
+    svcs[idx].isHealthy = healthy;
+  }
+
+  public void setFailToBecomeActive(int idx, boolean doFail) {
+    svcs[idx].failToBecomeActive = doFail;
+  }
+  
+  public void setUnreachable(int idx, boolean unreachable) {
+    svcs[idx].actUnreachable = unreachable;
+  }
+
+  /**
+   * Wait for the given HA service to enter the given HA state.
+   */
+  public void waitForHAState(int idx, HAServiceState state)
+      throws Exception {
+    DummyHAService svc = getService(idx);
+    while (svc.state != state) {
+      ctx.checkException();
+      Thread.sleep(50);
+    }
+  }
+  
+  /**
+   * Wait for the ZKFC to be notified of a change in health state.
+   */
+  public void waitForHealthState(int idx, State state)
+      throws Exception {
+    ZKFailoverController zkfc = thrs[idx].zkfc; 
+    while (zkfc.getLastHealthState() != state) {
+      ctx.checkException();
+      Thread.sleep(50);
+    }
+  }
+
+  /**
+   * Wait for the given elector to enter the given elector state.
+   * @param idx the service index (0 or 1)
+   * @param state the state to wait for
+   * @throws Exception if it times out, or an exception occurs on one
+   * of the ZKFC threads while waiting.
+   */
+  public void waitForElectorState(int idx,
+      ActiveStandbyElector.State state) throws Exception {
+    ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+        getElector(idx), state);
+  }
+
+  
+
+  /**
+   * Expire the ZK session of the given service. This requires
+   * (and asserts) that the given service be the current active.
+   * @throws NoNodeException if no service holds the lock
+   */
+  public void expireActiveLockHolder(int idx)
+      throws NoNodeException {
+    Stat stat = new Stat();
+    byte[] data = zks.getZKDatabase().getData(
+        ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
+        ActiveStandbyElector.LOCK_FILENAME, stat, null);
+    
+    assertArrayEquals(Ints.toByteArray(svcs[idx].index), data);
+    long session = stat.getEphemeralOwner();
+    LOG.info("Expiring svc " + idx + "'s zookeeper session " + session);
+    zks.closeSession(session);
+  }
+  
+
+  /**
+   * Wait for the given HA service to become the active lock holder.
+   * If the passed svc is null, waits for there to be no active
+   * lock holder.
+   */
+  public void waitForActiveLockHolder(Integer idx)
+      throws Exception {
+    DummyHAService svc = idx == null ? null : svcs[idx];
+    ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
+        ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
+        (idx == null) ? null : Ints.toByteArray(svc.index));
+  }
+  
+
+  /**
+   * Expires the ZK session associated with service 'fromIdx', and waits
+   * until service 'toIdx' takes over.
+   * @throws Exception if the target service does not become active
+   */
+  public void expireAndVerifyFailover(int fromIdx, int toIdx)
+      throws Exception {
+    Preconditions.checkArgument(fromIdx != toIdx);
+    
+    getElector(fromIdx).preventSessionReestablishmentForTests();
+    try {
+      expireActiveLockHolder(fromIdx);
+      
+      waitForHAState(fromIdx, HAServiceState.STANDBY);
+      waitForHAState(toIdx, HAServiceState.ACTIVE);
+    } finally {
+      getElector(fromIdx).allowSessionReestablishmentForTests();
+    }
+  }
+
+  /**
+   * Test-thread which runs a ZK Failover Controller corresponding
+   * to a given dummy service.
+   */
+  private class DummyZKFCThread extends TestingThread {
+    private final DummyZKFC zkfc;
+
+    public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
+      super(ctx);
+      this.zkfc = new DummyZKFC(svc);
+      zkfc.setConf(conf);
+    }
+
+    @Override
+    public void doWork() throws Exception {
+      try {
+        assertEquals(0, zkfc.run(new String[0]));
+      } catch (InterruptedException ie) {
+        // Interrupted by main thread, that's OK.
+      }
+    }
+  }
+  
+  static class DummyZKFC extends ZKFailoverController {
+    private final DummyHAService localTarget;
+    
+    public DummyZKFC(DummyHAService localTarget) {
+      this.localTarget = localTarget;
+    }
+
+    @Override
+    protected byte[] targetToData(HAServiceTarget target) {
+      return Ints.toByteArray(((DummyHAService)target).index);
+    }
+    
+    @Override
+    protected HAServiceTarget dataToTarget(byte[] data) {
+      int index = Ints.fromByteArray(data);
+      return DummyHAService.getInstance(index);
+    }
+
+    @Override
+    protected HAServiceTarget getLocalTarget() {
+      return localTarget;
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Fri Mar 30 20:30:47 2012
@@ -17,36 +17,24 @@
  */
 package org.apache.hadoop.ha;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.io.File;
-import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HealthMonitor.State;
-import org.apache.hadoop.test.MultithreadedTestUtil;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
 import org.apache.log4j.Level;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.primitives.Ints;
-
 public class TestZKFailoverController extends ClientBase {
   private Configuration conf;
-  private DummyHAService svc1;
-  private DummyHAService svc2;
-  private TestContext ctx;
-  private DummyZKFCThread thr1, thr2;
+  private MiniZKFCCluster cluster;
   
   static {
     ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
@@ -63,49 +51,7 @@ public class TestZKFailoverController ex
   public void setupConfAndServices() {
     conf = new Configuration();
     conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
-    // Fast check interval so tests run faster
-    conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
-    conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
-    conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
-    svc1 = new DummyHAService(HAServiceState.INITIALIZING,
-        new InetSocketAddress("svc1", 1234));
-    svc2 = new DummyHAService(HAServiceState.INITIALIZING,
-        new InetSocketAddress("svc2", 1234));
-  }
-  
-  /**
-   * Set up two services and their failover controllers. svc1 is started
-   * first, so that it enters ACTIVE state, and then svc2 is started,
-   * which enters STANDBY
-   */
-  private void setupFCs() throws Exception {
-    // Format the base dir, should succeed
-    assertEquals(0, runFC(svc1, "-formatZK"));
-
-    ctx = new MultithreadedTestUtil.TestContext();
-    thr1 = new DummyZKFCThread(ctx, svc1);
-    ctx.addThread(thr1);
-    thr1.start();
-    
-    LOG.info("Waiting for svc1 to enter active state");
-    waitForHAState(svc1, HAServiceState.ACTIVE);
-    
-    LOG.info("Adding svc2");
-    thr2 = new DummyZKFCThread(ctx, svc2);
-    thr2.start();
-    waitForHAState(svc2, HAServiceState.STANDBY);
-  }
-  
-  private void stopFCs() throws Exception {
-    if (thr1 != null) {
-      thr1.interrupt();
-    }
-    if (thr2 != null) {
-      thr2.interrupt();
-    }
-    if (ctx != null) {
-      ctx.stop();
-    }
+    this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
   }
 
   /**
@@ -114,20 +60,21 @@ public class TestZKFailoverController ex
    */
   @Test(timeout=15000)
   public void testFormatZK() throws Exception {
+    DummyHAService svc = cluster.getService(1);
     // Run without formatting the base dir,
     // should barf
     assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
-        runFC(svc1));
+        runFC(svc));
 
     // Format the base dir, should succeed
-    assertEquals(0, runFC(svc1, "-formatZK"));
+    assertEquals(0, runFC(svc, "-formatZK"));
 
     // Should fail to format if already formatted
     assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
-        runFC(svc1, "-formatZK", "-nonInteractive"));
+        runFC(svc, "-formatZK", "-nonInteractive"));
   
     // Unless '-force' is on
-    assertEquals(0, runFC(svc1, "-formatZK", "-force"));
+    assertEquals(0, runFC(svc, "-formatZK", "-force"));
   }
   
   /**
@@ -136,14 +83,14 @@ public class TestZKFailoverController ex
    */
   @Test(timeout=15000)
   public void testFencingMustBeConfigured() throws Exception {
-    svc1 = Mockito.spy(svc1);
+    DummyHAService svc = Mockito.spy(cluster.getService(0));
     Mockito.doThrow(new BadFencingConfigurationException("no fencing"))
-        .when(svc1).checkFencingConfigured();
+        .when(svc).checkFencingConfigured();
     // Format the base dir, should succeed
-    assertEquals(0, runFC(svc1, "-formatZK"));
+    assertEquals(0, runFC(svc, "-formatZK"));
     // Try to run the actual FC, should fail without a fencer
     assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER,
-        runFC(svc1));
+        runFC(svc));
   }
   
   /**
@@ -155,66 +102,50 @@ public class TestZKFailoverController ex
   @Test(timeout=15000)
   public void testAutoFailoverOnBadHealth() throws Exception {
     try {
-      setupFCs();
+      cluster.start();
+      DummyHAService svc1 = cluster.getService(1);
       
-      LOG.info("Faking svc1 unhealthy, should failover to svc2");
-      svc1.isHealthy = false;
-      LOG.info("Waiting for svc1 to enter standby state");
-      waitForHAState(svc1, HAServiceState.STANDBY);
-      waitForHAState(svc2, HAServiceState.ACTIVE);
+      LOG.info("Faking svc0 unhealthy, should failover to svc1");
+      cluster.setHealthy(0, false);
+      
+      LOG.info("Waiting for svc0 to enter standby state");
+      cluster.waitForHAState(0, HAServiceState.STANDBY);
+      cluster.waitForHAState(1, HAServiceState.ACTIVE);
   
-      LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " +
+      LOG.info("Allowing svc0 to be healthy again, making svc1 unreachable " +
           "and fail to gracefully go to standby");
-      svc1.isHealthy = true;
-      svc2.actUnreachable = true;
-      
-      // Allow fencing to succeed
-      Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2));
-      // Should fail back to svc1 at this point
-      waitForHAState(svc1, HAServiceState.ACTIVE);
-      // and fence svc2
-      Mockito.verify(svc2.fencer).fence(Mockito.same(svc2));
+      cluster.setUnreachable(1, true);
+      cluster.setHealthy(0, true);
+ 
+      // Should fail back to svc0 at this point
+      cluster.waitForHAState(0, HAServiceState.ACTIVE);
+      // and fence svc1
+      Mockito.verify(svc1.fencer).fence(Mockito.same(svc1));
     } finally {
-      stopFCs();
+      cluster.stop();
     }
   }
   
   @Test(timeout=15000)
   public void testAutoFailoverOnLostZKSession() throws Exception {
     try {
-      setupFCs();
+      cluster.start();
 
-      // Expire svc1, it should fail over to svc2
-      expireAndVerifyFailover(thr1, thr2);
+      // Expire svc0, it should fail over to svc1
+      cluster.expireAndVerifyFailover(0, 1);
       
-      // Expire svc2, it should fail back to svc1
-      expireAndVerifyFailover(thr2, thr1);
+      // Expire svc1, it should fail back to svc0
+      cluster.expireAndVerifyFailover(1, 0);
       
       LOG.info("======= Running test cases second time to test " +
           "re-establishment =========");
-      // Expire svc1, it should fail over to svc2
-      expireAndVerifyFailover(thr1, thr2);
+      // Expire svc0, it should fail over to svc1
+      cluster.expireAndVerifyFailover(0, 1);
       
-      // Expire svc2, it should fail back to svc1
-      expireAndVerifyFailover(thr2, thr1);
+      // Expire svc1, it should fail back to svc0
+      cluster.expireAndVerifyFailover(1, 0);
     } finally {
-      stopFCs();
-    }
-  }
-  
-  private void expireAndVerifyFailover(DummyZKFCThread fromThr,
-      DummyZKFCThread toThr) throws Exception {
-    DummyHAService fromSvc = fromThr.zkfc.localTarget;
-    DummyHAService toSvc = toThr.zkfc.localTarget;
-    
-    fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
-    try {
-      expireActiveLockHolder(fromSvc);
-      
-      waitForHAState(fromSvc, HAServiceState.STANDBY);
-      waitForHAState(toSvc, HAServiceState.ACTIVE);
-    } finally {
-      fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+      cluster.stop();
     }
   }
 
@@ -225,33 +156,32 @@ public class TestZKFailoverController ex
   @Test(timeout=15000)
   public void testDontFailoverToUnhealthyNode() throws Exception {
     try {
-      setupFCs();
+      cluster.start();
 
-      // Make svc2 unhealthy, and wait for its FC to notice the bad health.
-      svc2.isHealthy = false;
-      waitForHealthState(thr2.zkfc,
-          HealthMonitor.State.SERVICE_UNHEALTHY);
+      // Make svc1 unhealthy, and wait for its FC to notice the bad health.
+      cluster.setHealthy(1, false);
+      cluster.waitForHealthState(1, HealthMonitor.State.SERVICE_UNHEALTHY);
       
-      // Expire svc1
-      thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
+      // Expire svc0
+      cluster.getElector(0).preventSessionReestablishmentForTests();
       try {
-        expireActiveLockHolder(svc1);
+        cluster.expireActiveLockHolder(0);
 
-        LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" +
+        LOG.info("Expired svc0's ZK session. Waiting a second to give svc1" +
             " a chance to take the lock, if it is ever going to.");
         Thread.sleep(1000);
         
         // Ensure that no one holds the lock.
-        waitForActiveLockHolder(null);
+        cluster.waitForActiveLockHolder(null);
         
       } finally {
-        LOG.info("Allowing svc1's elector to re-establish its connection");
-        thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+        LOG.info("Allowing svc0's elector to re-establish its connection");
+        cluster.getElector(0).allowSessionReestablishmentForTests();
       }
-      // svc1 should get the lock again
-      waitForActiveLockHolder(svc1);
+      // svc0 should get the lock again
+      cluster.waitForActiveLockHolder(0);
     } finally {
-      stopFCs();
+      cluster.stop();
     }
   }
 
@@ -262,36 +192,38 @@ public class TestZKFailoverController ex
   @Test(timeout=15000)
   public void testBecomingActiveFails() throws Exception {
     try {
-      setupFCs();
+      cluster.start();
+      DummyHAService svc1 = cluster.getService(1);
       
-      LOG.info("Making svc2 fail to become active");
-      svc2.failToBecomeActive = true;
+      LOG.info("Making svc1 fail to become active");
+      cluster.setFailToBecomeActive(1, true);
       
-      LOG.info("Faking svc1 unhealthy, should NOT successfully " +
-          "failover to svc2");
-      svc1.isHealthy = false;
-      waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
-      waitForActiveLockHolder(null);
+      LOG.info("Faking svc0 unhealthy, should NOT successfully " +
+          "failover to svc1");
+      cluster.setHealthy(0, false);
+      cluster.waitForHealthState(0, State.SERVICE_UNHEALTHY);
+      cluster.waitForActiveLockHolder(null);
 
-      Mockito.verify(svc2.proxy, Mockito.timeout(2000).atLeastOnce())
+      
+      Mockito.verify(svc1.proxy, Mockito.timeout(2000).atLeastOnce())
         .transitionToActive();
 
-      waitForHAState(svc1, HAServiceState.STANDBY);
-      waitForHAState(svc2, HAServiceState.STANDBY);
+      cluster.waitForHAState(0, HAServiceState.STANDBY);
+      cluster.waitForHAState(1, HAServiceState.STANDBY);
       
-      LOG.info("Faking svc1 healthy again, should go back to svc1");
-      svc1.isHealthy = true;
-      waitForHAState(svc1, HAServiceState.ACTIVE);
-      waitForHAState(svc2, HAServiceState.STANDBY);
-      waitForActiveLockHolder(svc1);
+      LOG.info("Faking svc0 healthy again, should go back to svc0");
+      cluster.setHealthy(0, true);
+      cluster.waitForHAState(0, HAServiceState.ACTIVE);
+      cluster.waitForHAState(1, HAServiceState.STANDBY);
+      cluster.waitForActiveLockHolder(0);
       
-      // Ensure that we can fail back to thr2 once it it is able
+      // Ensure that we can fail back to svc1  once it it is able
       // to become active (e.g the admin has restarted it)
-      LOG.info("Allowing svc2 to become active, expiring svc1");
-      svc2.failToBecomeActive = false;
-      expireAndVerifyFailover(thr1, thr2);
+      LOG.info("Allowing svc1 to become active, expiring svc0");
+      svc1.failToBecomeActive = false;
+      cluster.expireAndVerifyFailover(0, 1);
     } finally {
-      stopFCs();
+      cluster.stop();
     }
   }
   
@@ -303,27 +235,25 @@ public class TestZKFailoverController ex
   @Test(timeout=15000)
   public void testZooKeeperFailure() throws Exception {
     try {
-      setupFCs();
+      cluster.start();
 
       // Record initial ZK sessions
-      long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests();
-      long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests();
+      long session0 = cluster.getElector(0).getZKSessionIdForTests();
+      long session1 = cluster.getElector(1).getZKSessionIdForTests();
 
       LOG.info("====== Stopping ZK server");
       stopServer();
       waitForServerDown(hostPort, CONNECTION_TIMEOUT);
       
       LOG.info("====== Waiting for services to enter NEUTRAL mode");
-      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
-          thr1.zkfc.getElectorForTests(),
+      cluster.waitForElectorState(0,
           ActiveStandbyElector.State.NEUTRAL);
-      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
-          thr2.zkfc.getElectorForTests(),
+      cluster.waitForElectorState(1,
           ActiveStandbyElector.State.NEUTRAL);
 
       LOG.info("====== Checking that the services didn't change HA state");
-      assertEquals(HAServiceState.ACTIVE, svc1.state);
-      assertEquals(HAServiceState.STANDBY, svc2.state);
+      assertEquals(HAServiceState.ACTIVE, cluster.getService(0).state);
+      assertEquals(HAServiceState.STANDBY, cluster.getService(1).state);
       
       LOG.info("====== Restarting server");
       startServer();
@@ -331,134 +261,26 @@ public class TestZKFailoverController ex
 
       // Nodes should go back to their original states, since they re-obtain
       // the same sessions.
-      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
-          thr1.zkfc.getElectorForTests(),
-          ActiveStandbyElector.State.ACTIVE);
-      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
-          thr2.zkfc.getElectorForTests(),
-          ActiveStandbyElector.State.STANDBY);
+      cluster.waitForElectorState(0, ActiveStandbyElector.State.ACTIVE);
+      cluster.waitForElectorState(1, ActiveStandbyElector.State.STANDBY);
       // Check HA states didn't change.
-      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
-          thr1.zkfc.getElectorForTests(),
-          ActiveStandbyElector.State.ACTIVE);
-      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
-          thr2.zkfc.getElectorForTests(),
-          ActiveStandbyElector.State.STANDBY);
+      cluster.waitForHAState(0, HAServiceState.ACTIVE);
+      cluster.waitForHAState(1, HAServiceState.STANDBY);
+
       // Check they re-used the same sessions and didn't spuriously reconnect
+      assertEquals(session0,
+          cluster.getElector(0).getZKSessionIdForTests());
       assertEquals(session1,
-          thr1.zkfc.getElectorForTests().getZKSessionIdForTests());
-      assertEquals(session2,
-          thr2.zkfc.getElectorForTests().getZKSessionIdForTests());
+          cluster.getElector(1).getZKSessionIdForTests());
     } finally {
-      stopFCs();
-    }
-  }
-
-  /**
-   * Expire the ZK session of the given service. This requires
-   * (and asserts) that the given service be the current active.
-   * @throws NoNodeException if no service holds the lock
-   */
-  private void expireActiveLockHolder(DummyHAService expectedActive)
-      throws NoNodeException {
-    ZooKeeperServer zks = getServer(serverFactory);
-    Stat stat = new Stat();
-    byte[] data = zks.getZKDatabase().getData(
-        ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
-        ActiveStandbyElector.LOCK_FILENAME, stat, null);
-    
-    assertArrayEquals(Ints.toByteArray(expectedActive.index), data);
-    long session = stat.getEphemeralOwner();
-    LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session);
-    zks.closeSession(session);
-  }
-  
-  /**
-   * Wait for the given HA service to enter the given HA state.
-   */
-  private void waitForHAState(DummyHAService svc, HAServiceState state)
-      throws Exception {
-    while (svc.state != state) {
-      ctx.checkException();
-      Thread.sleep(50);
-    }
-  }
-  
-  /**
-   * Wait for the ZKFC to be notified of a change in health state.
-   */
-  private void waitForHealthState(DummyZKFC zkfc, State state)
-      throws Exception {
-    while (zkfc.getLastHealthState() != state) {
-      ctx.checkException();
-      Thread.sleep(50);
+      cluster.stop();
     }
   }
 
-  /**
-   * Wait for the given HA service to become the active lock holder.
-   * If the passed svc is null, waits for there to be no active
-   * lock holder.
-   */
-  private void waitForActiveLockHolder(DummyHAService svc)
-      throws Exception {
-    ZooKeeperServer zks = getServer(serverFactory);
-    ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
-        ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
-        (svc == null) ? null : Ints.toByteArray(svc.index));
-  }
-
-
   private int runFC(DummyHAService target, String ... args) throws Exception {
     DummyZKFC zkfc = new DummyZKFC(target);
     zkfc.setConf(conf);
     return zkfc.run(args);
   }
 
-  /**
-   * Test-thread which runs a ZK Failover Controller corresponding
-   * to a given dummy service.
-   */
-  private class DummyZKFCThread extends TestingThread {
-    private final DummyZKFC zkfc;
-
-    public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
-      super(ctx);
-      this.zkfc = new DummyZKFC(svc);
-      zkfc.setConf(conf);
-    }
-
-    @Override
-    public void doWork() throws Exception {
-      try {
-        assertEquals(0, zkfc.run(new String[0]));
-      } catch (InterruptedException ie) {
-        // Interrupted by main thread, that's OK.
-      }
-    }
-  }
-  
-  private static class DummyZKFC extends ZKFailoverController {
-    private final DummyHAService localTarget;
-    
-    public DummyZKFC(DummyHAService localTarget) {
-      this.localTarget = localTarget;
-    }
-
-    @Override
-    protected byte[] targetToData(HAServiceTarget target) {
-      return Ints.toByteArray(((DummyHAService)target).index);
-    }
-    
-    @Override
-    protected HAServiceTarget dataToTarget(byte[] data) {
-      int index = Ints.fromByteArray(data);
-      return DummyHAService.getInstance(index);
-    }
-
-    @Override
-    protected HAServiceTarget getLocalTarget() {
-      return localTarget;
-    }
-  }
 }

Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java?rev=1307599&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java Fri Mar 30 20:30:47 2012
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.File;
+import java.util.Random;
+import java.util.Set;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.JMXEnv;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Stress test for ZKFailoverController.
+ * Starts multiple ZKFCs for dummy services, and then performs many automatic
+ * failovers. While doing so, ensures that a fake "shared resource"
+ * (simulating the shared edits dir) is only owned by one service at a time. 
+ */
+public class TestZKFailoverControllerStress extends ClientBase {
+  
+  private static final int STRESS_RUNTIME_SECS = 30;
+  private static final int EXTRA_TIMEOUT_SECS = 10;
+  
+  private Configuration conf;
+  private MiniZKFCCluster cluster;
+
+  @Override
+  public void setUp() throws Exception {
+    // build.test.dir is used by zookeeper
+    new File(System.getProperty("build.test.dir", "build")).mkdirs();
+    super.setUp();
+  }
+  
+  @Before
+  public void setupConfAndServices() throws Exception {
+    conf = new Configuration();
+    conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
+    this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
+    cluster.start();
+  }
+  
+  @After
+  public void stopCluster() throws Exception {
+    cluster.stop();
+  }
+
+  /**
+   * ZK seems to have a bug when we muck with its sessions
+   * behind its back, causing disconnects, etc. This bug
+   * ends up leaving JMX beans around at the end of the test,
+   * and ClientBase's teardown method will throw an exception
+   * if it finds JMX beans leaked. So, clear them out there
+   * to workaround the ZK bug. See ZOOKEEPER-1438.
+   */
+  @After
+  public void clearZKJMX() throws Exception {
+    Set<ObjectName> names = JMXEnv.ensureAll();
+    for (ObjectName n : names) {
+      JMXEnv.conn().unregisterMBean(n);
+    }
+  }
+
+  /**
+   * Simply fail back and forth between two services for the
+   * configured amount of time, via expiring their ZK sessions.
+   */
+  @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+  public void testExpireBackAndForth() throws Exception {
+    long st = System.currentTimeMillis();
+    long runFor = STRESS_RUNTIME_SECS * 1000;
+
+    int i = 0;
+    while (System.currentTimeMillis() - st < runFor) {
+      // flip flop the services back and forth
+      int from = i % 2;
+      int to = (i + 1) % 2;
+
+      // Expire one service, it should fail over to the other
+      LOG.info("Failing over via expiration from " + from + " to " + to);
+      cluster.expireAndVerifyFailover(from, to);
+
+      i++;
+    }
+  }
+  
+  /**
+   * Randomly expire the ZK sessions of the two ZKFCs. This differs
+   * from the above test in that it is not a controlled failover -
+   * we just do random expirations and expect neither one to ever
+   * generate fatal exceptions.
+   */
+  @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+  public void testRandomExpirations() throws Exception {
+    long st = System.currentTimeMillis();
+    long runFor = STRESS_RUNTIME_SECS * 1000;
+
+    Random r = new Random();
+    while (System.currentTimeMillis() - st < runFor) {
+      cluster.getTestContext().checkException();
+      int targetIdx = r.nextInt(2);
+      ActiveStandbyElector target = cluster.getElector(targetIdx);
+      long sessId = target.getZKSessionIdForTests();
+      if (sessId != -1) {
+        LOG.info(String.format("Expiring session %x for svc %d",
+            sessId, targetIdx));
+        getServer(serverFactory).closeSession(sessId);
+      }
+      Thread.sleep(r.nextInt(300));
+    }
+  }
+  
+  /**
+   * Have the services fail their health checks half the time,
+   * causing the master role to bounce back and forth in the
+   * cluster. Meanwhile, causes ZK to disconnect clients every
+   * 50ms, to trigger the retry code and failures to become active.
+   */
+  @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+  public void testRandomHealthAndDisconnects() throws Exception {
+    long runFor = STRESS_RUNTIME_SECS * 1000;
+    Mockito.doAnswer(new RandomlyThrow(0))
+        .when(cluster.getService(0).proxy).monitorHealth();
+    Mockito.doAnswer(new RandomlyThrow(1))
+        .when(cluster.getService(1).proxy).monitorHealth();
+    ActiveStandbyElector.NUM_RETRIES = 100;
+    
+    long st = System.currentTimeMillis();
+    while (System.currentTimeMillis() - st < runFor) {
+      cluster.getTestContext().checkException();
+      serverFactory.closeAll();
+      Thread.sleep(50);
+    }
+  }
+  
+  
+  /**
+   * Randomly throw an exception half the time the method is called
+   */
+  @SuppressWarnings("rawtypes")
+  private static class RandomlyThrow implements Answer {
+    private Random r = new Random();
+    private final int svcIdx;
+    public RandomlyThrow(int svcIdx) {
+      this.svcIdx = svcIdx;
+    }
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      if (r.nextBoolean()) {
+        LOG.info("Throwing an exception for svc " + svcIdx);
+        throw new HealthCheckFailedException("random failure");
+      }
+      return invocation.callRealMethod();
+    }
+  }
+}