You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/24 01:05:40 UTC
svn commit: r1304676 - in
/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/
Author: todd
Date: Sat Mar 24 00:05:40 2012
New Revision: 1304676
URL: http://svn.apache.org/viewvc?rev=1304676&view=rev
Log:
HADOOP-8163. Improve ActiveStandbyElector to provide hooks for fencing old active. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1304676&r1=1304675&r2=1304676&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Sat Mar 24 00:05:40 2012
@@ -106,6 +106,9 @@ Release 0.23.3 - UNRELEASED
HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet.
(Sanjay Radia via szetszwo)
+ HADOOP-8163. Improve ActiveStandbyElector to provide hooks for
+ fencing old active. (todd)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1304676&r1=1304675&r2=1304676&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Sat Mar 24 00:05:40 2012
@@ -19,6 +19,7 @@
package org.apache.hadoop.ha;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
@@ -37,6 +39,7 @@ import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException.Code;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
/**
*
@@ -106,6 +109,15 @@ public class ActiveStandbyElector implem
* called to notify the app about it.
*/
void notifyFatalError(String errorMessage);
+
+ /**
+ * If an old active has failed, rather than exited gracefully, then
+ * the new active may need to take some fencing actions against it
+ * before proceeding with failover.
+ *
+ * @param oldActiveData the application data provided by the prior active
+ */
+ void fenceOldActive(byte[] oldActiveData);
}
/**
@@ -113,7 +125,9 @@ public class ActiveStandbyElector implem
* classes
*/
@VisibleForTesting
- protected static final String LOCKFILENAME = "ActiveStandbyElectorLock";
+ protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock";
+ @VisibleForTesting
+ protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb";
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
@@ -139,6 +153,7 @@ public class ActiveStandbyElector implem
private final List<ACL> zkAcl;
private byte[] appData;
private final String zkLockFilePath;
+ private final String zkBreadCrumbPath;
private final String znodeWorkingDir;
/**
@@ -182,7 +197,8 @@ public class ActiveStandbyElector implem
zkAcl = acl;
appClient = app;
znodeWorkingDir = parentZnodeName;
- zkLockFilePath = znodeWorkingDir + "/" + LOCKFILENAME;
+ zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
+ zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
// createConnection for future API calls
createConnection();
@@ -204,6 +220,7 @@ public class ActiveStandbyElector implem
*/
public synchronized void joinElection(byte[] data)
throws HadoopIllegalArgumentException {
+
LOG.debug("Attempting active election");
if (data == null) {
@@ -215,6 +232,49 @@ public class ActiveStandbyElector implem
joinElectionInternal();
}
+
+ /**
+ * @return true if the configured parent znode exists
+ */
+ public synchronized boolean parentZNodeExists()
+ throws IOException, InterruptedException {
+ Preconditions.checkState(zkClient != null);
+ try {
+ return zkClient.exists(znodeWorkingDir, false) != null;
+ } catch (KeeperException e) {
+ throw new IOException("Couldn't determine existence of znode '" +
+ znodeWorkingDir + "'", e);
+ }
+ }
+
+ /**
+ * Utility function to ensure that the configured base znode exists.
+ * This recursively creates the znode as well as all of its parents.
+ */
+ public synchronized void ensureParentZNode()
+ throws IOException, InterruptedException {
+ String pathParts[] = znodeWorkingDir.split("/");
+ Preconditions.checkArgument(pathParts.length >= 1 &&
+ "".equals(pathParts[0]),
+ "Invalid path: %s", znodeWorkingDir);
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i < pathParts.length; i++) {
+ sb.append("/").append(pathParts[i]);
+ String prefixPath = sb.toString();
+ LOG.debug("Ensuring existence of " + prefixPath);
+ try {
+ createWithRetries(prefixPath, new byte[]{}, zkAcl, CreateMode.PERSISTENT);
+ } catch (KeeperException e) {
+ if (isNodeExists(e.code())) {
+ // This is OK - just ensuring existence.
+ continue;
+ } else {
+ throw new IOException("Couldn't create " + prefixPath, e);
+ }
+ }
+ }
+ }
/**
* Any service instance can drop out of the election by calling quitElection.
@@ -225,9 +285,17 @@ public class ActiveStandbyElector implem
* call joinElection(). <br/>
* This allows service instances to take themselves out of rotation for known
* impending unavailable states (e.g. long GC pause or software upgrade).
+ *
+ * @param needFence true if the underlying daemon may need to be fenced
+ * if a failover occurs due to dropping out of the election.
*/
- public synchronized void quitElection() {
+ public synchronized void quitElection(boolean needFence) {
LOG.debug("Yielding from election");
+ if (!needFence && state == State.ACTIVE) {
+ // If active is gracefully going back to standby mode, remove
+ // our permanent znode so no one fences us.
+ tryDeleteOwnBreadCrumbNode();
+ }
reset();
}
@@ -260,7 +328,7 @@ public class ActiveStandbyElector implem
return zkClient.getData(zkLockFilePath, false, stat);
} catch(KeeperException e) {
Code code = e.code();
- if (operationNodeDoesNotExist(code)) {
+ if (isNodeDoesNotExist(code)) {
// handle the commonly expected cases that make sense for us
throw new ActiveNotFoundException();
} else {
@@ -284,14 +352,14 @@ public class ActiveStandbyElector implem
}
Code code = Code.get(rc);
- if (operationSuccess(code)) {
+ if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
becomeActive();
monitorActiveStatus();
return;
}
- if (operationNodeExists(code)) {
+ if (isNodeExists(code)) {
if (createRetryCount == 0) {
// znode exists and we did not retry the operation. so a different
// instance has created it. become standby and monitor lock.
@@ -306,14 +374,14 @@ public class ActiveStandbyElector implem
}
String errorMessage = "Received create error from Zookeeper. code:"
- + code.toString();
+ + code.toString() + " for path " + path;
LOG.debug(errorMessage);
- if (operationRetry(code)) {
+ if (shouldRetry(code)) {
if (createRetryCount < NUM_RETRIES) {
LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
++createRetryCount;
- createNode();
+ createLockNodeAsync();
return;
}
errorMessage = errorMessage
@@ -338,7 +406,7 @@ public class ActiveStandbyElector implem
}
Code code = Code.get(rc);
- if (operationSuccess(code)) {
+ if (isSuccess(code)) {
// the following owner check completes verification in case the lock znode
// creation was retried
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
@@ -352,7 +420,7 @@ public class ActiveStandbyElector implem
return;
}
- if (operationNodeDoesNotExist(code)) {
+ if (isNodeDoesNotExist(code)) {
// the lock znode disappeared before we started monitoring it
enterNeutralMode();
joinElectionInternal();
@@ -363,10 +431,10 @@ public class ActiveStandbyElector implem
+ code.toString();
LOG.debug(errorMessage);
- if (operationRetry(code)) {
+ if (shouldRetry(code)) {
if (statRetryCount < NUM_RETRIES) {
++statRetryCount;
- monitorNode();
+ monitorLockNodeAsync();
return;
}
errorMessage = errorMessage
@@ -470,7 +538,7 @@ public class ActiveStandbyElector implem
private void monitorActiveStatus() {
LOG.debug("Monitoring active leader");
statRetryCount = 0;
- monitorNode();
+ monitorLockNodeAsync();
}
private void joinElectionInternal() {
@@ -482,7 +550,7 @@ public class ActiveStandbyElector implem
}
createRetryCount = 0;
- createNode();
+ createLockNodeAsync();
}
private void reJoinElection() {
@@ -515,7 +583,7 @@ public class ActiveStandbyElector implem
private void createConnection() throws IOException {
zkClient = getNewZooKeeper();
}
-
+
private void terminateConnection() {
if (zkClient == null) {
return;
@@ -538,12 +606,110 @@ public class ActiveStandbyElector implem
private void becomeActive() {
if (state != State.ACTIVE) {
+ try {
+ Stat oldBreadcrumbStat = fenceOldActive();
+ writeBreadCrumbNode(oldBreadcrumbStat);
+ } catch (Exception e) {
+ LOG.warn("Exception handling the winning of election", e);
+ reJoinElection();
+ return;
+ }
LOG.debug("Becoming active");
state = State.ACTIVE;
appClient.becomeActive();
}
}
+ /**
+ * Write the "ActiveBreadCrumb" node, indicating that this node may need
+ * to be fenced on failover.
+ * @param oldBreadcrumbStat
+ */
+ private void writeBreadCrumbNode(Stat oldBreadcrumbStat)
+ throws KeeperException, InterruptedException {
+ LOG.info("Writing znode " + zkBreadCrumbPath +
+ " to indicate that the local node is the most recent active...");
+ if (oldBreadcrumbStat == null) {
+ // No previous active, just create the node
+ createWithRetries(zkBreadCrumbPath, appData, zkAcl,
+ CreateMode.PERSISTENT);
+ } else {
+ // There was a previous active, update the node
+ setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
+ }
+ }
+
+ /**
+ * Try to delete the "ActiveBreadCrumb" node when gracefully giving up
+ * active status.
+ * If this fails, it will simply warn, since the graceful release behavior
+ * is only an optimization.
+ */
+ private void tryDeleteOwnBreadCrumbNode() {
+ assert state == State.ACTIVE;
+ LOG.info("Deleting bread-crumb of active node...");
+
+ // Sanity check the data. This shouldn't be strictly necessary,
+ // but better to play it safe.
+ Stat stat = new Stat();
+ byte[] data = null;
+ try {
+ data = zkClient.getData(zkBreadCrumbPath, false, stat);
+
+ if (!Arrays.equals(data, appData)) {
+ throw new IllegalStateException(
+ "We thought we were active, but in fact " +
+ "the active znode had the wrong data: " +
+ StringUtils.byteToHexString(data) + " (stat=" + stat + ")");
+ }
+
+ deleteWithRetries(zkBreadCrumbPath, stat.getVersion());
+ } catch (Exception e) {
+ LOG.warn("Unable to delete our own bread-crumb of being active at " +
+ zkBreadCrumbPath + ": " + e.getLocalizedMessage() + ". " +
+ "Expecting to be fenced by the next active.");
+ }
+ }
+
+ /**
+ * If there is a breadcrumb node indicating that another node may need
+ * fencing, try to fence that node.
+ * @return the Stat of the breadcrumb node that was read, or null
+ * if no breadcrumb node existed
+ */
+ private Stat fenceOldActive() throws InterruptedException, KeeperException {
+ final Stat stat = new Stat();
+ byte[] data;
+ LOG.info("Checking for any old active which needs to be fenced...");
+ try {
+ data = zkDoWithRetries(new ZKAction<byte[]>() {
+ @Override
+ public byte[] run() throws KeeperException, InterruptedException {
+ return zkClient.getData(zkBreadCrumbPath, false, stat);
+ }
+ });
+ } catch (KeeperException ke) {
+ if (isNodeDoesNotExist(ke.code())) {
+ LOG.info("No old node to fence");
+ return null;
+ }
+
+ // If we failed to read for any other reason, then likely we lost
+ // our session, or we don't have permissions, etc. In any case,
+ // we probably shouldn't become active, and failing the whole
+ // thing is the best bet.
+ throw ke;
+ }
+
+ LOG.info("Old node exists: " + StringUtils.byteToHexString(data));
+ if (Arrays.equals(data, appData)) {
+ LOG.info("But old node has our own data, so don't need to fence it.");
+ } else {
+ appClient.fenceOldActive(data);
+ }
+ return stat;
+ }
+
private void becomeStandby() {
if (state != State.STANDBY) {
LOG.debug("Becoming standby");
@@ -560,28 +726,76 @@ public class ActiveStandbyElector implem
}
}
- private void createNode() {
+ private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this,
null);
}
- private void monitorNode() {
- zkClient.exists(zkLockFilePath, true, this, null);
+ private void monitorLockNodeAsync() {
+ zkClient.exists(zkLockFilePath, this, this, null);
+ }
+
+ private String createWithRetries(final String path, final byte[] data,
+ final List<ACL> acl, final CreateMode mode)
+ throws InterruptedException, KeeperException {
+ return zkDoWithRetries(new ZKAction<String>() {
+ public String run() throws KeeperException, InterruptedException {
+ return zkClient.create(path, data, acl, mode);
+ }
+ });
+ }
+
+ private Stat setDataWithRetries(final String path, final byte[] data,
+ final int version) throws InterruptedException, KeeperException {
+ return zkDoWithRetries(new ZKAction<Stat>() {
+ public Stat run() throws KeeperException, InterruptedException {
+ return zkClient.setData(path, data, version);
+ }
+ });
+ }
+
+ private void deleteWithRetries(final String path, final int version)
+ throws KeeperException, InterruptedException {
+ zkDoWithRetries(new ZKAction<Void>() {
+ public Void run() throws KeeperException, InterruptedException {
+ zkClient.delete(path, version);
+ return null;
+ }
+ });
+ }
+
+ private static <T> T zkDoWithRetries(ZKAction<T> action)
+ throws KeeperException, InterruptedException {
+ int retry = 0;
+ while (true) {
+ try {
+ return action.run();
+ } catch (KeeperException ke) {
+ if (shouldRetry(ke.code()) && ++retry < NUM_RETRIES) {
+ continue;
+ }
+ throw ke;
+ }
+ }
+ }
+
+ private interface ZKAction<T> {
+ T run() throws KeeperException, InterruptedException;
}
- private boolean operationSuccess(Code code) {
+ private static boolean isSuccess(Code code) {
return (code == Code.OK);
}
- private boolean operationNodeExists(Code code) {
+ private static boolean isNodeExists(Code code) {
return (code == Code.NODEEXISTS);
}
- private boolean operationNodeDoesNotExist(Code code) {
+ private static boolean isNodeDoesNotExist(Code code) {
return (code == Code.NONODE);
}
- private boolean operationRetry(Code code) {
+ private static boolean shouldRetry(Code code) {
switch (code) {
case CONNECTIONLOSS:
case OPERATIONTIMEOUT:
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java?rev=1304676&r1=1304675&r2=1304676&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java Sat Mar 24 00:05:40 2012
@@ -42,12 +42,12 @@ import org.apache.hadoop.ha.ActiveStandb
public class TestActiveStandbyElector {
- static ZooKeeper mockZK;
- static int count;
- static ActiveStandbyElectorCallback mockApp;
- static final byte[] data = new byte[8];
+ private ZooKeeper mockZK;
+ private int count;
+ private ActiveStandbyElectorCallback mockApp;
+ private final byte[] data = new byte[8];
- ActiveStandbyElectorTester elector;
+ private ActiveStandbyElectorTester elector;
class ActiveStandbyElectorTester extends ActiveStandbyElector {
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
@@ -57,26 +57,46 @@ public class TestActiveStandbyElector {
@Override
public ZooKeeper getNewZooKeeper() {
- ++TestActiveStandbyElector.count;
- return TestActiveStandbyElector.mockZK;
+ ++count;
+ return mockZK;
}
-
}
- private static final String zkParentName = "/zookeeper";
- private static final String zkLockPathName = "/zookeeper/"
- + ActiveStandbyElector.LOCKFILENAME;
+ private static final String ZK_PARENT_NAME = "/parent/node";
+ private static final String ZK_LOCK_NAME = ZK_PARENT_NAME + "/" +
+ ActiveStandbyElector.LOCK_FILENAME;
+ private static final String ZK_BREADCRUMB_NAME = ZK_PARENT_NAME + "/" +
+ ActiveStandbyElector.BREADCRUMB_FILENAME;
@Before
public void init() throws IOException {
count = 0;
mockZK = Mockito.mock(ZooKeeper.class);
mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
- elector = new ActiveStandbyElectorTester("hostPort", 1000, zkParentName,
+ elector = new ActiveStandbyElectorTester("hostPort", 1000, ZK_PARENT_NAME,
Ids.OPEN_ACL_UNSAFE, mockApp);
}
/**
+ * Set up the mock ZK to return no info for a prior active in ZK.
+ */
+ private void mockNoPriorActive() throws Exception {
+ Mockito.doThrow(new KeeperException.NoNodeException()).when(mockZK)
+ .getData(Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.anyBoolean(),
+ Mockito.<Stat>any());
+ }
+
+ /**
+ * Set up the mock to return info for some prior active node in ZK./
+ */
+ private void mockPriorActive(byte[] data) throws Exception {
+ Mockito.doReturn(data).when(mockZK)
+ .getData(Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.anyBoolean(),
+ Mockito.<Stat>any());
+ }
+
+
+ /**
* verify that joinElection checks for null data
*/
@Test(expected = HadoopIllegalArgumentException.class)
@@ -90,7 +110,7 @@ public class TestActiveStandbyElector {
@Test
public void testJoinElection() {
elector.joinElection(data);
- Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data,
+ Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
}
@@ -99,30 +119,74 @@ public class TestActiveStandbyElector {
* started
*/
@Test
- public void testCreateNodeResultBecomeActive() {
+ public void testCreateNodeResultBecomeActive() throws Exception {
+ mockNoPriorActive();
+
elector.joinElection(data);
- elector.processResult(Code.OK.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(1);
// monitor callback verifies the leader is ephemeral owner of lock but does
// not call becomeActive since its already active
Stat stat = new Stat();
stat.setEphemeralOwner(1L);
Mockito.when(mockZK.getSessionId()).thenReturn(1L);
- elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
// should not call neutral mode/standby/active
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
Mockito.verify(mockApp, Mockito.times(0)).becomeStandby();
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
// another joinElection not called.
- Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data,
+ Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
// no new monitor called
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(1);
+ }
+
+ /**
+ * Verify that, if there is a record of a prior active node, the
+ * elector asks the application to fence it before becoming active.
+ */
+ @Test
+ public void testFencesOldActive() throws Exception {
+ byte[] fakeOldActiveData = new byte[0];
+ mockPriorActive(fakeOldActiveData);
+
+ elector.joinElection(data);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ // Application fences active.
+ Mockito.verify(mockApp, Mockito.times(1)).fenceOldActive(
+ fakeOldActiveData);
+ // Updates breadcrumb node to new data
+ Mockito.verify(mockZK, Mockito.times(1)).setData(
+ Mockito.eq(ZK_BREADCRUMB_NAME),
+ Mockito.eq(data),
+ Mockito.eq(0));
+ // Then it becomes active itself
+ Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+ }
+
+ @Test
+ public void testQuitElectionRemovesBreadcrumbNode() throws Exception {
+ mockNoPriorActive();
+ elector.joinElection(data);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ // Writes its own active info
+ Mockito.verify(mockZK, Mockito.times(1)).create(
+ Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(data),
+ Mockito.eq(Ids.OPEN_ACL_UNSAFE),
+ Mockito.eq(CreateMode.PERSISTENT));
+ mockPriorActive(data);
+
+ elector.quitElection(false);
+
+ // Deletes its own active data
+ Mockito.verify(mockZK, Mockito.times(1)).delete(
+ Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(0));
}
/**
@@ -133,11 +197,10 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultBecomeStandby() {
elector.joinElection(data);
- elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(1);
}
/**
@@ -147,10 +210,11 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultError() {
elector.joinElection(data);
- elector.processResult(Code.APIERROR.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
- "Received create error from Zookeeper. code:APIERROR");
+ "Received create error from Zookeeper. code:APIERROR " +
+ "for path " + ZK_LOCK_NAME);
}
/**
@@ -158,42 +222,43 @@ public class TestActiveStandbyElector {
* becomes active if they match. monitoring is started.
*/
@Test
- public void testCreateNodeResultRetryBecomeActive() {
+ public void testCreateNodeResultRetryBecomeActive() throws Exception {
+ mockNoPriorActive();
+
elector.joinElection(data);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
- zkLockPathName);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
- zkLockPathName);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
- zkLockPathName);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
// 4 errors results in fatalError
Mockito
.verify(mockApp, Mockito.times(1))
.notifyFatalError(
- "Received create error from Zookeeper. code:CONNECTIONLOSS. "+
+ "Received create error from Zookeeper. code:CONNECTIONLOSS " +
+ "for path " + ZK_LOCK_NAME + ". " +
"Not retrying further znode create connection errors.");
elector.joinElection(data);
// recreate connection via getNewZooKeeper
- Assert.assertEquals(2, TestActiveStandbyElector.count);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
- zkLockPathName);
- elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
- zkLockPathName);
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ Assert.assertEquals(2, count);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ verifyExistCall(1);
Stat stat = new Stat();
stat.setEphemeralOwner(1L);
Mockito.when(mockZK.getSessionId()).thenReturn(1L);
- elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
- Mockito.verify(mockZK, Mockito.times(6)).create(zkLockPathName, data,
+ verifyExistCall(1);
+ Mockito.verify(mockZK, Mockito.times(6)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
}
@@ -205,20 +270,18 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultRetryBecomeStandby() {
elector.joinElection(data);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
- zkLockPathName);
- elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
- zkLockPathName);
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ verifyExistCall(1);
Stat stat = new Stat();
stat.setEphemeralOwner(0);
Mockito.when(mockZK.getSessionId()).thenReturn(1L);
- elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(1);
}
/**
@@ -230,19 +293,18 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultRetryNoNode() {
elector.joinElection(data);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
- zkLockPathName);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
- zkLockPathName);
- elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
- zkLockPathName);
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
+ verifyExistCall(1);
- elector.processResult(Code.NONODE.intValue(), zkLockPathName, null,
+ elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, null,
(Stat) null);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
- Mockito.verify(mockZK, Mockito.times(4)).create(zkLockPathName, data,
+ Mockito.verify(mockZK, Mockito.times(4)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
}
@@ -251,13 +313,13 @@ public class TestActiveStandbyElector {
*/
@Test
public void testStatNodeRetry() {
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
(Stat) null);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
(Stat) null);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
(Stat) null);
- elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
(Stat) null);
Mockito
.verify(mockApp, Mockito.times(1))
@@ -271,7 +333,7 @@ public class TestActiveStandbyElector {
*/
@Test
public void testStatNodeError() {
- elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), zkLockPathName,
+ elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME,
null, (Stat) null);
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
@@ -282,7 +344,8 @@ public class TestActiveStandbyElector {
* verify behavior of watcher.process callback with non-node event
*/
@Test
- public void testProcessCallbackEventNone() {
+ public void testProcessCallbackEventNone() throws Exception {
+ mockNoPriorActive();
elector.joinElection(data);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
@@ -306,8 +369,7 @@ public class TestActiveStandbyElector {
Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.SyncConnected);
elector.process(mockEvent);
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(1);
// session expired should enter safe mode and initiate re-election
// re-election checked via checking re-creation of new zookeeper and
@@ -318,17 +380,16 @@ public class TestActiveStandbyElector {
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// called getNewZooKeeper to create new session. first call was in
// constructor
- Assert.assertEquals(2, TestActiveStandbyElector.count);
+ Assert.assertEquals(2, count);
// once in initial joinElection and one now
- Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+ Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
// create znode success. become master and monitor
- elector.processResult(Code.OK.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
- Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(2);
// error event results in fatal error
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
@@ -343,32 +404,30 @@ public class TestActiveStandbyElector {
* verify behavior of watcher.process with node event
*/
@Test
- public void testProcessCallbackEventNode() {
+ public void testProcessCallbackEventNode() throws Exception {
+ mockNoPriorActive();
elector.joinElection(data);
// make the object go into the monitoring state
- elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(1);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
- Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName);
+ Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeDataChanged);
elector.process(mockEvent);
- Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(2);
// monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeChildrenChanged);
elector.process(mockEvent);
- Mockito.verify(mockZK, Mockito.times(3)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(3);
// lock node deletion when in standby mode should create znode again
// successful znode creation enters active state and sets monitor
@@ -377,13 +436,12 @@ public class TestActiveStandbyElector {
// enterNeutralMode not called when app is standby and leader is lost
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// once in initial joinElection() and one now
- Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+ Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
- elector.processResult(Code.OK.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
- Mockito.verify(mockZK, Mockito.times(4)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(4);
// lock node deletion in active mode should enter neutral mode and create
// znode again successful znode creation enters active state and sets
@@ -392,13 +450,12 @@ public class TestActiveStandbyElector {
elector.process(mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// another joinElection called
- Mockito.verify(mockZK, Mockito.times(3)).create(zkLockPathName, data,
+ Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
- elector.processResult(Code.OK.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
- Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(5);
// bad path name results in fatal error
Mockito.when(mockEvent.getPath()).thenReturn(null);
@@ -406,11 +463,15 @@ public class TestActiveStandbyElector {
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Unexpected watch error from Zookeeper");
// fatal error means no new connection other than one from constructor
- Assert.assertEquals(1, TestActiveStandbyElector.count);
+ Assert.assertEquals(1, count);
// no new watches after fatal error
- Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(5);
+
+ }
+ private void verifyExistCall(int times) {
+ Mockito.verify(mockZK, Mockito.times(times)).exists(
+ ZK_LOCK_NAME, elector, elector, null);
}
/**
@@ -421,14 +482,13 @@ public class TestActiveStandbyElector {
elector.joinElection(data);
// make the object go into the monitoring standby state
- elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(1);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
- Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName);
+ Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
// notify node deletion
// monitoring should be setup again after event is received
@@ -437,16 +497,15 @@ public class TestActiveStandbyElector {
// is standby. no need to notify anything now
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// another joinElection called.
- Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
+ Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
// lost election
- elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
- zkLockPathName);
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
// still standby. so no need to notify again
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
// monitor is set again
- Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(2);
}
/**
@@ -454,22 +513,20 @@ public class TestActiveStandbyElector {
* next call to joinElection creates new connection and performs election
*/
@Test
- public void testQuitElection() throws InterruptedException {
- elector.quitElection();
+ public void testQuitElection() throws Exception {
+ elector.quitElection(true);
Mockito.verify(mockZK, Mockito.times(1)).close();
// no watches added
- Mockito.verify(mockZK, Mockito.times(0)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(0);
byte[] data = new byte[8];
elector.joinElection(data);
// getNewZooKeeper called 2 times. once in constructor and once now
- Assert.assertEquals(2, TestActiveStandbyElector.count);
- elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
- zkLockPathName);
+ Assert.assertEquals(2, count);
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
+ ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
- Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
- elector, null);
+ verifyExistCall(1);
}
@@ -488,16 +545,16 @@ public class TestActiveStandbyElector {
// get valid active data
byte[] data = new byte[8];
Mockito.when(
- mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+ mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject())).thenReturn(data);
Assert.assertEquals(data, elector.getActiveData());
Mockito.verify(mockZK, Mockito.times(1)).getData(
- Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject());
// active does not exist
Mockito.when(
- mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+ mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject())).thenThrow(
new KeeperException.NoNodeException());
try {
@@ -505,23 +562,65 @@ public class TestActiveStandbyElector {
Assert.fail("ActiveNotFoundException expected");
} catch(ActiveNotFoundException e) {
Mockito.verify(mockZK, Mockito.times(2)).getData(
- Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject());
}
// error getting active data rethrows keeperexception
try {
Mockito.when(
- mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
+ mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject())).thenThrow(
new KeeperException.AuthFailedException());
elector.getActiveData();
Assert.fail("KeeperException.AuthFailedException expected");
} catch(KeeperException.AuthFailedException ke) {
Mockito.verify(mockZK, Mockito.times(3)).getData(
- Mockito.eq(zkLockPathName), Mockito.eq(false),
+ Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject());
}
}
+ /**
+ * Test that ensureBaseNode() recursively creates the specified dir
+ */
+ @Test
+ public void testEnsureBaseNode() throws Exception {
+ elector.ensureParentZNode();
+ StringBuilder prefix = new StringBuilder();
+ for (String part : ZK_PARENT_NAME.split("/")) {
+ if (part.isEmpty()) continue;
+ prefix.append("/").append(part);
+ if (!"/".equals(prefix.toString())) {
+ Mockito.verify(mockZK).create(
+ Mockito.eq(prefix.toString()), Mockito.<byte[]>any(),
+ Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
+ }
+ }
+ }
+
+ /**
+ * Test for a bug encountered during development of HADOOP-8163:
+ * ensureBaseNode() should throw an exception if it has to retry
+ * more than 3 times to create any part of the path.
+ */
+ @Test
+ public void testEnsureBaseNodeFails() throws Exception {
+ Mockito.doThrow(new KeeperException.ConnectionLossException())
+ .when(mockZK).create(
+ Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
+ Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
+ try {
+ elector.ensureParentZNode();
+ Assert.fail("Did not throw!");
+ } catch (IOException ioe) {
+ if (!(ioe.getCause() instanceof KeeperException.ConnectionLossException)) {
+ throw ioe;
+ }
+ }
+ // Should have tried three times
+ Mockito.verify(mockZK, Mockito.times(3)).create(
+ Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
+ Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
+ }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java?rev=1304676&r1=1304675&r2=1304676&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java Sat Mar 24 00:05:40 2012
@@ -18,19 +18,24 @@
package org.apache.hadoop.ha;
+import static org.junit.Assert.*;
+
import java.io.File;
import java.io.IOException;
-import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.log4j.Level;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.test.ClientBase;
/**
@@ -39,7 +44,17 @@ import org.apache.zookeeper.test.ClientB
public class TestActiveStandbyElectorRealZK extends ClientBase {
static final int NUM_ELECTORS = 2;
static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS];
- static int currentClientIndex = 0;
+
+ static {
+ ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(
+ Level.ALL);
+ }
+
+ int activeIndex = -1;
+ int standbyIndex = -1;
+ static final String PARENT_DIR = "/" + UUID.randomUUID();
+
+ ActiveStandbyElector[] electors = new ActiveStandbyElector[NUM_ELECTORS];
@Override
public void setUp() throws Exception {
@@ -48,20 +63,6 @@ public class TestActiveStandbyElectorRea
super.setUp();
}
- class ActiveStandbyElectorTesterRealZK extends ActiveStandbyElector {
- ActiveStandbyElectorTesterRealZK(String hostPort, int timeout,
- String parent, List<ACL> acl, ActiveStandbyElectorCallback app)
- throws IOException {
- super(hostPort, timeout, parent, acl, app);
- }
-
- @Override
- public ZooKeeper getNewZooKeeper() {
- return TestActiveStandbyElectorRealZK.zkClient[
- TestActiveStandbyElectorRealZK.currentClientIndex];
- }
- }
-
/**
* The class object runs on a thread and waits for a signal to start from the
* test object. On getting the signal it joins the election and thus by doing
@@ -71,71 +72,48 @@ public class TestActiveStandbyElectorRea
* an unexpected fatal error. this lets another thread object to become a
* leader.
*/
- class ThreadRunner implements Runnable, ActiveStandbyElectorCallback {
+ class ThreadRunner extends TestingThread
+ implements ActiveStandbyElectorCallback {
int index;
- TestActiveStandbyElectorRealZK test;
- boolean wait = true;
+
+ CountDownLatch hasBecomeActive = new CountDownLatch(1);
- ThreadRunner(int i, TestActiveStandbyElectorRealZK s) {
- index = i;
- test = s;
+ ThreadRunner(TestContext ctx,
+ int idx) {
+ super(ctx);
+ index = idx;
}
@Override
- public void run() {
+ public void doWork() throws Exception {
LOG.info("starting " + index);
- while(true) {
- synchronized (test) {
- // wait for test start signal to come
- if (!test.start) {
- try {
- test.wait();
- } catch(InterruptedException e) {
- Assert.fail(e.getMessage());
- }
- } else {
- break;
- }
- }
- }
// join election
- byte[] data = new byte[8];
- ActiveStandbyElector elector = test.elector[index];
+ byte[] data = new byte[1];
+ data[0] = (byte)index;
+
+ ActiveStandbyElector elector = electors[index];
LOG.info("joining " + index);
elector.joinElection(data);
- try {
- while(true) {
- synchronized (this) {
- // wait for elector to become active/fatal error
- if (wait) {
- // wait to become active
- // wait capped at 30s to prevent hung test
- wait(30000);
- } else {
- break;
- }
- }
- }
- Thread.sleep(1000);
- // quit election to allow other elector to become active
- elector.quitElection();
- } catch(InterruptedException e) {
- Assert.fail(e.getMessage());
- }
+
+ hasBecomeActive.await(30, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ // quit election to allow other elector to become active
+ elector.quitElection(true);
+
LOG.info("ending " + index);
}
@Override
public synchronized void becomeActive() {
- test.reportActive(index);
+ reportActive(index);
LOG.info("active " + index);
- wait = false;
- notifyAll();
+ hasBecomeActive.countDown();
}
@Override
public synchronized void becomeStandby() {
- test.reportStandby(index);
+ reportStandby(index);
LOG.info("standby " + index);
}
@@ -147,19 +125,16 @@ public class TestActiveStandbyElectorRea
@Override
public synchronized void notifyFatalError(String errorMessage) {
LOG.info("fatal " + index + " .Error message:" + errorMessage);
- wait = false;
- notifyAll();
+ this.interrupt();
}
- }
-
- boolean start = false;
- int activeIndex = -1;
- int standbyIndex = -1;
- String parentDir = "/" + java.util.UUID.randomUUID().toString();
- ActiveStandbyElector[] elector = new ActiveStandbyElector[NUM_ELECTORS];
- ThreadRunner[] threadRunner = new ThreadRunner[NUM_ELECTORS];
- Thread[] thread = new Thread[NUM_ELECTORS];
+ @Override
+ public void fenceOldActive(byte[] data) {
+ LOG.info("fenceOldActive " + index);
+ // should not fence itself
+ Assert.assertTrue(index != data[0]);
+ }
+ }
synchronized void reportActive(int index) {
if (activeIndex == -1) {
@@ -187,45 +162,37 @@ public class TestActiveStandbyElectorRea
* the standby now becomes active. these electors run on different threads and
* callback to the test class to report active and standby where the outcome
* is verified
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws KeeperException
+ * @throws Exception
*/
@Test
- public void testActiveStandbyTransition() throws IOException,
- InterruptedException, KeeperException {
- LOG.info("starting test with parentDir:" + parentDir);
- start = false;
- byte[] data = new byte[8];
- // create random working directory
- createClient().create(parentDir, data, Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
-
- for(currentClientIndex = 0;
- currentClientIndex < NUM_ELECTORS;
- ++currentClientIndex) {
- LOG.info("creating " + currentClientIndex);
- zkClient[currentClientIndex] = createClient();
- threadRunner[currentClientIndex] = new ThreadRunner(currentClientIndex,
- this);
- elector[currentClientIndex] = new ActiveStandbyElectorTesterRealZK(
- "hostPort", 1000, parentDir, Ids.OPEN_ACL_UNSAFE,
- threadRunner[currentClientIndex]);
- zkClient[currentClientIndex].register(elector[currentClientIndex]);
- thread[currentClientIndex] = new Thread(threadRunner[currentClientIndex]);
- thread[currentClientIndex].start();
- }
-
- synchronized (this) {
- // signal threads to start
- LOG.info("signaling threads");
- start = true;
- notifyAll();
- }
+ public void testActiveStandbyTransition() throws Exception {
+ LOG.info("starting test with parentDir:" + PARENT_DIR);
- for(int i = 0; i < thread.length; i++) {
- thread[i].join();
+ TestContext ctx = new TestContext();
+
+ for(int i = 0; i < NUM_ELECTORS; i++) {
+ LOG.info("creating " + i);
+ final ZooKeeper zk = createClient();
+ assert zk != null;
+
+ ThreadRunner tr = new ThreadRunner(ctx, i);
+ electors[i] = new ActiveStandbyElector(
+ "hostPort", 1000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE,
+ tr) {
+ @Override
+ protected synchronized ZooKeeper getNewZooKeeper()
+ throws IOException {
+ return zk;
+ }
+ };
+ ctx.addThread(tr);
}
+
+ assertFalse(electors[0].parentZNodeExists());
+ electors[0].ensureParentZNode();
+ assertTrue(electors[0].parentZNodeExists());
+
+ ctx.startThreads();
+ ctx.stop();
}
}