You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/30 22:30:48 UTC
svn commit: r1307599 - in
/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/
Author: todd
Date: Fri Mar 30 20:30:47 2012
New Revision: 1307599
URL: http://svn.apache.org/viewvc?rev=1307599&view=rev
Log:
HADOOP-8228. Auto HA: Refactor tests and add stress tests. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
Modified:
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt Fri Mar 30 20:30:47 2012
@@ -6,3 +6,4 @@ branch is merged.
HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd)
+HADOOP-8228. Auto HA: Refactor tests and add stress tests. (todd)
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Fri Mar 30 20:30:47 2012
@@ -140,7 +140,7 @@ public class ActiveStandbyElector implem
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
- private static final int NUM_RETRIES = 3;
+ static int NUM_RETRIES = 3;
private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
private static enum ConnectionState {
@@ -662,8 +662,12 @@ public class ActiveStandbyElector implem
}
@VisibleForTesting
- long getZKSessionIdForTests() {
- return zkClient.getSessionId();
+ synchronized long getZKSessionIdForTests() {
+ if (zkClient != null) {
+ return zkClient.getSessionId();
+ } else {
+ return -1;
+ }
}
@VisibleForTesting
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Fri Mar 30 20:30:47 2012
@@ -146,7 +146,12 @@ public abstract class ZKFailoverControll
}
initHM();
- mainLoop();
+ try {
+ mainLoop();
+ } finally {
+ healthMonitor.shutdown();
+ healthMonitor.join();
+ }
return 0;
}
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java Fri Mar 30 20:30:47 2012
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.security.AccessControlException;
@@ -34,6 +36,7 @@ import com.google.common.collect.Lists;
* a mock implementation.
*/
class DummyHAService extends HAServiceTarget {
+ public static final Log LOG = LogFactory.getLog(DummyHAService.class);
volatile HAServiceState state;
HAServiceProtocol proxy;
NodeFencer fencer;
@@ -42,13 +45,21 @@ class DummyHAService extends HAServiceTa
boolean actUnreachable = false;
boolean failToBecomeActive;
+ DummySharedResource sharedResource;
+
static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index;
DummyHAService(HAServiceState state, InetSocketAddress address) {
this.state = state;
this.proxy = makeMock();
- this.fencer = Mockito.mock(NodeFencer.class);
+ try {
+ Configuration conf = new Configuration();
+ conf.set(NodeFencer.CONF_METHODS_KEY, DummyFencer.class.getName());
+ this.fencer = Mockito.spy(NodeFencer.create(conf));
+ } catch (BadFencingConfigurationException e) {
+ throw new RuntimeException(e);
+ }
this.address = address;
synchronized (instances) {
instances.add(this);
@@ -56,6 +67,10 @@ class DummyHAService extends HAServiceTa
}
}
+ public void setSharedResource(DummySharedResource rsrc) {
+ this.sharedResource = rsrc;
+ }
+
private HAServiceProtocol makeMock() {
return Mockito.spy(new MockHAProtocolImpl());
}
@@ -107,7 +122,9 @@ class DummyHAService extends HAServiceTa
if (failToBecomeActive) {
throw new ServiceFailedException("injected failure");
}
-
+ if (sharedResource != null) {
+ sharedResource.take(DummyHAService.this);
+ }
state = HAServiceState.ACTIVE;
}
@@ -115,6 +132,9 @@ class DummyHAService extends HAServiceTa
public void transitionToStandby() throws ServiceFailedException,
AccessControlException, IOException {
checkUnreachable();
+ if (sharedResource != null) {
+ sharedResource.release(DummyHAService.this);
+ }
state = HAServiceState.STANDBY;
}
@@ -138,4 +158,20 @@ class DummyHAService extends HAServiceTa
public void close() throws IOException {
}
}
+
+ public static class DummyFencer implements FenceMethod {
+
+ public void checkArgs(String args) throws BadFencingConfigurationException {
+ }
+
+ @Override
+ public boolean tryFence(HAServiceTarget target, String args)
+ throws BadFencingConfigurationException {
+ LOG.info("tryFence(" + target + ")");
+ DummyHAService svc = (DummyHAService)target;
+ svc.sharedResource.release(svc);
+ return true;
+ }
+ }
+
}
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java?rev=1307599&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java Fri Mar 30 20:30:47 2012
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import org.junit.Assert;
+
+/**
+ * A fake shared resource, for use in automatic failover testing.
+ * This simulates a real shared resource like a shared edit log.
+ * When the {@link DummyHAService} instances change state or get
+ * fenced, they notify the shared resource, which asserts that
+ * we never have two HA services who think they're holding the
+ * resource at the same time.
+ */
+public class DummySharedResource {
+ private DummyHAService holder = null;
+ private int violations = 0;
+
+ public synchronized void take(DummyHAService newHolder) {
+ if (holder == null || holder == newHolder) {
+ holder = newHolder;
+ } else {
+ violations++;
+ throw new IllegalStateException("already held by: " + holder);
+ }
+ }
+
+ public synchronized void release(DummyHAService oldHolder) {
+ if (holder == oldHolder) {
+ holder = null;
+ }
+ }
+
+ public synchronized void assertNoViolations() {
+ Assert.assertEquals(0, violations);
+ }
+}
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java?rev=1307599&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java Fri Mar 30 20:30:47 2012
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+
+/**
+ * Harness for starting two dummy ZK FailoverControllers, associated with
+ * DummyHAServices. This harness starts two such ZKFCs, designated by
+ * indexes 0 and 1, and provides utilities for building tests around them.
+ */
+public class MiniZKFCCluster {
+ private final TestContext ctx;
+ private final ZooKeeperServer zks;
+
+ private DummyHAService svcs[];
+ private DummyZKFCThread thrs[];
+ private Configuration conf;
+
+ private DummySharedResource sharedResource = new DummySharedResource();
+
+ private static final Log LOG = LogFactory.getLog(MiniZKFCCluster.class);
+
+ public MiniZKFCCluster(Configuration conf, ZooKeeperServer zks) {
+ this.conf = conf;
+ // Fast check interval so tests run faster
+ conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
+ conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
+ conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
+ svcs = new DummyHAService[2];
+ svcs[0] = new DummyHAService(HAServiceState.INITIALIZING,
+ new InetSocketAddress("svc1", 1234));
+ svcs[0].setSharedResource(sharedResource);
+ svcs[1] = new DummyHAService(HAServiceState.INITIALIZING,
+ new InetSocketAddress("svc2", 1234));
+ svcs[1].setSharedResource(sharedResource);
+
+ this.ctx = new TestContext();
+ this.zks = zks;
+ }
+
+ /**
+ * Set up two services and their failover controllers. svc1 is started
+ * first, so that it enters ACTIVE state, and then svc2 is started,
+ * which enters STANDBY
+ */
+ public void start() throws Exception {
+ // Format the base dir, should succeed
+ thrs = new DummyZKFCThread[2];
+ thrs[0] = new DummyZKFCThread(ctx, svcs[0]);
+ assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"}));
+ ctx.addThread(thrs[0]);
+ thrs[0].start();
+
+ LOG.info("Waiting for svc0 to enter active state");
+ waitForHAState(0, HAServiceState.ACTIVE);
+
+ LOG.info("Adding svc1");
+ thrs[1] = new DummyZKFCThread(ctx, svcs[1]);
+ thrs[1].start();
+ waitForHAState(1, HAServiceState.STANDBY);
+ }
+
+ /**
+ * Stop the services.
+ * @throws Exception if either of the services had encountered a fatal error
+ */
+ public void stop() throws Exception {
+ for (DummyZKFCThread thr : thrs) {
+ if (thr != null) {
+ thr.interrupt();
+ }
+ }
+ if (ctx != null) {
+ ctx.stop();
+ }
+ sharedResource.assertNoViolations();
+ }
+
+ /**
+ * @return the TestContext implementation used internally. This allows more
+ * threads to be added to the context, etc.
+ */
+ public TestContext getTestContext() {
+ return ctx;
+ }
+
+ public DummyHAService getService(int i) {
+ return svcs[i];
+ }
+
+ public ActiveStandbyElector getElector(int i) {
+ return thrs[i].zkfc.getElectorForTests();
+ }
+
+ public void setHealthy(int idx, boolean healthy) {
+ svcs[idx].isHealthy = healthy;
+ }
+
+ public void setFailToBecomeActive(int idx, boolean doFail) {
+ svcs[idx].failToBecomeActive = doFail;
+ }
+
+ public void setUnreachable(int idx, boolean unreachable) {
+ svcs[idx].actUnreachable = unreachable;
+ }
+
+ /**
+ * Wait for the given HA service to enter the given HA state.
+ */
+ public void waitForHAState(int idx, HAServiceState state)
+ throws Exception {
+ DummyHAService svc = getService(idx);
+ while (svc.state != state) {
+ ctx.checkException();
+ Thread.sleep(50);
+ }
+ }
+
+ /**
+ * Wait for the ZKFC to be notified of a change in health state.
+ */
+ public void waitForHealthState(int idx, State state)
+ throws Exception {
+ ZKFailoverController zkfc = thrs[idx].zkfc;
+ while (zkfc.getLastHealthState() != state) {
+ ctx.checkException();
+ Thread.sleep(50);
+ }
+ }
+
+ /**
+ * Wait for the given elector to enter the given elector state.
+ * @param idx the service index (0 or 1)
+ * @param state the state to wait for
+ * @throws Exception if it times out, or an exception occurs on one
+ * of the ZKFC threads while waiting.
+ */
+ public void waitForElectorState(int idx,
+ ActiveStandbyElector.State state) throws Exception {
+ ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+ getElector(idx), state);
+ }
+
+
+
+ /**
+ * Expire the ZK session of the given service. This requires
+ * (and asserts) that the given service be the current active.
+ * @throws NoNodeException if no service holds the lock
+ */
+ public void expireActiveLockHolder(int idx)
+ throws NoNodeException {
+ Stat stat = new Stat();
+ byte[] data = zks.getZKDatabase().getData(
+ ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
+ ActiveStandbyElector.LOCK_FILENAME, stat, null);
+
+ assertArrayEquals(Ints.toByteArray(svcs[idx].index), data);
+ long session = stat.getEphemeralOwner();
+ LOG.info("Expiring svc " + idx + "'s zookeeper session " + session);
+ zks.closeSession(session);
+ }
+
+
+ /**
+ * Wait for the given HA service to become the active lock holder.
+ * If the passed svc is null, waits for there to be no active
+ * lock holder.
+ */
+ public void waitForActiveLockHolder(Integer idx)
+ throws Exception {
+ DummyHAService svc = idx == null ? null : svcs[idx];
+ ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
+ ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
+ (idx == null) ? null : Ints.toByteArray(svc.index));
+ }
+
+
+ /**
+ * Expires the ZK session associated with service 'fromIdx', and waits
+ * until service 'toIdx' takes over.
+ * @throws Exception if the target service does not become active
+ */
+ public void expireAndVerifyFailover(int fromIdx, int toIdx)
+ throws Exception {
+ Preconditions.checkArgument(fromIdx != toIdx);
+
+ getElector(fromIdx).preventSessionReestablishmentForTests();
+ try {
+ expireActiveLockHolder(fromIdx);
+
+ waitForHAState(fromIdx, HAServiceState.STANDBY);
+ waitForHAState(toIdx, HAServiceState.ACTIVE);
+ } finally {
+ getElector(fromIdx).allowSessionReestablishmentForTests();
+ }
+ }
+
+ /**
+ * Test-thread which runs a ZK Failover Controller corresponding
+ * to a given dummy service.
+ */
+ private class DummyZKFCThread extends TestingThread {
+ private final DummyZKFC zkfc;
+
+ public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
+ super(ctx);
+ this.zkfc = new DummyZKFC(svc);
+ zkfc.setConf(conf);
+ }
+
+ @Override
+ public void doWork() throws Exception {
+ try {
+ assertEquals(0, zkfc.run(new String[0]));
+ } catch (InterruptedException ie) {
+ // Interrupted by main thread, that's OK.
+ }
+ }
+ }
+
+ static class DummyZKFC extends ZKFailoverController {
+ private final DummyHAService localTarget;
+
+ public DummyZKFC(DummyHAService localTarget) {
+ this.localTarget = localTarget;
+ }
+
+ @Override
+ protected byte[] targetToData(HAServiceTarget target) {
+ return Ints.toByteArray(((DummyHAService)target).index);
+ }
+
+ @Override
+ protected HAServiceTarget dataToTarget(byte[] data) {
+ int index = Ints.fromByteArray(data);
+ return DummyHAService.getInstance(index);
+ }
+
+ @Override
+ protected HAServiceTarget getLocalTarget() {
+ return localTarget;
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1307599&r1=1307598&r2=1307599&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Fri Mar 30 20:30:47 2012
@@ -17,36 +17,24 @@
*/
package org.apache.hadoop.ha;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import java.io.File;
-import java.net.InetSocketAddress;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthMonitor.State;
-import org.apache.hadoop.test.MultithreadedTestUtil;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
import org.apache.log4j.Level;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import com.google.common.primitives.Ints;
-
public class TestZKFailoverController extends ClientBase {
private Configuration conf;
- private DummyHAService svc1;
- private DummyHAService svc2;
- private TestContext ctx;
- private DummyZKFCThread thr1, thr2;
+ private MiniZKFCCluster cluster;
static {
((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
@@ -63,49 +51,7 @@ public class TestZKFailoverController ex
public void setupConfAndServices() {
conf = new Configuration();
conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
- // Fast check interval so tests run faster
- conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
- conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
- conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
- svc1 = new DummyHAService(HAServiceState.INITIALIZING,
- new InetSocketAddress("svc1", 1234));
- svc2 = new DummyHAService(HAServiceState.INITIALIZING,
- new InetSocketAddress("svc2", 1234));
- }
-
- /**
- * Set up two services and their failover controllers. svc1 is started
- * first, so that it enters ACTIVE state, and then svc2 is started,
- * which enters STANDBY
- */
- private void setupFCs() throws Exception {
- // Format the base dir, should succeed
- assertEquals(0, runFC(svc1, "-formatZK"));
-
- ctx = new MultithreadedTestUtil.TestContext();
- thr1 = new DummyZKFCThread(ctx, svc1);
- ctx.addThread(thr1);
- thr1.start();
-
- LOG.info("Waiting for svc1 to enter active state");
- waitForHAState(svc1, HAServiceState.ACTIVE);
-
- LOG.info("Adding svc2");
- thr2 = new DummyZKFCThread(ctx, svc2);
- thr2.start();
- waitForHAState(svc2, HAServiceState.STANDBY);
- }
-
- private void stopFCs() throws Exception {
- if (thr1 != null) {
- thr1.interrupt();
- }
- if (thr2 != null) {
- thr2.interrupt();
- }
- if (ctx != null) {
- ctx.stop();
- }
+ this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
}
/**
@@ -114,20 +60,21 @@ public class TestZKFailoverController ex
*/
@Test(timeout=15000)
public void testFormatZK() throws Exception {
+ DummyHAService svc = cluster.getService(1);
// Run without formatting the base dir,
// should barf
assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
- runFC(svc1));
+ runFC(svc));
// Format the base dir, should succeed
- assertEquals(0, runFC(svc1, "-formatZK"));
+ assertEquals(0, runFC(svc, "-formatZK"));
// Should fail to format if already formatted
assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
- runFC(svc1, "-formatZK", "-nonInteractive"));
+ runFC(svc, "-formatZK", "-nonInteractive"));
// Unless '-force' is on
- assertEquals(0, runFC(svc1, "-formatZK", "-force"));
+ assertEquals(0, runFC(svc, "-formatZK", "-force"));
}
/**
@@ -136,14 +83,14 @@ public class TestZKFailoverController ex
*/
@Test(timeout=15000)
public void testFencingMustBeConfigured() throws Exception {
- svc1 = Mockito.spy(svc1);
+ DummyHAService svc = Mockito.spy(cluster.getService(0));
Mockito.doThrow(new BadFencingConfigurationException("no fencing"))
- .when(svc1).checkFencingConfigured();
+ .when(svc).checkFencingConfigured();
// Format the base dir, should succeed
- assertEquals(0, runFC(svc1, "-formatZK"));
+ assertEquals(0, runFC(svc, "-formatZK"));
// Try to run the actual FC, should fail without a fencer
assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER,
- runFC(svc1));
+ runFC(svc));
}
/**
@@ -155,66 +102,50 @@ public class TestZKFailoverController ex
@Test(timeout=15000)
public void testAutoFailoverOnBadHealth() throws Exception {
try {
- setupFCs();
+ cluster.start();
+ DummyHAService svc1 = cluster.getService(1);
- LOG.info("Faking svc1 unhealthy, should failover to svc2");
- svc1.isHealthy = false;
- LOG.info("Waiting for svc1 to enter standby state");
- waitForHAState(svc1, HAServiceState.STANDBY);
- waitForHAState(svc2, HAServiceState.ACTIVE);
+ LOG.info("Faking svc0 unhealthy, should failover to svc1");
+ cluster.setHealthy(0, false);
+
+ LOG.info("Waiting for svc0 to enter standby state");
+ cluster.waitForHAState(0, HAServiceState.STANDBY);
+ cluster.waitForHAState(1, HAServiceState.ACTIVE);
- LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " +
+ LOG.info("Allowing svc0 to be healthy again, making svc1 unreachable " +
"and fail to gracefully go to standby");
- svc1.isHealthy = true;
- svc2.actUnreachable = true;
-
- // Allow fencing to succeed
- Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2));
- // Should fail back to svc1 at this point
- waitForHAState(svc1, HAServiceState.ACTIVE);
- // and fence svc2
- Mockito.verify(svc2.fencer).fence(Mockito.same(svc2));
+ cluster.setUnreachable(1, true);
+ cluster.setHealthy(0, true);
+
+ // Should fail back to svc0 at this point
+ cluster.waitForHAState(0, HAServiceState.ACTIVE);
+ // and fence svc1
+ Mockito.verify(svc1.fencer).fence(Mockito.same(svc1));
} finally {
- stopFCs();
+ cluster.stop();
}
}
@Test(timeout=15000)
public void testAutoFailoverOnLostZKSession() throws Exception {
try {
- setupFCs();
+ cluster.start();
- // Expire svc1, it should fail over to svc2
- expireAndVerifyFailover(thr1, thr2);
+ // Expire svc0, it should fail over to svc1
+ cluster.expireAndVerifyFailover(0, 1);
- // Expire svc2, it should fail back to svc1
- expireAndVerifyFailover(thr2, thr1);
+ // Expire svc1, it should fail back to svc0
+ cluster.expireAndVerifyFailover(1, 0);
LOG.info("======= Running test cases second time to test " +
"re-establishment =========");
- // Expire svc1, it should fail over to svc2
- expireAndVerifyFailover(thr1, thr2);
+ // Expire svc0, it should fail over to svc1
+ cluster.expireAndVerifyFailover(0, 1);
- // Expire svc2, it should fail back to svc1
- expireAndVerifyFailover(thr2, thr1);
+ // Expire svc1, it should fail back to svc0
+ cluster.expireAndVerifyFailover(1, 0);
} finally {
- stopFCs();
- }
- }
-
- private void expireAndVerifyFailover(DummyZKFCThread fromThr,
- DummyZKFCThread toThr) throws Exception {
- DummyHAService fromSvc = fromThr.zkfc.localTarget;
- DummyHAService toSvc = toThr.zkfc.localTarget;
-
- fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
- try {
- expireActiveLockHolder(fromSvc);
-
- waitForHAState(fromSvc, HAServiceState.STANDBY);
- waitForHAState(toSvc, HAServiceState.ACTIVE);
- } finally {
- fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+ cluster.stop();
}
}
@@ -225,33 +156,32 @@ public class TestZKFailoverController ex
@Test(timeout=15000)
public void testDontFailoverToUnhealthyNode() throws Exception {
try {
- setupFCs();
+ cluster.start();
- // Make svc2 unhealthy, and wait for its FC to notice the bad health.
- svc2.isHealthy = false;
- waitForHealthState(thr2.zkfc,
- HealthMonitor.State.SERVICE_UNHEALTHY);
+ // Make svc1 unhealthy, and wait for its FC to notice the bad health.
+ cluster.setHealthy(1, false);
+ cluster.waitForHealthState(1, HealthMonitor.State.SERVICE_UNHEALTHY);
- // Expire svc1
- thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
+ // Expire svc0
+ cluster.getElector(0).preventSessionReestablishmentForTests();
try {
- expireActiveLockHolder(svc1);
+ cluster.expireActiveLockHolder(0);
- LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" +
+ LOG.info("Expired svc0's ZK session. Waiting a second to give svc1" +
" a chance to take the lock, if it is ever going to.");
Thread.sleep(1000);
// Ensure that no one holds the lock.
- waitForActiveLockHolder(null);
+ cluster.waitForActiveLockHolder(null);
} finally {
- LOG.info("Allowing svc1's elector to re-establish its connection");
- thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+ LOG.info("Allowing svc0's elector to re-establish its connection");
+ cluster.getElector(0).allowSessionReestablishmentForTests();
}
- // svc1 should get the lock again
- waitForActiveLockHolder(svc1);
+ // svc0 should get the lock again
+ cluster.waitForActiveLockHolder(0);
} finally {
- stopFCs();
+ cluster.stop();
}
}
@@ -262,36 +192,38 @@ public class TestZKFailoverController ex
@Test(timeout=15000)
public void testBecomingActiveFails() throws Exception {
try {
- setupFCs();
+ cluster.start();
+ DummyHAService svc1 = cluster.getService(1);
- LOG.info("Making svc2 fail to become active");
- svc2.failToBecomeActive = true;
+ LOG.info("Making svc1 fail to become active");
+ cluster.setFailToBecomeActive(1, true);
- LOG.info("Faking svc1 unhealthy, should NOT successfully " +
- "failover to svc2");
- svc1.isHealthy = false;
- waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
- waitForActiveLockHolder(null);
+ LOG.info("Faking svc0 unhealthy, should NOT successfully " +
+ "failover to svc1");
+ cluster.setHealthy(0, false);
+ cluster.waitForHealthState(0, State.SERVICE_UNHEALTHY);
+ cluster.waitForActiveLockHolder(null);
- Mockito.verify(svc2.proxy, Mockito.timeout(2000).atLeastOnce())
+
+ Mockito.verify(svc1.proxy, Mockito.timeout(2000).atLeastOnce())
.transitionToActive();
- waitForHAState(svc1, HAServiceState.STANDBY);
- waitForHAState(svc2, HAServiceState.STANDBY);
+ cluster.waitForHAState(0, HAServiceState.STANDBY);
+ cluster.waitForHAState(1, HAServiceState.STANDBY);
- LOG.info("Faking svc1 healthy again, should go back to svc1");
- svc1.isHealthy = true;
- waitForHAState(svc1, HAServiceState.ACTIVE);
- waitForHAState(svc2, HAServiceState.STANDBY);
- waitForActiveLockHolder(svc1);
+ LOG.info("Faking svc0 healthy again, should go back to svc0");
+ cluster.setHealthy(0, true);
+ cluster.waitForHAState(0, HAServiceState.ACTIVE);
+ cluster.waitForHAState(1, HAServiceState.STANDBY);
+ cluster.waitForActiveLockHolder(0);
- // Ensure that we can fail back to thr2 once it it is able
+ // Ensure that we can fail back to svc1 once it it is able
// to become active (e.g the admin has restarted it)
- LOG.info("Allowing svc2 to become active, expiring svc1");
- svc2.failToBecomeActive = false;
- expireAndVerifyFailover(thr1, thr2);
+ LOG.info("Allowing svc1 to become active, expiring svc0");
+ svc1.failToBecomeActive = false;
+ cluster.expireAndVerifyFailover(0, 1);
} finally {
- stopFCs();
+ cluster.stop();
}
}
@@ -303,27 +235,25 @@ public class TestZKFailoverController ex
@Test(timeout=15000)
public void testZooKeeperFailure() throws Exception {
try {
- setupFCs();
+ cluster.start();
// Record initial ZK sessions
- long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests();
- long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests();
+ long session0 = cluster.getElector(0).getZKSessionIdForTests();
+ long session1 = cluster.getElector(1).getZKSessionIdForTests();
LOG.info("====== Stopping ZK server");
stopServer();
waitForServerDown(hostPort, CONNECTION_TIMEOUT);
LOG.info("====== Waiting for services to enter NEUTRAL mode");
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr1.zkfc.getElectorForTests(),
+ cluster.waitForElectorState(0,
ActiveStandbyElector.State.NEUTRAL);
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr2.zkfc.getElectorForTests(),
+ cluster.waitForElectorState(1,
ActiveStandbyElector.State.NEUTRAL);
LOG.info("====== Checking that the services didn't change HA state");
- assertEquals(HAServiceState.ACTIVE, svc1.state);
- assertEquals(HAServiceState.STANDBY, svc2.state);
+ assertEquals(HAServiceState.ACTIVE, cluster.getService(0).state);
+ assertEquals(HAServiceState.STANDBY, cluster.getService(1).state);
LOG.info("====== Restarting server");
startServer();
@@ -331,134 +261,26 @@ public class TestZKFailoverController ex
// Nodes should go back to their original states, since they re-obtain
// the same sessions.
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr1.zkfc.getElectorForTests(),
- ActiveStandbyElector.State.ACTIVE);
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr2.zkfc.getElectorForTests(),
- ActiveStandbyElector.State.STANDBY);
+ cluster.waitForElectorState(0, ActiveStandbyElector.State.ACTIVE);
+ cluster.waitForElectorState(1, ActiveStandbyElector.State.STANDBY);
// Check HA states didn't change.
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr1.zkfc.getElectorForTests(),
- ActiveStandbyElector.State.ACTIVE);
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr2.zkfc.getElectorForTests(),
- ActiveStandbyElector.State.STANDBY);
+ cluster.waitForHAState(0, HAServiceState.ACTIVE);
+ cluster.waitForHAState(1, HAServiceState.STANDBY);
+
// Check they re-used the same sessions and didn't spuriously reconnect
+ assertEquals(session0,
+ cluster.getElector(0).getZKSessionIdForTests());
assertEquals(session1,
- thr1.zkfc.getElectorForTests().getZKSessionIdForTests());
- assertEquals(session2,
- thr2.zkfc.getElectorForTests().getZKSessionIdForTests());
+ cluster.getElector(1).getZKSessionIdForTests());
} finally {
- stopFCs();
- }
- }
-
- /**
- * Expire the ZK session of the given service. This requires
- * (and asserts) that the given service be the current active.
- * @throws NoNodeException if no service holds the lock
- */
- private void expireActiveLockHolder(DummyHAService expectedActive)
- throws NoNodeException {
- ZooKeeperServer zks = getServer(serverFactory);
- Stat stat = new Stat();
- byte[] data = zks.getZKDatabase().getData(
- ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
- ActiveStandbyElector.LOCK_FILENAME, stat, null);
-
- assertArrayEquals(Ints.toByteArray(expectedActive.index), data);
- long session = stat.getEphemeralOwner();
- LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session);
- zks.closeSession(session);
- }
-
- /**
- * Wait for the given HA service to enter the given HA state.
- */
- private void waitForHAState(DummyHAService svc, HAServiceState state)
- throws Exception {
- while (svc.state != state) {
- ctx.checkException();
- Thread.sleep(50);
- }
- }
-
- /**
- * Wait for the ZKFC to be notified of a change in health state.
- */
- private void waitForHealthState(DummyZKFC zkfc, State state)
- throws Exception {
- while (zkfc.getLastHealthState() != state) {
- ctx.checkException();
- Thread.sleep(50);
+ cluster.stop();
}
}
- /**
- * Wait for the given HA service to become the active lock holder.
- * If the passed svc is null, waits for there to be no active
- * lock holder.
- */
- private void waitForActiveLockHolder(DummyHAService svc)
- throws Exception {
- ZooKeeperServer zks = getServer(serverFactory);
- ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
- ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
- (svc == null) ? null : Ints.toByteArray(svc.index));
- }
-
-
private int runFC(DummyHAService target, String ... args) throws Exception {
DummyZKFC zkfc = new DummyZKFC(target);
zkfc.setConf(conf);
return zkfc.run(args);
}
- /**
- * Test-thread which runs a ZK Failover Controller corresponding
- * to a given dummy service.
- */
- private class DummyZKFCThread extends TestingThread {
- private final DummyZKFC zkfc;
-
- public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
- super(ctx);
- this.zkfc = new DummyZKFC(svc);
- zkfc.setConf(conf);
- }
-
- @Override
- public void doWork() throws Exception {
- try {
- assertEquals(0, zkfc.run(new String[0]));
- } catch (InterruptedException ie) {
- // Interrupted by main thread, that's OK.
- }
- }
- }
-
- private static class DummyZKFC extends ZKFailoverController {
- private final DummyHAService localTarget;
-
- public DummyZKFC(DummyHAService localTarget) {
- this.localTarget = localTarget;
- }
-
- @Override
- protected byte[] targetToData(HAServiceTarget target) {
- return Ints.toByteArray(((DummyHAService)target).index);
- }
-
- @Override
- protected HAServiceTarget dataToTarget(byte[] data) {
- int index = Ints.fromByteArray(data);
- return DummyHAService.getInstance(index);
- }
-
- @Override
- protected HAServiceTarget getLocalTarget() {
- return localTarget;
- }
- }
}
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java?rev=1307599&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java Fri Mar 30 20:30:47 2012
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.File;
+import java.util.Random;
+import java.util.Set;
+
+import javax.management.ObjectName;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.JMXEnv;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Stress test for ZKFailoverController.
+ * Starts multiple ZKFCs for dummy services, and then performs many automatic
+ * failovers. While doing so, ensures that a fake "shared resource"
+ * (simulating the shared edits dir) is only owned by one service at a time.
+ */
+public class TestZKFailoverControllerStress extends ClientBase {
+
+ private static final int STRESS_RUNTIME_SECS = 30;
+ private static final int EXTRA_TIMEOUT_SECS = 10;
+
+ private Configuration conf;
+ private MiniZKFCCluster cluster;
+
+ @Override
+ public void setUp() throws Exception {
+ // build.test.dir is used by zookeeper
+ new File(System.getProperty("build.test.dir", "build")).mkdirs();
+ super.setUp();
+ }
+
+ @Before
+ public void setupConfAndServices() throws Exception {
+ conf = new Configuration();
+ conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
+ this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
+ cluster.start();
+ }
+
+ @After
+ public void stopCluster() throws Exception {
+ cluster.stop();
+ }
+
+ /**
+ * ZK seems to have a bug when we muck with its sessions
+ * behind its back, causing disconnects, etc. This bug
+ * ends up leaving JMX beans around at the end of the test,
+ * and ClientBase's teardown method will throw an exception
+ * if it finds JMX beans leaked. So, clear them out there
+ * to workaround the ZK bug. See ZOOKEEPER-1438.
+ */
+ @After
+ public void clearZKJMX() throws Exception {
+ Set<ObjectName> names = JMXEnv.ensureAll();
+ for (ObjectName n : names) {
+ JMXEnv.conn().unregisterMBean(n);
+ }
+ }
+
+ /**
+ * Simply fail back and forth between two services for the
+ * configured amount of time, via expiring their ZK sessions.
+ */
+ @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+ public void testExpireBackAndForth() throws Exception {
+ long st = System.currentTimeMillis();
+ long runFor = STRESS_RUNTIME_SECS * 1000;
+
+ int i = 0;
+ while (System.currentTimeMillis() - st < runFor) {
+ // flip flop the services back and forth
+ int from = i % 2;
+ int to = (i + 1) % 2;
+
+ // Expire one service, it should fail over to the other
+ LOG.info("Failing over via expiration from " + from + " to " + to);
+ cluster.expireAndVerifyFailover(from, to);
+
+ i++;
+ }
+ }
+
+ /**
+ * Randomly expire the ZK sessions of the two ZKFCs. This differs
+ * from the above test in that it is not a controlled failover -
+ * we just do random expirations and expect neither one to ever
+ * generate fatal exceptions.
+ */
+ @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+ public void testRandomExpirations() throws Exception {
+ long st = System.currentTimeMillis();
+ long runFor = STRESS_RUNTIME_SECS * 1000;
+
+ Random r = new Random();
+ while (System.currentTimeMillis() - st < runFor) {
+ cluster.getTestContext().checkException();
+ int targetIdx = r.nextInt(2);
+ ActiveStandbyElector target = cluster.getElector(targetIdx);
+ long sessId = target.getZKSessionIdForTests();
+ if (sessId != -1) {
+ LOG.info(String.format("Expiring session %x for svc %d",
+ sessId, targetIdx));
+ getServer(serverFactory).closeSession(sessId);
+ }
+ Thread.sleep(r.nextInt(300));
+ }
+ }
+
+ /**
+ * Have the services fail their health checks half the time,
+ * causing the master role to bounce back and forth in the
+ * cluster. Meanwhile, causes ZK to disconnect clients every
+ * 50ms, to trigger the retry code and failures to become active.
+ */
+ @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+ public void testRandomHealthAndDisconnects() throws Exception {
+ long runFor = STRESS_RUNTIME_SECS * 1000;
+ Mockito.doAnswer(new RandomlyThrow(0))
+ .when(cluster.getService(0).proxy).monitorHealth();
+ Mockito.doAnswer(new RandomlyThrow(1))
+ .when(cluster.getService(1).proxy).monitorHealth();
+ ActiveStandbyElector.NUM_RETRIES = 100;
+
+ long st = System.currentTimeMillis();
+ while (System.currentTimeMillis() - st < runFor) {
+ cluster.getTestContext().checkException();
+ serverFactory.closeAll();
+ Thread.sleep(50);
+ }
+ }
+
+
+ /**
+ * Randomly throw an exception half the time the method is called
+ */
+ @SuppressWarnings("rawtypes")
+ private static class RandomlyThrow implements Answer {
+ private Random r = new Random();
+ private final int svcIdx;
+ public RandomlyThrow(int svcIdx) {
+ this.svcIdx = svcIdx;
+ }
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (r.nextBoolean()) {
+ LOG.info("Throwing an exception for svc " + svcIdx);
+ throw new HealthCheckFailedException("random failure");
+ }
+ return invocation.callRealMethod();
+ }
+ }
+}