You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/03/26 19:12:24 UTC

[hadoop] branch branch-2 updated: HDFS-14205. Backport HDFS-6440 to branch-2. Contributed by Chao Sun.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 7935485  HDFS-14205. Backport HDFS-6440 to branch-2. Contributed by Chao Sun.
7935485 is described below

commit 7935485eec54cfea4e599e5a33afbeb9b00e616f
Author: Chen Liang <cl...@apache.org>
AuthorDate: Tue Mar 26 12:12:14 2019 -0700

    HDFS-14205. Backport HDFS-6440 to branch-2. Contributed by Chao Sun.
---
 .../org/apache/hadoop/ha/ZKFailoverController.java |  61 ++-
 .../java/org/apache/hadoop/ha/MiniZKFCCluster.java |  93 +++--
 .../apache/hadoop/ha/TestZKFailoverController.java |  32 ++
 .../bkjournal/TestBookKeeperHACheckpoints.java     |   7 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../main/java/org/apache/hadoop/hdfs/HAUtil.java   |  35 +-
 .../token/block/BlockTokenSecretManager.java       |  38 +-
 .../hdfs/server/blockmanagement/BlockManager.java  |  21 +-
 .../hdfs/server/namenode/CheckpointConf.java       |  14 +-
 .../hadoop/hdfs/server/namenode/ImageServlet.java  |  90 +++-
 .../hdfs/server/namenode/NameNodeHttpServer.java   |   5 +
 .../hdfs/server/namenode/TransferFsImage.java      |  47 ++-
 .../hdfs/server/namenode/ha/BootstrapStandby.java  |  94 +++--
 .../hdfs/server/namenode/ha/EditLogTailer.java     | 196 ++++++---
 .../server/namenode/ha/RemoteNameNodeInfo.java     | 133 ++++++
 .../server/namenode/ha/StandbyCheckpointer.java    | 182 +++++---
 .../hadoop/hdfs/tools/DFSZKFailoverController.java |  13 +
 .../src/main/resources/hdfs-default.xml            |  20 +
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     | 461 +++++++++++++--------
 .../org/apache/hadoop/hdfs/MiniDFSNNTopology.java  |  18 +-
 .../hadoop/hdfs/TestDFSUpgradeFromImage.java       |   7 +-
 .../org/apache/hadoop/hdfs/TestRollingUpgrade.java | 101 +++--
 .../hadoop/hdfs/qjournal/MiniQJMHACluster.java     |  88 ++--
 .../hdfs/security/token/block/TestBlockToken.java  |  10 +-
 .../hdfs/server/balancer/TestKeyManager.java       |   2 +-
 .../hdfs/server/namenode/TestBackupNode.java       |   2 +-
 .../hdfs/server/namenode/TestCheckpoint.java       |   3 +-
 .../hdfs/server/namenode/TestEditLogRace.java      |   2 +-
 .../server/namenode/ha/HAStressTestHarness.java    |  46 +-
 .../hadoop/hdfs/server/namenode/ha/HATestUtil.java |  60 ++-
 .../server/namenode/ha/TestBootstrapStandby.java   | 184 ++++----
 .../namenode/ha/TestBootstrapStandbyWithQJM.java   |  47 +--
 .../namenode/ha/TestDNFencingWithReplication.java  |   1 +
 .../hdfs/server/namenode/ha/TestEditLogTailer.java |  17 +-
 .../ha/TestFailoverWithBlockTokensEnabled.java     |  55 ++-
 .../server/namenode/ha/TestHAConfiguration.java    |  53 ++-
 .../server/namenode/ha/TestPipelinesFailover.java  | 110 +++--
 .../server/namenode/ha/TestRemoteNameNodeInfo.java |  61 +++
 .../server/namenode/ha/TestSeveralNameNodes.java   | 179 ++++++++
 .../server/namenode/ha/TestStandbyCheckpoints.java | 112 ++---
 .../src/test/resources/hadoop-0.23-reserved.tgz    | Bin 4558 -> 5590 bytes
 .../src/test/resources/hadoop-1-reserved.tgz       | Bin 2572 -> 3348 bytes
 .../src/test/resources/hadoop-2-reserved.tgz       | Bin 2838 -> 3465 bytes
 .../src/test/resources/hadoop-22-dfs-dir.tgz       | Bin 318180 -> 413239 bytes
 .../hadoop-hdfs/src/test/resources/hadoop1-bbw.tgz | Bin 40234 -> 43294 bytes
 .../src/test/resources/log4j.properties            |   2 +-
 46 files changed, 1947 insertions(+), 759 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
index cf65ae1..ae1a92b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -144,6 +145,7 @@ public abstract class ZKFailoverController {
       throws AccessControlException, IOException;
   protected abstract InetSocketAddress getRpcAddressToBindTo();
   protected abstract PolicyProvider getPolicyProvider();
+  protected abstract List<HAServiceTarget> getAllOtherNodes();
 
   /**
    * Return the name of a znode inside the configured parent znode in which
@@ -627,9 +629,11 @@ public abstract class ZKFailoverController {
    * Coordinate a graceful failover. This proceeds in several phases:
    * 1) Pre-flight checks: ensure that the local node is healthy, and
    * thus a candidate for failover.
-   * 2) Determine the current active node. If it is the local node, no
+   * 2a) Determine the current active node. If it is the local node, no
    * need to failover - return success.
-   * 3) Ask that node to yield from the election for a number of seconds.
+   * 2b) Get the other nodes
+   * 3a) Ask the other nodes to yield from election for a number of seconds
+   * 3b) Ask the active node to yield from the election for a number of seconds.
    * 4) Allow the normal election path to run in other threads. Wait until
    * we either become unhealthy or we see an election attempt recorded by
    * the normal code path.
@@ -659,12 +663,27 @@ public abstract class ZKFailoverController {
           "No need to failover. Returning success.");
       return;
     }
-    
-    // Phase 3: ask the old active to yield from the election.
-    LOG.info("Asking " + oldActive + " to cede its active state for " +
-        timeout + "ms");
-    ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
-    oldZkfc.cedeActive(timeout);
+
+    // Phase 2b: get the other nodes
+    List<HAServiceTarget> otherNodes = getAllOtherNodes();
+    List<ZKFCProtocol> otherZkfcs = new ArrayList<ZKFCProtocol>(otherNodes.size());
+
+    // Phase 3: ask the other nodes to yield from the election.
+    HAServiceTarget activeNode = null;
+    for (HAServiceTarget remote : otherNodes) {
+      // same location, same node - may not always be == equality
+      if (remote.getAddress().equals(oldActive.getAddress())) {
+        activeNode = remote;
+        continue;
+      }
+      otherZkfcs.add(cedeRemoteActive(remote, timeout));
+    }
+
+    assert
+      activeNode != null : "Active node does not match any known remote node";
+
+    // Phase 3b: ask the old active to yield
+    otherZkfcs.add(cedeRemoteActive(activeNode, timeout));
 
     // Phase 4: wait for the normal election to make the local node
     // active.
@@ -687,8 +706,10 @@ public abstract class ZKFailoverController {
     // Phase 5. At this point, we made some attempt to become active. So we
     // can tell the old active to rejoin if it wants. This allows a quick
     // fail-back if we immediately crash.
-    oldZkfc.cedeActive(-1);
-    
+    for (ZKFCProtocol zkfc : otherZkfcs) {
+      zkfc.cedeActive(-1);
+    }
+
     if (attempt.succeeded) {
       LOG.info("Successfully became active. " + attempt.status);
     } else {
@@ -699,6 +720,23 @@ public abstract class ZKFailoverController {
   }
 
   /**
+   * Ask the remote zkfc to cede its active status and wait for the specified
+   * timeout before attempting to claim leader status.
+   * @param remote node to ask
+   * @param timeout amount of time to cede
+   * @return the {@link ZKFCProtocol} used to talk to the ndoe
+   * @throws IOException
+   */
+  private ZKFCProtocol cedeRemoteActive(HAServiceTarget remote, int timeout)
+    throws IOException {
+    LOG.info("Asking " + remote + " to cede its active state for "
+               + timeout + "ms");
+    ZKFCProtocol oldZkfc = remote.getZKFCProxy(conf, timeout);
+    oldZkfc.cedeActive(timeout);
+    return oldZkfc;
+  }
+
+  /**
    * Ensure that the local node is in a healthy state, and thus
    * eligible for graceful failover.
    * @throws ServiceFailedException if the node is unhealthy
@@ -788,7 +826,8 @@ public abstract class ZKFailoverController {
           break;
           
         default:
-          throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
+          throw new IllegalArgumentException("Unhandled state:"
+                                               + lastHealthState);
         }
       }
     }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
index 0967c1a..920c446 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -49,7 +51,7 @@ public class MiniZKFCCluster {
   private final TestContext ctx;
   private final ZooKeeperServer zks;
 
-  private DummyHAService svcs[];
+  private List<DummyHAService> svcs;
   private DummyZKFCThread thrs[];
   private Configuration conf;
   
@@ -64,38 +66,67 @@ public class MiniZKFCCluster {
     conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
     conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
     conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
-    svcs = new DummyHAService[2];
-    svcs[0] = new DummyHAService(HAServiceState.INITIALIZING,
-        new InetSocketAddress("svc1", 1234));
-    svcs[0].setSharedResource(sharedResource);
-    svcs[1] = new DummyHAService(HAServiceState.INITIALIZING,
-        new InetSocketAddress("svc2", 1234));
-    svcs[1].setSharedResource(sharedResource);
-    
+    svcs = new ArrayList<DummyHAService>(2);
+    // remove any existing instances we are keeping track of
+    DummyHAService.instances.clear();
+
+    for (int i = 0; i < 2; i++) {
+      addSvcs(svcs, i);
+    }
+
     this.ctx = new TestContext();
     this.zks = zks;
   }
-  
+
+  private void addSvcs(List<DummyHAService> svcs, int i) {
+    svcs.add(new DummyHAService(HAServiceState.INITIALIZING, new InetSocketAddress("svc" + (i + 1),
+        1234)));
+    svcs.get(i).setSharedResource(sharedResource);
+  }
+
   /**
    * Set up two services and their failover controllers. svc1 is started
    * first, so that it enters ACTIVE state, and then svc2 is started,
    * which enters STANDBY
    */
   public void start() throws Exception {
+    start(2);
+  }
+
+  /**
+   * Set up the specified number of services and their failover controllers. svc1 is
+   * started first, so that it enters ACTIVE state, and then svc2...svcN is started, which enters
+   * STANDBY.
+   * <p>
+   * Adds any extra svc needed beyond the first two before starting the rest of the cluster.
+   * @param count number of zkfcs to start
+   */
+  public void start(int count) throws Exception {
+    // setup the expected number of zkfcs, if we need to add more. This seemed the least invasive
+    // way to add the services - otherwise its a large test rewrite or changing a lot of assumptions
+    if (count > 2) {
+      for (int i = 2; i < count; i++) {
+        addSvcs(svcs, i);
+      }
+    }
+
     // Format the base dir, should succeed
-    thrs = new DummyZKFCThread[2];
-    thrs[0] = new DummyZKFCThread(ctx, svcs[0]);
+    thrs = new DummyZKFCThread[count];
+    thrs[0] = new DummyZKFCThread(ctx, svcs.get(0));
     assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"}));
     ctx.addThread(thrs[0]);
     thrs[0].start();
     
     LOG.info("Waiting for svc0 to enter active state");
     waitForHAState(0, HAServiceState.ACTIVE);
-    
-    LOG.info("Adding svc1");
-    thrs[1] = new DummyZKFCThread(ctx, svcs[1]);
-    thrs[1].start();
-    waitForHAState(1, HAServiceState.STANDBY);
+
+    // add the remaining zkfc
+    for (int i = 1; i < count; i++) {
+      LOG.info("Adding svc" + i);
+      thrs[i] = new DummyZKFCThread(ctx, svcs.get(i));
+      thrs[i].start();
+      waitForHAState(i, HAServiceState.STANDBY);
+    }
   }
   
   /**
@@ -125,7 +156,7 @@ public class MiniZKFCCluster {
   }
   
   public DummyHAService getService(int i) {
-    return svcs[i];
+    return svcs.get(i);
   }
 
   public ActiveStandbyElector getElector(int i) {
@@ -137,23 +168,23 @@ public class MiniZKFCCluster {
   }
   
   public void setHealthy(int idx, boolean healthy) {
-    svcs[idx].isHealthy = healthy;
+    svcs.get(idx).isHealthy = healthy;
   }
 
   public void setFailToBecomeActive(int idx, boolean doFail) {
-    svcs[idx].failToBecomeActive = doFail;
+    svcs.get(idx).failToBecomeActive = doFail;
   }
 
   public void setFailToBecomeStandby(int idx, boolean doFail) {
-    svcs[idx].failToBecomeStandby = doFail;
+    svcs.get(idx).failToBecomeStandby = doFail;
   }
   
   public void setFailToFence(int idx, boolean doFail) {
-    svcs[idx].failToFence = doFail;
+    svcs.get(idx).failToFence = doFail;
   }
   
   public void setUnreachable(int idx, boolean unreachable) {
-    svcs[idx].actUnreachable = unreachable;
+    svcs.get(idx).actUnreachable = unreachable;
   }
 
   /**
@@ -207,7 +238,7 @@ public class MiniZKFCCluster {
     byte[] data = zks.getZKDatabase().getData(
         DummyZKFC.LOCK_ZNODE, stat, null);
     
-    assertArrayEquals(Ints.toByteArray(svcs[idx].index), data);
+    assertArrayEquals(Ints.toByteArray(svcs.get(idx).index), data);
     long session = stat.getEphemeralOwner();
     LOG.info("Expiring svc " + idx + "'s zookeeper session " + session);
     zks.closeSession(session);
@@ -221,7 +252,7 @@ public class MiniZKFCCluster {
    */
   public void waitForActiveLockHolder(Integer idx)
       throws Exception {
-    DummyHAService svc = idx == null ? null : svcs[idx];
+    DummyHAService svc = idx == null ? null : svcs.get(idx);
     ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
         DummyZKFC.SCOPED_PARENT_ZNODE,
         (idx == null) ? null : Ints.toByteArray(svc.index));
@@ -323,5 +354,17 @@ public class MiniZKFCCluster {
     protected PolicyProvider getPolicyProvider() {
       return null;
     }
+
+    @Override
+    protected List<HAServiceTarget> getAllOtherNodes() {
+      List<HAServiceTarget> services = new ArrayList<HAServiceTarget>(
+          DummyHAService.instances.size());
+      for (DummyHAService service : DummyHAService.instances) {
+        if (service != this.localTarget) {
+          services.add(service);
+        }
+      }
+      return services;
+    }
   }
 }
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
index 597f157..cd15f28 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
@@ -581,4 +581,36 @@ public class TestZKFailoverController extends ClientBaseWithFixes {
     return zkfc.run(args);
   }
 
