You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/14 22:12:13 UTC

[12/37] hadoop git commit: HDFS-6184. Capture NN's thread dump when it fails over. Contributed by Ming Ma.

HDFS-6184. Capture NN's thread dump when it fails over. Contributed by Ming Ma.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2463666e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2463666e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2463666e

Branch: refs/heads/HDFS-7240
Commit: 2463666ecb553dbde1b8c540a21ad3d599239acf
Parents: f24452d
Author: Akira Ajisaka <aa...@apache.org>
Authored: Wed May 13 11:37:22 2015 +0900
Committer: Akira Ajisaka <aa...@apache.org>
Committed: Wed May 13 11:37:22 2015 +0900

----------------------------------------------------------------------
 .../apache/hadoop/ha/ZKFailoverController.java  |   5 +-
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +-
 .../hdfs/tools/DFSZKFailoverController.java     |  60 +++++
 .../src/main/resources/hdfs-default.xml         |  11 +
 .../ha/TestDFSZKFailoverController.java         | 226 -----------------
 .../hdfs/tools/TestDFSZKFailoverController.java | 243 +++++++++++++++++++
 7 files changed, 322 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2463666e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
index 9eb1ff8..788d48e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
@@ -844,12 +844,11 @@ public abstract class ZKFailoverController {
    * @return the last health state passed to the FC
    * by the HealthMonitor.
    */
-  @VisibleForTesting
-  synchronized State getLastHealthState() {
+  protected synchronized State getLastHealthState() {
     return lastHealthState;
   }
 
-  private synchronized void setLastHealthState(HealthMonitor.State newState) {
+  protected synchronized void setLastHealthState(HealthMonitor.State newState) {
     LOG.info("Local service " + localTarget +
         " entered state: " + newState);
     lastHealthState = newState;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2463666e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cd477af..135b50c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -546,6 +546,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8255. Rename getBlockReplication to getPreferredBlockReplication.
     (Contributed by Zhe Zhang)
 
+    HDFS-6184. Capture NN's thread dump when it fails over.
+    (Ming Ma via aajisaka)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2463666e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 4356b9b..f8e9f3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -543,7 +543,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
   public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
   public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
-  
+  public static final String DFS_HA_ZKFC_NN_HTTP_TIMEOUT_KEY = "dfs.ha.zkfc.nn.http.timeout.ms";
+  public static final int DFS_HA_ZKFC_NN_HTTP_TIMEOUT_KEY_DEFAULT = 20000;
+
   // Security-related configs
   public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2463666e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
index 4e256a2..f125a27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
@@ -20,15 +20,20 @@ package org.apache.hadoop.hdfs.tools;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
+import java.net.URL;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceTarget;
+import org.apache.hadoop.ha.HealthMonitor;
 import org.apache.hadoop.ha.ZKFailoverController;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -37,6 +42,7 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
@@ -57,6 +63,9 @@ public class DFSZKFailoverController extends ZKFailoverController {
   /* the same as superclass's localTarget, but with the more specfic NN type */
   private final NNHAServiceTarget localNNTarget;
 
+  // This is used only for unit tests
+  private boolean isThreadDumpCaptured = false;
+
   @Override
   protected HAServiceTarget dataToTarget(byte[] data) {
     ActiveNodeInfo proto;
@@ -201,4 +210,55 @@ public class DFSZKFailoverController extends ZKFailoverController {
     LOG.warn(msg);
     throw new AccessControlException(msg);
   }
+
+  /**
+   * capture local NN's thread dump and write it to ZKFC's log.
+   */
+  private void getLocalNNThreadDump() {
+    isThreadDumpCaptured = false;
+    // We use the same timeout value for both connection establishment
+    // timeout and read timeout.
+    int httpTimeOut = conf.getInt(
+        DFSConfigKeys.DFS_HA_ZKFC_NN_HTTP_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_HA_ZKFC_NN_HTTP_TIMEOUT_KEY_DEFAULT);
+    if (httpTimeOut == 0) {
+      // If timeout value is set to zero, the feature is turned off.
+      return;
+    }
+    try {
+      String stacksUrl = DFSUtil.getInfoServer(localNNTarget.getAddress(),
+          conf, DFSUtil.getHttpClientScheme(conf)) + "/stacks";
+      URL url = new URL(stacksUrl);
+      HttpURLConnection conn = (HttpURLConnection)url.openConnection();
+      conn.setReadTimeout(httpTimeOut);
+      conn.setConnectTimeout(httpTimeOut);
+      conn.connect();
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
+      StringBuilder localNNThreadDumpContent =
+          new StringBuilder("-- Local NN thread dump -- \n");
+      localNNThreadDumpContent.append(out);
+      localNNThreadDumpContent.append("\n -- Local NN thread dump -- ");
+      LOG.info(localNNThreadDumpContent);
+      isThreadDumpCaptured = true;
+    } catch (IOException e) {
+      LOG.warn("Can't get local NN thread dump due to " + e.getMessage());
+    }
+  }
+
+  @Override
+  protected synchronized void setLastHealthState(HealthMonitor.State newState) {
+    super.setLastHealthState(newState);
+    // Capture local NN thread dump when the target NN health state changes.
+    if (getLastHealthState() == HealthMonitor.State.SERVICE_NOT_RESPONDING ||
+        getLastHealthState() == HealthMonitor.State.SERVICE_UNHEALTHY) {
+      getLocalNNThreadDump();
+    }
+  }
+
+  @VisibleForTesting
+  boolean isThreadDumpCaptured() {
+    return isThreadDumpCaptured;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2463666e/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index fe1d1de..5d1d670 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2334,4 +2334,15 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.ha.zkfc.nn.http.timeout.ms</name>
+  <value>20000</value>
+  <description>
+    The HTTP connection and read timeout value (unit is ms ) when DFS ZKFC
+    tries to get local NN thread dump after local NN becomes
+    SERVICE_NOT_RESPONDING or SERVICE_UNHEALTHY.
+    If it is set to zero, DFS ZKFC won't get local NN thread dump.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2463666e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java
deleted file mode 100644
index bcbd543..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode.ha;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ha.ClientBaseWithFixes;
-import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
-import org.apache.hadoop.ha.HealthMonitor;
-import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
-import org.apache.hadoop.ha.ZKFCTestUtil;
-import org.apache.hadoop.ha.ZKFailoverController;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
-import org.apache.hadoop.hdfs.tools.DFSZKFailoverController;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.base.Supplier;
-
-public class TestDFSZKFailoverController extends ClientBaseWithFixes {
-  private Configuration conf;
-  private MiniDFSCluster cluster;
-  private TestContext ctx;
-  private ZKFCThread thr1, thr2;
-  private FileSystem fs;
-
-  static {
-    // Make tests run faster by avoiding fsync()
-    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
-  }
-  
-  @Before
-  public void setup() throws Exception {
-    conf = new Configuration();
-    // Specify the quorum per-nameservice, to ensure that these configs
-    // can be nameservice-scoped.
-    conf.set(ZKFailoverController.ZK_QUORUM_KEY + ".ns1", hostPort);
-    conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY,
-        AlwaysSucceedFencer.class.getName());
-    conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
-
-    // Turn off IPC client caching, so that the suite can handle
-    // the restart of the daemons between test cases.
-    conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
-        0);
-    
-    conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn1", 10023);
-    conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn2", 10024);
-
-    MiniDFSNNTopology topology = new MiniDFSNNTopology()
-    .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
-        .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10021))
-        .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10022)));
-    cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(topology)
-        .numDataNodes(0)
-        .build();
-    cluster.waitActive();
-
-    ctx = new TestContext();
-    ctx.addThread(thr1 = new ZKFCThread(ctx, 0));
-    assertEquals(0, thr1.zkfc.run(new String[]{"-formatZK"}));
-
-    thr1.start();
-    waitForHAState(0, HAServiceState.ACTIVE);
-    
-    ctx.addThread(thr2 = new ZKFCThread(ctx, 1));
-    thr2.start();
-    
-    // Wait for the ZKFCs to fully start up
-    ZKFCTestUtil.waitForHealthState(thr1.zkfc,
-        HealthMonitor.State.SERVICE_HEALTHY, ctx);
-    ZKFCTestUtil.waitForHealthState(thr2.zkfc,
-        HealthMonitor.State.SERVICE_HEALTHY, ctx);
-    
-    fs = HATestUtil.configureFailoverFs(cluster, conf);
-  }
-  
-  @After
-  public void shutdown() throws Exception {
-    cluster.shutdown();
-    
-    if (thr1 != null) {
-      thr1.interrupt();
-    }
-    if (thr2 != null) {
-      thr2.interrupt();
-    }
-    if (ctx != null) {
-      ctx.stop();
-    }
-  }
-  
-  /**
-   * Test that automatic failover is triggered by shutting the
-   * active NN down.
-   */
-  @Test(timeout=60000)
-  public void testFailoverAndBackOnNNShutdown() throws Exception {
-    Path p1 = new Path("/dir1");
-    Path p2 = new Path("/dir2");
-    
-    // Write some data on the first NN
-    fs.mkdirs(p1);
-    // Shut it down, causing automatic failover
-    cluster.shutdownNameNode(0);
-    // Data should still exist. Write some on the new NN
-    assertTrue(fs.exists(p1));
-    fs.mkdirs(p2);
-    assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
-        thr1.zkfc.getLocalTarget().getAddress());
-    
-    // Start the first node back up
-    cluster.restartNameNode(0);
-    // This should have no effect -- the new node should be STANDBY.
-    waitForHAState(0, HAServiceState.STANDBY);
-    assertTrue(fs.exists(p1));
-    assertTrue(fs.exists(p2));
-    // Shut down the second node, which should failback to the first
-    cluster.shutdownNameNode(1);
-    waitForHAState(0, HAServiceState.ACTIVE);
-
-    // First node should see what was written on the second node while it was down.
-    assertTrue(fs.exists(p1));
-    assertTrue(fs.exists(p2));
-    assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
-        thr2.zkfc.getLocalTarget().getAddress());
-  }
-  
-  @Test(timeout=30000)
-  public void testManualFailover() throws Exception {
-    thr2.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
-    waitForHAState(0, HAServiceState.STANDBY);
-    waitForHAState(1, HAServiceState.ACTIVE);
-
-    thr1.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
-    waitForHAState(0, HAServiceState.ACTIVE);
-    waitForHAState(1, HAServiceState.STANDBY);
-  }
-  
-  @Test(timeout=30000)
-  public void testManualFailoverWithDFSHAAdmin() throws Exception {
-    DFSHAAdmin tool = new DFSHAAdmin();
-    tool.setConf(conf);
-    assertEquals(0, 
-        tool.run(new String[]{"-failover", "nn1", "nn2"}));
-    waitForHAState(0, HAServiceState.STANDBY);
-    waitForHAState(1, HAServiceState.ACTIVE);
-    assertEquals(0,
-        tool.run(new String[]{"-failover", "nn2", "nn1"}));
-    waitForHAState(0, HAServiceState.ACTIVE);
-    waitForHAState(1, HAServiceState.STANDBY);
-  }
-  
-  private void waitForHAState(int nnidx, final HAServiceState state)
-      throws TimeoutException, InterruptedException {
-    final NameNode nn = cluster.getNameNode(nnidx);
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        try {
-          return nn.getRpcServer().getServiceStatus().getState() == state;
-        } catch (Exception e) {
-          e.printStackTrace();
-          return false;
-        }
-      }
-    }, 50, 15000);
-  }
-
-  /**
-   * Test-thread which runs a ZK Failover Controller corresponding
-   * to a given NameNode in the minicluster.
-   */
-  private class ZKFCThread extends TestingThread {
-    private final DFSZKFailoverController zkfc;
-
-    public ZKFCThread(TestContext ctx, int idx) {
-      super(ctx);
-      this.zkfc = DFSZKFailoverController.create(
-          cluster.getConfiguration(idx));
-    }
-
-    @Override
-    public void doWork() throws Exception {
-      try {
-        assertEquals(0, zkfc.run(new String[0]));
-      } catch (InterruptedException ie) {
-        // Interrupted by main thread, that's OK.
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2463666e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSZKFailoverController.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSZKFailoverController.java
new file mode 100644
index 0000000..3e1c96f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSZKFailoverController.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthMonitor;
+import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
+import org.apache.hadoop.ha.ZKFCTestUtil;
+import org.apache.hadoop.ha.ZKFailoverController;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+import org.mockito.Mockito;
+
+public class TestDFSZKFailoverController extends ClientBaseWithFixes {
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private TestContext ctx;
+  private ZKFCThread thr1, thr2;
+  private FileSystem fs;
+
+  static {
+    // Make tests run faster by avoiding fsync()
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+  }
+  
+  @Before
+  public void setup() throws Exception {
+    conf = new Configuration();
+    // Specify the quorum per-nameservice, to ensure that these configs
+    // can be nameservice-scoped.
+    conf.set(ZKFailoverController.ZK_QUORUM_KEY + ".ns1", hostPort);
+    conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY,
+        AlwaysSucceedFencer.class.getName());
+    conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
+
+    // Turn off IPC client caching, so that the suite can handle
+    // the restart of the daemons between test cases.
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        0);
+    
+    conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn1", 10023);
+    conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn2", 10024);
+
+    MiniDFSNNTopology topology = new MiniDFSNNTopology()
+    .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
+        .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10021))
+        .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10022)));
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(topology)
+        .numDataNodes(0)
+        .build();
+    cluster.waitActive();
+
+    ctx = new TestContext();
+    ctx.addThread(thr1 = new ZKFCThread(ctx, 0));
+    assertEquals(0, thr1.zkfc.run(new String[]{"-formatZK"}));
+
+    thr1.start();
+    waitForHAState(0, HAServiceState.ACTIVE);
+    
+    ctx.addThread(thr2 = new ZKFCThread(ctx, 1));
+    thr2.start();
+    
+    // Wait for the ZKFCs to fully start up
+    ZKFCTestUtil.waitForHealthState(thr1.zkfc,
+        HealthMonitor.State.SERVICE_HEALTHY, ctx);
+    ZKFCTestUtil.waitForHealthState(thr2.zkfc,
+        HealthMonitor.State.SERVICE_HEALTHY, ctx);
+    
+    fs = HATestUtil.configureFailoverFs(cluster, conf);
+  }
+  
+  @After
+  public void shutdown() throws Exception {
+    cluster.shutdown();
+    
+    if (thr1 != null) {
+      thr1.interrupt();
+    }
+    if (thr2 != null) {
+      thr2.interrupt();
+    }
+    if (ctx != null) {
+      ctx.stop();
+    }
+  }
+
+  /**
+   * Test that thread dump is captured after NN state changes.
+   */
+  @Test(timeout=60000)
+  public void testThreadDumpCaptureAfterNNStateChange() throws Exception {
+    NameNodeResourceChecker mockResourceChecker = Mockito.mock(
+        NameNodeResourceChecker.class);
+    Mockito.doReturn(false).when(mockResourceChecker).hasAvailableDiskSpace();
+    cluster.getNameNode(0).getNamesystem()
+        .setNNResourceChecker(mockResourceChecker);
+    waitForHAState(0, HAServiceState.STANDBY);
+    while (!thr1.zkfc.isThreadDumpCaptured()) {
+      Thread.sleep(1000);
+    }
+  }
+
+  /**
+   * Test that automatic failover is triggered by shutting the
+   * active NN down.
+   */
+  @Test(timeout=60000)
+  public void testFailoverAndBackOnNNShutdown() throws Exception {
+    Path p1 = new Path("/dir1");
+    Path p2 = new Path("/dir2");
+
+    // Write some data on the first NN
+    fs.mkdirs(p1);
+    // Shut it down, causing automatic failover
+    cluster.shutdownNameNode(0);
+    // Data should still exist. Write some on the new NN
+    assertTrue(fs.exists(p1));
+    fs.mkdirs(p2);
+    assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
+        thr1.zkfc.getLocalTarget().getAddress());
+    
+    // Start the first node back up
+    cluster.restartNameNode(0);
+    // This should have no effect -- the new node should be STANDBY.
+    waitForHAState(0, HAServiceState.STANDBY);
+    assertTrue(fs.exists(p1));
+    assertTrue(fs.exists(p2));
+    // Shut down the second node, which should failback to the first
+    cluster.shutdownNameNode(1);
+    waitForHAState(0, HAServiceState.ACTIVE);
+
+    // First node should see what was written on the second node while it was down.
+    assertTrue(fs.exists(p1));
+    assertTrue(fs.exists(p2));
+    assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
+        thr2.zkfc.getLocalTarget().getAddress());
+  }
+  
+  @Test(timeout=30000)
+  public void testManualFailover() throws Exception {
+    thr2.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
+    waitForHAState(0, HAServiceState.STANDBY);
+    waitForHAState(1, HAServiceState.ACTIVE);
+
+    thr1.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
+    waitForHAState(0, HAServiceState.ACTIVE);
+    waitForHAState(1, HAServiceState.STANDBY);
+  }
+  
+  @Test(timeout=30000)
+  public void testManualFailoverWithDFSHAAdmin() throws Exception {
+    DFSHAAdmin tool = new DFSHAAdmin();
+    tool.setConf(conf);
+    assertEquals(0, 
+        tool.run(new String[]{"-failover", "nn1", "nn2"}));
+    waitForHAState(0, HAServiceState.STANDBY);
+    waitForHAState(1, HAServiceState.ACTIVE);
+    assertEquals(0,
+        tool.run(new String[]{"-failover", "nn2", "nn1"}));
+    waitForHAState(0, HAServiceState.ACTIVE);
+    waitForHAState(1, HAServiceState.STANDBY);
+  }
+
+  private void waitForHAState(int nnidx, final HAServiceState state)
+      throws TimeoutException, InterruptedException {
+    final NameNode nn = cluster.getNameNode(nnidx);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          return nn.getRpcServer().getServiceStatus().getState() == state;
+        } catch (Exception e) {
+          e.printStackTrace();
+          return false;
+        }
+      }
+    }, 50, 15000);
+  }
+
+  /**
+   * Test-thread which runs a ZK Failover Controller corresponding
+   * to a given NameNode in the minicluster.
+   */
+  private class ZKFCThread extends TestingThread {
+    private final DFSZKFailoverController zkfc;
+
+    public ZKFCThread(TestContext ctx, int idx) {
+      super(ctx);
+      this.zkfc = DFSZKFailoverController.create(
+          cluster.getConfiguration(idx));
+    }
+
+    @Override
+    public void doWork() throws Exception {
+      try {
+        assertEquals(0, zkfc.run(new String[0]));
+      } catch (InterruptedException ie) {
+        // Interrupted by main thread, that's OK.
+      }
+    }
+  }
+
+}