You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/03/27 01:37:34 UTC

svn commit: r1305673 - in /hadoop/common/trunk/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ha/ src/main/java/org/apache/hadoop/util/ src/test/java/org/apache/hadoop/ha/

Author: todd
Date: Mon Mar 26 23:37:33 2012
New Revision: 1305673

URL: http://svn.apache.org/viewvc?rev=1305673&view=rev
Log:
HADOOP-8206. Common portion of a ZK-based failover controller. Contributed by Todd Lipcon.

Added:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
    hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Mon Mar 26 23:37:33 2012
@@ -138,6 +138,8 @@ Release 0.23.3 - UNRELEASED 
     HADOOP-7030. Add TableMapping topology implementation to read host to rack
     mapping from a file. (Patrick Angeles and tomwhite via tomwhite)
 
+    HADOOP-8206. Common portion of a ZK-based failover controller (todd)
+
   IMPROVEMENTS
 
     HADOOP-7524. Change RPC to allow multiple protocols including multuple

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Mon Mar 26 23:37:33 2012
@@ -35,6 +35,7 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.AsyncCallback.*;
@@ -135,11 +136,11 @@ public class ActiveStandbyElector implem
 
   private static final int NUM_RETRIES = 3;
 
-  private enum ConnectionState {
+  private static enum ConnectionState {
     DISCONNECTED, CONNECTED, TERMINATED
   };
 
-  private enum State {
+  static enum State {
     INIT, ACTIVE, STANDBY, NEUTRAL
   };
 
@@ -282,6 +283,32 @@ public class ActiveStandbyElector implem
     
     LOG.info("Successfully created " + znodeWorkingDir + " in ZK.");
   }
+  
+  /**
+   * Clear all of the state held within the parent ZNode.
+   * This recursively deletes everything within the znode as well as the
+   * parent znode itself. It should only be used when it's certain that
+   * no electors are currently participating in the election.
+   */
+  public synchronized void clearParentZNode()
+      throws IOException, InterruptedException {
+    try {
+      LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
+
+      zkDoWithRetries(new ZKAction<Void>() {
+        @Override
+        public Void run() throws KeeperException, InterruptedException {
+          ZKUtil.deleteRecursive(zkClient, znodeWorkingDir);
+          return null;
+        }
+      });
+    } catch (KeeperException e) {
+      throw new IOException("Couldn't clear parent znode " + znodeWorkingDir,
+          e);
+    }
+    LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK.");
+  }
+
 
   /**
    * Any service instance can drop out of the election by calling quitElection. 
@@ -592,6 +619,11 @@ public class ActiveStandbyElector implem
   long getZKSessionIdForTests() {
     return zkClient.getSessionId();
   }
+  
+  @VisibleForTesting
+  synchronized State getStateForTests() {
+    return state;
+  }
 
   private boolean reEstablishSession() {
     int connectionRetryCount = 0;

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java Mon Mar 26 23:37:33 2012
@@ -24,7 +24,9 @@ import org.apache.commons.logging.LogFac
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ipc.RPC;
 
 import com.google.common.base.Preconditions;
 
@@ -40,6 +42,8 @@ public class FailoverController {
 
   private static final Log LOG = LogFactory.getLog(FailoverController.class);
 
+  private static final int GRACEFUL_FENCE_TIMEOUT = 5000;
+
   /**
    * Perform pre-failover checks on the given service we plan to
    * failover to, eg to prevent failing over to a service (eg due
@@ -96,7 +100,35 @@ public class FailoverController {
           "Got an IO exception", e);
     }
   }
-
+  
+  
+  /**
+   * Try to get the HA state of the node at the given address. This
+   * function is guaranteed to be "quick" -- ie it has a short timeout
+   * and no retries. Its only purpose is to avoid fencing a node that
+   * has already restarted.
+   */
+  static boolean tryGracefulFence(Configuration conf,
+      HAServiceTarget svc) {
+    HAServiceProtocol proxy = null;
+    try {
+      proxy = svc.getProxy(conf, GRACEFUL_FENCE_TIMEOUT);
+      proxy.transitionToStandby();
+      return true;
+    } catch (ServiceFailedException sfe) {
+      LOG.warn("Unable to gracefully make " + svc + " standby (" +
+          sfe.getMessage() + ")");
+    } catch (IOException ioe) {
+      LOG.warn("Unable to gracefully make " + svc +
+          " standby (unable to connect)", ioe);
+    } finally {
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+    }
+    return false;
+  }
+  
   /**
    * Failover from service 1 to service 2. If the failover fails
    * then try to failback.
@@ -118,16 +150,9 @@ public class FailoverController {
 
     // Try to make fromSvc standby
     boolean tryFence = true;
-    try {
-      HAServiceProtocolHelper.transitionToStandby(fromSvc.getProxy());
-      // We should try to fence if we failed or it was forced
-      tryFence = forceFence ? true : false;
-    } catch (ServiceFailedException sfe) {
-      LOG.warn("Unable to make " + fromSvc + " standby (" +
-          sfe.getMessage() + ")");
-    } catch (IOException ioe) {
-      LOG.warn("Unable to make " + fromSvc +
-          " standby (unable to connect)", ioe);
+    
+    if (tryGracefulFence(new Configuration(), fromSvc)) {
+      tryFence = forceFence;
     }
 
     // Fence fromSvc if it's required or forced by the user

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java Mon Mar 26 23:37:33 2012
@@ -188,7 +188,8 @@ class HealthMonitor {
         proxy.monitorHealth();
         healthy = true;
       } catch (HealthCheckFailedException e) {
-        LOG.warn("Service health check failed: " + e.getMessage());
+        LOG.warn("Service health check failed for " + targetToMonitor
+            + ": " + e.getMessage());
         enterState(State.SERVICE_UNHEALTHY);
       } catch (Throwable t) {
         LOG.warn("Transport-level exception trying to monitor health of " +

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1305673&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Mon Mar 26 23:37:33 2012
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.ACL;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+@InterfaceAudience.LimitedPrivate("HDFS")
+public abstract class ZKFailoverController implements Tool {
+
+  static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
+  
+  // TODO: this should be namespace-scoped
+  public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
+  private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
+  private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
+  private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
+  static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
+
+  /** Unable to format the parent znode in ZK */
+  static final int ERR_CODE_FORMAT_DENIED = 2;
+  /** The parent znode doesn't exist in ZK */
+  static final int ERR_CODE_NO_PARENT_ZNODE = 3;
+  /** Fencing is not properly configured */
+  static final int ERR_CODE_NO_FENCER = 4;
+  
+  private Configuration conf;
+
+  private HealthMonitor healthMonitor;
+  private ActiveStandbyElector elector;
+
+  private HAServiceTarget localTarget;
+
+  private String parentZnode;
+
+  private State lastHealthState = State.INITIALIZING;
+
+  /** Set if a fatal error occurs */
+  private String fatalError = null;
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    localTarget = getLocalTarget();
+  }
+  
+
+  protected abstract byte[] targetToData(HAServiceTarget target);
+  protected abstract HAServiceTarget getLocalTarget();  
+  protected abstract HAServiceTarget dataToTarget(byte[] data);
+
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public int run(final String[] args) throws Exception {
+    // TODO: need to hook DFS here to find the NN keytab info, etc,
+    // similar to what DFSHAAdmin does. Annoying that this is in common.
+    try {
+      return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
+        @Override
+        public Integer run() {
+          try {
+            return doRun(args);
+          } catch (Exception t) {
+            throw new RuntimeException(t);
+          }
+        }
+      });
+    } catch (RuntimeException rte) {
+      throw (Exception)rte.getCause();
+    }
+  }
+  
+  private int doRun(String[] args)
+      throws HadoopIllegalArgumentException, IOException, InterruptedException {
+    initZK();
+    if (args.length > 0) {
+      if ("-formatZK".equals(args[0])) {
+        boolean force = false;
+        boolean interactive = true;
+        for (int i = 1; i < args.length; i++) {
+          if ("-force".equals(args[i])) {
+            force = true;
+          } else if ("-nonInteractive".equals(args[i])) {
+            interactive = false;
+          } else {
+            badArg(args[i]);
+          }
+        }
+        return formatZK(force, interactive);
+      } else {
+        badArg(args[0]);
+      }
+    }
+    
+    if (!elector.parentZNodeExists()) {
+      LOG.fatal("Unable to start failover controller. " +
+          "Parent znode does not exist.\n" +
+          "Run with -formatZK flag to initialize ZooKeeper.");
+      return ERR_CODE_NO_PARENT_ZNODE;
+    }
+
+    try {
+      localTarget.checkFencingConfigured();
+    } catch (BadFencingConfigurationException e) {
+      LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
+          "You must configure a fencing method before using automatic " +
+          "failover.", e);
+      return ERR_CODE_NO_FENCER;
+    }
+
+    initHM();
+    mainLoop();
+    return 0;
+  }
+
+  private void badArg(String arg) {
+    printUsage();
+    throw new HadoopIllegalArgumentException(
+        "Bad argument: " + arg);
+  }
+
+  private void printUsage() {
+    System.err.println("Usage: " + this.getClass().getSimpleName() +
+        " [-formatZK [-force | -nonInteractive]]");
+  }
+
+  private int formatZK(boolean force, boolean interactive)
+      throws IOException, InterruptedException {
+    if (elector.parentZNodeExists()) {
+      if (!force && (!interactive || !confirmFormat())) {
+        return ERR_CODE_FORMAT_DENIED;
+      }
+      
+      try {
+        elector.clearParentZNode();
+      } catch (IOException e) {
+        LOG.error("Unable to clear zk parent znode", e);
+        return 1;
+      }
+    }
+    
+    elector.ensureParentZNode();
+    return 0;
+  }
+
+  private boolean confirmFormat() {
+    System.err.println(
+        "===============================================\n" +
+        "The configured parent znode " + parentZnode + " already exists.\n" +
+        "Are you sure you want to clear all failover information from\n" +
+        "ZooKeeper?\n" +
+        "WARNING: Before proceeding, ensure that all HDFS services and\n" +
+        "failover controllers are stopped!\n" +
+        "===============================================");
+    try {
+      return ToolRunner.confirmPrompt("Proceed formatting " + parentZnode + "?");
+    } catch (IOException e) {
+      LOG.debug("Failed to confirm", e);
+      return false;
+    }
+  }
+
+  // ------------------------------------------
+  // Begin actual guts of failover controller
+  // ------------------------------------------
+  
+  private void initHM() {
+    healthMonitor = new HealthMonitor(conf, localTarget);
+    healthMonitor.addCallback(new HealthCallbacks());
+    healthMonitor.start();
+  }
+
+  private void initZK() throws HadoopIllegalArgumentException, IOException {
+    String zkQuorum = conf.get(ZK_QUORUM_KEY);
+    int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
+        ZK_SESSION_TIMEOUT_DEFAULT);
+    parentZnode = conf.get(ZK_PARENT_ZNODE_KEY,
+        ZK_PARENT_ZNODE_DEFAULT);
+    // TODO: need ZK ACL support in config, also maybe auth!
+    List<ACL> zkAcls = Ids.OPEN_ACL_UNSAFE;
+
+    Preconditions.checkArgument(zkQuorum != null,
+        "Missing required configuration '%s' for ZooKeeper quorum",
+        ZK_QUORUM_KEY);
+    Preconditions.checkArgument(zkTimeout > 0,
+        "Invalid ZK session timeout %s", zkTimeout);
+    
+
+    elector = new ActiveStandbyElector(zkQuorum,
+        zkTimeout, parentZnode, zkAcls, new ElectorCallbacks());
+  }
+  
+  private synchronized void mainLoop() throws InterruptedException {
+    while (fatalError == null) {
+      wait();
+    }
+    assert fatalError != null; // only get here on fatal
+    throw new RuntimeException(
+        "ZK Failover Controller failed: " + fatalError);
+  }
+  
+  private synchronized void fatalError(String err) {
+    LOG.fatal("Fatal error occurred:" + err);
+    fatalError = err;
+    notifyAll();
+  }
+  
+  private synchronized void becomeActive() {
+    LOG.info("Trying to make " + localTarget + " active...");
+    try {
+      localTarget.getProxy().transitionToActive();
+      LOG.info("Successfully transitioned " + localTarget +
+          " to active state");
+    } catch (Throwable t) {
+      LOG.fatal("Couldn't make " + localTarget + " active", t);
+      elector.quitElection(true);
+/*
+* TODO:
+* we need to make sure that if we get fenced and then quickly restarted,
+* none of these calls will retry across the restart boundary
+* perhaps the solution is that, whenever the nn starts, it gets a unique
+* ID, and when we start becoming active, we record it, and then any future
+* calls use the same ID
+*/
+      
+    }
+  }
+
+  private synchronized void becomeStandby() {
+    LOG.info("ZK Election indicated that " + localTarget +
+        " should become standby");
+    try {
+      localTarget.getProxy().transitionToStandby();
+      LOG.info("Successfully transitioned " + localTarget +
+          " to standby state");
+    } catch (Exception e) {
+      LOG.error("Couldn't transition " + localTarget + " to standby state",
+          e);
+      // TODO handle this. It's a likely case since we probably got fenced
+      // at the same time.
+    }
+  }
+
+  /**
+   * @return the last health state passed to the FC
+   * by the HealthMonitor.
+   */
+  @VisibleForTesting
+  State getLastHealthState() {
+    return lastHealthState;
+  }
+  
+  @VisibleForTesting
+  ActiveStandbyElector getElectorForTests() {
+    return elector;
+  }
+
+  /**
+   * Callbacks from elector
+   */
+  class ElectorCallbacks implements ActiveStandbyElectorCallback {
+    @Override
+    public void becomeActive() {
+      ZKFailoverController.this.becomeActive();
+    }
+
+    @Override
+    public void becomeStandby() {
+      ZKFailoverController.this.becomeStandby();
+    }
+
+    @Override
+    public void enterNeutralMode() {
+    }
+
+    @Override
+    public void notifyFatalError(String errorMessage) {
+      fatalError(errorMessage);
+    }
+
+    @Override
+    public void fenceOldActive(byte[] data) {
+      HAServiceTarget target = dataToTarget(data);
+      
+      LOG.info("Should fence: " + target);
+      boolean gracefulWorked =
+        FailoverController.tryGracefulFence(conf, target);
+      if (gracefulWorked) {
+        // It's possible that it's in standby but just about to go into active,
+        // no? Is there some race here?
+        LOG.info("Successfully transitioned " + target + " to standby " +
+            "state without fencing");
+        return;
+      }
+      
+      try {
+        target.checkFencingConfigured();
+      } catch (BadFencingConfigurationException e) {
+        LOG.error("Couldn't fence old active " + target, e);
+        // TODO: see below todo
+        throw new RuntimeException(e);
+      }
+      
+      if (!target.getFencer().fence(target)) {
+        // TODO: this will end up in some kind of tight loop,
+        // won't it? We need some kind of backoff
+        throw new RuntimeException("Unable to fence " + target);
+      }
+    }
+  }
+  
+  /**
+   * Callbacks from HealthMonitor
+   */
+  class HealthCallbacks implements HealthMonitor.Callback {
+    @Override
+    public void enteredState(HealthMonitor.State newState) {
+      LOG.info("Local service " + localTarget +
+          " entered state: " + newState);
+      switch (newState) {
+      case SERVICE_HEALTHY:
+        LOG.info("Joining master election for " + localTarget);
+        elector.joinElection(targetToData(localTarget));
+        break;
+        
+      case INITIALIZING:
+        LOG.info("Ensuring that " + localTarget + " does not " +
+            "participate in active master election");
+        elector.quitElection(false);
+        break;
+
+      case SERVICE_UNHEALTHY:
+      case SERVICE_NOT_RESPONDING:
+        LOG.info("Quitting master election for " + localTarget +
+            " and marking that fencing is necessary");
+        elector.quitElection(true);
+        break;
+        
+      case HEALTH_MONITOR_FAILED:
+        fatalError("Health monitor failed!");
+        break;
+        
+      default:
+        throw new IllegalArgumentException("Unhandled state:" + newState);
+      }
+      
+      lastHealthState = newState;
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java Mon Mar 26 23:37:33 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.util;
 
+import java.io.IOException;
 import java.io.PrintStream;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -92,4 +93,33 @@ public class ToolRunner {
     GenericOptionsParser.printGenericCommandUsage(out);
   }
   
+  
+  /**
+   * Print out a prompt to the user, and return true if the user
+   * responds with "y" or "yes". (case insensitive)
+   */
+  public static boolean confirmPrompt(String prompt) throws IOException {
+    while (true) {
+      System.err.print(prompt + " (Y or N) ");
+      StringBuilder responseBuilder = new StringBuilder();
+      while (true) {
+        int c = System.in.read();
+        if (c == -1 || c == '\r' || c == '\n') {
+          break;
+        }
+        responseBuilder.append((char)c);
+      }
+  
+      String response = responseBuilder.toString();
+      if (response.equalsIgnoreCase("y") ||
+          response.equalsIgnoreCase("yes")) {
+        return true;
+      } else if (response.equalsIgnoreCase("n") ||
+          response.equalsIgnoreCase("no")) {
+        return false;
+      }
+      System.err.println("Invalid input: " + response);
+      // else ask them again
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java Mon Mar 26 23:37:33 2012
@@ -50,4 +50,15 @@ public abstract class ActiveStandbyElect
       Thread.sleep(50);
     }
   }
+
+  public static void waitForElectorState(TestContext ctx,
+      ActiveStandbyElector elector,
+      ActiveStandbyElector.State state) throws Exception { 
+    while (elector.getStateForTests() != state) {
+      if (ctx != null) {
+        ctx.checkException();
+      }
+      Thread.sleep(50);
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java Mon Mar 26 23:37:33 2012
@@ -19,12 +19,15 @@ package org.apache.hadoop.ha;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.security.AccessControlException;
 import org.mockito.Mockito;
 
+import com.google.common.collect.Lists;
+
 /**
  * Test-only implementation of {@link HAServiceTarget}, which returns
  * a mock implementation.
@@ -36,12 +39,20 @@ class DummyHAService extends HAServiceTa
   InetSocketAddress address;
   boolean isHealthy = true;
   boolean actUnreachable = false;
+  boolean failToBecomeActive;
+  
+  static ArrayList<DummyHAService> instances = Lists.newArrayList();
+  int index;
 
   DummyHAService(HAServiceState state, InetSocketAddress address) {
     this.state = state;
     this.proxy = makeMock();
     this.fencer = Mockito.mock(NodeFencer.class);
     this.address = address;
+    synchronized (instances) {
+      instances.add(this);
+      this.index = instances.size();
+    }
   }
   
   private HAServiceProtocol makeMock() {
@@ -59,6 +70,10 @@ class DummyHAService extends HAServiceTa
       public void transitionToActive() throws ServiceFailedException,
           AccessControlException, IOException {
         checkUnreachable();
+        if (failToBecomeActive) {
+          throw new ServiceFailedException("injected failure");
+        }
+
         state = HAServiceState.ACTIVE;
       }
 
@@ -97,7 +112,7 @@ class DummyHAService extends HAServiceTa
       throws IOException {
     return proxy;
   }
-
+  
   @Override
   public NodeFencer getFencer() {
     return fencer;
@@ -106,4 +121,13 @@ class DummyHAService extends HAServiceTa
   @Override
   public void checkFencingConfigured() throws BadFencingConfigurationException {
   }
-}
\ No newline at end of file
+  
+  @Override
+  public String toString() {
+    return "DummyHAService #" + index;
+  }
+
+  public static HAServiceTarget getInstance(int serial) {
+    return instances.get(serial - 1);
+  }
+}

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java?rev=1305673&r1=1305672&r2=1305673&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java Mon Mar 26 23:37:33 2012
@@ -156,6 +156,10 @@ public class TestNodeFencer {
     @Override
     public void checkArgs(String args) {
     }
+    
+    public static HAServiceTarget getLastFencedService() {
+      return fencedSvc;
+    }
   }
   
   /**

Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1305673&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Mon Mar 26 23:37:33 2012
@@ -0,0 +1,457 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.test.MultithreadedTestUtil;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.log4j.Level;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.primitives.Ints;
+
+public class TestZKFailoverController extends ClientBase {
+  private Configuration conf;
+  private DummyHAService svc1;
+  private DummyHAService svc2;
+  private TestContext ctx;
+  private DummyZKFCThread thr1, thr2;
+  
+  static {
+    ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
+  @Override
+  public void setUp() throws Exception {
+    // build.test.dir is used by zookeeper
+    new File(System.getProperty("build.test.dir", "build")).mkdirs();
+    super.setUp();
+  }
+  
+  @Before
+  public void setupConfAndServices() {
+    conf = new Configuration();
+    conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
+    // Fast check interval so tests run faster
+    conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
+    conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
+    conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
+    svc1 = new DummyHAService(HAServiceState.INITIALIZING,
+        new InetSocketAddress("svc1", 1234));
+    svc2 = new DummyHAService(HAServiceState.INITIALIZING,
+        new InetSocketAddress("svc2", 1234));
+  }
+  
+  /**
+   * Set up two services and their failover controllers. svc1 is started
+   * first, so that it enters ACTIVE state, and then svc2 is started,
+   * which enters STANDBY
+   */
+  private void setupFCs() throws Exception {
+    // Format the base dir, should succeed
+    assertEquals(0, runFC(svc1, "-formatZK"));
+
+    ctx = new MultithreadedTestUtil.TestContext();
+    thr1 = new DummyZKFCThread(ctx, svc1);
+    ctx.addThread(thr1);
+    thr1.start();
+    
+    LOG.info("Waiting for svc1 to enter active state");
+    waitForHAState(svc1, HAServiceState.ACTIVE);
+    
+    LOG.info("Adding svc2");
+    thr2 = new DummyZKFCThread(ctx, svc2);
+    thr2.start();
+    waitForHAState(svc2, HAServiceState.STANDBY);
+  }
+  
+  private void stopFCs() throws Exception {
+    if (thr1 != null) {
+      thr1.interrupt();
+    }
+    if (thr2 != null) {
+      thr2.interrupt();
+    }
+    if (ctx != null) {
+      ctx.stop();
+    }
+  }
+
+  /**
+   * Test that the various command lines for formatting the ZK directory
+   * function correctly.
+   */
+  @Test(timeout=15000)
+  public void testFormatZK() throws Exception {
+    // Run without formatting the base dir,
+    // should barf
+    assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
+        runFC(svc1));
+
+    // Format the base dir, should succeed
+    assertEquals(0, runFC(svc1, "-formatZK"));
+
+    // Should fail to format if already formatted
+    assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
+        runFC(svc1, "-formatZK", "-nonInteractive"));
+  
+    // Unless '-force' is on
+    assertEquals(0, runFC(svc1, "-formatZK", "-force"));
+  }
+  
+  /**
+   * Test that the ZKFC won't run if fencing is not configured for the
+   * local service.
+   */
+  @Test(timeout=15000)
+  public void testFencingMustBeConfigured() throws Exception {
+    svc1 = Mockito.spy(svc1);
+    Mockito.doThrow(new BadFencingConfigurationException("no fencing"))
+        .when(svc1).checkFencingConfigured();
+    // Format the base dir, should succeed
+    assertEquals(0, runFC(svc1, "-formatZK"));
+    // Try to run the actual FC, should fail without a fencer
+    assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER,
+        runFC(svc1));
+  }
+  
+  /**
+   * Test that, when the health monitor indicates bad health status,
+   * failover is triggered. Also ensures that graceful active->standby
+   * transition is used when possible, falling back to fencing when
+   * the graceful approach fails.
+   */
+  @Test(timeout=15000)
+  public void testAutoFailoverOnBadHealth() throws Exception {
+    try {
+      setupFCs();
+      
+      LOG.info("Faking svc1 unhealthy, should failover to svc2");
+      svc1.isHealthy = false;
+      LOG.info("Waiting for svc1 to enter standby state");
+      waitForHAState(svc1, HAServiceState.STANDBY);
+      waitForHAState(svc2, HAServiceState.ACTIVE);
+  
+      LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " +
+          "and fail to gracefully go to standby");
+      svc1.isHealthy = true;
+      svc2.actUnreachable = true;
+      
+      // Allow fencing to succeed
+      Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2));
+      // Should fail back to svc1 at this point
+      waitForHAState(svc1, HAServiceState.ACTIVE);
+      // and fence svc2
+      Mockito.verify(svc2.fencer).fence(Mockito.same(svc2));
+    } finally {
+      stopFCs();
+    }
+  }
+  
+  @Test(timeout=15000)
+  public void testAutoFailoverOnLostZKSession() throws Exception {
+    try {
+      setupFCs();
+
+      // Expire svc1, it should fail over to svc2
+      expireAndVerifyFailover(thr1, thr2);
+      
+      // Expire svc2, it should fail back to svc1
+      expireAndVerifyFailover(thr2, thr1);
+      
+      LOG.info("======= Running test cases second time to test " +
+          "re-establishment =========");
+      // Expire svc1, it should fail over to svc2
+      expireAndVerifyFailover(thr1, thr2);
+      
+      // Expire svc2, it should fail back to svc1
+      expireAndVerifyFailover(thr2, thr1);
+    } finally {
+      stopFCs();
+    }
+  }
+  
+  private void expireAndVerifyFailover(DummyZKFCThread fromThr,
+      DummyZKFCThread toThr) throws Exception {
+    DummyHAService fromSvc = fromThr.zkfc.localTarget;
+    DummyHAService toSvc = toThr.zkfc.localTarget;
+    
+    fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
+    try {
+      expireActiveLockHolder(fromSvc);
+      
+      waitForHAState(fromSvc, HAServiceState.STANDBY);
+      waitForHAState(toSvc, HAServiceState.ACTIVE);
+    } finally {
+      fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+    }
+  }
+
+  /**
+   * Test that, if the standby node is unhealthy, it doesn't try to become
+   * active
+   */
+  @Test(timeout=15000)
+  public void testDontFailoverToUnhealthyNode() throws Exception {
+    try {
+      setupFCs();
+
+      // Make svc2 unhealthy, and wait for its FC to notice the bad health.
+      svc2.isHealthy = false;
+      waitForHealthState(thr2.zkfc,
+          HealthMonitor.State.SERVICE_UNHEALTHY);
+      
+      // Expire svc1
+      thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
+      try {
+        expireActiveLockHolder(svc1);
+
+        LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" +
+            " a chance to take the lock, if it is ever going to.");
+        Thread.sleep(1000);
+        
+        // Ensure that no one holds the lock.
+        waitForActiveLockHolder(null);
+        
+      } finally {
+        LOG.info("Allowing svc1's elector to re-establish its connection");
+        thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+      }
+      // svc1 should get the lock again
+      waitForActiveLockHolder(svc1);
+    } finally {
+      stopFCs();
+    }
+  }
+
+  /**
+   * Test that the ZKFC successfully quits the election when it fails to
+   * become active. This allows the old node to successfully fail back.
+   */
+  @Test(timeout=15000)
+  public void testBecomingActiveFails() throws Exception {
+    try {
+      setupFCs();
+      
+      LOG.info("Making svc2 fail to become active");
+      svc2.failToBecomeActive = true;
+      
+      LOG.info("Faking svc1 unhealthy, should NOT successfully " +
+          "failover to svc2");
+      svc1.isHealthy = false;
+      waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
+      waitForActiveLockHolder(null);
+
+      Mockito.verify(svc2.proxy).transitionToActive();
+
+      waitForHAState(svc1, HAServiceState.STANDBY);
+      waitForHAState(svc2, HAServiceState.STANDBY);
+      
+      LOG.info("Faking svc1 healthy again, should go back to svc1");
+      svc1.isHealthy = true;
+      waitForHAState(svc1, HAServiceState.ACTIVE);
+      waitForHAState(svc2, HAServiceState.STANDBY);
+      waitForActiveLockHolder(svc1);
+    } finally {
+      stopFCs();
+    }
+  }
+  
+  /**
+   * Test that, when ZooKeeper fails, the system remains in its
+   * current state, without triggering any failovers, and without
+   * causing the active node to enter standby state.
+   */
+  @Test(timeout=15000)
+  public void testZooKeeperFailure() throws Exception {
+    try {
+      setupFCs();
+
+      // Record initial ZK sessions
+      long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests();
+      long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests();
+
+      LOG.info("====== Stopping ZK server");
+      stopServer();
+      waitForServerDown(hostPort, CONNECTION_TIMEOUT);
+      
+      LOG.info("====== Waiting for services to enter NEUTRAL mode");
+      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+          thr1.zkfc.getElectorForTests(),
+          ActiveStandbyElector.State.NEUTRAL);
+      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+          thr2.zkfc.getElectorForTests(),
+          ActiveStandbyElector.State.NEUTRAL);
+
+      LOG.info("====== Checking that the services didn't change HA state");
+      assertEquals(HAServiceState.ACTIVE, svc1.state);
+      assertEquals(HAServiceState.STANDBY, svc2.state);
+      
+      LOG.info("====== Restarting server");
+      startServer();
+      waitForServerUp(hostPort, CONNECTION_TIMEOUT);
+
+      // Nodes should go back to their original states, since they re-obtain
+      // the same sessions.
+      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+          thr1.zkfc.getElectorForTests(),
+          ActiveStandbyElector.State.ACTIVE);
+      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+          thr2.zkfc.getElectorForTests(),
+          ActiveStandbyElector.State.STANDBY);
+      // Check HA states didn't change.
+      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+          thr1.zkfc.getElectorForTests(),
+          ActiveStandbyElector.State.ACTIVE);
+      ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+          thr2.zkfc.getElectorForTests(),
+          ActiveStandbyElector.State.STANDBY);
+      // Check they re-used the same sessions and didn't spuriously reconnect
+      assertEquals(session1,
+          thr1.zkfc.getElectorForTests().getZKSessionIdForTests());
+      assertEquals(session2,
+          thr2.zkfc.getElectorForTests().getZKSessionIdForTests());
+    } finally {
+      stopFCs();
+    }
+  }
+
+  /**
+   * Expire the ZK session of the given service. This requires
+   * (and asserts) that the given service be the current active.
+   * @throws NoNodeException if no service holds the lock
+   */
+  private void expireActiveLockHolder(DummyHAService expectedActive)
+      throws NoNodeException {
+    ZooKeeperServer zks = getServer(serverFactory);
+    Stat stat = new Stat();
+    byte[] data = zks.getZKDatabase().getData(
+        ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
+        ActiveStandbyElector.LOCK_FILENAME, stat, null);
+    
+    assertArrayEquals(Ints.toByteArray(expectedActive.index), data);
+    long session = stat.getEphemeralOwner();
+    LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session);
+    zks.closeSession(session);
+  }
+  
+  /**
+   * Wait for the given HA service to enter the given HA state.
+   */
+  private void waitForHAState(DummyHAService svc, HAServiceState state)
+      throws Exception {
+    while (svc.state != state) {
+      ctx.checkException();
+      Thread.sleep(50);
+    }
+  }
+  
+  /**
+   * Wait for the ZKFC to be notified of a change in health state.
+   */
+  private void waitForHealthState(DummyZKFC zkfc, State state)
+      throws Exception {
+    while (zkfc.getLastHealthState() != state) {
+      ctx.checkException();
+      Thread.sleep(50);
+    }
+  }
+
+  /**
+   * Wait for the given HA service to become the active lock holder.
+   * If the passed svc is null, waits for there to be no active
+   * lock holder.
+   */
+  private void waitForActiveLockHolder(DummyHAService svc)
+      throws Exception {
+    ZooKeeperServer zks = getServer(serverFactory);
+    ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
+        ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
+        (svc == null) ? null : Ints.toByteArray(svc.index));
+  }
+
+
+  private int runFC(DummyHAService target, String ... args) throws Exception {
+    DummyZKFC zkfc = new DummyZKFC(target);
+    zkfc.setConf(conf);
+    return zkfc.run(args);
+  }
+
+  /**
+   * Test-thread which runs a ZK Failover Controller corresponding
+   * to a given dummy service.
+   */
+  private class DummyZKFCThread extends TestingThread {
+    private final DummyZKFC zkfc;
+
+    public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
+      super(ctx);
+      this.zkfc = new DummyZKFC(svc);
+      zkfc.setConf(conf);
+    }
+
+    @Override
+    public void doWork() throws Exception {
+      try {
+        assertEquals(0, zkfc.run(new String[0]));
+      } catch (InterruptedException ie) {
+        // Interrupted by main thread, that's OK.
+      }
+    }
+  }
+  
+  private static class DummyZKFC extends ZKFailoverController {
+    private final DummyHAService localTarget;
+    
+    public DummyZKFC(DummyHAService localTarget) {
+      this.localTarget = localTarget;
+    }
+
+    @Override
+    protected byte[] targetToData(HAServiceTarget target) {
+      return Ints.toByteArray(((DummyHAService)target).index);
+    }
+    
+    @Override
+    protected HAServiceTarget dataToTarget(byte[] data) {
+      int index = Ints.fromByteArray(data);
+      return DummyHAService.getInstance(index);
+    }
+
+    @Override
+    protected HAServiceTarget getLocalTarget() {
+      return localTarget;
+    }
+  }
+}