+  @Test(timeout = 25000)
+  public void testGracefulFailoverMultipleZKfcs() throws Exception {
+    try {
+      cluster.start(3);
+
+      cluster.waitForActiveLockHolder(0);
+
+      // failover to first
+      cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+      cluster.waitForActiveLockHolder(1);
+
+      // failover to second
+      cluster.getService(2).getZKFCProxy(conf, 5000).gracefulFailover();
+      cluster.waitForActiveLockHolder(2);
+
+      // failover back to original
+      cluster.getService(0).getZKFCProxy(conf, 5000).gracefulFailover();
+      cluster.waitForActiveLockHolder(0);
+
+      Thread.sleep(10000); // allow to quiesce
+
+      assertEquals(0, cluster.getService(0).fenceCount);
+      assertEquals(0, cluster.getService(1).fenceCount);
+      assertEquals(0, cluster.getService(2).fenceCount);
+      assertEquals(2, cluster.getService(0).activeTransitionCount);
+      assertEquals(1, cluster.getService(1).activeTransitionCount);
+      assertEquals(1, cluster.getService(2).activeTransitionCount);
+    } finally {
+      cluster.stop();
+    }
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
index 3299673..5822b43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperHACheckpoints.java
@@ -37,6 +37,10 @@ import java.util.Random;
  * using a bookkeeper journal manager as the shared directory
  */
 public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
+  //overwrite the nn count
+ static{
+   TestStandbyCheckpoints.NUM_NNS = 2;
+ }
   private static BKJMUtil bkutil = null;
   static int numBookies = 3;
   static int journalCount = 0;
@@ -69,8 +73,7 @@ public class TestBookKeeperHACheckpoints extends TestStandbyCheckpoints {
           .manageNameDfsSharedDirs(false)
           .build();
         cluster.waitActive();
-        nn0 = cluster.getNameNode(0);
-        nn1 = cluster.getNameNode(1);
+        setNNs();
         fs = HATestUtil.configureFailoverFs(cluster, conf);
 
         cluster.transitionToActive(0);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 81614e5..0185e83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -177,8 +177,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50090";
   public static final String  DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_KEY = "dfs.namenode.secondary.https-address";
   public static final String  DFS_NAMENODE_SECONDARY_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:50091";
+  public static final String  DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY = "dfs.namenode.checkpoint.check.quiet-multiplier";
+  public static final double  DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT = 1.5;
   public static final String  DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY = "dfs.namenode.checkpoint.check.period";
   public static final long    DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_DEFAULT = 60;
+  public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries";
+  public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3;
   public static final String  DFS_NAMENODE_CHECKPOINT_PERIOD_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
   public static final long    DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT = 3600;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
index 67fdc04..c0b1228 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
@@ -152,7 +152,7 @@ public class HAUtil {
    * @param conf the configuration of this node
    * @return the NN ID of the other node in this nameservice
    */
-  public static String getNameNodeIdOfOtherNode(Configuration conf, String nsId) {
+  public static List<String> getNameNodeIdOfOtherNodes(Configuration conf, String nsId) {
     Preconditions.checkArgument(nsId != null,
         "Could not determine namespace id. Please ensure that this " +
         "machine is one of the machines listed as a NN RPC address, " +
@@ -166,20 +166,20 @@ public class HAUtil {
         DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
             nsId),
         nsId);
-    Preconditions.checkArgument(nnIds.size() == 2,
-        "Expected exactly 2 NameNodes in namespace '%s'. " +
-        "Instead, got only %s (NN ids were '%s'",
-        nsId, nnIds.size(), Joiner.on("','").join(nnIds));
+    Preconditions.checkArgument(nnIds.size() >= 2,
+        "Expected at least 2 NameNodes in namespace '%s'. " +
+          "Instead, got only %s (NN ids were '%s')",
+          nsId, nnIds.size(), Joiner.on("','").join(nnIds));
     Preconditions.checkState(myNNId != null && !myNNId.isEmpty(),
         "Could not determine own NN ID in namespace '%s'. Please " +
         "ensure that this node is one of the machines listed as an " +
         "NN RPC address, or configure " + DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY,
         nsId);
 
-    ArrayList<String> nnSet = Lists.newArrayList(nnIds);
-    nnSet.remove(myNNId);
-    assert nnSet.size() == 1;
-    return nnSet.get(0);
+    ArrayList<String> namenodes = Lists.newArrayList(nnIds);
+    namenodes.remove(myNNId);
+    assert namenodes.size() >= 1;
+    return namenodes;
   }
 
   /**
@@ -189,20 +189,25 @@ public class HAUtil {
    * @param myConf the configuration of this node
    * @return the configuration of the other node in an HA setup
    */
-  public static Configuration getConfForOtherNode(
+  public static List<Configuration> getConfForOtherNodes(
       Configuration myConf) {
     
     String nsId = DFSUtil.getNamenodeNameServiceId(myConf);
-    String otherNn = getNameNodeIdOfOtherNode(myConf, nsId);
+    List<String> otherNodes = getNameNodeIdOfOtherNodes(myConf, nsId);
     
     // Look up the address of the active NN.
-    Configuration confForOtherNode = new Configuration(myConf);
+    List<Configuration> confs = new ArrayList<Configuration>(otherNodes.size());
+    myConf = new Configuration(myConf);
     // unset independent properties
     for (String idpKey : HA_SPECIAL_INDEPENDENT_KEYS) {
-      confForOtherNode.unset(idpKey);
+      myConf.unset(idpKey);
+    }
+    for (String nn : otherNodes) {
+      Configuration confForOtherNode = new Configuration(myConf);
+      NameNode.initializeGenericKeys(confForOtherNode, nsId, nn);
+      confs.add(confForOtherNode);
     }
-    NameNode.initializeGenericKeys(confForOtherNode, nsId, otherNn);
-    return confForOtherNode;
+    return confs;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index d54d0b6..2ad8d08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -53,17 +53,11 @@ import org.apache.hadoop.util.Timer;
 @InterfaceAudience.Private
 public class BlockTokenSecretManager extends
     SecretManager<BlockTokenIdentifier> {
-  public static final Log LOG = LogFactory
-      .getLog(BlockTokenSecretManager.class);
-  
-  // We use these in an HA setup to ensure that the pair of NNs produce block
-  // token serial numbers that are in different ranges.
-  private static final int LOW_MASK  = ~(1 << 31);
-  
+  public static final Log LOG = LogFactory.getLog(BlockTokenSecretManager.class);
+
   public static final Token<BlockTokenIdentifier> DUMMY_TOKEN = new Token<BlockTokenIdentifier>();
 
   private final boolean isMaster;
-  private int nnIndex;
   
   /**
    * keyUpdateInterval is the interval that NN updates its block keys. It should
@@ -78,7 +72,10 @@ public class BlockTokenSecretManager extends
   private final Map<Integer, BlockKey> allKeys;
   private String blockPoolId;
   private final String encryptionAlgorithm;
-  
+
+  private final int intRange;
+  private final int nnRangeStart;
+
   private final SecureRandom nonceGenerator = new SecureRandom();
 
   /**
@@ -88,14 +85,14 @@ public class BlockTokenSecretManager extends
   private Timer timer;
   /**
    * Constructor for slaves.
-   * 
+   *
    * @param keyUpdateInterval how often a new key will be generated
    * @param tokenLifetime how long an individual token is valid
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
       long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
     this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
-        encryptionAlgorithm);
+        encryptionAlgorithm, 0, 1);
   }
   
   /**
@@ -103,23 +100,25 @@ public class BlockTokenSecretManager extends
    * 
    * @param keyUpdateInterval how often a new key will be generated
    * @param tokenLifetime how long an individual token is valid
-   * @param nnIndex namenode index
+   * @param nnIndex namenode index of the namenode for which we are creating the manager
    * @param blockPoolId block pool ID
    * @param encryptionAlgorithm encryption algorithm to use
+   * @param numNNs number of namenodes possible
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime, int nnIndex, String blockPoolId,
+      long tokenLifetime, int nnIndex, int numNNs,  String blockPoolId,
       String encryptionAlgorithm) {
-    this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
-        encryptionAlgorithm);
-    Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
-    this.nnIndex = nnIndex;
+    this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs);
+    Preconditions.checkArgument(nnIndex >= 0);
+    Preconditions.checkArgument(numNNs > 0);
     setSerialNo(new SecureRandom().nextInt());
     generateKeys();
   }
   
   private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) {
+    this.intRange = Integer.MAX_VALUE / numNNs;
+    this.nnRangeStart = intRange * nnIndex;
     this.isMaster = isMaster;
     this.keyUpdateInterval = keyUpdateInterval;
     this.tokenLifetime = tokenLifetime;
@@ -132,7 +131,8 @@ public class BlockTokenSecretManager extends
   
   @VisibleForTesting
   public synchronized void setSerialNo(int serialNo) {
-    this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
+    // we mod the serial number by the range and then add that times the index
+    this.serialNo = (serialNo % intRange) + (nnRangeStart);
   }
   
   public void setBlockPoolId(String blockPoolId) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 0d4c948..ad9891a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.Time.now;
 
@@ -54,6 +55,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -508,14 +510,21 @@ public class BlockManager implements BlockStatsMXBean {
     boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
 
     if (isHaEnabled) {
-      String thisNnId = HAUtil.getNameNodeId(conf, nsId);
-      String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
-      return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null,
-          encryptionAlgorithm);
+      // figure out which index we are of the nns
+      Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
+      String nnId = HAUtil.getNameNodeId(conf, nsId);
+      int nnIndex = 0;
+      for (String id : nnIds) {
+        if (id.equals(nnId)) {
+          break;
+        }
+        nnIndex++;
+      }
+      return new BlockTokenSecretManager(updateMin * 60 * 1000L,
+          lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm);
     } else {
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
+          lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm);
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
index b1636bc..c30730b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java
@@ -44,7 +44,13 @@ public class CheckpointConf {
 
   /** The output dir for legacy OIV image */
   private final String legacyOivImageDir;
-  
+
+  /**
+  * multiplier on the checkpoint period to allow other nodes to do the checkpointing, when not the
+  * 'primary' checkpoint node
+  */
+  private double quietMultiplier;
+
   public CheckpointConf(Configuration conf) {
     checkpointCheckPeriod = conf.getLong(
         DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
@@ -57,6 +63,8 @@ public class CheckpointConf {
     maxRetriesOnMergeError = conf.getInt(DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY,
                                   DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT);
     legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
+    quietMultiplier = conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY,
+      DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT);
     warnForDeprecatedConfigs(conf);
   }
   
@@ -91,4 +99,8 @@ public class CheckpointConf {
   public String getLegacyOivImageDir() {
     return legacyOivImageDir;
   }
+
+  public double getQuietPeriod() {
+    return this.checkpointPeriod * this.quietMultiplier;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
index d8dfa54..9223abd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
@@ -30,6 +30,7 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.security.SecurityUtil;
@@ -83,6 +84,9 @@ public class ImageServlet extends HttpServlet {
   private static final String IMAGE_FILE_TYPE = "imageFile";
   private static final String IS_BOOTSTRAP_STANDBY = "bootstrapstandby";
 
+  private SortedSet<ImageUploadRequest> currentlyDownloadingCheckpoints = Collections
+      .<ImageUploadRequest> synchronizedSortedSet(new TreeSet<ImageUploadRequest>());
+
   @Override
   public void doGet(final HttpServletRequest request,
       final HttpServletResponse response) throws ServletException, IOException {
@@ -270,10 +274,12 @@ public class ImageServlet extends HttpServlet {
     }
 
     if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
-      Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
-      validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
-          .get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY),
-          DFSUtilClient.getNNAddress(otherNnConf).getHostName()));
+      List<Configuration> otherNnConfs = HAUtil.getConfForOtherNodes(conf);
+      for (Configuration otherNnConf : otherNnConfs) {
+        validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
+                .get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY),
+            DFSUtilClient.getNNAddress(otherNnConf).getHostName()));
+      }
     }
 
     for (String v : validRequestors) {
@@ -442,8 +448,7 @@ public class ImageServlet extends HttpServlet {
 
   /**
    * Set the required parameters for uploading image
-   * 
-   * @param httpMethod instance of method to set the parameters
+   *
    * @param storage colon separated storageInfo string
    * @param txid txid of the image
    * @param imageFileSize size of the imagefile to be uploaded
@@ -482,12 +487,37 @@ public class ImageServlet extends HttpServlet {
 
             @Override
             public Void run() throws Exception {
+              // if its not the active NN, then we need to notify the caller it was was the wrong
+              // target (regardless of the fact that we got the image)
+              HAServiceProtocol.HAServiceState state = NameNodeHttpServer
+                  .getNameNodeStateFromContext(getServletContext());
+              if (state != HAServiceProtocol.HAServiceState.ACTIVE) {
+                // we need a different response type here so the client can differentiate this
+                // from the failure to upload due to (1) security, or (2) other checkpoints already
+                // present
+                response.sendError(HttpServletResponse.SC_EXPECTATION_FAILED,
+                    "Nameode "+request.getLocalAddr()+" is currently not in a state which can "
+                        + "accept uploads of new fsimages. State: "+state);
+                return null;
+              }
 
               final long txid = parsedParams.getTxId();
+              String remoteAddr = request.getRemoteAddr();
+              ImageUploadRequest imageRequest = new ImageUploadRequest(txid, remoteAddr);
 
               final NameNodeFile nnf = parsedParams.getNameNodeFile();
 
-              if (!nnImage.addToCheckpointing(txid)) {
+              // if the node is attempting to upload an older transaction, we ignore it
+              SortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);
+              if (larger.size() > 0) {
+                response.sendError(HttpServletResponse.SC_CONFLICT,
+                    "Another checkpointer is already in the process of uploading a" +
+                        " checkpoint made up to transaction ID " + larger.last());
+                return null;
+              }
+
+              //make sure no one else has started uploading one
+              if (!currentlyDownloadingCheckpoints.add(imageRequest)) {
                 response.sendError(HttpServletResponse.SC_CONFLICT,
                     "Either current namenode is checkpointing or another"
                         + " checkpointer is already in the process of "
@@ -522,6 +552,10 @@ public class ImageServlet extends HttpServlet {
                   // remove some old ones.
                   nnImage.purgeOldStorage(nnf);
                 } finally {
+                  // remove the request once we've processed it, or it threw an error, so we
+                  // aren't using it either
+                  currentlyDownloadingCheckpoints.remove(imageRequest);
+
                   stream.close();
                 }
               } finally {
@@ -578,4 +612,46 @@ public class ImageServlet extends HttpServlet {
       return nnf;
     }
   }
+
+  private static class ImageUploadRequest implements Comparable<ImageUploadRequest> {
+
+    private final long txId;
+    private final String address;
+
+    public ImageUploadRequest(long txid, String remoteAddr) {
+      this.txId = txid;
+      this.address = remoteAddr;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ImageUploadRequest that = (ImageUploadRequest) o;
+
+      if (txId != that.txId) return false;
+      if (!address.equals(that.address)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (txId ^ (txId >>> 32));
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+
+    @Override public int compareTo(ImageUploadRequest other) {
+      return Long.compare(txId, other.txId);
+    }
+
+    @Override public String toString() {
+      return "ImageRequest{" +
+          "txId=" + txId +
+          ", address='" + address + '\'' +
+          '}';
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
index 84ca7e8..165e8f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
@@ -32,6 +32,7 @@ import javax.servlet.ServletContext;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -353,4 +354,8 @@ public class NameNodeHttpServer {
   public HttpServer2 getHttpServer() {
     return httpServer;
   }
+
+  public static HAServiceProtocol.HAServiceState getNameNodeStateFromContext(ServletContext context) {
+    return getNameNodeFromContext(context).getServiceState();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 9b9da24..65200fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -71,7 +71,33 @@ import org.mortbay.jetty.EofException;
  */
 @InterfaceAudience.Private
 public class TransferFsImage {
-  
+
+  public enum TransferResult{
+    SUCCESS(HttpServletResponse.SC_OK, false),
+    AUTHENTICATION_FAILURE(HttpServletResponse.SC_FORBIDDEN, true),
+    NOT_ACTIVE_NAMENODE_FAILURE(HttpServletResponse.SC_EXPECTATION_FAILED, false),
+    OLD_TRANSACTION_ID_FAILURE(HttpServletResponse.SC_CONFLICT, false),
+    UNEXPECTED_FAILURE(-1, true);
+
+    private final int response;
+    private final boolean shouldReThrowException;
+
+    private TransferResult(int response, boolean rethrow) {
+      this.response = response;
+      this.shouldReThrowException = rethrow;
+    }
+
+    public static TransferResult getResultForCode(int code){
+      TransferResult ret = UNEXPECTED_FAILURE;
+      for(TransferResult result:TransferResult.values()){
+        if(result.response == code){
+          return result;
+        }
+      }
+      return ret;
+    }
+  }
+
   public final static String CONTENT_LENGTH = "Content-Length";
   public final static String FILE_LENGTH = "File-Length";
   public final static String MD5_HEADER = "X-MD5-Digest";
@@ -200,9 +226,9 @@ public class TransferFsImage {
    * @param txid the transaction ID of the image to be uploaded
    * @throws IOException if there is an I/O error
    */
-  public static void uploadImageFromStorage(URL fsName, Configuration conf,
+  public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf,
       NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
-    uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
+    return uploadImageFromStorage(fsName, conf, storage, nnf, txid, null);
   }
 
   /**
@@ -217,7 +243,7 @@ public class TransferFsImage {
    * @param canceler optional canceler to check for abort of upload
    * @throws IOException if there is an I/O error or cancellation
    */
-  public static void uploadImageFromStorage(URL fsName, Configuration conf,
+  public static TransferResult uploadImageFromStorage(URL fsName, Configuration conf,
       NNStorage storage, NameNodeFile nnf, long txid, Canceler canceler)
       throws IOException {
     URL url = new URL(fsName, ImageServlet.PATH_SPEC);
@@ -225,21 +251,18 @@ public class TransferFsImage {
     try {
       uploadImage(url, conf, storage, nnf, txid, canceler);
     } catch (HttpPutFailedException e) {
-      if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
-        // this is OK - this means that a previous attempt to upload
-        // this checkpoint succeeded even though we thought it failed.
-        LOG.info("Image upload with txid " + txid + 
-            " conflicted with a previous image upload to the " +
-            "same NameNode. Continuing...", e);
-        return;
-      } else {
+      // translate the error code to a result, which is a bit more obvious in usage
+      TransferResult result = TransferResult.getResultForCode(e.getResponseCode());
+      if (result.shouldReThrowException) {
         throw e;
       }
+      return result;
     }
     double xferSec = Math.max(
         ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
     LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
         + " in " + xferSec + " seconds");
+    return TransferResult.SUCCESS;
   }
 
   /*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
index f509365..51f4c57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
@@ -23,8 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.net.URL;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -79,10 +79,8 @@ public class BootstrapStandby implements Tool, Configurable {
   private static final Log LOG = LogFactory.getLog(BootstrapStandby.class);
   private String nsId;
   private String nnId;
-  private String otherNNId;
+  private List<RemoteNameNodeInfo> remoteNNs;
 
-  private URL otherHttpAddr;
-  private InetSocketAddress otherIpcAddr;
   private Collection<URI> dirsToFormat;
   private List<URI> editUrisToFormat;
   private List<URI> sharedEditsUris;
@@ -147,8 +145,8 @@ public class BootstrapStandby implements Tool, Configurable {
         + "\twe have enough edits already in the shared directory to start\n"
         + "\tup from the last checkpoint on the active.");
   }
-  
-  private NamenodeProtocol createNNProtocolProxy()
+
+  private NamenodeProtocol createNNProtocolProxy(InetSocketAddress otherIpcAddr)
       throws IOException {
     return NameNodeProxies.createNonHAProxy(getConf(),
         otherIpcAddr, NamenodeProtocol.class,
@@ -157,18 +155,36 @@ public class BootstrapStandby implements Tool, Configurable {
   }
   
   private int doRun() throws IOException {
-    NamenodeProtocol proxy = createNNProtocolProxy();
-    NamespaceInfo nsInfo;
-    boolean isUpgradeFinalized;
-    try {
-      nsInfo = proxy.versionRequest();
-      isUpgradeFinalized = proxy.isUpgradeFinalized();
-    } catch (IOException ioe) {
-      LOG.fatal("Unable to fetch namespace information from active NN at " +
-          otherIpcAddr + ": " + ioe.getMessage());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Full exception trace", ioe);
+    // find the active NN
+    NamenodeProtocol proxy = null;
+    NamespaceInfo nsInfo = null;
+    boolean isUpgradeFinalized = false;
+    RemoteNameNodeInfo proxyInfo = null;
+    for (int i = 0; i < remoteNNs.size(); i++) {
+      proxyInfo = remoteNNs.get(i);
+      InetSocketAddress otherIpcAddress = proxyInfo.getIpcAddress();
+      proxy = createNNProtocolProxy(otherIpcAddress);
+      try {
+        // Get the namespace from any active NN. If you just formatted the primary NN and are
+        // bootstrapping the other NNs from that layout, it will only contact the single NN.
+        // However, if there cluster is already running and you are adding a NN later (e.g.
+        // replacing a failed NN), then this will bootstrap from any node in the cluster.
+        nsInfo = proxy.versionRequest();
+        isUpgradeFinalized = proxy.isUpgradeFinalized();
+        break;
+      } catch (IOException ioe) {
+        LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress
+            + ": " + ioe.getMessage());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Full exception trace", ioe);
+        }
       }
+    }
+
+    if (nsInfo == null) {
+      LOG.fatal(
+          "Unable to fetch namespace information from any remote NN. Possible NameNodes: "
+              + remoteNNs);
       return ERR_CODE_FAILED_CONNECT;
     }
 
@@ -183,9 +199,9 @@ public class BootstrapStandby implements Tool, Configurable {
         "=====================================================\n" +
         "About to bootstrap Standby ID " + nnId + " from:\n" +
         "           Nameservice ID: " + nsId + "\n" +
-        "        Other Namenode ID: " + otherNNId + "\n" +
-        "  Other NN's HTTP address: " + otherHttpAddr + "\n" +
-        "  Other NN's IPC  address: " + otherIpcAddr + "\n" +
+        "        Other Namenode ID: " + proxyInfo.getNameNodeID() + "\n" +
+        "  Other NN's HTTP address: " + proxyInfo.getHttpAddress() + "\n" +
+        "  Other NN's IPC  address: " + proxyInfo.getIpcAddress() + "\n" +
         "             Namespace ID: " + nsInfo.getNamespaceID() + "\n" +
         "            Block pool ID: " + nsInfo.getBlockPoolID() + "\n" +
         "               Cluster ID: " + nsInfo.getClusterID() + "\n" +
@@ -209,7 +225,7 @@ public class BootstrapStandby implements Tool, Configurable {
     }
 
     // download the fsimage from active namenode
-    int download = downloadImage(storage, proxy);
+    int download = downloadImage(storage, proxy, proxyInfo);
     if (download != 0) {
       return download;
     }
@@ -300,7 +316,7 @@ public class BootstrapStandby implements Tool, Configurable {
     }
   }
 
-  private int downloadImage(NNStorage storage, NamenodeProtocol proxy)
+  private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo)
       throws IOException {
     // Load the newly formatted image, using all of the directories
     // (including shared edits)
@@ -322,7 +338,7 @@ public class BootstrapStandby implements Tool, Configurable {
 
       // Download that checkpoint into our storage directories.
       MD5Hash hash = TransferFsImage.downloadImageToStorage(
-          otherHttpAddr, imageTxId, storage, true, true);
+          proxyInfo.getHttpAddress(), imageTxId, storage, true, true);
       image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
           hash);
 
@@ -391,18 +407,26 @@ public class BootstrapStandby implements Tool, Configurable {
       throw new HadoopIllegalArgumentException(
         "Shared edits storage is not enabled for this namenode.");
     }
-    
-    Configuration otherNode = HAUtil.getConfForOtherNode(conf);
-    otherNNId = HAUtil.getNameNodeId(otherNode, nsId);
-    otherIpcAddr = NameNode.getServiceAddress(otherNode, true);
-    Preconditions.checkArgument(otherIpcAddr.getPort() != 0 &&
-        !otherIpcAddr.getAddress().isAnyLocalAddress(),
-        "Could not determine valid IPC address for other NameNode (%s)" +
-        ", got: %s", otherNNId, otherIpcAddr);
-
-    final String scheme = DFSUtil.getHttpClientScheme(conf);
-    otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost(
-        otherIpcAddr.getHostName(), otherNode, scheme).toURL();
+
+
+    remoteNNs = RemoteNameNodeInfo.getRemoteNameNodes(conf, nsId);
+    // validate the configured NNs
+    List<RemoteNameNodeInfo> remove = new ArrayList<RemoteNameNodeInfo>(remoteNNs.size());
+    for (RemoteNameNodeInfo info : remoteNNs) {
+      InetSocketAddress address = info.getIpcAddress();
+      LOG.info("Found nn: " + info.getNameNodeID() + ", ipc: " + info.getIpcAddress());
+      if (address.getPort() == 0 || address.getAddress().isAnyLocalAddress()) {
+        LOG.error("Could not determine valid IPC address for other NameNode ("
+            + info.getNameNodeID() + ") , got: " + address);
+        remove.add(info);
+      }
+    }
+
+    // remove any invalid nns
+    remoteNNs.removeAll(remove);
+
+    // make sure we have at least one left to read
+    Preconditions.checkArgument(!remoteNNs.isEmpty(), "Could not find any valid namenodes!");
 
     dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
     editUrisToFormat = FSNamesystem.getNamespaceEditsDirs(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
index e13b501..73e43b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
@@ -23,6 +23,10 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -33,6 +37,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -71,21 +77,21 @@ import com.google.common.base.Preconditions;
 @InterfaceStability.Evolving
 public class EditLogTailer {
   public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
-  
+
   private final EditLogTailerThread tailerThread;
-  
+
   private final Configuration conf;
   private final FSNamesystem namesystem;
+  private final Iterator<RemoteNameNodeInfo> nnLookup;
   private FSEditLog editLog;
 
-  private InetSocketAddress activeAddr;
-  private NamenodeProtocol cachedActiveProxy = null;
+  private RemoteNameNodeInfo currentNN;
 
   /**
    * The last transaction ID at which an edit log roll was initiated.
    */
   private long lastRollTriggerTxId = HdfsServerConstants.INVALID_TXID;
-  
+
   /**
    * The highest transaction ID loaded by the Standby.
    */
@@ -120,32 +126,68 @@ public class EditLogTailer {
    * available to be read from.
    */
   private final long sleepTimeMs;
-  
+
+  private final int nnCount;
+  private NamenodeProtocol cachedActiveProxy = null;
+  // count of the number of NNs we have attempted in the current lookup loop
+  private int nnLoopCount = 0;
+
+  /**
+   * maximum number of retries we should give each of the remote namenodes before giving up
+   */
+  private int maxRetries;
+
   public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
     this.tailerThread = new EditLogTailerThread();
     this.conf = conf;
     this.namesystem = namesystem;
     this.editLog = namesystem.getEditLog();
-    
+
     lastLoadTimeMs = monotonicNow();
 
     logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
         DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000;
+    List<RemoteNameNodeInfo> nns = Collections.emptyList();
     if (logRollPeriodMs >= 0) {
-      this.activeAddr = getActiveNodeAddress();
-      Preconditions.checkArgument(activeAddr.getPort() > 0,
-          "Active NameNode must have an IPC port configured. " +
-          "Got address '%s'", activeAddr);
-      LOG.info("Will roll logs on active node at " + activeAddr + " every " +
+      try {
+        nns = RemoteNameNodeInfo.getRemoteNameNodes(conf);
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Remote NameNodes not correctly configured!", e);
+      }
+
+      for (RemoteNameNodeInfo info : nns) {
+        // overwrite the socket address, if we need to
+        InetSocketAddress ipc = NameNode.getServiceAddress(info.getConfiguration(), true);
+        // sanity check the ipc address
+        Preconditions.checkArgument(ipc.getPort() > 0,
+            "Active NameNode must have an IPC port configured. " + "Got address '%s'", ipc);
+        info.setIpcAddress(ipc);
+      }
+
+      LOG.info("Will roll logs on active node every " +
           (logRollPeriodMs / 1000) + " seconds.");
     } else {
       LOG.info("Not going to trigger log rolls on active node because " +
           DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY + " is negative.");
     }
-    
+
     sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
         DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000;
 
+    maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY,
+        DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT);
+    if (maxRetries <= 0) {
+      LOG.error("Specified a non-positive number of retries for the number of retries for the " +
+          "namenode connection when manipulating the edit log (" +
+          DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY + "), setting to default: " +
+          DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT);
+      maxRetries = DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT;
+    }
+
+    nnCount = nns.size();
+    // setup the iterator to endlessly loop the nns
+    this.nnLookup = Iterators.cycle(nns);
+
     rollEditsTimeoutMs = conf.getInt(
         DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY,
         DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT) * 1000;
@@ -156,30 +198,11 @@ public class EditLogTailer {
     LOG.debug("logRollPeriodMs=" + logRollPeriodMs +
         " sleepTime=" + sleepTimeMs);
   }
-  
-  private InetSocketAddress getActiveNodeAddress() {
-    Configuration activeConf = HAUtil.getConfForOtherNode(conf);
-    return NameNode.getServiceAddress(activeConf, true);
-  }
-  
-  private NamenodeProtocol getActiveNodeProxy() throws IOException {
-    if (cachedActiveProxy == null) {
-      int rpcTimeout = conf.getInt(
-          DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_KEY,
-          DFSConfigKeys.DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT);
-      NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class,
-          RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf,
-          rpcTimeout, Long.MAX_VALUE);
-      cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy);
-    }
-    assert cachedActiveProxy != null;
-    return cachedActiveProxy;
-  }
 
   public void start() {
     tailerThread.start();
   }
-  
+
   public void stop() throws IOException {
     rollEditsRpcExecutor.shutdown();
     tailerThread.setShouldRun(false);
@@ -191,12 +214,12 @@ public class EditLogTailer {
       throw new IOException(e);
     }
   }
-  
+
   @VisibleForTesting
   FSEditLog getEditLog() {
     return editLog;
   }
-  
+
   @VisibleForTesting
   public void setEditLog(FSEditLog editLog) {
     this.editLog = editLog;
@@ -223,7 +246,7 @@ public class EditLogTailer {
       }
     });
   }
-  
+
   @VisibleForTesting
   void doTailEdits() throws IOException, InterruptedException {
     // Write lock needs to be interruptible here because the 
@@ -235,7 +258,7 @@ public class EditLogTailer {
       FSImage image = namesystem.getFSImage();
 
       long lastTxnId = image.getLastAppliedTxId();
-      
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("lastTxnId: " + lastTxnId);
       }
@@ -253,7 +276,7 @@ public class EditLogTailer {
       if (LOG.isDebugEnabled()) {
         LOG.debug("edit streams to load from: " + streams.size());
       }
-      
+
       // Once we have streams to load, errors encountered are legitimate cause
       // for concern, so we don't catch them here. Simple errors reading from
       // disk are ignored.
@@ -290,19 +313,20 @@ public class EditLogTailer {
    * @return true if the configured log roll period has elapsed.
    */
   private boolean tooLongSinceLastLoad() {
-    return logRollPeriodMs >= 0 && 
+    return logRollPeriodMs >= 0 &&
       (monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ;
   }
 
   /**
+   * NameNodeProxy factory method.
    * @return a Callable to roll logs on remote NameNode.
    */
   @VisibleForTesting
-  Callable<Void> getRollEditsTask() {
-    return new Callable<Void>() {
+  Callable<Void> getNameNodeProxy() {
+    return new MultipleNameNodeProxy<Void>() {
       @Override
-      public Void call() throws IOException {
-        getActiveNodeProxy().rollEditLog();
+      protected Void doWork() throws IOException {
+        cachedActiveProxy.rollEditLog();
         return null;
       }
     };
@@ -316,7 +340,7 @@ public class EditLogTailer {
     LOG.info("Triggering log roll on remote NameNode");
     Future<Void> future = null;
     try {
-      future = rollEditsRpcExecutor.submit(getRollEditsTask());
+      future = rollEditsRpcExecutor.submit(getNameNodeProxy());
       future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
       lastRollTriggerTxId = lastLoadedTxnId;
     } catch (ExecutionException e) {
@@ -347,15 +371,15 @@ public class EditLogTailer {
    */
   private class EditLogTailerThread extends Thread {
     private volatile boolean shouldRun = true;
-    
+
     private EditLogTailerThread() {
       super("Edit log tailer");
     }
-    
+
     private void setShouldRun(boolean shouldRun) {
       this.shouldRun = shouldRun;
     }
-    
+
     @Override
     public void run() {
       SecurityUtil.doAsLoginUserOrFatal(
@@ -367,13 +391,13 @@ public class EditLogTailer {
           }
         });
     }
-    
+
     private void doWork() {
       while (shouldRun) {
         try {
           // There's no point in triggering a log roll if the Standby hasn't
           // read any more transactions since the last time a roll was
-          // triggered. 
+          // triggered.
           if (tooLongSinceLastLoad() &&
               lastRollTriggerTxId < lastLoadedTxnId) {
             triggerActiveLogRoll();
@@ -417,4 +441,78 @@ public class EditLogTailer {
       }
     }
   }
+
+  /**
+   * Manage the 'active namenode proxy'. This cannot just be the a single proxy since we could
+   * failover across a number of NameNodes, rather than just between an active and a standby.
+   * <p>
+   * We - lazily - get a proxy to one of the configured namenodes and attempt to make the request
+   * against it. If it doesn't succeed, either because the proxy failed to be created or the request
+   * failed, we try the next NN in the list. We try this up to the configuration maximum number of
+   * retries before throwing up our hands. A working proxy is retained across attempts since we
+   * expect the active NameNode to switch rarely.
+   * <p>
+   * This mechanism is <b>very bad</b> for cases where we care about being <i>fast</i>; it just
+   * blindly goes and tries namenodes.
+   */
+  private abstract class MultipleNameNodeProxy<T> implements Callable<T> {
+
+    /**
+     * Do the actual work to the remote namenode via the {@link #cachedActiveProxy}.
+     * @return the result of the work, if there is one
+     * @throws IOException if the actions done to the proxy throw an exception.
+     */
+    protected abstract T doWork() throws IOException;
+
+    public T call() throws IOException {
+      while ((cachedActiveProxy = getActiveNodeProxy()) != null) {
+        try {
+          T ret = doWork();
+          // reset the loop count on success
+          nnLoopCount = 0;
+          return ret;
+        } catch (RemoteException e) {
+          Throwable cause = e.unwrapRemoteException(StandbyException.class);
+          // if its not a standby exception, then we need to re-throw it, something bad has happened
+          if (cause == e) {
+            throw e;
+          } else {
+            // it is a standby exception, so we try the other NN
+            LOG.warn("Failed to reach remote node: " + currentNN
+                + ", retrying with remaining remote NNs");
+            cachedActiveProxy = null;
+            // this NN isn't responding to requests, try the next one
+            nnLoopCount++;
+          }
+        }
+      }
+      throw new IOException("Cannot find any valid remote NN to service request!");
+    }
+
+    private NamenodeProtocol getActiveNodeProxy() throws IOException {
+      if (cachedActiveProxy == null) {
+        while (true) {
+          // if we have reached the max loop count, quit by returning null
+          if ((nnLoopCount / nnCount) >= maxRetries) {
+            return null;
+          }
+
+          currentNN = nnLookup.next();
+          try {
+            NamenodeProtocolPB proxy = RPC.waitForProxy(NamenodeProtocolPB.class,
+                RPC.getProtocolVersion(NamenodeProtocolPB.class), currentNN.getIpcAddress(), conf);
+            cachedActiveProxy = new NamenodeProtocolTranslatorPB(proxy);
+            break;
+          } catch (IOException e) {
+            LOG.info("Failed to reach " + currentNN, e);
+            // couldn't even reach this NN, try the next one
+            nnLoopCount++;
+          }
+        }
+      }
+      assert cachedActiveProxy != null;
+      return cachedActiveProxy;
+    }
+  }
 }
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java
new file mode 100644
index 0000000..9a51190
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RemoteNameNodeInfo.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Information about a single remote NameNode
+ */
+public class RemoteNameNodeInfo {
+
+  public static List<RemoteNameNodeInfo> getRemoteNameNodes(Configuration conf) throws IOException {
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    return getRemoteNameNodes(conf, nsId);
+  }
+
+  public static List<RemoteNameNodeInfo> getRemoteNameNodes(Configuration conf, String nsId)
+      throws IOException {
+    // there is only a single NN configured (and no federation) so we don't have any more NNs
+    if (nsId == null) {
+      return Collections.emptyList();
+    }
+    List<Configuration> otherNodes = HAUtil.getConfForOtherNodes(conf);
+    List<RemoteNameNodeInfo> nns = new ArrayList<RemoteNameNodeInfo>();
+
+    for (Configuration otherNode : otherNodes) {
+      String otherNNId = HAUtil.getNameNodeId(otherNode, nsId);
+      // don't do any validation here as in some cases, it can be overwritten later
+      InetSocketAddress otherIpcAddr = NameNode.getServiceAddress(otherNode, true);
+
+
+      final String scheme = DFSUtil.getHttpClientScheme(conf);
+      URL otherHttpAddr = DFSUtil.getInfoServerWithDefaultHost(otherIpcAddr.getHostName(),
+          otherNode, scheme).toURL();
+
+      nns.add(new RemoteNameNodeInfo(otherNode, otherNNId, otherIpcAddr, otherHttpAddr));
+    }
+    return nns;
+  }
+
+  private final Configuration conf;
+  private final String nnId;
+  private InetSocketAddress ipcAddress;
+  private final URL httpAddress;
+
+  private RemoteNameNodeInfo(Configuration conf, String nnId, InetSocketAddress ipcAddress,
+      URL httpAddress) {
+    this.conf = conf;
+    this.nnId = nnId;
+    this.ipcAddress = ipcAddress;
+    this.httpAddress = httpAddress;
+  }
+
+  public InetSocketAddress getIpcAddress() {
+    return this.ipcAddress;
+  }
+
+  public String getNameNodeID() {
+    return this.nnId;
+  }
+
+  public URL getHttpAddress() {
+    return this.httpAddress;
+  }
+
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
+
+  public void setIpcAddress(InetSocketAddress ipc) {
+    this.ipcAddress = ipc;
+  }
+
+  @Override
+  public String toString() {
+    return "RemoteNameNodeInfo [nnId=" + nnId + ", ipcAddress=" + ipcAddress
+        + ", httpAddress=" + httpAddress + "]";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RemoteNameNodeInfo that = (RemoteNameNodeInfo) o;
+
+    if (!nnId.equals(that.nnId)) return false;
+    if (!ipcAddress.equals(that.ipcAddress)) return false;
+    // convert to the standard strings since URL.equals does address resolution, which is a
+    // blocking call and a a FindBugs issue.
+    String httpString = httpAddress.toString();
+    String thatHttpString  = that.httpAddress.toString();
+    return httpString.equals(thatHttpString);
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = nnId.hashCode();
+    result = 31 * result + ipcAddress.hashCode();
+    // toString rather than hashCode b/c Url.hashCode is a blocking call.
+    result = 31 * result + httpAddress.toString().hashCode();
+    return result;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
index c5c2ef3..b2f2cac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
@@ -23,12 +23,10 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
 import java.security.PrivilegedAction;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +43,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.hdfs.util.Canceler;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -68,12 +67,13 @@ public class StandbyCheckpointer {
   private long lastCheckpointTime;
   private final CheckpointerThread thread;
   private final ThreadFactory uploadThreadFactory;
-  private URL activeNNAddress;
+  private List<URL> activeNNAddresses;
   private URL myNNAddress;
 
   private final Object cancelLock = new Object();
   private Canceler canceler;
-  
+  private boolean isPrimaryCheckPointer = true;
+
   // Keep track of how many checkpoints were canceled.
   // This is for use in tests.
   private static int canceledCount = 0;
@@ -100,14 +100,21 @@ public class StandbyCheckpointer {
     myNNAddress = getHttpAddress(conf);
 
     // Look up the active node's address
-    Configuration confForActive = HAUtil.getConfForOtherNode(conf);
-    activeNNAddress = getHttpAddress(confForActive);
-    
+    List<Configuration> confForActive = HAUtil.getConfForOtherNodes(conf);
+    activeNNAddresses = new ArrayList<URL>(confForActive.size());
+    for (Configuration activeConf : confForActive) {
+      URL activeNNAddress = getHttpAddress(activeConf);
+
+      // sanity check each possible active NN
+      Preconditions.checkArgument(checkAddress(activeNNAddress),
+          "Bad address for active NN: %s", activeNNAddress);
+
+      activeNNAddresses.add(activeNNAddress);
+    }
+
     // Sanity-check.
-    Preconditions.checkArgument(checkAddress(activeNNAddress),
-        "Bad address for active NN: %s", activeNNAddress);
-    Preconditions.checkArgument(checkAddress(myNNAddress),
-        "Bad address for standby NN: %s", myNNAddress);
+    Preconditions.checkArgument(checkAddress(myNNAddress), "Bad address for standby NN: %s",
+        myNNAddress);
   }
   
   private URL getHttpAddress(Configuration conf) throws IOException {
@@ -127,7 +134,7 @@ public class StandbyCheckpointer {
 
   public void start() {
     LOG.info("Starting standby checkpoint thread...\n" +
-        "Checkpointing active NN at " + activeNNAddress + "\n" +
+        "Checkpointing active NN to possible NNs: " + activeNNAddresses + "\n" +
         "Serving checkpoints at " + myNNAddress);
     thread.start();
   }
@@ -148,11 +155,10 @@ public class StandbyCheckpointer {
     thread.interrupt();
   }
 
-  private void doCheckpoint() throws InterruptedException, IOException {
+  private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
     assert canceler != null;
     final long txid;
     final NameNodeFile imageType;
-    
     // Acquire cpLock to make sure no one is modifying the name system.
     // It does not need the full namesystem write lock, since the only thing
     // that modifies namesystem on standby node is edit log replaying.
@@ -161,9 +167,9 @@ public class StandbyCheckpointer {
       assert namesystem.getEditLog().isOpenForRead() :
         "Standby Checkpointer should only attempt a checkpoint when " +
         "NN is in standby mode, but the edit logs are in an unexpected state";
-      
+
       FSImage img = namesystem.getFSImage();
-      
+
       long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
       long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();
       assert thisCheckpointTxId >= prevCheckpointTxId;
@@ -185,7 +191,7 @@ public class StandbyCheckpointer {
       img.saveNamespace(namesystem, imageType, canceler);
       txid = img.getStorage().getMostRecentCheckpointTxId();
       assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
-        thisCheckpointTxId + " but instead saved at txid=" + txid;
+          thisCheckpointTxId + " but instead saved at txid=" + txid;
 
       // Save the legacy OIV image, if the output dir is defined.
       String outputDir = checkpointConf.getLegacyOivImageDir();
@@ -200,31 +206,85 @@ public class StandbyCheckpointer {
     } finally {
       namesystem.cpUnlock();
     }
-    
+
+    //early exit if we shouldn't actually send the checkpoint to the ANN
+    if(!sendCheckpoint){
+      return;
+    }
+
     // Upload the saved checkpoint back to the active
-    // Do this in a separate thread to avoid blocking transition to active
+    // Do this in a separate thread to avoid blocking transition to active, but don't allow more
+    // than the expected number of tasks to run or queue up
     // See HDFS-4816
-    ExecutorService executor =
-        Executors.newSingleThreadExecutor(uploadThreadFactory);
-    Future<Void> upload = executor.submit(new Callable<Void>() {
-      @Override
-      public Void call() throws IOException {
-        TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
-            namesystem.getFSImage().getStorage(), imageType, txid, canceler);
-        return null;
+    ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
+        uploadThreadFactory);
+    // for right now, just match the upload to the nn address by convention. There is no need to
+    // directly tie them together by adding a pair class.
+    List<Future<TransferFsImage.TransferResult>> uploads =
+        new ArrayList<Future<TransferFsImage.TransferResult>>();
+    for (final URL activeNNAddress : activeNNAddresses) {
+      Future<TransferFsImage.TransferResult> upload =
+          executor.submit(new Callable<TransferFsImage.TransferResult>() {
+            @Override
+            public TransferFsImage.TransferResult call() throws IOException {
+              return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
+                  .getFSImage().getStorage(), imageType, txid, canceler);
+            }
+          });
+      uploads.add(upload);
+    }
+    InterruptedException ie = null;
+    IOException ioe= null;
+    int i = 0;
+    boolean success = false;
+    for (; i < uploads.size(); i++) {
+      Future<TransferFsImage.TransferResult> upload = uploads.get(i);
+      try {
+        // TODO should there be some smarts here about retries nodes that are not the active NN?
+        if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
+          success = true;
+          //avoid getting the rest of the results - we don't care since we had a successful upload
+          break;
+        }
+
+      } catch (ExecutionException e) {
+        ioe = new IOException("Exception during image upload: " + e.getMessage(),
+            e.getCause());
+        break;
+      } catch (InterruptedException e) {
+        ie = e;
+        break;
+      }
+    }
+
+    // we are primary if we successfully updated the ANN
+    this.isPrimaryCheckPointer = success;
+
+    // cleaner than copying code for multiple catch statements and better than catching all
+    // exceptions, so we just handle the ones we expect.
+    if (ie != null || ioe != null) {
+
+      // cancel the rest of the tasks, and close the pool
+      for (; i < uploads.size(); i++) {
+        Future<TransferFsImage.TransferResult> upload = uploads.get(i);
+        // The background thread may be blocked waiting in the throttler, so
+        // interrupt it.
+        upload.cancel(true);
+      }
+
+      // shutdown so we interrupt anything running and don't start anything new
+      executor.shutdownNow();
+      // this is a good bit longer than the thread timeout, just to make sure all the threads
+      // that are not doing any work also stop
+      executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+
+      // re-throw the exception we got, since one of these two must be non-null
+      if (ie != null) {
+        throw ie;
+      } else if (ioe != null) {
+        throw ioe;
       }
-    });
-    executor.shutdown();
-    try {
-      upload.get();
-    } catch (InterruptedException e) {
-      // The background thread may be blocked waiting in the throttler, so
-      // interrupt it.
-      upload.cancel(true);
-      throw e;
-    } catch (ExecutionException e) {
-      throw new IOException("Exception during image upload: " + e.getMessage(),
-          e.getCause());
     }
   }
   
@@ -327,8 +387,10 @@ public class StandbyCheckpointer {
           final long now = monotonicNow();
           final long uncheckpointed = countUncheckpointedTxns();
           final long secsSinceLast = (now - lastCheckpointTime) / 1000;
-          
+
+          // if we need a rollback checkpoint, always attempt to checkpoint
           boolean needCheckpoint = needRollbackCheckpoint;
+
           if (needCheckpoint) {
             LOG.info("Triggering a rollback fsimage for rolling upgrade.");
           } else if (uncheckpointed >= checkpointConf.getTxnCount()) {
@@ -343,19 +405,23 @@ public class StandbyCheckpointer {
                 "exceeds the configured interval " + checkpointConf.getPeriod());
             needCheckpoint = true;
           }
-          
-          synchronized (cancelLock) {
-            if (now < preventCheckpointsUntil) {
-              LOG.info("But skipping this checkpoint since we are about to failover!");
-              canceledCount++;
-              continue;
-            }
-            assert canceler == null;
-            canceler = new Canceler();
-          }
-          
+
           if (needCheckpoint) {
-            doCheckpoint();
+            synchronized (cancelLock) {
+              if (now < preventCheckpointsUntil) {
+                LOG.info("But skipping this checkpoint since we are about to failover!");
+                canceledCount++;
+                continue;
+              }
+              assert canceler == null;
+              canceler = new Canceler();
+            }
+
+            // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
+            // rollback request, are the checkpointer, are outside the quiet period.
+            boolean sendRequest = isPrimaryCheckPointer || secsSinceLast >= checkpointConf.getQuietPeriod();
+            doCheckpoint(sendRequest);
+
             // reset needRollbackCheckpoint to false only when we finish a ckpt
             // for rollback image
             if (needRollbackCheckpoint
@@ -384,7 +450,7 @@ public class StandbyCheckpointer {
   }
 
   @VisibleForTesting
-  URL getActiveNNAddress() {
-    return activeNNAddress;
+  List<URL> getActiveNNAddresses() {
+    return activeNNAddresses;
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
index 7749164..e0a4f70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
@@ -263,4 +265,15 @@ public class DFSZKFailoverController extends ZKFailoverController {
     return isThreadDumpCaptured;
   }
 
+  @Override
+  public List<HAServiceTarget> getAllOtherNodes() {
+    String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+    List<String> otherNn = HAUtil.getNameNodeIdOfOtherNodes(conf, nsId);
+
+    List<HAServiceTarget> targets = new ArrayList<HAServiceTarget>(otherNn.size());
+    for (String nnId : otherNn) {
+      targets.add(new NNHAServiceTarget(conf, nsId, nnId));
+    }
+    return targets;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 0ce3bd9..3a08d6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1150,6 +1150,18 @@
 </property>
 
 <property>
+  <name>dfs.namenode.checkpoint.check.quiet-multiplier</name>
+  <value>1.5</value>
+  <description>
+    Used to calculate the amount of time between retries when in the 'quiet' period
+    for creating checkpoints (active namenode already has an up-to-date image from another
+    checkpointer), so we wait a multiplier of the dfs.namenode.checkpoint.check.period before
+    retrying the checkpoint because another node likely is already managing the checkpoints,
+    allowing us to save bandwidth to transfer checkpoints that don't need to be used.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.num.checkpoints.retained</name>
   <value>2</value>
   <description>The number of image checkpoint files (fsimage_*) that will be retained by
@@ -1565,6 +1577,14 @@
 </property>
 
 <property>
+  <name>dfs.ha.tail-edits.namenode-retries</name>
+  <value>3</value>
+  <description>
+    Number of retries to use when contacting the namenode when tailing the log.
+  </description>
+</property>
+
+<property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>false</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 63348de..d30c2bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -64,6 +64,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
 import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -488,7 +490,7 @@ public class MiniDFSCluster implements AutoCloseable {
     final int numNameNodes = builder.nnTopology.countNameNodes();
     LOG.info("starting cluster: numNameNodes=" + numNameNodes
         + ", numDataNodes=" + builder.numDataNodes);
-    nameNodes = new NameNodeInfo[numNameNodes];
+
     this.storagesPerDatanode = builder.storagesPerDatanode;
 
     // Duplicate the storageType setting for each DN.
@@ -559,7 +561,7 @@ public class MiniDFSCluster implements AutoCloseable {
   }
 
   private Configuration conf;
-  private NameNodeInfo[] nameNodes;
+  private Multimap<String, NameNodeInfo> namenodes = ArrayListMultimap.create();
   protected int numDataNodes;
   protected final ArrayList<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
@@ -585,10 +587,10 @@ public class MiniDFSCluster implements AutoCloseable {
    * Stores the information related to a namenode in the cluster
    */
   public static class NameNodeInfo {
-    final NameNode nameNode;
-    final Configuration conf;
-    final String nameserviceId;
-    final String nnId;
+    public NameNode nameNode;
+    Configuration conf;
+    String nameserviceId;
+    String nnId;
     StartupOption startOpt;
     NameNodeInfo(NameNode nn, String nameserviceId, String nnId,
         StartupOption startOpt, Configuration conf) {
@@ -617,7 +619,6 @@ public class MiniDFSCluster implements AutoCloseable {
    * without a name node (ie when the name node is started elsewhere).
    */
   public MiniDFSCluster() {
-    nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
     storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
     synchronized (MiniDFSCluster.class) {
       instanceId = instanceCount++;
@@ -792,7 +793,6 @@ public class MiniDFSCluster implements AutoCloseable {
                         StartupOption operation,
                         String[] racks, String hosts[],
                         long[] simulatedCapacities) throws IOException {
-    this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
     this.storagesPerDatanode = DEFAULT_STORAGES_PER_DATANODE;
     initMiniDFSCluster(conf, numDataNodes, null, format,
                        manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
@@ -883,7 +883,7 @@ public class MiniDFSCluster implements AutoCloseable {
         createNameNodesAndSetConf(
             nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
             enableManagedDfsDirsRedundancy,
-            format, startOpt, clusterId, conf);
+            format, startOpt, clusterId);
       } catch (IOException ioe) {
         LOG.error("IOE creating namenodes. Permissions dump:\n" +
             createPermissionsDiagnosisString(data_dir), ioe);
@@ -915,9 +915,9 @@ public class MiniDFSCluster implements AutoCloseable {
       }
     }
 
-    for (NameNodeInfo nn : nameNodes) {
+    for (NameNodeInfo nn : namenodes.values()) {
       Configuration nnConf = nn.conf;
-      for (NameNodeInfo nnInfo : nameNodes) {
+      for (NameNodeInfo nnInfo : namenodes.values()) {
         if (nn.equals(nnInfo)) {
           continue;
         }
@@ -975,7 +975,125 @@ public class MiniDFSCluster implements AutoCloseable {
   private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
       boolean manageNameDfsDirs, boolean manageNameDfsSharedDirs,
       boolean enableManagedDfsDirsRedundancy, boolean format,
+      StartupOption operation, String clusterId) throws IOException {
+    // do the basic namenode configuration
+    configureNameNodes(nnTopology, federation, conf);
+
+    int nnCounter = 0;
+    int nsCounter = 0;
+    // configure each NS independently
+    for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
+      configureNameService(nameservice, nsCounter++, manageNameDfsSharedDirs,
+          manageNameDfsDirs, enableManagedDfsDirsRedundancy,
+          format, operation, clusterId, nnCounter);
+      nnCounter += nameservice.getNNs().size();
+    }
+  }
+
+  /**
+   * Do the rest of the NN configuration for things like shared edits,
+   * as well as directory formatting, etc. for a single nameservice
+   * @param nnCounter the count of the number of namenodes already configured/started. Also,
+   *                  acts as the <i>index</i> to the next NN to start (since indicies start at 0).
+   * @throws IOException
+   */
+  private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCounter,
+      boolean manageNameDfsSharedDirs, boolean manageNameDfsDirs, boolean
+      enableManagedDfsDirsRedundancy, boolean format,
       StartupOption operation, String clusterId,
+      final int nnCounter) throws IOException{
+    String nsId = nameservice.getId();
+    String lastDefaultFileSystem = null;
+
+    // If HA is enabled on this nameservice, enumerate all the namenodes
+    // in the configuration. Also need to set a shared edits dir
+    int numNNs = nameservice.getNNs().size();
+    if (numNNs > 1 && manageNameDfsSharedDirs) {
+      URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter + numNNs - 1);
+      conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
+      // Clean out the shared edits dir completely, including all subdirectories.
+      FileUtil.fullyDelete(new File(sharedEditsUri));
+    }
+
+    // Now format first NN and copy the storage directory from that node to the others.
+    int nnIndex = nnCounter;
+    Collection<URI> prevNNDirs = null;
+    for (NNConf nn : nameservice.getNNs()) {
+      initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
+          manageNameDfsDirs,  nnIndex);
+      Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
+      if (format) {
+        // delete the existing namespaces
+        for (URI nameDirUri : namespaceDirs) {
+          File nameDir = new File(nameDirUri);
+          if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
+            throw new IOException("Could not fully delete " + nameDir);
+          }
+        }
+
+        // delete the checkpoint directories, if they exist
+        Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
+            .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
+        for (URI checkpointDirUri : checkpointDirs) {
+          File checkpointDir = new File(checkpointDirUri);
+          if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
+            throw new IOException("Could not fully delete " + checkpointDir);
+          }
+        }
+      }
+
+      boolean formatThisOne = format;
+      // if we are looking at not the first NN
+      if (nnIndex++ > nnCounter && format) {
+        // Don't format the second, third, etc NN in an HA setup - that
+        // would result in it having a different clusterID,
+        // block pool ID, etc. Instead, copy the name dirs
+        // from the previous one.
+        formatThisOne = false;
+        assert (null != prevNNDirs);
+        copyNameDirs(prevNNDirs, namespaceDirs, conf);
+      }
+
+      if (formatThisOne) {
+        // Allow overriding clusterID for specific NNs to test
+        // misconfiguration.
+        if (nn.getClusterId() == null) {
+          StartupOption.FORMAT.setClusterId(clusterId);
+        } else {
+          StartupOption.FORMAT.setClusterId(nn.getClusterId());
+        }
+        DFSTestUtil.formatNameNode(conf);
+      }
+      prevNNDirs = namespaceDirs;
+    }
+
+    // create all the namenodes in the namespace
+    nnIndex = nnCounter;
+    for (NNConf nn : nameservice.getNNs()) {
+      Configuration hdfsConf = new Configuration(conf);
+      initNameNodeConf(hdfsConf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
+          enableManagedDfsDirsRedundancy, nnIndex++);
+      createNameNode(hdfsConf, false, operation,
+          clusterId, nsId, nn.getNnId());
+
+      // Record the last namenode uri
+      lastDefaultFileSystem = hdfsConf.get(FS_DEFAULT_NAME_KEY);
+    }
+    if (!federation && lastDefaultFileSystem != null) {
+      // Set the default file system to the actual bind address of NN.
+      conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem);
+    }
+  }
+
+  /**
+   * Do the basic NN configuration for the topology. Does not configure things like the shared
+   * edits directories
+   * @param nnTopology
+   * @param federation
+   * @param conf
+   * @throws IOException
+   */
+  public static void configureNameNodes(MiniDFSNNTopology nnTopology, boolean federation,
       Configuration conf) throws IOException {
     Preconditions.checkArgument(nnTopology.countNameNodes() > 0,
         "empty NN topology: no namenodes specified!");
@@ -988,22 +1106,21 @@ public class MiniDFSCluster implements AutoCloseable {
       // NN is started.
       conf.set(FS_DEFAULT_NAME_KEY, "hdfs://127.0.0.1:" + onlyNN.getIpcPort());
     }
-    
+
     List<String> allNsIds = Lists.newArrayList();
     for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
       if (nameservice.getId() != null) {
         allNsIds.add(nameservice.getId());
       }
     }
+
     if (!allNsIds.isEmpty()) {
       conf.set(DFS_NAMESERVICES, Joiner.on(",").join(allNsIds));
     }
-    
-    int nnCounter = 0;
+
     for (MiniDFSNNTopology.NSConf nameservice : nnTopology.getNameservices()) {
       String nsId = nameservice.getId();
-      String lastDefaultFileSystem = null;
-      
+
       Preconditions.checkArgument(
           !federation || nsId != null,
           "if there is more than one NS, they must have names");
@@ -1022,83 +1139,10 @@ public class MiniDFSCluster implements AutoCloseable {
       // If HA is enabled on this nameservice, enumerate all the namenodes
       // in the configuration. Also need to set a shared edits dir
       if (nnIds.size() > 1) {
-        conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()),
-            Joiner.on(",").join(nnIds));
-        if (manageNameDfsSharedDirs) {
-          URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); 
-          conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
-          // Clean out the shared edits dir completely, including all subdirectories.
-          FileUtil.fullyDelete(new File(sharedEditsUri));
-        }
-      }
-
-      // Now format first NN and copy the storage directory from that node to the others.
-      int i = 0;
-      Collection<URI> prevNNDirs = null;
-      int nnCounterForFormat = nnCounter;
-      for (NNConf nn : nameservice.getNNs()) {
-        initNameNodeConf(conf, nsId, nn.getNnId(), manageNameDfsDirs,
-            enableManagedDfsDirsRedundancy, nnCounterForFormat);
-        Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
-        if (format) {
-          for (URI nameDirUri : namespaceDirs) {
-            File nameDir = new File(nameDirUri);
-            if (nameDir.exists() && !FileUtil.fullyDelete(nameDir)) {
-              throw new IOException("Could not fully delete " + nameDir);
-            }
-          }
-          Collection<URI> checkpointDirs = Util.stringCollectionAsURIs(conf
-              .getTrimmedStringCollection(DFS_NAMENODE_CHECKPOINT_DIR_KEY));
-          for (URI checkpointDirUri : checkpointDirs) {
-            File checkpointDir = new File(checkpointDirUri);
-            if (checkpointDir.exists() && !FileUtil.fullyDelete(checkpointDir)) {
-              throw new IOException("Could not fully delete " + checkpointDir);
-            }
-          }
-        }
-        
-        boolean formatThisOne = format;
-        if (format && i++ > 0) {
-          // Don't format the second NN in an HA setup - that
-          // would result in it having a different clusterID,
-          // block pool ID, etc. Instead, copy the name dirs
-          // from the first one.
-          formatThisOne = false;
-          assert (null != prevNNDirs);
-          copyNameDirs(prevNNDirs, namespaceDirs, conf);
-        }
-        
-        nnCounterForFormat++;
-        if (formatThisOne) {
-          // Allow overriding clusterID for specific NNs to test
-          // misconfiguration.
-          if (nn.getClusterId() == null) {
-            StartupOption.FORMAT.setClusterId(clusterId);
-          } else {
-            StartupOption.FORMAT.setClusterId(nn.getClusterId());
-          }
-          DFSTestUtil.formatNameNode(conf);
-        }
-        prevNNDirs = namespaceDirs;
-      }
-
-      // Start all Namenodes
-      for (NNConf nn : nameservice.getNNs()) {
-        Configuration hdfsConf = new Configuration(conf);
-        initNameNodeConf(hdfsConf, nsId, nn.getNnId(), manageNameDfsDirs,
-            enableManagedDfsDirsRedundancy, nnCounter);
-        createNameNode(nnCounter, hdfsConf, numDataNodes, false, operation,
-            clusterId, nsId, nn.getNnId());
-        // Record the last namenode uri
-        lastDefaultFileSystem = hdfsConf.get(FS_DEFAULT_NAME_KEY);
-        nnCounter++;
-      }
-      if (!federation && lastDefaultFileSystem != null) {
-        // Set the default file system to the actual bind address of NN.
-        conf.set(FS_DEFAULT_NAME_KEY, lastDefaultFileSystem);
+        conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, nameservice.getId()), Joiner
+            .on(",").join(nnIds));
       }
     }
-
   }
   
   public URI getSharedEditsDir(int minNN, int maxNN) throws IOException {
@@ -1112,39 +1156,92 @@ public class MiniDFSCluster implements AutoCloseable {
   }
   
   public NameNodeInfo[] getNameNodeInfos() {
-    return this.nameNodes;
+    return this.namenodes.values().toArray(new NameNodeInfo[0]);
   }
 
-  private void initNameNodeConf(Configuration conf,
-      String nameserviceId, String nnId,
-      boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy,
-      int nnIndex) throws IOException {
+  /**
+   * @param nsIndex index of the namespace id to check
+   * @return all the namenodes bound to the given namespace index
+   */
+  public NameNodeInfo[] getNameNodeInfos(int nsIndex) {
+    int i = 0;
+    for (String ns : this.namenodes.keys()) {
+      if (i++ == nsIndex) {
+        return this.namenodes.get(ns).toArray(new NameNodeInfo[0]);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * @param nameservice id of nameservice to read
+   * @return all the namenodes bound to the given namespace index
+   */
+  public NameNodeInfo[] getNameNodeInfos(String nameservice) {
+    for (String ns : this.namenodes.keys()) {
+      if (nameservice.equals(ns)) {
+        return this.namenodes.get(ns).toArray(new NameNodeInfo[0]);
+      }
+    }
+    return null;
+  }
+
+
+  private void initNameNodeConf(Configuration conf, String nameserviceId, int nsIndex, String nnId,
+      boolean manageNameDfsDirs, boolean enableManagedDfsDirsRedundancy, int nnIndex)
+      throws IOException {
     if (nameserviceId != null) {
       conf.set(DFS_NAMESERVICE_ID, nameserviceId);
     }
     if (nnId != null) {
       conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
     }
-    
     if (manageNameDfsDirs) {
       if (enableManagedDfsDirsRedundancy) {
-        conf.set(DFS_NAMENODE_NAME_DIR_KEY,
-            fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
-            fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
-        conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
-            fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
-            fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
+        File[] files = getNameNodeDirectory(nsIndex, nnIndex);
+        conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1]));
+        files = getCheckpointDirectory(nsIndex, nnIndex);
+        conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]) + "," + fileAsURI(files[1]));
       } else {
-        conf.set(DFS_NAMENODE_NAME_DIR_KEY,
-            fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1))).
-              toString());
-        conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY,
-            fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1))).
-              toString());
+        File[] files = getNameNodeDirectory(nsIndex, nnIndex);
+        conf.set(DFS_NAMENODE_NAME_DIR_KEY, fileAsURI(files[0]).toString());
+        files = getCheckpointDirectory(nsIndex, nnIndex);
+        conf.set(DFS_NAMENODE_CHECKPOINT_DIR_KEY, fileAsURI(files[0]).toString());
       }
     }
   }
 
+  private File[] getNameNodeDirectory(int nameserviceIndex, int nnIndex) {
+    return getNameNodeDirectory(base_dir, nameserviceIndex, nnIndex);
+  }
+
+  public static File[] getNameNodeDirectory(String base_dir, int nsIndex, int nnIndex) {
+    return getNameNodeDirectory(new File(base_dir), nsIndex, nnIndex);
+  }
+
+  public static File[] getNameNodeDirectory(File base_dir, int nsIndex, int nnIndex) {
+    File[] files = new File[2];
+    files[0] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 1));
+    files[1] = new File(base_dir, "name-" + nsIndex + "-" + (2 * nnIndex + 2));
+    return files;
+  }
+
+  public File[] getCheckpointDirectory(int nsIndex, int nnIndex) {
+    return getCheckpointDirectory(base_dir, nsIndex, nnIndex);
+  }
+
+  public static File[] getCheckpointDirectory(String base_dir, int nsIndex, int nnIndex) {
+    return getCheckpointDirectory(new File(base_dir), nsIndex, nnIndex);
+  }
+
+  public static File[] getCheckpointDirectory(File base_dir, int nsIndex, int nnIndex) {
+    File[] files = new File[2];
+    files[0] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 1));
+    files[1] = new File(base_dir, "namesecondary-" + nsIndex + "-" + (2 * nnIndex + 2));
+    return files;
+  }
+
+
   public static void copyNameDirs(Collection<URI> srcDirs, Collection<URI> dstDirs,
       Configuration dstConf) throws IOException {
     URI srcDir = Lists.newArrayList(srcDirs).get(0);
@@ -1197,10 +1294,8 @@ public class MiniDFSCluster implements AutoCloseable {
     return args;
   }
   
-  private void createNameNode(int nnIndex, Configuration hdfsConf,
-      int numDataNodes, boolean format, StartupOption operation,
-      String clusterId, String nameserviceId,
-      String nnId)
+  private void createNameNode(Configuration hdfsConf, boolean format, StartupOption operation,
+      String clusterId, String nameserviceId, String nnId)
       throws IOException {
     // Format and clean out DataNode directories
     if (format) {
@@ -1237,8 +1332,9 @@ public class MiniDFSCluster implements AutoCloseable {
     copyKeys(hdfsConf, conf, nameserviceId, nnId);
     DFSUtil.setGenericConf(hdfsConf, nameserviceId, nnId,
         DFS_NAMENODE_HTTP_ADDRESS_KEY);
-    nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId,
+    NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId,
         operation, hdfsConf);
+    namenodes.put(nameserviceId, info);
   }
 
   /**
@@ -1254,7 +1350,7 @@ public class MiniDFSCluster implements AutoCloseable {
    */
   public URI getURI(int nnIndex) {
     String hostPort =
-        nameNodes[nnIndex].nameNode.getNameNodeAddressHostPortString();
+        getNN(nnIndex).nameNode.getNameNodeAddressHostPortString();
     URI uri = null;
     try {
       uri = new URI("hdfs://" + hostPort);
@@ -1272,9 +1368,21 @@ public class MiniDFSCluster implements AutoCloseable {
    * @return Configuration of for the given namenode
    */
   public Configuration getConfiguration(int nnIndex) {
-    return nameNodes[nnIndex].conf;
+    return getNN(nnIndex).conf;
+  }
+
+  private NameNodeInfo getNN(int nnIndex) {
+    int count = 0;
+    for (NameNodeInfo nn : namenodes.values()) {
+      if (count == nnIndex) {
+        return nn;
+      }
+      count++;
+    }
+    return null;
   }
 
+
   /**
    * wait for the given namenode to get out of safemode.
    */
@@ -1685,7 +1793,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * @throws Exception
    */
   public void finalizeCluster(int nnIndex, Configuration conf) throws Exception {
-    finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf);
+    finalizeNamenode(getNN(nnIndex).nameNode, getNN(nnIndex).conf);
   }
 
   /**
@@ -1696,7 +1804,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * @throws IllegalStateException if the Namenode is not running.
    */
   public void finalizeCluster(Configuration conf) throws Exception {
-    for (NameNodeInfo nnInfo : nameNodes) {
+    for (NameNodeInfo nnInfo : namenodes.values()) {
       if (nnInfo == null) {
         throw new IllegalStateException("Attempting to finalize "
             + "Namenode but it is not running");
@@ -1704,9 +1812,9 @@ public class MiniDFSCluster implements AutoCloseable {
       finalizeNamenode(nnInfo.nameNode, nnInfo.conf);
     }
   }
-  
+
   public int getNumNameNodes() {
-    return nameNodes.length;
+    return namenodes.size();
   }
   
   /**
@@ -1736,7 +1844,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * Gets the NameNode for the index.  May be null.
    */
   public NameNode getNameNode(int nnIndex) {
-    return nameNodes[nnIndex].nameNode;
+    return getNN(nnIndex).nameNode;
   }
   
   /**
@@ -1745,11 +1853,11 @@ public class MiniDFSCluster implements AutoCloseable {
    */
   public FSNamesystem getNamesystem() {
     checkSingleNameNode();
-    return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode);
+    return NameNodeAdapter.getNamesystem(getNN(0).nameNode);
   }
-  
+
   public FSNamesystem getNamesystem(int nnIndex) {
-    return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode);
+    return NameNodeAdapter.getNamesystem(getNN(nnIndex).nameNode);
   }
 
   /**
@@ -1811,14 +1919,14 @@ public class MiniDFSCluster implements AutoCloseable {
    * caller supplied port is not necessarily the actual port used.
    */     
   public int getNameNodePort(int nnIndex) {
-    return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort();
+    return getNN(nnIndex).nameNode.getNameNodeAddress().getPort();
   }
 
   /**
    * @return the service rpc port used by the NameNode at the given index.
    */     
   public int getNameNodeServicePort(int nnIndex) {
-    return nameNodes[nnIndex].nameNode.getServiceRpcAddress().getPort();
+    return getNN(nnIndex).nameNode.getServiceRpcAddress().getPort();
   }
     
   /**
@@ -1859,7 +1967,7 @@ public class MiniDFSCluster implements AutoCloseable {
       fileSystems.clear();
     }
     shutdownDataNodes();
-    for (NameNodeInfo nnInfo : nameNodes) {
+    for (NameNodeInfo nnInfo : namenodes.values()) {
       if (nnInfo == null) continue;
       stopAndJoinNameNode(nnInfo.nameNode);
     }
@@ -1897,7 +2005,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * Shutdown all the namenodes.
    */
   public synchronized void shutdownNameNodes() {
-    for (int i = 0; i < nameNodes.length; i++) {
+    for (int i = 0; i < namenodes.size(); i++) {
       shutdownNameNode(i);
     }
   }
@@ -1906,11 +2014,13 @@ public class MiniDFSCluster implements AutoCloseable {
    * Shutdown the namenode at a given index.
    */
   public synchronized void shutdownNameNode(int nnIndex) {
-    NameNode nn = nameNodes[nnIndex].nameNode;
+    NameNodeInfo info = getNN(nnIndex);
+    NameNode nn = info.nameNode;
     if (nn != null) {
       stopAndJoinNameNode(nn);
-      Configuration conf = nameNodes[nnIndex].conf;
-      nameNodes[nnIndex] = new NameNodeInfo(null, null, null, null, conf);
+      info.nnId = null;
+      info.nameNode = null;
+      info.nameserviceId = null;
     }
   }
 
@@ -1931,7 +2041,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * Restart all namenodes.
    */
   public synchronized void restartNameNodes() throws IOException {
-    for (int i = 0; i < nameNodes.length; i++) {
+    for (int i = 0; i < namenodes.size(); i++) {
       restartNameNode(i, false);
     }
     waitActive();
@@ -1967,19 +2077,19 @@ public class MiniDFSCluster implements AutoCloseable {
    */
   public synchronized void restartNameNode(int nnIndex, boolean waitActive,
       String... args) throws IOException {
-    String nameserviceId = nameNodes[nnIndex].nameserviceId;
-    String nnId = nameNodes[nnIndex].nnId;
-    StartupOption startOpt = nameNodes[nnIndex].startOpt;
-    Configuration conf = nameNodes[nnIndex].conf;
+    NameNodeInfo info = getNN(nnIndex);
+    StartupOption startOpt = info.startOpt;
+
     shutdownNameNode(nnIndex);
     if (args.length != 0) {
       startOpt = null;
     } else {
       args = createArgs(startOpt);
     }
-    NameNode nn = NameNode.createNameNode(args, conf);
-    nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, startOpt,
-        conf);
+
+    NameNode nn = NameNode.createNameNode(args, info.conf);
+    info.nameNode = nn;
+    info.setStartOpt(startOpt);
     if (waitActive) {
       waitClusterUp();
       LOG.info("Restarted the namenode");
@@ -2343,7 +2453,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * or if waiting for safe mode is disabled.
    */
   public boolean isNameNodeUp(int nnIndex) {
-    NameNode nameNode = nameNodes[nnIndex].nameNode;
+    NameNode nameNode = getNN(nnIndex).nameNode;
     if (nameNode == null) {
       return false;
     }
@@ -2361,7 +2471,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * Returns true if all the NameNodes are running and is out of Safe Mode.
    */
   public boolean isClusterUp() {
-    for (int index = 0; index < nameNodes.length; index++) {
+    for (int index = 0; index < namenodes.size(); index++) {
       if (!isNameNodeUp(index)) {
         return false;
       }
@@ -2391,15 +2501,13 @@ public class MiniDFSCluster implements AutoCloseable {
     checkSingleNameNode();
     return getFileSystem(0);
   }
-  
+
   /**
    * Get a client handle to the DFS cluster for the namenode at given index.
    */
   public DistributedFileSystem getFileSystem(int nnIndex) throws IOException {
-    DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(
-        getURI(nnIndex), nameNodes[nnIndex].conf);
-    fileSystems.add(dfs);
-    return dfs;
+    return (DistributedFileSystem) addFileSystem(FileSystem.get(getURI(nnIndex),
+        getNN(nnIndex).conf));
   }
 
   /**
@@ -2407,17 +2515,20 @@ public class MiniDFSCluster implements AutoCloseable {
    * This simulating different threads working on different FileSystem instances.
    */
   public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
-    FileSystem dfs = FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
-    fileSystems.add(dfs);
-    return dfs;
+    return addFileSystem(FileSystem.newInstance(getURI(nnIndex), getNN(nnIndex).conf));
   }
-  
+
+  private <T extends FileSystem> T addFileSystem(T fs) {
+    fileSystems.add(fs);
+    return fs;
+  }
+
   /**
    * @return a http URL
    */
   public String getHttpUri(int nnIndex) {
     return "http://"
-        + nameNodes[nnIndex].conf
+        + getNN(nnIndex).conf
             .get(DFS_NAMENODE_HTTP_ADDRESS_KEY);
   }
   
@@ -2426,7 +2537,7 @@ public class MiniDFSCluster implements AutoCloseable {
    */
   public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException {
     String uri = "hftp://"
-        + nameNodes[nnIndex].conf
+        + getNN(nnIndex).conf
             .get(DFS_NAMENODE_HTTP_ADDRESS_KEY);
     try {
       return (HftpFileSystem)FileSystem.get(new URI(uri), conf);
@@ -2455,14 +2566,14 @@ public class MiniDFSCluster implements AutoCloseable {
    * Get the directories where the namenode stores its image.
    */
   public Collection<URI> getNameDirs(int nnIndex) {
-    return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf);
+    return FSNamesystem.getNamespaceDirs(getNN(nnIndex).conf);
   }
 
   /**
    * Get the directories where the namenode stores its edits.
    */
   public Collection<URI> getNameEditsDirs(int nnIndex) throws IOException {
-    return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
+    return FSNamesystem.getNamespaceEditsDirs(getNN(nnIndex).conf);
   }
   
   public void transitionToActive(int nnIndex) throws IOException,
@@ -2503,11 +2614,12 @@ public class MiniDFSCluster implements AutoCloseable {
 
   /** Wait until the given namenode gets registration from all the datanodes */
   public void waitActive(int nnIndex) throws IOException {
-    if (nameNodes.length == 0 || nameNodes[nnIndex] == null
-        || nameNodes[nnIndex].nameNode == null) {
+    if (namenodes.size() == 0 || getNN(nnIndex) == null || getNN(nnIndex).nameNode == null) {
       return;
     }
-    InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
+
+    NameNodeInfo info = getNN(nnIndex);
+    InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
     assert addr.getPort() != 0;
     DFSClient client = new DFSClient(addr, conf);
 
@@ -2526,8 +2638,8 @@ public class MiniDFSCluster implements AutoCloseable {
   /** Wait until the given namenode gets first block reports from all the datanodes */
   public void waitFirstBRCompleted(int nnIndex, int timeout) throws
       IOException, TimeoutException, InterruptedException {
-    if (nameNodes.length == 0 || nameNodes[nnIndex] == null
-        || nameNodes[nnIndex].nameNode == null) {
+    if (namenodes.size() == 0 || getNN(nnIndex) == null
+        || getNN(nnIndex).nameNode == null) {
       return;
     }
 
@@ -2552,7 +2664,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * Wait until the cluster is active and running.
    */
   public void waitActive() throws IOException {
-    for (int index = 0; index < nameNodes.length; index++) {
+    for (int index = 0; index < namenodes.size(); index++) {
       int failedCount = 0;
       while (true) {
         try {
@@ -2572,7 +2684,14 @@ public class MiniDFSCluster implements AutoCloseable {
     }
     LOG.info("Cluster is active");
   }
-  
+
+  public void printNNs() {
+    for (int i = 0; i < namenodes.size(); i++) {
+      LOG.info("Have namenode " + i + ", info:" + getNN(i));
+      LOG.info(" has namenode: " + getNN(i).nameNode);
+    }
+  }
+
   private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,
       InetSocketAddress addr) {
     // If a datanode failed to start, then do not wait
@@ -3020,7 +3139,7 @@ public class MiniDFSCluster implements AutoCloseable {
    * namenode
    */
   private void checkSingleNameNode() {
-    if (nameNodes.length != 1) {
+    if (namenodes.size() != 1) {
       throw new IllegalArgumentException("Namenode index is needed");
     }
   }
@@ -3036,13 +3155,9 @@ public class MiniDFSCluster implements AutoCloseable {
     if(!federation)
       throw new IOException("cannot add namenode to non-federated cluster");
 
-    int nnIndex = nameNodes.length;
-    int numNameNodes = nameNodes.length + 1;
-    NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
-    System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length);
-    nameNodes = newlist;
-    String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1);
-    
+    int nameServiceIndex = namenodes.keys().size();
+    String nameserviceId = NAMESERVICE_ID_PREFIX + (namenodes.keys().size() + 1);
+
     String nameserviceIds = conf.get(DFS_NAMESERVICES);
     nameserviceIds += "," + nameserviceId;
     conf.set(DFS_NAMESERVICES, nameserviceIds);
@@ -3050,9 +3165,11 @@ public class MiniDFSCluster implements AutoCloseable {
     String nnId = null;
     initNameNodeAddress(conf, nameserviceId,
         new NNConf(nnId).setIpcPort(namenodePort));
-    initNameNodeConf(conf, nameserviceId, nnId, true, true, nnIndex);
-    createNameNode(nnIndex, conf, numDataNodes, true, null, null,
-        nameserviceId, nnId);
+    // figure out the current number of NNs
+    NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId);
+    int nnIndex = infos == null ? 0 : infos.length;
+    initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex);
+    createNameNode(conf, true, null, null, nameserviceId, nnId);
 
     // Refresh datanodes with the newly started namenode
     for (DataNodeProperties dn : dataNodes) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
index a99e9c3..b9786a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSNNTopology.java
@@ -56,10 +56,20 @@ public class MiniDFSNNTopology {
    * Set up an HA topology with a single HA nameservice.
    */
   public static MiniDFSNNTopology simpleHATopology() {
-    return new MiniDFSNNTopology()
-      .addNameservice(new MiniDFSNNTopology.NSConf("minidfs-ns")
-        .addNN(new MiniDFSNNTopology.NNConf("nn1"))
-        .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+    return simpleHATopology(2);
+  }
+
+  /**
+   * Set up an HA topology with a single HA nameservice.
+   * @param nnCount of namenodes to use with the nameservice
+   */
+  public static MiniDFSNNTopology simpleHATopology(int nnCount) {
+    MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("minidfs-ns");
+    for (int i = 1; i <= nnCount; i++) {
+      nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i));
+    }
+    MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice);
+    return topology;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
index 4ae24a6..a2fe746 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
 import org.junit.Test;
 
-import static org.apache.hadoop.hdfs.inotify.Event.CreateEvent;
 import static org.junit.Assert.*;
 
 /**
@@ -310,12 +309,12 @@ public class TestDFSUpgradeFromImage {
     unpackStorage(HADOOP22_IMAGE, HADOOP_DFS_DIR_TXT);
     
     // Overwrite the md5 stored in the VERSION files
-    File baseDir = new File(MiniDFSCluster.getBaseDirectory());
+    File[] nnDirs = MiniDFSCluster.getNameNodeDirectory(MiniDFSCluster.getBaseDirectory(), 0, 0);
     FSImageTestUtil.corruptVersionFile(
-        new File(baseDir, "name1/current/VERSION"),
+        new File(nnDirs[0], "current/VERSION"),
         "imageMD5Digest", "22222222222222222222222222222222");
     FSImageTestUtil.corruptVersionFile(
-        new File(baseDir, "name2/current/VERSION"),
+        new File(nnDirs[1], "current/VERSION"),
         "imageMD5Digest", "22222222222222222222222222222222");
     
     // Attach our own log appender so we can verify output
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
index ef99411..749ec96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
+import static org.junit.Assert.assertTrue;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -80,7 +81,7 @@ public class TestRollingUpgrade {
    */
   @Test
   public void testDFSAdminRollingUpgradeCommands() throws Exception {
-    // start a cluster 
+    // start a cluster
     final Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = null;
     try {
@@ -113,7 +114,7 @@ public class TestRollingUpgrade {
         checkMxBean();
 
         dfs.mkdirs(bar);
-        
+
         //finalize rolling upgrade
         runCmd(dfsadmin, true, "-rollingUpgrade", "finalize");
         // RollingUpgradeInfo should be null after finalization, both via
@@ -163,7 +164,7 @@ public class TestRollingUpgrade {
     String nnDirPrefix = MiniDFSCluster.getBaseDirectory() + "/nn/";
     final File nn1Dir = new File(nnDirPrefix + "image1");
     final File nn2Dir = new File(nnDirPrefix + "image2");
-    
+
     LOG.info("nn1Dir=" + nn1Dir);
     LOG.info("nn2Dir=" + nn2Dir);
 
@@ -207,9 +208,9 @@ public class TestRollingUpgrade {
 
       final RollingUpgradeInfo info1;
       {
-        final DistributedFileSystem dfs = cluster.getFileSystem(); 
+        final DistributedFileSystem dfs = cluster.getFileSystem();
         dfs.mkdirs(foo);
-  
+
         //start rolling upgrade
         dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
         info1 = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
@@ -351,7 +352,7 @@ public class TestRollingUpgrade {
       if(cluster != null) cluster.shutdown();
     }
   }
-  
+
   private static void startRollingUpgrade(Path foo, Path bar,
       Path file, byte[] data,
       MiniDFSCluster cluster) throws IOException {
@@ -373,7 +374,7 @@ public class TestRollingUpgrade {
     TestFileTruncate.checkBlockRecovery(file, dfs);
     AppendTestUtil.checkFullFile(dfs, file, newLength, data);
   }
-  
+
   private static void rollbackRollingUpgrade(Path foo, Path bar,
       Path file, byte[] data,
       MiniDFSCluster cluster) throws IOException {
@@ -419,22 +420,33 @@ public class TestRollingUpgrade {
     }
   }
 
-  @Test (timeout = 300000)
+  @Test(timeout = 300000)
   public void testFinalize() throws Exception {
+    testFinalize(2);
+  }
+
+  @Test(timeout = 300000)
+  public void testFinalizeWithMultipleNN() throws Exception {
+    testFinalize(3);
+  }
+
+  private void testFinalize(int nnCount) throws Exception {
     final Configuration conf = new HdfsConfiguration();
     MiniQJMHACluster cluster = null;
     final Path foo = new Path("/foo");
     final Path bar = new Path("/bar");
 
     try {
-      cluster = new MiniQJMHACluster.Builder(conf).build();
+      cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
       MiniDFSCluster dfsCluster = cluster.getDfsCluster();
       dfsCluster.waitActive();
 
-      // let NN1 tail editlog every 1s
-      dfsCluster.getConfiguration(1).setInt(
-          DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
-      dfsCluster.restartNameNode(1);
+      // let other NN tail editlog every 1s
+      for(int i=1; i < nnCount; i++) {
+        dfsCluster.getConfiguration(i).setInt(
+            DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      }
+      dfsCluster.restartNameNodes();
 
       dfsCluster.transitionToActive(0);
       DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
@@ -472,17 +484,29 @@ public class TestRollingUpgrade {
 
   @Test (timeout = 300000)
   public void testQuery() throws Exception {
+    testQuery(2);
+  }
+
+  @Test (timeout = 300000)
+  public void testQueryWithMultipleNN() throws Exception {
+    testQuery(3);
+  }
+
+  private void testQuery(int nnCount) throws Exception{
     final Configuration conf = new Configuration();
     MiniQJMHACluster cluster = null;
     try {
-      cluster = new MiniQJMHACluster.Builder(conf).build();
+      cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
       MiniDFSCluster dfsCluster = cluster.getDfsCluster();
       dfsCluster.waitActive();
 
       dfsCluster.transitionToActive(0);
       DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
 
-      dfsCluster.shutdownNameNode(1);
+      // shutdown other NNs
+      for (int i = 1; i < nnCount; i++) {
+        dfsCluster.shutdownNameNode(i);
+      }
 
       // start rolling upgrade
       RollingUpgradeInfo info = dfs
@@ -492,13 +516,16 @@ public class TestRollingUpgrade {
       info = dfs.rollingUpgrade(RollingUpgradeAction.QUERY);
       Assert.assertFalse(info.createdRollbackImages());
 
-      dfsCluster.restartNameNode(1);
-
+      // restart other NNs
+      for (int i = 1; i < nnCount; i++) {
+        dfsCluster.restartNameNode(i);
+      }
+      // check that one of the other NNs has created the rollback image and uploaded it
       queryForPreparation(dfs);
 
       // The NN should have a copy of the fsimage in case of rollbacks.
       Assert.assertTrue(dfsCluster.getNamesystem(0).getFSImage()
-          .hasRollbackFSImage());
+              .hasRollbackFSImage());
     } finally {
       if (cluster != null) {
         cluster.shutdown();
@@ -534,6 +561,15 @@ public class TestRollingUpgrade {
 
   @Test(timeout = 300000)
   public void testCheckpoint() throws IOException, InterruptedException {
+    testCheckpoint(2);
+  }
+
+  @Test(timeout = 300000)
+  public void testCheckpointWithMultipleNN() throws IOException, InterruptedException {
+    testCheckpoint(3);
+  }
+
+  public void testCheckpoint(int nnCount) throws IOException, InterruptedException {
     final Configuration conf = new Configuration();
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
@@ -542,7 +578,7 @@ public class TestRollingUpgrade {
     final Path foo = new Path("/foo");
 
     try {
-      cluster = new MiniQJMHACluster.Builder(conf).build();
+      cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
       MiniDFSCluster dfsCluster = cluster.getDfsCluster();
       dfsCluster.waitActive();
 
@@ -560,16 +596,9 @@ public class TestRollingUpgrade {
       long txid = dfs.rollEdits();
       Assert.assertTrue(txid > 0);
 
-      int retries = 0;
-      while (++retries < 5) {
-        NNStorage storage = dfsCluster.getNamesystem(1).getFSImage()
-            .getStorage();
-        if (storage.getFsImageName(txid - 1) != null) {
-          return;
-        }
-        Thread.sleep(1000);
+      for(int i=1; i< nnCount; i++) {
+        verifyNNCheckpoint(dfsCluster, txid, i);
       }
-      Assert.fail("new checkpoint does not exist");
 
     } finally {
       if (cluster != null) {
@@ -578,6 +607,22 @@ public class TestRollingUpgrade {
     }
   }
 
+  /**
+   * Verify that the namenode at the given index has an FSImage with a TxId up to txid-1
+   */
+  private void verifyNNCheckpoint(MiniDFSCluster dfsCluster, long txid, int nnIndex) throws InterruptedException {
+    int retries = 0;
+    while (++retries < 5) {
+      NNStorage storage = dfsCluster.getNamesystem(nnIndex).getFSImage()
+              .getStorage();
+      if (storage.getFsImageName(txid - 1) != null) {
+        return;
+      }
+      Thread.sleep(1000);
+    }
+    Assert.fail("new checkpoint does not exist");
+  }
+
   static void queryForPreparation(DistributedFileSystem dfs) throws IOException,
       InterruptedException {
     RollingUpgradeInfo info;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index 8f4b3e3..20c398a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -17,40 +17,36 @@
  */
 package org.apache.hadoop.hdfs.qjournal;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
-
-import java.io.IOException;
-import java.net.BindException;
-import java.net.URI;
-import java.util.Random;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 
 public class MiniQJMHACluster {
   private MiniDFSCluster cluster;
   private MiniJournalCluster journalCluster;
   private final Configuration conf;
   private static final Log LOG = LogFactory.getLog(MiniQJMHACluster.class);
-  
+
   public static final String NAMESERVICE = "ns1";
-  private static final String NN1 = "nn1";
-  private static final String NN2 = "nn2";
   private static final Random RANDOM = new Random();
 
   public static class Builder {
     private final Configuration conf;
     private StartupOption startOpt = null;
+    private int numNNs = 2;
     private final MiniDFSCluster.Builder dfsBuilder;
     private boolean forceRemoteEditsOnly = false;
     private String baseDir;
@@ -65,7 +61,7 @@ public class MiniQJMHACluster {
     public MiniDFSCluster.Builder getDfsBuilder() {
       return dfsBuilder;
     }
-    
+
     public MiniQJMHACluster build() throws IOException {
       return new MiniQJMHACluster(this);
     }
@@ -83,15 +79,25 @@ public class MiniQJMHACluster {
       this.forceRemoteEditsOnly = val;
       return this;
     }
+
+    public Builder setNumNameNodes(int nns) {
+      this.numNNs = nns;
+      return this;
+    }
+  }
+
+  public static MiniDFSNNTopology createDefaultTopology(int nns, int startingPort) {
+    MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(NAMESERVICE);
+    for (int i = 0; i < nns; i++) {
+      nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setIpcPort(startingPort++)
+          .setHttpPort(startingPort++));
+    }
+
+    return new MiniDFSNNTopology().addNameservice(nameservice);
   }
-  
+
   public static MiniDFSNNTopology createDefaultTopology(int basePort) {
-    return new MiniDFSNNTopology()
-      .addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
-        new MiniDFSNNTopology.NNConf("nn1").setIpcPort(basePort)
-            .setHttpPort(basePort + 1)).addNN(
-        new MiniDFSNNTopology.NNConf("nn2").setIpcPort(basePort + 2)
-            .setHttpPort(basePort + 3)));
+    return createDefaultTopology(2, basePort);
   }
 
   private MiniQJMHACluster(Builder builder) throws IOException {
@@ -109,10 +115,10 @@ public class MiniQJMHACluster {
         journalCluster.waitActive();
         URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
 
-        // start cluster with 2 NameNodes
-        MiniDFSNNTopology topology = createDefaultTopology(basePort);
+        // start cluster with specified NameNodes
+        MiniDFSNNTopology topology = createDefaultTopology(builder.numNNs, basePort);
 
-        initHAConf(journalURI, builder, basePort);
+        initHAConf(journalURI, builder, builder.numNNs, basePort);
 
         // First start up the NNs just to format the namespace. The MinIDFSCluster
         // has no way to just format the NameNodes without also starting them.
@@ -125,8 +131,9 @@ public class MiniQJMHACluster {
         Configuration confNN0 = cluster.getConfiguration(0);
         NameNode.initializeSharedEdits(confNN0, true);
 
-        cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
-        cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
+        for (MiniDFSCluster.NameNodeInfo nn : cluster.getNameNodeInfos()) {
+          nn.setStartOpt(builder.startOpt);
+        }
 
         // restart the cluster
         cluster.restartNameNodes();
@@ -143,7 +150,7 @@ public class MiniQJMHACluster {
     }
   }
 
-  private Configuration initHAConf(URI journalURI, Builder builder,
+  private Configuration initHAConf(URI journalURI, Builder builder, int numNNs,
       int basePort) {
     conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
         journalURI.toString());
@@ -153,26 +160,23 @@ public class MiniQJMHACluster {
           journalURI.toString());
     }
 
-    String address1 = "127.0.0.1:" + basePort;
-    String address2 = "127.0.0.1:" + (basePort + 2);
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
-        NAMESERVICE, NN1), address1);
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
-        NAMESERVICE, NN2), address2);
-    conf.set(DFSConfigKeys.DFS_NAMESERVICES, NAMESERVICE);
-    conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NAMESERVICE),
-        NN1 + "," + NN2);
-    conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + NAMESERVICE,
-        ConfiguredFailoverProxyProvider.class.getName());
-    conf.set("fs.defaultFS", "hdfs://" + NAMESERVICE);
-    
+    List<String> nns = new ArrayList<String>(numNNs);
+    int port = basePort;
+    for (int i = 0; i < numNNs; i++) {
+      nns.add("127.0.0.1:" + port);
+      // increment by 2 each time to account for the http port in the config setting
+      port += 2;
+    }
+
+    // use standard failover configurations
+    HATestUtil.setFailoverConfigurations(conf, NAMESERVICE, nns);
     return conf;
   }
 
   public MiniDFSCluster getDfsCluster() {
     return cluster;
   }
-  
+
   public MiniJournalCluster getJournalCluster() {
     return journalCluster;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
index 58e0836..55e9d30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
@@ -162,7 +162,7 @@ public class TestBlockToken {
   public void testWritable() throws Exception {
     TestWritable.testWritable(new BlockTokenIdentifier());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
     TestWritable.testWritable(generateTokenId(sm, block1,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)));
     TestWritable.testWritable(generateTokenId(sm, block2,
@@ -201,7 +201,7 @@ public class TestBlockToken {
   @Test
   public void testBlockTokenSecretManager() throws Exception {
     BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
     BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
         blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
     ExportedBlockKeys keys = masterHandler.exportKeys();
@@ -244,7 +244,7 @@ public class TestBlockToken {
     UserGroupInformation.setConfiguration(conf);
     
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
 
@@ -283,7 +283,7 @@ public class TestBlockToken {
     
     Assume.assumeTrue(FD_DIR.exists());
     BlockTokenSecretManager sm = new BlockTokenSecretManager(
-        blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+        blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
     Token<BlockTokenIdentifier> token = sm.generateToken(block3,
         EnumSet.allOf(BlockTokenIdentifier.AccessMode.class));
 
@@ -352,7 +352,7 @@ public class TestBlockToken {
     for (int i = 0; i < 10; i++) {
       String bpid = Integer.toString(i);
       BlockTokenSecretManager masterHandler = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime, 0, "fake-pool", null);
+          blockKeyUpdateInterval, blockTokenLifetime, 0, 1, "fake-pool", null);
       BlockTokenSecretManager slaveHandler = new BlockTokenSecretManager(
           blockKeyUpdateInterval, blockTokenLifetime, "fake-pool", null);
       bpMgr.addBlockPool(bpid, slaveHandler);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
index 8f1f42e..b804a48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestKeyManager.java
@@ -54,7 +54,7 @@ public class TestKeyManager {
     final String blockPoolId = "bp-foo";
     FakeTimer fakeTimer = new FakeTimer();
     BlockTokenSecretManager btsm = new BlockTokenSecretManager(
-        keyUpdateInterval, tokenLifeTime, 0, blockPoolId, null);
+        keyUpdateInterval, tokenLifeTime, 0, 1, blockPoolId, null);
     Whitebox.setInternalState(btsm, "timer", fakeTimer);
 
     // When KeyManager asks for block keys, return them from btsm directly
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
index 1a70795..08eff0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
@@ -377,7 +377,7 @@ public class TestBackupNode {
       if(fileSys != null) fileSys.close();
       if(cluster != null) cluster.shutdown();
     }
-    File nnCurDir = new File(BASE_DIR, "name1/current/");
+    File nnCurDir = new File(MiniDFSCluster.getNameNodeDirectory(BASE_DIR, 0, 0)[0], "current/");
     File bnCurDir = new File(getBackupNodeDir(op, 1), "/current/");
 
     FSImageTestUtil.assertParallelFilesAreIdentical(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
index 7c16f14..9a4f6db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
@@ -1433,7 +1433,8 @@ public class TestCheckpoint {
       //
       secondary = startSecondaryNameNode(conf);
 
-      File secondaryDir = new File(MiniDFSCluster.getBaseDirectory(), "namesecondary1");
+      File secondaryDir = MiniDFSCluster.getCheckpointDirectory(MiniDFSCluster.getBaseDirectory(),
+        0, 0)[0];
       File secondaryCurrent = new File(secondaryDir, "current");
 
       long expectedTxIdToDownload = cluster.getNameNode().getFSImage()
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index 6f2f6cd..962c1c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -92,7 +92,7 @@ public class TestEditLogRace {
     TestEditLogRace.useAsyncEditLog = useAsyncEditLog;
   }
 
-  private static final String NAME_DIR = MiniDFSCluster.getBaseDirectory() + "name1";
+  private static final String NAME_DIR = MiniDFSCluster.getBaseDirectory() + "name-0-1";
 
   private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
index 5b72901..a736d27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java
@@ -42,7 +42,8 @@ public class HAStressTestHarness {
   private MiniDFSCluster cluster;
   static final int BLOCK_SIZE = 1024;
   final TestContext testCtx = new TestContext();
-  
+  private int nns = 2;
+
   public HAStressTestHarness() {
     conf = new Configuration();
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
@@ -55,11 +56,19 @@ public class HAStressTestHarness {
   }
 
   /**
+   * Set the number of namenodes that should be run. This must be set before calling
+   * {@link #startCluster()}
+   */
+  public void setNumberOfNameNodes(int nns) {
+    this.nns = nns;
+  }
+
+  /**
    * Start and return the MiniDFSCluster.
    */
   public MiniDFSCluster startCluster() throws IOException {
     cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .nnTopology(MiniDFSNNTopology.simpleHATopology(nns))
       .numDataNodes(3)
       .build();
     return cluster;
@@ -99,28 +108,27 @@ public class HAStressTestHarness {
   }
 
   /**
-   * Add a thread which periodically triggers failover back and forth between
-   * the two namenodes.
+   * Add a thread which periodically triggers failover back and forth between the namenodes.
    */
   public void addFailoverThread(final int msBetweenFailovers) {
     testCtx.addThread(new RepeatingTestThread(testCtx) {
-      
       @Override
       public void doAnAction() throws Exception {
-        System.err.println("==============================\n" +
-            "Failing over from 0->1\n" +
-            "==================================");
-        cluster.transitionToStandby(0);
-        cluster.transitionToActive(1);
-        
-        Thread.sleep(msBetweenFailovers);
-        System.err.println("==============================\n" +
-            "Failing over from 1->0\n" +
-            "==================================");
-
-        cluster.transitionToStandby(1);
-        cluster.transitionToActive(0);
-        Thread.sleep(msBetweenFailovers);
+        // fail over from one namenode to the next, all the way back to the original NN
+        for (int i = 0; i < nns; i++) {
+          // next node, mod nns so we wrap to the 0th NN on the last iteration
+          int next = (i + 1) % nns;
+          System.err.println("==============================\n"
+              + "[Starting] Failing over from " + i + "->" + next + "\n"
+              + "==============================");
+          cluster.transitionToStandby(i);
+          cluster.transitionToActive(next);
+          System.err.println("==============================\n"
+              + "[Completed] Failing over from " + i + "->" + next + ". Sleeping for "+
+              (msBetweenFailovers/1000) +"sec \n"
+              + "==============================");
+          Thread.sleep(msBetweenFailovers);
+        }
       }
     });
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index f504f36..169bbee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -24,9 +24,14 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -68,12 +73,11 @@ public abstract class HATestUtil {
    */
   public static void waitForStandbyToCatchUp(NameNode active,
       NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
-    
     long activeTxId = active.getNamesystem().getFSImage().getEditLog()
       .getLastWrittenTxId();
-    
+
     active.getRpcServer().rollEditLog();
-    
+
     long start = Time.now();
     while (Time.now() - start < TestEditLogTailer.NN_LAG_TIMEOUT) {
       long nn2HighestTxId = standby.getNamesystem().getFSImage()
@@ -169,34 +173,52 @@ public abstract class HATestUtil {
   /** Sets the required configurations for performing failover.  */
   public static void setFailoverConfigurations(MiniDFSCluster cluster,
       Configuration conf, String logicalName, int nsIndex) {
-    InetSocketAddress nnAddr1 = cluster.getNameNode(2 * nsIndex).getNameNodeAddress();
-    InetSocketAddress nnAddr2 = cluster.getNameNode(2 * nsIndex + 1).getNameNodeAddress();
-    setFailoverConfigurations(conf, logicalName, nnAddr1, nnAddr2);
+    MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
+    List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
+    for (MiniDFSCluster.NameNodeInfo nn : nns) {
+      nnAddresses.add(nn.nameNode.getNameNodeAddress());
+    }
+    setFailoverConfigurations(conf, logicalName, nnAddresses);
+  }
+
+  public static void setFailoverConfigurations(Configuration conf, String logicalName,
+      InetSocketAddress ... nnAddresses){
+    setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses));
   }
 
   /**
    * Sets the required configurations for performing failover
    */
   public static void setFailoverConfigurations(Configuration conf,
-      String logicalName, InetSocketAddress nnAddr1,
-      InetSocketAddress nnAddr2) {
-    String nameNodeId1 = "nn1";
-    String nameNodeId2 = "nn2";
-    String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
-    String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
-        logicalName, nameNodeId1), address1);
-    conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
-        logicalName, nameNodeId2), address2);
-    
+      String logicalName, List<InetSocketAddress> nnAddresses) {
+    setFailoverConfigurations(conf, logicalName,
+        Iterables.transform(nnAddresses, new Function<InetSocketAddress, String>() {
+
+          // transform the inet address to a simple string
+          @Override
+          public String apply(InetSocketAddress addr) {
+            return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
+          }
+        }));
+  }
+
+  public static void setFailoverConfigurations(Configuration conf, String logicalName,
+      Iterable<String> nnAddresses) {
+    List<String> nnids = new ArrayList<String>();
+    int i = 0;
+    for (String address : nnAddresses) {
+      String nnId = "nn" + (i + 1);
+      nnids.add(nnId);
+      conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, logicalName, nnId), address);
+      i++;
+    }
     conf.set(DFSConfigKeys.DFS_NAMESERVICES, logicalName);
     conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, logicalName),
-        nameNodeId1 + "," + nameNodeId2);
+        Joiner.on(',').join(nnids));
     conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
         ConfiguredFailoverProxyProvider.class.getName());
     conf.set("fs.defaultFS", "hdfs://" + logicalName);
   }
-  
 
   public static String getLogicalHostname(MiniDFSCluster cluster) {
     return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
index 9dfad12..fd49bf8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
@@ -52,30 +52,40 @@ import com.google.common.collect.ImmutableList;
 
 public class TestBootstrapStandby {
   private static final Log LOG = LogFactory.getLog(TestBootstrapStandby.class);
-  
+
+  private static final int maxNNCount = 3;
+  private static final int STARTING_PORT = 20000;
+
   private MiniDFSCluster cluster;
   private NameNode nn0;
-  
+
   @Before
   public void setupCluster() throws IOException {
     Configuration conf = new Configuration();
 
-    MiniDFSNNTopology topology = new MiniDFSNNTopology()
-      .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
-        .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(20001))
-        .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(20002)));
-    
+    // duplicate code with MiniQJMHACluster#createDefaultTopology, but don't want to cross
+    // dependencies or munge too much code to support it all correctly
+    MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf("ns1");
+    for (int i = 0; i < maxNNCount; i++) {
+      nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setHttpPort(STARTING_PORT + i + 1));
+    }
+
+    MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(nameservice);
+
     cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(topology)
-      .numDataNodes(0)
-      .build();
+        .nnTopology(topology)
+        .numDataNodes(0)
+        .build();
     cluster.waitActive();
-    
+
     nn0 = cluster.getNameNode(0);
     cluster.transitionToActive(0);
-    cluster.shutdownNameNode(1);
+    // shutdown the other NNs
+    for (int i = 1; i < maxNNCount; i++) {
+      cluster.shutdownNameNode(i);
+    }
   }
-  
+
   @After
   public void shutdownCluster() {
     if (cluster != null) {
@@ -83,7 +93,7 @@ public class TestBootstrapStandby {
       cluster = null;
     }
   }
-  
+
   /**
    * Test for the base success case. The primary NN
    * hasn't made any checkpoints, and we copy the fsimage_0
@@ -92,32 +102,33 @@ public class TestBootstrapStandby {
   @Test
   public void testSuccessfulBaseCase() throws Exception {
     removeStandbyNameDirs();
-    
-    try {
-      cluster.restartNameNode(1);
-      fail("Did not throw");
-    } catch (IOException ioe) {
-      GenericTestUtils.assertExceptionContains(
-          "storage directory does not exist or is not accessible",
-          ioe);
+
+    // skip the first NN, its up
+    for (int index = 1; index < maxNNCount; index++) {
+      try {
+        cluster.restartNameNode(index);
+        fail("Did not throw");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains(
+            "storage directory does not exist or is not accessible", ioe);
+      }
+
+      int expectedCheckpointTxId = (int)NameNodeAdapter.getNamesystem(nn0)
+          .getFSImage().getMostRecentCheckpointTxId();
+
+      int rc = BootstrapStandby.run(new String[] { "-nonInteractive" },
+          cluster.getConfiguration(index));
+      assertEquals(0, rc);
+
+      // Should have copied over the namespace from the active
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, index,
+          ImmutableList.of(expectedCheckpointTxId));
     }
-    int expectedCheckpointTxId = (int)NameNodeAdapter.getNamesystem(nn0)
-        .getFSImage().getMostRecentCheckpointTxId();
-    
-    int rc = BootstrapStandby.run(
-        new String[]{"-nonInteractive"},
-        cluster.getConfiguration(1));
-    assertEquals(0, rc);
-    
-    // Should have copied over the namespace from the active
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of(expectedCheckpointTxId));
-    FSImageTestUtil.assertNNFilesMatch(cluster);
 
-    // We should now be able to start the standby successfully.
-    cluster.restartNameNode(1);
+    // We should now be able to start the standbys successfully.
+    restartNameNodesFromIndex(1);
   }
-  
+
   /**
    * Test for downloading a checkpoint made at a later checkpoint
    * from the active.
@@ -132,24 +143,24 @@ public class TestBootstrapStandby {
     NameNodeAdapter.saveNamespace(nn0);
     NameNodeAdapter.leaveSafeMode(nn0);
     long expectedCheckpointTxId = NameNodeAdapter.getNamesystem(nn0)
-      .getFSImage().getMostRecentCheckpointTxId();
+        .getFSImage().getMostRecentCheckpointTxId();
     assertEquals(6, expectedCheckpointTxId);
 
     // advance the current txid
     cluster.getFileSystem(0).create(new Path("/test_txid"), (short)1).close();
 
     // obtain the content of seen_txid
-    URI editsUri = cluster.getSharedEditsDir(0, 1);
+    URI editsUri = cluster.getSharedEditsDir(0, maxNNCount - 1);
     long seen_txid_shared = FSImageTestUtil.getStorageTxId(nn0, editsUri);
 
-    int rc = BootstrapStandby.run(
-        new String[]{"-force"},
-        cluster.getConfiguration(1));
-    assertEquals(0, rc);
-    
-    // Should have copied over the namespace from the active
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of((int)expectedCheckpointTxId));
+    for (int i = 1; i < maxNNCount; i++) {
+      assertEquals(0, forceBootstrap(i));
+
+      // Should have copied over the namespace from the active
+      LOG.info("Checking namenode: " + i);
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, i,
+          ImmutableList.of((int) expectedCheckpointTxId));
+    }
     FSImageTestUtil.assertNNFilesMatch(cluster);
 
     // Make sure the seen_txid was not modified by the standby
@@ -157,7 +168,7 @@ public class TestBootstrapStandby {
         FSImageTestUtil.getStorageTxId(nn0, editsUri));
 
     // We should now be able to start the standby successfully.
-    cluster.restartNameNode(1);
+    restartNameNodesFromIndex(1);
   }
 
   /**
@@ -167,36 +178,40 @@ public class TestBootstrapStandby {
   @Test
   public void testSharedEditsMissingLogs() throws Exception {
     removeStandbyNameDirs();
-    
+
     CheckpointSignature sig = nn0.getRpcServer().rollEditLog();
     assertEquals(3, sig.getCurSegmentTxId());
-    
+
     // Should have created edits_1-2 in shared edits dir
-    URI editsUri = cluster.getSharedEditsDir(0, 1);
+    URI editsUri = cluster.getSharedEditsDir(0, maxNNCount - 1);
     File editsDir = new File(editsUri);
-    File editsSegment = new File(new File(editsDir, "current"),
+    File currentDir = new File(editsDir, "current");
+    File editsSegment = new File(currentDir,
         NNStorage.getFinalizedEditsFileName(1, 2));
     GenericTestUtils.assertExists(editsSegment);
+    GenericTestUtils.assertExists(currentDir);
 
     // Delete the segment.
     assertTrue(editsSegment.delete());
-    
+
     // Trying to bootstrap standby should now fail since the edit
     // logs aren't available in the shared dir.
     LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
         LogFactory.getLog(BootstrapStandby.class));
     try {
-      int rc = BootstrapStandby.run(
-          new String[]{"-force"},
-          cluster.getConfiguration(1));
-      assertEquals(BootstrapStandby.ERR_CODE_LOGS_UNAVAILABLE, rc);
+      assertEquals(BootstrapStandby.ERR_CODE_LOGS_UNAVAILABLE, forceBootstrap(1));
     } finally {
       logs.stopCapturing();
     }
     GenericTestUtils.assertMatches(logs.getOutput(),
         "FATAL.*Unable to read transaction ids 1-3 from the configured shared");
   }
-  
+
+  /**
+   * Show that bootstrapping will fail on a given NameNode if its directories already exist. Its not
+   * run across all the NN because its testing the state local on each node.
+   * @throws Exception on unexpected failure
+   */
   @Test
   public void testStandbyDirsAlreadyExist() throws Exception {
     // Should not pass since standby dirs exist, force not given
@@ -206,12 +221,9 @@ public class TestBootstrapStandby {
     assertEquals(BootstrapStandby.ERR_CODE_ALREADY_FORMATTED, rc);
 
     // Should pass with -force
-    rc = BootstrapStandby.run(
-        new String[]{"-force"},
-        cluster.getConfiguration(1));
-    assertEquals(0, rc);
+    assertEquals(0, forceBootstrap(1));
   }
-  
+
   /**
    * Test that, even if the other node is not active, we are able
    * to bootstrap standby from it.
@@ -219,10 +231,7 @@ public class TestBootstrapStandby {
   @Test(timeout=30000)
   public void testOtherNodeNotActive() throws Exception {
     cluster.transitionToStandby(0);
-    int rc = BootstrapStandby.run(
-        new String[]{"-force"},
-        cluster.getConfiguration(1));
-    assertEquals(0, rc);
+    assertSuccessfulBootstrapFromIndex(1);
   }
 
   /**
@@ -329,11 +338,40 @@ public class TestBootstrapStandby {
   }
 
   private void removeStandbyNameDirs() {
-    for (URI u : cluster.getNameDirs(1)) {
-      assertTrue(u.getScheme().equals("file"));
-      File dir = new File(u.getPath());
-      LOG.info("Removing standby dir " + dir);
-      assertTrue(FileUtil.fullyDelete(dir));
+    for (int i = 1; i < maxNNCount; i++) {
+      for (URI u : cluster.getNameDirs(i)) {
+        assertTrue(u.getScheme().equals("file"));
+        File dir = new File(u.getPath());
+        LOG.info("Removing standby dir " + dir);
+        assertTrue(FileUtil.fullyDelete(dir));
+      }
+    }
+  }
+
+  private void restartNameNodesFromIndex(int start) throws IOException {
+    for (int i = start; i < maxNNCount; i++) {
+      // We should now be able to start the standby successfully.
+      cluster.restartNameNode(i, false);
+    }
+
+    cluster.waitClusterUp();
+    cluster.waitActive();
+  }
+
+  /**
+   * Force boot strapping on a namenode
+   * @param i index of the namenode to attempt
+   * @return exit code
+   * @throws Exception on unexpected failure
+   */
+  private int forceBootstrap(int i) throws Exception {
+    return BootstrapStandby.run(new String[] { "-force" },
+        cluster.getConfiguration(i));
+  }
+
+  private void assertSuccessfulBootstrapFromIndex(int start) throws Exception {
+    for (int i = start; i < maxNNCount; i++) {
+      assertEquals(0, forceBootstrap(i));
     }
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
index 6c8c200..10194ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandbyWithQJM.java
@@ -52,7 +52,8 @@ public class TestBootstrapStandbyWithQJM {
 
   private MiniDFSCluster cluster;
   private MiniJournalCluster jCluster;
-  
+  private int nnCount = 3;
+
   @Before
   public void setup() throws Exception {
     Configuration conf = new Configuration();
@@ -62,7 +63,8 @@ public class TestBootstrapStandbyWithQJM {
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
         0);
 
-    MiniQJMHACluster miniQjmHaCluster = new MiniQJMHACluster.Builder(conf).build();
+    MiniQJMHACluster miniQjmHaCluster =
+        new MiniQJMHACluster.Builder(conf).setNumNameNodes(nnCount).build();
     cluster = miniQjmHaCluster.getDfsCluster();
     jCluster = miniQjmHaCluster.getJournalCluster();
     
@@ -92,18 +94,7 @@ public class TestBootstrapStandbyWithQJM {
   public void testBootstrapStandbyWithStandbyNN() throws Exception {
     // make the first NN in standby state
     cluster.transitionToStandby(0);
-    Configuration confNN1 = cluster.getConfiguration(1);
-    
-    // shut down nn1
-    cluster.shutdownNameNode(1);
-    
-    int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
-    assertEquals(0, rc);
-    
-    // Should have copied over the namespace from the standby
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of(0));
-    FSImageTestUtil.assertNNFilesMatch(cluster);
+    bootstrapStandbys();
   }
   
   /** BootstrapStandby when the existing NN is active */
@@ -111,17 +102,23 @@ public class TestBootstrapStandbyWithQJM {
   public void testBootstrapStandbyWithActiveNN() throws Exception {
     // make the first NN in active state
     cluster.transitionToActive(0);
-    Configuration confNN1 = cluster.getConfiguration(1);
-    
-    // shut down nn1
-    cluster.shutdownNameNode(1);
-    
-    int rc = BootstrapStandby.run(new String[] { "-force" }, confNN1);
-    assertEquals(0, rc);
-    
-    // Should have copied over the namespace from the standby
-    FSImageTestUtil.assertNNHasCheckpoints(cluster, 1,
-        ImmutableList.of(0));
+    bootstrapStandbys();
+  }
+
+  private void bootstrapStandbys() throws Exception {
+    // shutdown and bootstrap all the other nns, except the first (start 1, not 0)
+    for (int i = 1; i < nnCount; i++) {
+      Configuration otherNNConf = cluster.getConfiguration(i);
+
+      // shut down other nn
+      cluster.shutdownNameNode(i);
+
+      int rc = BootstrapStandby.run(new String[] { "-force" }, otherNNConf);
+      assertEquals(0, rc);
+
+      // Should have copied over the namespace from the standby
+      FSImageTestUtil.assertNNHasCheckpoints(cluster, i, ImmutableList.of(0));
+    }
     FSImageTestUtil.assertNNFilesMatch(cluster);
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
index e458ac7..e4fe230 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java
@@ -104,6 +104,7 @@ public class TestDNFencingWithReplication {
   @Test
   public void testFencingStress() throws Exception {
     HAStressTestHarness harness = new HAStressTestHarness();
+    harness.setNumberOfNameNodes(3);
     harness.conf.setInt(
         DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
     harness.conf.setInt(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
index dec4506..3e69e9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
@@ -151,7 +151,12 @@ public class TestEditLogTailer {
   public void testNN1TriggersLogRolls() throws Exception {
     testStandbyTriggersLogRolls(1);
   }
-  
+
+  @Test
+  public void testNN2TriggersLogRolls() throws Exception {
+    testStandbyTriggersLogRolls(2);
+  }
+
   private static void testStandbyTriggersLogRolls(int activeIndex)
       throws Exception {
     Configuration conf = getConf();
@@ -163,13 +168,15 @@ public class TestEditLogTailer {
     for (int i = 0; i < 5; i++) {
       try {
         // Have to specify IPC ports so the NNs can talk to each other.
-        int[] ports = ServerSocketUtil.getPorts(2);
+        int[] ports = ServerSocketUtil.getPorts(3);
         MiniDFSNNTopology topology = new MiniDFSNNTopology()
             .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
                 .addNN(new MiniDFSNNTopology.NNConf("nn1")
                     .setIpcPort(ports[0]))
                 .addNN(new MiniDFSNNTopology.NNConf("nn2")
-                    .setIpcPort(ports[1])));
+                    .setIpcPort(ports[1]))
+                .addNN(new MiniDFSNNTopology.NNConf("nn3")
+                    .setIpcPort(ports[2])));
 
         cluster = new MiniDFSCluster.Builder(conf)
           .nnTopology(topology)
@@ -198,7 +205,7 @@ public class TestEditLogTailer {
   
   private static void waitForLogRollInSharedDir(MiniDFSCluster cluster,
       long startTxId) throws Exception {
-    URI sharedUri = cluster.getSharedEditsDir(0, 1);
+    URI sharedUri = cluster.getSharedEditsDir(0, 2);
     File sharedDir = new File(sharedUri.getPath(), "current");
     final File expectedInProgressLog =
         new File(sharedDir, NNStorage.getInProgressEditsFileName(startTxId));
@@ -237,7 +244,7 @@ public class TestEditLogTailer {
       final AtomicInteger flag = new AtomicInteger(0);
 
       // Return a slow roll edit process.
-      when(tailer.getRollEditsTask()).thenReturn(
+      when(tailer.getNameNodeProxy()).thenReturn(
           new Callable<Void>() {
             @Override
             public Void call() throws Exception {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
index 43a94a1..43ab69d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailoverWithBlockTokensEnabled.java
@@ -56,10 +56,11 @@ public class TestFailoverWithBlockTokensEnabled {
   
   private static final Path TEST_PATH = new Path("/test-path");
   private static final String TEST_DATA = "very important text";
-  
+  private static final int numNNs = 3;
+
   private Configuration conf;
   private MiniDFSCluster cluster;
-  
+
   @Before
   public void startCluster() throws IOException {
     conf = new Configuration();
@@ -67,7 +68,7 @@ public class TestFailoverWithBlockTokensEnabled {
     // Set short retry timeouts so this test runs faster
     conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
     cluster = new MiniDFSCluster.Builder(conf)
-        .nnTopology(MiniDFSNNTopology.simpleHATopology())
+        .nnTopology(MiniDFSNNTopology.simpleHATopology(numNNs))
         .numDataNodes(1)
         .build();
   }
@@ -79,33 +80,41 @@ public class TestFailoverWithBlockTokensEnabled {
       cluster = null;
     }
   }
-  
+
   @Test
   public void ensureSerialNumbersNeverOverlap() {
     BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager()
         .getBlockTokenSecretManager();
     BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager()
         .getBlockTokenSecretManager();
-    
-    btsm1.setSerialNo(0);
-    btsm2.setSerialNo(0);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-    
-    btsm1.setSerialNo(Integer.MAX_VALUE);
-    btsm2.setSerialNo(Integer.MAX_VALUE);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-    
-    btsm1.setSerialNo(Integer.MIN_VALUE);
-    btsm2.setSerialNo(Integer.MIN_VALUE);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
-    
-    btsm1.setSerialNo(Integer.MAX_VALUE / 2);
-    btsm2.setSerialNo(Integer.MAX_VALUE / 2);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+    BlockTokenSecretManager btsm3 = cluster.getNamesystem(2).getBlockManager()
+        .getBlockTokenSecretManager();
+
+    setAndCheckSerialNumber(0, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MAX_VALUE, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MIN_VALUE, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MAX_VALUE / 2, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MIN_VALUE / 2, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MAX_VALUE / 3, btsm1, btsm2, btsm3);
+    setAndCheckSerialNumber(Integer.MIN_VALUE / 3, btsm1, btsm2, btsm3);
+  }
+
+  private void setAndCheckSerialNumber(int serialNumber, BlockTokenSecretManager... btsms) {
+    for (BlockTokenSecretManager btsm : btsms) {
+      btsm.setSerialNo(serialNumber);
+    }
 
-    btsm1.setSerialNo(Integer.MIN_VALUE / 2);
-    btsm2.setSerialNo(Integer.MIN_VALUE / 2);
-    assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting());
+    for (int i = 0; i < btsms.length; i++) {
+      for (int j = 0; j < btsms.length; j++) {
+        if (j == i) {
+          continue;
+        }
+        int first = btsms[i].getSerialNoForTesting();
+        int second = btsms[j].getSerialNoForTesting();
+        assertFalse("Overlap found for set serial number (" + serialNumber + ") is " + i + ": "
+            + first + " == " + j + ": " + second, first == second);
+      }
+    }
   }
   
   @Test
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAConfiguration.java
index 90ccd86..43f0b85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAConfiguration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAConfiguration.java
@@ -25,10 +25,13 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
 import java.util.Collection;
 
+import com.google.common.base.Joiner;
+import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -61,19 +64,23 @@ public class TestHAConfiguration {
     }
   }
 
-  private Configuration getHAConf(String nsId, String host1, String host2) {
+  private Configuration getHAConf(String nsId, String ... hosts) {
     Configuration conf = new Configuration();
     conf.set(DFSConfigKeys.DFS_NAMESERVICES, nsId);
-    conf.set(DFSUtil.addKeySuffixes(
-        DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, nsId),
-        "nn1,nn2");    
     conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
+
+    String[] nnids = new String[hosts.length];
+    for (int i = 0; i < hosts.length; i++) {
+      String nnid = "nn" + (i + 1);
+      nnids[i] = nnid;
+      conf.set(DFSUtil.addKeySuffixes(
+              DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnid),
+          hosts[i] + ":12345");
+    }
+
     conf.set(DFSUtil.addKeySuffixes(
-        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, "nn1"),
-        host1 + ":12345");
-    conf.set(DFSUtil.addKeySuffixes(
-        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, "nn2"),
-        host2 + ":12345");
+            DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, nsId),
+        Joiner.on(',').join(nnids));
     return conf;
   }
 
@@ -90,11 +97,28 @@ public class TestHAConfiguration {
     // 0.0.0.0, it should substitute the address from the RPC configuration
     // above.
     StandbyCheckpointer checkpointer = new StandbyCheckpointer(conf, fsn);
-    assertEquals(new URL("http", "1.2.3.2",
-        DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, ""),
-        checkpointer.getActiveNNAddress());
+    assertAddressMatches("1.2.3.2", checkpointer.getActiveNNAddresses().get(0));
+
+    //test when there are three NNs
+    // Use non-local addresses to avoid host address matching
+    conf = getHAConf("ns1", "1.2.3.1", "1.2.3.2", "1.2.3.3");
+
+    // This is done by the NN before the StandbyCheckpointer is created
+    NameNode.initializeGenericKeys(conf, "ns1", "nn1");
+
+    checkpointer = new StandbyCheckpointer(conf, fsn);
+    assertEquals("Got an unexpected number of possible active NNs", 2, checkpointer
+        .getActiveNNAddresses().size());
+    assertEquals(new URL("http", "1.2.3.2", DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, ""),
+        checkpointer.getActiveNNAddresses().get(0));
+    assertAddressMatches("1.2.3.2", checkpointer.getActiveNNAddresses().get(0));
+    assertAddressMatches("1.2.3.3", checkpointer.getActiveNNAddresses().get(1));
   }
-  
+
+  private void assertAddressMatches(String address, URL url) throws MalformedURLException {
+    assertEquals(new URL("http", address, DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, ""), url);
+  }
+
   /**
    * Tests that the namenode edits dirs and shared edits dirs are gotten with
    * duplicates removed
@@ -143,7 +167,8 @@ public class TestHAConfiguration {
     conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns1");
     NameNode.initializeGenericKeys(conf, "ns1", "nn1");
 
-    Configuration nn2Conf = HAUtil.getConfForOtherNode(conf);
+    List<Configuration> others = HAUtil.getConfForOtherNodes(conf);
+    Configuration nn2Conf = others.get(0);
     assertEquals(nn2Conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY), "nn2");
     assertTrue(!conf.get(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY).equals(
         nn2Conf.get(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
index dbc264e..0826686 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -83,24 +84,33 @@ public class TestPipelinesFailover {
   
   private static final int STRESS_NUM_THREADS = 25;
   private static final int STRESS_RUNTIME = 40000;
-  
+
+  private static final int NN_COUNT = 3;
+  private static final long FAILOVER_SEED = System.currentTimeMillis();
+  private static final Random failoverRandom = new Random(FAILOVER_SEED);
+  static{
+    // log the failover seed so we can reproduce the test exactly
+    LOG.info("Using random seed: " + FAILOVER_SEED
+        + " for selecting active target NN during failover");
+  }
+
   enum TestScenario {
     GRACEFUL_FAILOVER {
       @Override
-      void run(MiniDFSCluster cluster) throws IOException {
-        cluster.transitionToStandby(0);
-        cluster.transitionToActive(1);
+      void run(MiniDFSCluster cluster, int previousActive, int activeIndex) throws IOException {
+        cluster.transitionToStandby(previousActive);
+        cluster.transitionToActive(activeIndex);
       }
     },
     ORIGINAL_ACTIVE_CRASHED {
       @Override
-      void run(MiniDFSCluster cluster) throws IOException {
-        cluster.restartNameNode(0);
-        cluster.transitionToActive(1);
+      void run(MiniDFSCluster cluster, int previousActive, int activeIndex) throws IOException {
+        cluster.restartNameNode(previousActive);
+        cluster.transitionToActive(activeIndex);
       }
     };
 
-    abstract void run(MiniDFSCluster cluster) throws IOException;
+    abstract void run(MiniDFSCluster cluster, int previousActive, int activeIndex) throws IOException;
   }
   
   enum MethodToTestIdempotence {
@@ -137,10 +147,7 @@ public class TestPipelinesFailover {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
     
     FSDataOutputStream stm = null;
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(MiniDFSNNTopology.simpleHATopology())
-      .numDataNodes(3)
-      .build();
+    MiniDFSCluster cluster = newMiniCluster(conf, 3);
     try {
       int sizeWritten = 0;
       
@@ -159,15 +166,15 @@ public class TestPipelinesFailover {
       // Make sure all of the blocks are written out before failover.
       stm.hflush();
 
-      LOG.info("Failing over to NN 1");
-      scenario.run(cluster);
+      LOG.info("Failing over to another NN");
+      int activeIndex = failover(cluster, scenario);
 
       // NOTE: explicitly do *not* make any further metadata calls
       // to the NN here. The next IPC call should be to allocate the next
       // block. Any other call would notice the failover and not test
       // idempotence of the operation (HDFS-3031)
       
-      FSNamesystem ns1 = cluster.getNameNode(1).getNamesystem();
+      FSNamesystem ns1 = cluster.getNameNode(activeIndex).getNamesystem();
       BlockManagerTestUtil.updateState(ns1.getBlockManager());
       assertEquals(0, ns1.getPendingReplicationBlocks());
       assertEquals(0, ns1.getCorruptReplicaBlocks());
@@ -215,10 +222,7 @@ public class TestPipelinesFailover {
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     
     FSDataOutputStream stm = null;
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(MiniDFSNNTopology.simpleHATopology())
-      .numDataNodes(5)
-      .build();
+    MiniDFSCluster cluster = newMiniCluster(conf, 5);
     try {
       cluster.waitActive();
       cluster.transitionToActive(0);
@@ -234,8 +238,7 @@ public class TestPipelinesFailover {
       // Make sure all the blocks are written before failover
       stm.hflush();
 
-      LOG.info("Failing over to NN 1");
-      scenario.run(cluster);
+      int nextActive = failover(cluster, scenario);
 
       assertTrue(fs.exists(TEST_PATH));
       
@@ -244,9 +247,9 @@ public class TestPipelinesFailover {
       // write another block and a half
       AppendTestUtil.write(stm, BLOCK_AND_A_HALF, BLOCK_AND_A_HALF);
       stm.hflush();
-      
-      LOG.info("Failing back to NN 0");
-      cluster.transitionToStandby(1);
+
+      LOG.info("Failing back from NN " + nextActive + " to NN 0");
+      cluster.transitionToStandby(nextActive);
       cluster.transitionToActive(0);
       
       cluster.stopDataNode(1);
@@ -264,7 +267,7 @@ public class TestPipelinesFailover {
       cluster.shutdown();
     }
   }
-  
+
   /**
    * Tests lease recovery if a client crashes. This approximates the
    * use case of HBase WALs being recovered after a NN failover.
@@ -277,10 +280,7 @@ public class TestPipelinesFailover {
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
 
     FSDataOutputStream stm = null;
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(MiniDFSNNTopology.simpleHATopology())
-      .numDataNodes(3)
-      .build();
+    final MiniDFSCluster cluster = newMiniCluster(conf, 3);
     try {
       cluster.waitActive();
       cluster.transitionToActive(0);
@@ -332,10 +332,7 @@ public class TestPipelinesFailover {
     conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     
     FSDataOutputStream stm = null;
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-      .nnTopology(MiniDFSNNTopology.simpleHATopology())
-      .numDataNodes(3)
-      .build();
+    final MiniDFSCluster cluster = newMiniCluster(conf, 3);
     try {
       cluster.waitActive();
       cluster.transitionToActive(0);
@@ -409,7 +406,20 @@ public class TestPipelinesFailover {
       cluster.shutdown();
     }
   }
-  
+
+  /**
+   * Create a MiniCluster with the specified base configuration and the specified number of
+   * DataNodes. Helper method to ensure that the we use the same number of NNs across all the tests.
+   * @return mini cluster ready to use
+   * @throws IOException cluster cannot be started
+   */
+  private MiniDFSCluster newMiniCluster(Configuration conf, int dnCount) throws IOException {
+    return new MiniDFSCluster.Builder(conf)
+             .nnTopology(MiniDFSNNTopology.simpleHATopology(NN_COUNT))
+             .numDataNodes(dnCount)
+             .build();
+  }
+
   /**
    * Stress test for pipeline/lease recovery. Starts a number of
    * threads, each of which creates a file and has another client
@@ -486,6 +496,38 @@ public class TestPipelinesFailover {
   }
 
   /**
+   * Fail-over using the given scenario, assuming NN0 is currently active
+   * @param cluster cluster on which to run the scenario
+   * @param scenario failure scenario to run
+   * @return the index of the new active NN
+   * @throws IOException
+   */
+  private int failover(MiniDFSCluster cluster, TestScenario scenario) throws IOException {
+    return failover(cluster, scenario, 0);
+  }
+
+  /**
+   * Do a fail-over with the given scenario.
+   * @param cluster cluster on which to run the scenario
+   * @param scenario failure scenario to run
+   * @param activeIndex index of the currently active node
+   * @throws IOException on failure
+   * @return the index of the new active NN
+   */
+  private int failover(MiniDFSCluster cluster, TestScenario scenario, int activeIndex)
+      throws IOException {
+    // get index of the next node that should be active, ensuring its not the same as the currently
+    // active node
+    int nextActive = failoverRandom.nextInt(NN_COUNT);
+    if (nextActive == activeIndex) {
+      nextActive = (nextActive + 1) % NN_COUNT;
+    }
+    LOG.info("Failing over to a standby NN:" + nextActive + " from NN " + activeIndex);
+    scenario.run(cluster, activeIndex, nextActive);
+    return nextActive;
+  }
+
+  /**
    * Test thread which creates a file, has another fake user recover
    * the lease on the file, and then ensures that the file's contents
    * are properly readable. If any of these steps fails, propagates
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRemoteNameNodeInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRemoteNameNodeInfo.java
new file mode 100644
index 0000000..cb2a4fc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRemoteNameNodeInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that we correctly obtain remote namenode information
+ */
+public class TestRemoteNameNodeInfo {
+
+  @Test
+  public void testParseMultipleNameNodes() throws Exception {
+    // start with an empty configuration
+    Configuration conf = new Configuration(false);
+
+    // add in keys for each of the NNs
+    String nameservice = "ns1";
+    MiniDFSNNTopology topology = new MiniDFSNNTopology()
+        .addNameservice(new MiniDFSNNTopology.NSConf(nameservice)
+            .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001))
+            .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10002))
+            .addNN(new MiniDFSNNTopology.NNConf("nn3").setIpcPort(10003)));
+
+    // add the configurations of the NNs to the passed conf, so we can parse it back out
+    MiniDFSCluster.configureNameNodes(topology, false, conf);
+
+    // set the 'local' one as nn1
+    conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1");
+
+    List<RemoteNameNodeInfo> nns = RemoteNameNodeInfo.getRemoteNameNodes(conf);
+
+    // make sure it matches when we pass in the nameservice
+    List<RemoteNameNodeInfo> nns2 = RemoteNameNodeInfo.getRemoteNameNodes(conf,
+        nameservice);
+    assertEquals(nns, nns2);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestSeveralNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestSeveralNameNodes.java
new file mode 100644
index 0000000..dbe8070
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestSeveralNameNodes.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.junit.Test;
+
+/**
+ * Test that we can start several and run with namenodes on the same minicluster
+ */
+public class TestSeveralNameNodes {
+
+  private static final Log LOG = LogFactory.getLog(TestSeveralNameNodes.class);
+
+  /** ms between failovers between NNs */
+  private static final int TIME_BETWEEN_FAILOVERS = 200;
+  private static final int NUM_NAMENODES = 3;
+  private static final int NUM_THREADS = 3;
+  private static final int LIST_LENGTH = 50;
+  /** ms for length of test */
+  private static final long RUNTIME = 100000;
+
+  @Test
+  public void testCircularLinkedListWrites() throws Exception {
+    HAStressTestHarness harness = new HAStressTestHarness();
+    // setup the harness
+    harness.setNumberOfNameNodes(NUM_NAMENODES);
+    harness.addFailoverThread(TIME_BETWEEN_FAILOVERS);
+
+    final MiniDFSCluster cluster = harness.startCluster();
+    try {
+      cluster.waitActive();
+      cluster.transitionToActive(0);
+
+      // setup the a circular writer
+      FileSystem fs = harness.getFailoverFs();
+      TestContext context = harness.testCtx;
+      List<CircularWriter> writers = new ArrayList<CircularWriter>();
+      for (int i = 0; i < NUM_THREADS; i++) {
+        Path p = new Path("/test-" + i);
+        fs.mkdirs(p);
+        CircularWriter writer = new CircularWriter(context, LIST_LENGTH, fs, p);
+        writers.add(writer);
+        context.addThread(writer);
+      }
+      harness.startThreads();
+
+      // wait for all the writer threads to finish, or that we exceed the time
+      long start = System.currentTimeMillis();
+      while ((System.currentTimeMillis() - start) < RUNTIME) {
+        for (int i = 0; i < writers.size(); i++) {
+          CircularWriter writer = writers.get(i);
+          // remove the writer from the ones to check
+          if (writer.done.await(10, TimeUnit.MILLISECONDS)) {
+            writers.remove(i--);
+          }
+        }
+      }
+      assertEquals(
+          "Some writers didn't complete in expected runtime! Current writer state:"
+              + writers, 0,
+          writers.size());
+
+      harness.stopThreads();
+    } finally {
+      System.err.println("===========================\n\n\n\n");
+      harness.shutdown();
+    }
+  }
+
+  private static class CircularWriter extends RepeatingTestThread {
+
+    private final int maxLength;
+    private final Path dir;
+    private final FileSystem fs;
+    private int currentListIndex = 0;
+    private CountDownLatch done = new CountDownLatch(1);
+
+    public CircularWriter(TestContext context, int listLength, FileSystem fs,
+        Path parentDir) {
+      super(context);
+      this.fs = fs;
+      this.maxLength = listLength;
+      this.dir = parentDir;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder("Circular Writer:\n");
+      builder.append("\t directory: " + dir + "\n");
+      builder.append("\t target length: " + maxLength + "\n");
+      // might be a little racy, but we just want a close count
+      builder.append("\t current item: " + currentListIndex + "\n");
+      builder.append("\t done: " + (done.getCount() == 0) + "\n");
+      return builder.toString();
+    }
+
+    @Override
+    public void doAnAction() throws Exception {
+      if (currentListIndex == maxLength) {
+        checkList();
+        this.stopTestThread();
+        done.countDown();
+      } else {
+        writeList();
+      }
+    }
+
+    /**
+     * Make sure we can traverse the entire linked list
+     */
+    private void checkList() throws IOException {
+      for (int i = 0; i < maxLength; i++) {
+        Path nextFile = getNextFile(i);
+        if (!fs.exists(nextFile)) {
+          throw new RuntimeException("Next file " + nextFile
+              + " for list does not exist!");
+        }
+        // read the next file name
+        FSDataInputStream in = fs.open(nextFile);
+        nextFile = getNextFile(in.read());
+        in.close();
+      }
+
+    }
+
+    private void cleanup() throws IOException {
+      if (!fs.delete(dir, true)) {
+        throw new RuntimeException("Didn't correctly delete " + dir);
+      }
+      if (!fs.mkdirs(dir)) {
+        throw new RuntimeException("Didn't correctly make directory " + dir);
+      }
+    }
+
+    private void writeList() throws IOException {
+      Path nextPath = getNextFile(currentListIndex++);
+      LOG.info("Writing next file: " + nextPath);
+      FSDataOutputStream file = fs.create(nextPath);
+      file.write(currentListIndex);
+      file.close();
+    }
+
+    private Path getNextFile(int i) {
+      return new Path(dir, Integer.toString(i));
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
index f245755..af75e42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
@@ -65,8 +65,9 @@ import static org.junit.Assert.*;
 
 public class TestStandbyCheckpoints {
   private static final int NUM_DIRS_IN_LOG = 200000;
+  protected static int NUM_NNS = 3;
   protected MiniDFSCluster cluster;
-  protected NameNode nn0, nn1;
+  protected NameNode[] nns = new NameNode[NUM_NNS];
   protected FileSystem fs;
   private final Random random = new Random();
   protected File tmpOivImgDir;
@@ -90,7 +91,8 @@ public class TestStandbyCheckpoints {
         MiniDFSNNTopology topology = new MiniDFSNNTopology()
             .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
                 .addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(basePort))
-                .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1)));
+                .addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(basePort + 1))
+                .addNN(new MiniDFSNNTopology.NNConf("nn3").setHttpPort(basePort + 2)));
 
         cluster = new MiniDFSCluster.Builder(conf)
             .nnTopology(topology)
@@ -98,8 +100,8 @@ public class TestStandbyCheckpoints {
             .build();
         cluster.waitActive();
 
-        nn0 = cluster.getNameNode(0);
-        nn1 = cluster.getNameNode(1);
+        setNNs();
+
         fs = HATestUtil.configureFailoverFs(cluster, conf);
 
         cluster.transitionToActive(0);
@@ -112,6 +114,12 @@ public class TestStandbyCheckpoints {
     }
   }
 
+  protected void setNNs(){
+    for (int i = 0; i < NUM_NNS; i++) {
+      nns[i] = cluster.getNameNode(i);
+    }
+  }
+
   protected Configuration setupCommonConfig() {
     tmpOivImgDir = GenericTestUtils.getTestDir("TestStandbyCheckpoints");
     tmpOivImgDir.mkdirs();
@@ -144,10 +152,10 @@ public class TestStandbyCheckpoints {
 
   @Test(timeout = 300000)
   public void testSBNCheckpoints() throws Exception {
-    JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(nn1);
-    
+    JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(nns[1]);
+
     doEdits(0, 10);
-    HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+    HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
     // Once the standby catches up, it should notice that it needs to
     // do a checkpoint and save one to its local directories.
     HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
@@ -155,10 +163,9 @@ public class TestStandbyCheckpoints {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        if(tmpOivImgDir.list().length > 0) {
+        if (tmpOivImgDir.list().length > 0) {
           return true;
-        }
-        else {
+        } else {
           return false;
         }
       }
@@ -188,7 +195,7 @@ public class TestStandbyCheckpoints {
     cluster.getConfiguration(0).set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
         existingDir + "," + Util.fileAsURI(nameDir).toString());
     cluster.restartNameNode(0);
-    nn0 = cluster.getNameNode(0);
+    nns[0] = cluster.getNameNode(0);
     cluster.transitionToActive(0);
 
     // "current" is created, but current/VERSION isn't.
@@ -199,7 +206,7 @@ public class TestStandbyCheckpoints {
 
     // Trigger a checkpointing and upload.
     doEdits(0, 10);
-    HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+    HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
 
     // The version file will be created if a checkpoint is uploaded.
     // Wait for it to happen up to 10 seconds.
@@ -235,9 +242,9 @@ public class TestStandbyCheckpoints {
     HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
     HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
     
-    assertEquals(12, nn0.getNamesystem().getFSImage()
+    assertEquals(12, nns[0].getNamesystem().getFSImage()
         .getMostRecentCheckpointTxId());
-    assertEquals(12, nn1.getNamesystem().getFSImage()
+    assertEquals(12, nns[1].getNamesystem().getFSImage()
         .getMostRecentCheckpointTxId());
     
     List<File> dirs = Lists.newArrayList();
@@ -260,17 +267,17 @@ public class TestStandbyCheckpoints {
     cluster.getConfiguration(1).setInt(
         DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
     cluster.restartNameNode(1);
-    nn1 = cluster.getNameNode(1);
- 
-    FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
-    
+    nns[1] = cluster.getNameNode(1);
+
+    FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
+
     // We shouldn't save any checkpoints at txid=0
     Thread.sleep(1000);
     Mockito.verify(spyImage1, Mockito.never())
       .saveNamespace((FSNamesystem) Mockito.anyObject());
  
     // Roll the primary and wait for the standby to catch up
-    HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+    HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
     Thread.sleep(2000);
     
     // We should make exactly one checkpoint at this new txid. 
@@ -305,7 +312,7 @@ public class TestStandbyCheckpoints {
     cluster.getConfiguration(1).setInt(
         DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 0);
     cluster.restartNameNode(1);
-    nn1 = cluster.getNameNode(1);
+    nns[1] = cluster.getNameNode(1);
 
     cluster.transitionToActive(0);    
     
@@ -335,31 +342,42 @@ public class TestStandbyCheckpoints {
         DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000);
 
     // don't compress, we want a big image
-    cluster.getConfiguration(0).setBoolean(
-        DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
-    cluster.getConfiguration(1).setBoolean(
-        DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
+    for (int i = 0; i < NUM_NNS; i++) {
+      cluster.getConfiguration(i).setBoolean(
+          DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
+    }
+
     // Throttle SBN upload to make it hang during upload to ANN
-    cluster.getConfiguration(1).setLong(
-        DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
-    cluster.restartNameNode(0);
-    cluster.restartNameNode(1);
-    nn0 = cluster.getNameNode(0);
-    nn1 = cluster.getNameNode(1);
+    for (int i = 1; i < NUM_NNS; i++) {
+      cluster.getConfiguration(i).setLong(
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
+    }
+    for (int i = 0; i < NUM_NNS; i++) {
+      cluster.restartNameNode(i);
+    }
+
+    // update references to each of the nns
+    setNNs();
 
     cluster.transitionToActive(0);
 
     doEdits(0, 100);
-    HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
-    HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(104));
+
+    for (int i = 1; i < NUM_NNS; i++) {
+      HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]);
+      HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104));
+    }
+
     cluster.transitionToStandby(0);
     cluster.transitionToActive(1);
 
+
     // Wait to make sure background TransferFsImageUpload thread was cancelled.
     // This needs to be done before the next test in the suite starts, so that a
     // file descriptor is not held open during the next cluster init.
     cluster.shutdown();
     cluster = null;
+
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
@@ -376,7 +394,7 @@ public class TestStandbyCheckpoints {
     }, 1000, 30000);
 
     // Assert that former active did not accept the canceled checkpoint file.
-    assertEquals(0, nn0.getFSImage().getMostRecentCheckpointTxId());
+    assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId());
   }
   
   /**
@@ -388,7 +406,7 @@ public class TestStandbyCheckpoints {
   public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
     
     // Set it up so that we know when the SBN checkpoint starts and ends.
-    FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
+    FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
     DelayAnswer answerer = new DelayAnswer(LOG);
     Mockito.doAnswer(answerer).when(spyImage1)
         .saveNamespace(Mockito.any(FSNamesystem.class),
@@ -396,7 +414,7 @@ public class TestStandbyCheckpoints {
 
     // Perform some edits and wait for a checkpoint to start on the SBN.
     doEdits(0, 1000);
-    nn0.getRpcServer().rollEditLog();
+    nns[0].getRpcServer().rollEditLog();
     answerer.waitForCall();
     assertTrue("SBN is not performing checkpoint but it should be.",
         answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
@@ -406,7 +424,7 @@ public class TestStandbyCheckpoints {
     ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
     try {
       // Perform an RPC to the SBN and make sure it throws a StandbyException.
-      nn1.getRpcServer().getFileInfo("/");
+      nns[1].getRpcServer().getFileInfo("/");
       fail("Should have thrown StandbyException, but instead succeeded.");
     } catch (StandbyException se) {
       GenericTestUtils.assertExceptionContains("is not supported", se);
@@ -433,7 +451,7 @@ public class TestStandbyCheckpoints {
   public void testReadsAllowedDuringCheckpoint() throws Exception {
     
     // Set it up so that we know when the SBN checkpoint starts and ends.
-    FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
+    FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nns[1]);
     DelayAnswer answerer = new DelayAnswer(LOG);
     Mockito.doAnswer(answerer).when(spyImage1)
         .saveNamespace(Mockito.any(FSNamesystem.class),
@@ -442,7 +460,7 @@ public class TestStandbyCheckpoints {
     
     // Perform some edits and wait for a checkpoint to start on the SBN.
     doEdits(0, 1000);
-    nn0.getRpcServer().rollEditLog();
+    nns[0].getRpcServer().rollEditLog();
     answerer.waitForCall();
     assertTrue("SBN is not performing checkpoint but it should be.",
         answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
@@ -456,7 +474,7 @@ public class TestStandbyCheckpoints {
       @Override
       public void run() {
         try {
-          nn1.getRpcServer().restoreFailedStorage("false");
+          nns[1].getRpcServer().restoreFailedStorage("false");
         } catch (IOException e) {
           e.printStackTrace();
         }
@@ -466,16 +484,16 @@ public class TestStandbyCheckpoints {
     
     // Make sure that our thread is waiting for the lock.
     ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
-    
-    assertFalse(nn1.getNamesystem().getFsLockForTests().hasQueuedThreads());
-    assertFalse(nn1.getNamesystem().getFsLockForTests().isWriteLocked());
-    assertTrue(nn1.getNamesystem().getCpLockForTests().hasQueuedThreads());
-    
+
+    assertFalse(nns[1].getNamesystem().getFsLockForTests().hasQueuedThreads());
+    assertFalse(nns[1].getNamesystem().getFsLockForTests().isWriteLocked());
+    assertTrue(nns[1].getNamesystem().getCpLockForTests().hasQueuedThreads());
+
     // Get /jmx of the standby NN web UI, which will cause the FSNS read lock to
     // be taken.
     String pageContents = DFSTestUtil.urlGet(new URL("http://" +
-        nn1.getHttpAddress().getHostName() + ":" +
-        nn1.getHttpAddress().getPort() + "/jmx"));
+        nns[1].getHttpAddress().getHostName() + ":" +
+        nns[1].getHttpAddress().getPort() + "/jmx"));
     assertTrue(pageContents.contains("NumLiveDataNodes"));
     
     // Make sure that the checkpoint is still going on, implying that the client
@@ -500,7 +518,7 @@ public class TestStandbyCheckpoints {
     FileUtil.fullyDelete(tmpOivImgDir);
 
     doEdits(0, 10);
-    HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
+    HATestUtil.waitForStandbyToCatchUp(nns[0], nns[1]);
     // Once the standby catches up, it should notice that it needs to
     // do a checkpoint and save one to its local directories.
     HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-0.23-reserved.tgz b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-0.23-reserved.tgz
index 0f53f2a..abc7bbd 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-0.23-reserved.tgz and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-0.23-reserved.tgz differ
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-1-reserved.tgz b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-1-reserved.tgz
index 737ad2d..b3f8b9d 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-1-reserved.tgz and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-1-reserved.tgz differ
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-2-reserved.tgz b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-2-reserved.tgz
index 3cb2ee6..2256fba 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-2-reserved.tgz and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-2-reserved.tgz differ
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-22-dfs-dir.tgz b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-22-dfs-dir.tgz
index b69741c..c4959b4 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-22-dfs-dir.tgz and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-22-dfs-dir.tgz differ
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop1-bbw.tgz b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop1-bbw.tgz
index 2574f8b..e7d3fbd 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop1-bbw.tgz and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop1-bbw.tgz differ
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/log4j.properties b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/log4j.properties
index 45d91cd..7378846 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/log4j.properties
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@ log4j.rootLogger=info,stdout
 log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
 
 #
 # NameNode metrics logging.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org