You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/27 01:37:34 UTC
svn commit: r1305673 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ha/ src/main/java/org/apache/hadoop/util/
src/test/java/org/apache/hadoop/ha/
Author: todd
Date: Mon Mar 26 23:37:33 2012
New Revision: 1305673
URL: http://svn.apache.org/viewvc?rev=1305673&view=rev
Log:
HADOOP-8206. Common portion of a ZK-based failover controller. Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Mon Mar 26 23:37:33 2012
@@ -138,6 +138,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-7030. Add TableMapping topology implementation to read host to rack
mapping from a file. (Patrick Angeles and tomwhite via tomwhite)
+ HADOOP-8206. Common portion of a ZK-based failover controller (todd)
+
IMPROVEMENTS
HADOOP-7524. Change RPC to allow multiple protocols including multuple
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Mon Mar 26 23:37:33 2012
@@ -35,6 +35,7 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.AsyncCallback.*;
@@ -135,11 +136,11 @@ public class ActiveStandbyElector implem
private static final int NUM_RETRIES = 3;
- private enum ConnectionState {
+ private static enum ConnectionState {
DISCONNECTED, CONNECTED, TERMINATED
};
- private enum State {
+ static enum State {
INIT, ACTIVE, STANDBY, NEUTRAL
};
@@ -282,6 +283,32 @@ public class ActiveStandbyElector implem
LOG.info("Successfully created " + znodeWorkingDir + " in ZK.");
}
+
+ /**
+ * Clear all of the state held within the parent ZNode.
+ * This recursively deletes everything within the znode as well as the
+ * parent znode itself. It should only be used when it's certain that
+ * no electors are currently participating in the election.
+ */
+ public synchronized void clearParentZNode()
+ throws IOException, InterruptedException {
+ try {
+ LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
+
+ zkDoWithRetries(new ZKAction<Void>() {
+ @Override
+ public Void run() throws KeeperException, InterruptedException {
+ ZKUtil.deleteRecursive(zkClient, znodeWorkingDir);
+ return null;
+ }
+ });
+ } catch (KeeperException e) {
+ throw new IOException("Couldn't clear parent znode " + znodeWorkingDir,
+ e);
+ }
+ LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK.");
+ }
+
/**
* Any service instance can drop out of the election by calling quitElection.
@@ -592,6 +619,11 @@ public class ActiveStandbyElector implem
long getZKSessionIdForTests() {
return zkClient.getSessionId();
}
+
+ @VisibleForTesting
+ synchronized State getStateForTests() {
+ return state;
+ }
private boolean reEstablishSession() {
int connectionRetryCount = 0;
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java Mon Mar 26 23:37:33 2012
@@ -24,7 +24,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ipc.RPC;
import com.google.common.base.Preconditions;
@@ -40,6 +42,8 @@ public class FailoverController {
private static final Log LOG = LogFactory.getLog(FailoverController.class);
+ private static final int GRACEFUL_FENCE_TIMEOUT = 5000;
+
/**
* Perform pre-failover checks on the given service we plan to
* failover to, eg to prevent failing over to a service (eg due
@@ -96,7 +100,35 @@ public class FailoverController {
"Got an IO exception", e);
}
}
-
+
+
+ /**
+ * Try to get the HA state of the node at the given address. This
+ * function is guaranteed to be "quick" -- ie it has a short timeout
+ * and no retries. Its only purpose is to avoid fencing a node that
+ * has already restarted.
+ */
+ static boolean tryGracefulFence(Configuration conf,
+ HAServiceTarget svc) {
+ HAServiceProtocol proxy = null;
+ try {
+ proxy = svc.getProxy(conf, GRACEFUL_FENCE_TIMEOUT);
+ proxy.transitionToStandby();
+ return true;
+ } catch (ServiceFailedException sfe) {
+ LOG.warn("Unable to gracefully make " + svc + " standby (" +
+ sfe.getMessage() + ")");
+ } catch (IOException ioe) {
+ LOG.warn("Unable to gracefully make " + svc +
+ " standby (unable to connect)", ioe);
+ } finally {
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ }
+ return false;
+ }
+
/**
* Failover from service 1 to service 2. If the failover fails
* then try to failback.
@@ -118,16 +150,9 @@ public class FailoverController {
// Try to make fromSvc standby
boolean tryFence = true;
- try {
- HAServiceProtocolHelper.transitionToStandby(fromSvc.getProxy());
- // We should try to fence if we failed or it was forced
- tryFence = forceFence ? true : false;
- } catch (ServiceFailedException sfe) {
- LOG.warn("Unable to make " + fromSvc + " standby (" +
- sfe.getMessage() + ")");
- } catch (IOException ioe) {
- LOG.warn("Unable to make " + fromSvc +
- " standby (unable to connect)", ioe);
+
+ if (tryGracefulFence(new Configuration(), fromSvc)) {
+ tryFence = forceFence;
}
// Fence fromSvc if it's required or forced by the user
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Mon Mar 26 23:37:33 2012
@@ -188,7 +188,8 @@ class HealthMonitor {
proxy.monitorHealth();
healthy = true;
} catch (HealthCheckFailedException e) {
- LOG.warn("Service health check failed: " + e.getMessage());
+ LOG.warn("Service health check failed for " + targetToMonitor
+ + ": " + e.getMessage());
enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " +
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1305673&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Mon Mar 26 23:37:33 2012
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+@InterfaceAudience.LimitedPrivate("HDFS")
+public abstract class ZKFailoverController implements Tool {
+
+ static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
+
+ // TODO: this should be namespace-scoped
+ public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
+ private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
+ private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
+ private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
+ static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
+
+ /** Unable to format the parent znode in ZK */
+ static final int ERR_CODE_FORMAT_DENIED = 2;
+ /** The parent znode doesn't exist in ZK */
+ static final int ERR_CODE_NO_PARENT_ZNODE = 3;
+ /** Fencing is not properly configured */
+ static final int ERR_CODE_NO_FENCER = 4;
+
+ private Configuration conf;
+
+ private HealthMonitor healthMonitor;
+ private ActiveStandbyElector elector;
+
+ private HAServiceTarget localTarget;
+
+ private String parentZnode;
+
+ private State lastHealthState = State.INITIALIZING;
+
+ /** Set if a fatal error occurs */
+ private String fatalError = null;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ localTarget = getLocalTarget();
+ }
+
+
+ protected abstract byte[] targetToData(HAServiceTarget target);
+ protected abstract HAServiceTarget getLocalTarget();
+ protected abstract HAServiceTarget dataToTarget(byte[] data);
+
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public int run(final String[] args) throws Exception {
+ // TODO: need to hook DFS here to find the NN keytab info, etc,
+ // similar to what DFSHAAdmin does. Annoying that this is in common.
+ try {
+ return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
+ @Override
+ public Integer run() {
+ try {
+ return doRun(args);
+ } catch (Exception t) {
+ throw new RuntimeException(t);
+ }
+ }
+ });
+ } catch (RuntimeException rte) {
+ throw (Exception)rte.getCause();
+ }
+ }
+
+ private int doRun(String[] args)
+ throws HadoopIllegalArgumentException, IOException, InterruptedException {
+ initZK();
+ if (args.length > 0) {
+ if ("-formatZK".equals(args[0])) {
+ boolean force = false;
+ boolean interactive = true;
+ for (int i = 1; i < args.length; i++) {
+ if ("-force".equals(args[i])) {
+ force = true;
+ } else if ("-nonInteractive".equals(args[i])) {
+ interactive = false;
+ } else {
+ badArg(args[i]);
+ }
+ }
+ return formatZK(force, interactive);
+ } else {
+ badArg(args[0]);
+ }
+ }
+
+ if (!elector.parentZNodeExists()) {
+ LOG.fatal("Unable to start failover controller. " +
+ "Parent znode does not exist.\n" +
+ "Run with -formatZK flag to initialize ZooKeeper.");
+ return ERR_CODE_NO_PARENT_ZNODE;
+ }
+
+ try {
+ localTarget.checkFencingConfigured();
+ } catch (BadFencingConfigurationException e) {
+ LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
+ "You must configure a fencing method before using automatic " +
+ "failover.", e);
+ return ERR_CODE_NO_FENCER;
+ }
+
+ initHM();
+ mainLoop();
+ return 0;
+ }
+
+ private void badArg(String arg) {
+ printUsage();
+ throw new HadoopIllegalArgumentException(
+ "Bad argument: " + arg);
+ }
+
+ private void printUsage() {
+ System.err.println("Usage: " + this.getClass().getSimpleName() +
+ " [-formatZK [-force | -nonInteractive]]");
+ }
+
+ private int formatZK(boolean force, boolean interactive)
+ throws IOException, InterruptedException {
+ if (elector.parentZNodeExists()) {
+ if (!force && (!interactive || !confirmFormat())) {
+ return ERR_CODE_FORMAT_DENIED;
+ }
+
+ try {
+ elector.clearParentZNode();
+ } catch (IOException e) {
+ LOG.error("Unable to clear zk parent znode", e);
+ return 1;
+ }
+ }
+
+ elector.ensureParentZNode();
+ return 0;
+ }
+
+ private boolean confirmFormat() {
+ System.err.println(
+ "===============================================\n" +
+ "The configured parent znode " + parentZnode + " already exists.\n" +
+ "Are you sure you want to clear all failover information from\n" +
+ "ZooKeeper?\n" +
+ "WARNING: Before proceeding, ensure that all HDFS services and\n" +
+ "failover controllers are stopped!\n" +
+ "===============================================");
+ try {
+ return ToolRunner.confirmPrompt("Proceed formatting " + parentZnode + "?");
+ } catch (IOException e) {
+ LOG.debug("Failed to confirm", e);
+ return false;
+ }
+ }
+
+ // ------------------------------------------
+ // Begin actual guts of failover controller
+ // ------------------------------------------
+
+ private void initHM() {
+ healthMonitor = new HealthMonitor(conf, localTarget);
+ healthMonitor.addCallback(new HealthCallbacks());
+ healthMonitor.start();
+ }
+
+ private void initZK() throws HadoopIllegalArgumentException, IOException {
+ String zkQuorum = conf.get(ZK_QUORUM_KEY);
+ int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
+ ZK_SESSION_TIMEOUT_DEFAULT);
+ parentZnode = conf.get(ZK_PARENT_ZNODE_KEY,
+ ZK_PARENT_ZNODE_DEFAULT);
+ // TODO: need ZK ACL support in config, also maybe auth!
+ List<ACL> zkAcls = Ids.OPEN_ACL_UNSAFE;
+
+ Preconditions.checkArgument(zkQuorum != null,
+ "Missing required configuration '%s' for ZooKeeper quorum",
+ ZK_QUORUM_KEY);
+ Preconditions.checkArgument(zkTimeout > 0,
+ "Invalid ZK session timeout %s", zkTimeout);
+
+
+ elector = new ActiveStandbyElector(zkQuorum,
+ zkTimeout, parentZnode, zkAcls, new ElectorCallbacks());
+ }
+
+ private synchronized void mainLoop() throws InterruptedException {
+ while (fatalError == null) {
+ wait();
+ }
+ assert fatalError != null; // only get here on fatal
+ throw new RuntimeException(
+ "ZK Failover Controller failed: " + fatalError);
+ }
+
+ private synchronized void fatalError(String err) {
+ LOG.fatal("Fatal error occurred:" + err);
+ fatalError = err;
+ notifyAll();
+ }
+
+ private synchronized void becomeActive() {
+ LOG.info("Trying to make " + localTarget + " active...");
+ try {
+ localTarget.getProxy().transitionToActive();
+ LOG.info("Successfully transitioned " + localTarget +
+ " to active state");
+ } catch (Throwable t) {
+ LOG.fatal("Couldn't make " + localTarget + " active", t);
+ elector.quitElection(true);
+/*
+* TODO:
+* we need to make sure that if we get fenced and then quickly restarted,
+* none of these calls will retry across the restart boundary
+* perhaps the solution is that, whenever the nn starts, it gets a unique
+* ID, and when we start becoming active, we record it, and then any future
+* calls use the same ID
+*/
+
+ }
+ }
+
+ private synchronized void becomeStandby() {
+ LOG.info("ZK Election indicated that " + localTarget +
+ " should become standby");
+ try {
+ localTarget.getProxy().transitionToStandby();
+ LOG.info("Successfully transitioned " + localTarget +
+ " to standby state");
+ } catch (Exception e) {
+ LOG.error("Couldn't transition " + localTarget + " to standby state",
+ e);
+ // TODO handle this. It's a likely case since we probably got fenced
+ // at the same time.
+ }
+ }
+
+ /**
+ * @return the last health state passed to the FC
+ * by the HealthMonitor.
+ */
+ @VisibleForTesting
+ State getLastHealthState() {
+ return lastHealthState;
+ }
+
+ @VisibleForTesting
+ ActiveStandbyElector getElectorForTests() {
+ return elector;
+ }
+
+ /**
+ * Callbacks from elector
+ */
+ class ElectorCallbacks implements ActiveStandbyElectorCallback {
+ @Override
+ public void becomeActive() {
+ ZKFailoverController.this.becomeActive();
+ }
+
+ @Override
+ public void becomeStandby() {
+ ZKFailoverController.this.becomeStandby();
+ }
+
+ @Override
+ public void enterNeutralMode() {
+ }
+
+ @Override
+ public void notifyFatalError(String errorMessage) {
+ fatalError(errorMessage);
+ }
+
+ @Override
+ public void fenceOldActive(byte[] data) {
+ HAServiceTarget target = dataToTarget(data);
+
+ LOG.info("Should fence: " + target);
+ boolean gracefulWorked =
+ FailoverController.tryGracefulFence(conf, target);
+ if (gracefulWorked) {
+ // It's possible that it's in standby but just about to go into active,
+ // no? Is there some race here?
+ LOG.info("Successfully transitioned " + target + " to standby " +
+ "state without fencing");
+ return;
+ }
+
+ try {
+ target.checkFencingConfigured();
+ } catch (BadFencingConfigurationException e) {
+ LOG.error("Couldn't fence old active " + target, e);
+ // TODO: see below todo
+ throw new RuntimeException(e);
+ }
+
+ if (!target.getFencer().fence(target)) {
+ // TODO: this will end up in some kind of tight loop,
+ // won't it? We need some kind of backoff
+ throw new RuntimeException("Unable to fence " + target);
+ }
+ }
+ }
+
+ /**
+ * Callbacks from HealthMonitor
+ */
+ class HealthCallbacks implements HealthMonitor.Callback {
+ @Override
+ public void enteredState(HealthMonitor.State newState) {
+ LOG.info("Local service " + localTarget +
+ " entered state: " + newState);
+ switch (newState) {
+ case SERVICE_HEALTHY:
+ LOG.info("Joining master election for " + localTarget);
+ elector.joinElection(targetToData(localTarget));
+ break;
+
+ case INITIALIZING:
+ LOG.info("Ensuring that " + localTarget + " does not " +
+ "participate in active master election");
+ elector.quitElection(false);
+ break;
+
+ case SERVICE_UNHEALTHY:
+ case SERVICE_NOT_RESPONDING:
+ LOG.info("Quitting master election for " + localTarget +
+ " and marking that fencing is necessary");
+ elector.quitElection(true);
+ break;
+
+ case HEALTH_MONITOR_FAILED:
+ fatalError("Health monitor failed!");
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unhandled state:" + newState);
+ }
+
+ lastHealthState = newState;
+ }
+ }
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java Mon Mar 26 23:37:33 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.util;
+import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -92,4 +93,33 @@ public class ToolRunner {
GenericOptionsParser.printGenericCommandUsage(out);
}
+
+ /**
+ * Print out a prompt to the user, and return true if the user
+ * responds with "y" or "yes". (case insensitive)
+ */
+ public static boolean confirmPrompt(String prompt) throws IOException {
+ while (true) {
+ System.err.print(prompt + " (Y or N) ");
+ StringBuilder responseBuilder = new StringBuilder();
+ while (true) {
+ int c = System.in.read();
+ if (c == -1 || c == '\r' || c == '\n') {
+ break;
+ }
+ responseBuilder.append((char)c);
+ }
+
+ String response = responseBuilder.toString();
+ if (response.equalsIgnoreCase("y") ||
+ response.equalsIgnoreCase("yes")) {
+ return true;
+ } else if (response.equalsIgnoreCase("n") ||
+ response.equalsIgnoreCase("no")) {
+ return false;
+ }
+ System.err.println("Invalid input: " + response);
+ // else ask them again
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java Mon Mar 26 23:37:33 2012
@@ -50,4 +50,15 @@ public abstract class ActiveStandbyElect
Thread.sleep(50);
}
}
+
+ public static void waitForElectorState(TestContext ctx,
+ ActiveStandbyElector elector,
+ ActiveStandbyElector.State state) throws Exception {
+ while (elector.getStateForTests() != state) {
+ if (ctx != null) {
+ ctx.checkException();
+ }
+ Thread.sleep(50);
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java Mon Mar 26 23:37:33 2012
@@ -19,12 +19,15 @@ package org.apache.hadoop.ha;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.security.AccessControlException;
import org.mockito.Mockito;
+import com.google.common.collect.Lists;
+
/**
* Test-only implementation of {@link HAServiceTarget}, which returns
* a mock implementation.
@@ -36,12 +39,20 @@ class DummyHAService extends HAServiceTa
InetSocketAddress address;
boolean isHealthy = true;
boolean actUnreachable = false;
+ boolean failToBecomeActive;
+
+ static ArrayList<DummyHAService> instances = Lists.newArrayList();
+ int index;
DummyHAService(HAServiceState state, InetSocketAddress address) {
this.state = state;
this.proxy = makeMock();
this.fencer = Mockito.mock(NodeFencer.class);
this.address = address;
+ synchronized (instances) {
+ instances.add(this);
+ this.index = instances.size();
+ }
}
private HAServiceProtocol makeMock() {
@@ -59,6 +70,10 @@ class DummyHAService extends HAServiceTa
public void transitionToActive() throws ServiceFailedException,
AccessControlException, IOException {
checkUnreachable();
+ if (failToBecomeActive) {
+ throw new ServiceFailedException("injected failure");
+ }
+
state = HAServiceState.ACTIVE;
}
@@ -97,7 +112,7 @@ class DummyHAService extends HAServiceTa
throws IOException {
return proxy;
}
-
+
@Override
public NodeFencer getFencer() {
return fencer;
@@ -106,4 +121,13 @@ class DummyHAService extends HAServiceTa
@Override
public void checkFencingConfigured() throws BadFencingConfigurationException {
}
-}
\ No newline at end of file
+
+ @Override
+ public String toString() {
+ return "DummyHAService #" + index;
+ }
+
+ public static HAServiceTarget getInstance(int serial) {
+ return instances.get(serial - 1);
+ }
+}
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java Mon Mar 26 23:37:33 2012
@@ -156,6 +156,10 @@ public class TestNodeFencer {
@Override
public void checkArgs(String args) {
}
+
+ public static HAServiceTarget getLastFencedService() {
+ return fencedSvc;
+ }
}
/**
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1305673&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Mon Mar 26 23:37:33 2012
@@ -0,0 +1,457 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.test.MultithreadedTestUtil;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.log4j.Level;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.primitives.Ints;
+
+public class TestZKFailoverController extends ClientBase {
+ private Configuration conf;
+ private DummyHAService svc1;
+ private DummyHAService svc2;
+ private TestContext ctx;
+ private DummyZKFCThread thr1, thr2;
+
+ static {
+ ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ // build.test.dir is used by zookeeper
+ new File(System.getProperty("build.test.dir", "build")).mkdirs();
+ super.setUp();
+ }
+
+ @Before
+ public void setupConfAndServices() {
+ conf = new Configuration();
+ conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
+ // Fast check interval so tests run faster
+ conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
+ conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
+ conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
+ svc1 = new DummyHAService(HAServiceState.INITIALIZING,
+ new InetSocketAddress("svc1", 1234));
+ svc2 = new DummyHAService(HAServiceState.INITIALIZING,
+ new InetSocketAddress("svc2", 1234));
+ }
+
+ /**
+ * Set up two services and their failover controllers. svc1 is started
+ * first, so that it enters ACTIVE state, and then svc2 is started,
+ * which enters STANDBY
+ */
+ private void setupFCs() throws Exception {
+ // Format the base dir, should succeed
+ assertEquals(0, runFC(svc1, "-formatZK"));
+
+ ctx = new MultithreadedTestUtil.TestContext();
+ thr1 = new DummyZKFCThread(ctx, svc1);
+ ctx.addThread(thr1);
+ thr1.start();
+
+ LOG.info("Waiting for svc1 to enter active state");
+ waitForHAState(svc1, HAServiceState.ACTIVE);
+
+ LOG.info("Adding svc2");
+ thr2 = new DummyZKFCThread(ctx, svc2);
+ thr2.start();
+ waitForHAState(svc2, HAServiceState.STANDBY);
+ }
+
+ private void stopFCs() throws Exception {
+ if (thr1 != null) {
+ thr1.interrupt();
+ }
+ if (thr2 != null) {
+ thr2.interrupt();
+ }
+ if (ctx != null) {
+ ctx.stop();
+ }
+ }
+
+ /**
+ * Test that the various command lines for formatting the ZK directory
+ * function correctly.
+ */
+ @Test(timeout=15000)
+ public void testFormatZK() throws Exception {
+ // Run without formatting the base dir,
+ // should barf
+ assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
+ runFC(svc1));
+
+ // Format the base dir, should succeed
+ assertEquals(0, runFC(svc1, "-formatZK"));
+
+ // Should fail to format if already formatted
+ assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
+ runFC(svc1, "-formatZK", "-nonInteractive"));
+
+ // Unless '-force' is on
+ assertEquals(0, runFC(svc1, "-formatZK", "-force"));
+ }
+
+ /**
+ * Test that the ZKFC won't run if fencing is not configured for the
+ * local service.
+ */
+ @Test(timeout=15000)
+ public void testFencingMustBeConfigured() throws Exception {
+ svc1 = Mockito.spy(svc1);
+ Mockito.doThrow(new BadFencingConfigurationException("no fencing"))
+ .when(svc1).checkFencingConfigured();
+ // Format the base dir, should succeed
+ assertEquals(0, runFC(svc1, "-formatZK"));
+ // Try to run the actual FC, should fail without a fencer
+ assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER,
+ runFC(svc1));
+ }
+
+ /**
+ * Test that, when the health monitor indicates bad health status,
+ * failover is triggered. Also ensures that graceful active->standby
+ * transition is used when possible, falling back to fencing when
+ * the graceful approach fails.
+ */
+ @Test(timeout=15000)
+ public void testAutoFailoverOnBadHealth() throws Exception {
+ try {
+ setupFCs();
+
+ LOG.info("Faking svc1 unhealthy, should failover to svc2");
+ svc1.isHealthy = false;
+ LOG.info("Waiting for svc1 to enter standby state");
+ waitForHAState(svc1, HAServiceState.STANDBY);
+ waitForHAState(svc2, HAServiceState.ACTIVE);
+
+ LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " +
+ "and fail to gracefully go to standby");
+ svc1.isHealthy = true;
+ svc2.actUnreachable = true;
+
+ // Allow fencing to succeed
+ Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2));
+ // Should fail back to svc1 at this point
+ waitForHAState(svc1, HAServiceState.ACTIVE);
+ // and fence svc2
+ Mockito.verify(svc2.fencer).fence(Mockito.same(svc2));
+ } finally {
+ stopFCs();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testAutoFailoverOnLostZKSession() throws Exception {
+ try {
+ setupFCs();
+
+ // Expire svc1, it should fail over to svc2
+ expireAndVerifyFailover(thr1, thr2);
+
+ // Expire svc2, it should fail back to svc1
+ expireAndVerifyFailover(thr2, thr1);
+
+ LOG.info("======= Running test cases second time to test " +
+ "re-establishment =========");
+ // Expire svc1, it should fail over to svc2
+ expireAndVerifyFailover(thr1, thr2);
+
+ // Expire svc2, it should fail back to svc1
+ expireAndVerifyFailover(thr2, thr1);
+ } finally {
+ stopFCs();
+ }
+ }
+
+ private void expireAndVerifyFailover(DummyZKFCThread fromThr,
+ DummyZKFCThread toThr) throws Exception {
+ DummyHAService fromSvc = fromThr.zkfc.localTarget;
+ DummyHAService toSvc = toThr.zkfc.localTarget;
+
+ fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
+ try {
+ expireActiveLockHolder(fromSvc);
+
+ waitForHAState(fromSvc, HAServiceState.STANDBY);
+ waitForHAState(toSvc, HAServiceState.ACTIVE);
+ } finally {
+ fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+ }
+ }
+
+ /**
+ * Test that, if the standby node is unhealthy, it doesn't try to become
+ * active
+ */
+ @Test(timeout=15000)
+ public void testDontFailoverToUnhealthyNode() throws Exception {
+ try {
+ setupFCs();
+
+ // Make svc2 unhealthy, and wait for its FC to notice the bad health.
+ svc2.isHealthy = false;
+ waitForHealthState(thr2.zkfc,
+ HealthMonitor.State.SERVICE_UNHEALTHY);
+
+ // Expire svc1
+ thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
+ try {
+ expireActiveLockHolder(svc1);
+
+ LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" +
+ " a chance to take the lock, if it is ever going to.");
+ Thread.sleep(1000);
+
+ // Ensure that no one holds the lock.
+ waitForActiveLockHolder(null);
+
+ } finally {
+ LOG.info("Allowing svc1's elector to re-establish its connection");
+ thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+ }
+ // svc1 should get the lock again
+ waitForActiveLockHolder(svc1);
+ } finally {
+ stopFCs();
+ }
+ }
+
+ /**
+ * Test that the ZKFC successfully quits the election when it fails to
+ * become active. This allows the old node to successfully fail back.
+ */
+ @Test(timeout=15000)
+ public void testBecomingActiveFails() throws Exception {
+ try {
+ setupFCs();
+
+ LOG.info("Making svc2 fail to become active");
+ svc2.failToBecomeActive = true;
+
+ LOG.info("Faking svc1 unhealthy, should NOT successfully " +
+ "failover to svc2");
+ svc1.isHealthy = false;
+ waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
+ waitForActiveLockHolder(null);
+
+ Mockito.verify(svc2.proxy).transitionToActive();
+
+ waitForHAState(svc1, HAServiceState.STANDBY);
+ waitForHAState(svc2, HAServiceState.STANDBY);
+
+ LOG.info("Faking svc1 healthy again, should go back to svc1");
+ svc1.isHealthy = true;
+ waitForHAState(svc1, HAServiceState.ACTIVE);
+ waitForHAState(svc2, HAServiceState.STANDBY);
+ waitForActiveLockHolder(svc1);
+ } finally {
+ stopFCs();
+ }
+ }
+
+ /**
+ * Test that, when ZooKeeper fails, the system remains in its
+ * current state, without triggering any failovers, and without
+ * causing the active node to enter standby state.
+ */
+ @Test(timeout=15000)
+ public void testZooKeeperFailure() throws Exception {
+ try {
+ setupFCs();
+
+ // Record initial ZK sessions
+ long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests();
+ long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests();
+
+ LOG.info("====== Stopping ZK server");
+ stopServer();
+ waitForServerDown(hostPort, CONNECTION_TIMEOUT);
+
+ LOG.info("====== Waiting for services to enter NEUTRAL mode");
+ ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+ thr1.zkfc.getElectorForTests(),
+ ActiveStandbyElector.State.NEUTRAL);
+ ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+ thr2.zkfc.getElectorForTests(),
+ ActiveStandbyElector.State.NEUTRAL);
+
+ LOG.info("====== Checking that the services didn't change HA state");
+ assertEquals(HAServiceState.ACTIVE, svc1.state);
+ assertEquals(HAServiceState.STANDBY, svc2.state);
+
+ LOG.info("====== Restarting server");
+ startServer();
+ waitForServerUp(hostPort, CONNECTION_TIMEOUT);
+
+ // Nodes should go back to their original states, since they re-obtain
+ // the same sessions.
+ ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+ thr1.zkfc.getElectorForTests(),
+ ActiveStandbyElector.State.ACTIVE);
+ ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+ thr2.zkfc.getElectorForTests(),
+ ActiveStandbyElector.State.STANDBY);
+ // Check HA states didn't change.
+ ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+ thr1.zkfc.getElectorForTests(),
+ ActiveStandbyElector.State.ACTIVE);
+ ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+ thr2.zkfc.getElectorForTests(),
+ ActiveStandbyElector.State.STANDBY);
+ // Check they re-used the same sessions and didn't spuriously reconnect
+ assertEquals(session1,
+ thr1.zkfc.getElectorForTests().getZKSessionIdForTests());
+ assertEquals(session2,
+ thr2.zkfc.getElectorForTests().getZKSessionIdForTests());
+ } finally {
+ stopFCs();
+ }
+ }
+
+ /**
+ * Expire the ZK session of the given service. This requires
+ * (and asserts) that the given service be the current active.
+ * @throws NoNodeException if no service holds the lock
+ */
+ private void expireActiveLockHolder(DummyHAService expectedActive)
+ throws NoNodeException {
+ ZooKeeperServer zks = getServer(serverFactory);
+ Stat stat = new Stat();
+ byte[] data = zks.getZKDatabase().getData(
+ ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
+ ActiveStandbyElector.LOCK_FILENAME, stat, null);
+
+ assertArrayEquals(Ints.toByteArray(expectedActive.index), data);
+ long session = stat.getEphemeralOwner();
+ LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session);
+ zks.closeSession(session);
+ }
+
+ /**
+ * Wait for the given HA service to enter the given HA state.
+ */
+ private void waitForHAState(DummyHAService svc, HAServiceState state)
+ throws Exception {
+ while (svc.state != state) {
+ ctx.checkException();
+ Thread.sleep(50);
+ }
+ }
+
+ /**
+ * Wait for the ZKFC to be notified of a change in health state.
+ */
+ private void waitForHealthState(DummyZKFC zkfc, State state)
+ throws Exception {
+ while (zkfc.getLastHealthState() != state) {
+ ctx.checkException();
+ Thread.sleep(50);
+ }
+ }
+
+ /**
+ * Wait for the given HA service to become the active lock holder.
+ * If the passed svc is null, waits for there to be no active
+ * lock holder.
+ */
+ private void waitForActiveLockHolder(DummyHAService svc)
+ throws Exception {
+ ZooKeeperServer zks = getServer(serverFactory);
+ ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
+ ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
+ (svc == null) ? null : Ints.toByteArray(svc.index));
+ }
+
+
+ private int runFC(DummyHAService target, String ... args) throws Exception {
+ DummyZKFC zkfc = new DummyZKFC(target);
+ zkfc.setConf(conf);
+ return zkfc.run(args);
+ }
+
+ /**
+ * Test-thread which runs a ZK Failover Controller corresponding
+ * to a given dummy service.
+ */
+ private class DummyZKFCThread extends TestingThread {
+ private final DummyZKFC zkfc;
+
+ public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
+ super(ctx);
+ this.zkfc = new DummyZKFC(svc);
+ zkfc.setConf(conf);
+ }
+
+ @Override
+ public void doWork() throws Exception {
+ try {
+ assertEquals(0, zkfc.run(new String[0]));
+ } catch (InterruptedException ie) {
+ // Interrupted by main thread, that's OK.
+ }
+ }
+ }
+
+ private static class DummyZKFC extends ZKFailoverController {
+ private final DummyHAService localTarget;
+
+ public DummyZKFC(DummyHAService localTarget) {
+ this.localTarget = localTarget;
+ }
+
+ @Override
+ protected byte[] targetToData(HAServiceTarget target) {
+ return Ints.toByteArray(((DummyHAService)target).index);
+ }
+
+ @Override
+ protected HAServiceTarget dataToTarget(byte[] data) {
+ int index = Ints.fromByteArray(data);
+ return DummyHAService.getInstance(index);
+ }
+
+ @Override
+ protected HAServiceTarget getLocalTarget() {
+ return localTarget;
+ }
+ }
+}