You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by su...@apache.org on 2011/04/29 20:16:38 UTC

svn commit: r1097905 [9/14] - in /hadoop/hdfs/trunk: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/ja...

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Apr 29 18:16:32 2011
@@ -50,7 +50,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -238,7 +238,7 @@ public class DFSTestUtil {
   public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
       Path file, int blockNo) throws IOException {
     DFSClient client = new DFSClient(new InetSocketAddress("localhost",
-        cluster.getNameNodePort()), cluster.getConfiguration());
+        cluster.getNameNodePort()), cluster.getConfiguration(0));
     LocatedBlocks blocks;
     try {
        blocks = client.getNamenode().getBlockLocations(
@@ -254,7 +254,7 @@ public class DFSTestUtil {
    * the requested number of racks, with the requested number of
    * replicas, and the requested number of replicas still needed.
    */
-  public static void waitForReplication(MiniDFSCluster cluster, Block b,
+  public static void waitForReplication(MiniDFSCluster cluster, ExtendedBlock b,
       int racks, int replicas, int neededReplicas)
       throws IOException, TimeoutException, InterruptedException {
     int curRacks = 0;
@@ -265,7 +265,7 @@ public class DFSTestUtil {
 
     do {
       Thread.sleep(1000);
-      int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b);
+      int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b.getLocalBlock());
       curRacks = r[0];
       curReplicas = r[1];
       curNeededReplicas = r[2];
@@ -288,11 +288,11 @@ public class DFSTestUtil {
    * given block in the file contains the given number of corrupt replicas.
    */
   public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
-      Path file, Block b, int corruptRepls)
+      Path file, ExtendedBlock b, int corruptRepls)
       throws IOException, TimeoutException {
     int count = 0;
     final int ATTEMPTS = 20;
-    int repls = ns.numCorruptReplicas(b);
+    int repls = ns.numCorruptReplicas(b.getLocalBlock());
     while (repls != corruptRepls && count < ATTEMPTS) {
       try {
         IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
@@ -301,7 +301,7 @@ public class DFSTestUtil {
         // Swallow exceptions
       }
       System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
-      repls = ns.numCorruptReplicas(b);
+      repls = ns.numCorruptReplicas(b.getLocalBlock());
       count++;
     }
     if (count == ATTEMPTS) {
@@ -342,11 +342,11 @@ public class DFSTestUtil {
    * Returns the index of the first datanode which has a copy
    * of the given block, or -1 if no such datanode exists.
    */
-  public static int firstDnWithBlock(MiniDFSCluster cluster, Block b)
+  public static int firstDnWithBlock(MiniDFSCluster cluster, ExtendedBlock b)
       throws IOException {
     int numDatanodes = cluster.getDataNodes().size();
     for (int i = 0; i < numDatanodes; i++) {
-      String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+      String blockContent = cluster.readBlockOnDataNode(i, b);
       if (blockContent != null) {
         return i;
       }
@@ -405,7 +405,7 @@ public class DFSTestUtil {
     files = null;
   }
   
-  public static Block getFirstBlock(FileSystem fs, Path path) throws IOException {
+  public static ExtendedBlock getFirstBlock(FileSystem fs, Path path) throws IOException {
     DFSDataInputStream in = 
       (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
     in.readByte();
@@ -553,7 +553,7 @@ public class DFSTestUtil {
   }
 
   /** For {@link TestTransferRbw} */
-  public static DataTransferProtocol.Status transferRbw(final Block b, 
+  public static DataTransferProtocol.Status transferRbw(final ExtendedBlock b, 
       final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
     assertEquals(2, datanodes.length);
     final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Apr 29 18:16:32 2011
@@ -36,8 +36,6 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
@@ -46,15 +44,20 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -65,6 +68,7 @@ import org.apache.hadoop.security.Refres
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -73,16 +77,18 @@ import org.apache.hadoop.util.ToolRunner
  * The data directories for non-simulated DFS are under the testing directory.
  * For simulated data nodes, no underlying fs storage is used.
  */
-@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "Hive", "MapReduce", "Pig"})
-@InterfaceStability.Unstable
 public class MiniDFSCluster {
 
+  private static final String NAMESERVICE_ID_PREFIX = "nameserviceId";
+  private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
+
   /**
    * Class to construct instances of MiniDFSClusters with specific options.
    */
   public static class Builder {
     private int nameNodePort = 0;
     private final Configuration conf;
+    private int numNameNodes = 1;
     private int numDataNodes = 1;
     private boolean format = true;
     private boolean manageNameDfsDirs = true;
@@ -91,15 +97,25 @@ public class MiniDFSCluster {
     private String[] racks = null; 
     private String [] hosts = null;
     private long [] simulatedCapacities = null;
-    // wait until namenode has left safe mode?
+    private String clusterId = null;
     private boolean waitSafeMode = true;
     private boolean setupHostsFile = false;
+    private boolean federation = false;
     
     public Builder(Configuration conf) {
       this.conf = conf;
     }
     
     /**
+     * default false - non federated cluster
+     * @param val
+     * @return Builder object
+     */
+    public Builder federation (boolean val){
+      this.federation = val;
+      return this;
+    }
+    /**
      * Default: 0
      */
     public Builder nameNodePort(int val) {
@@ -110,6 +126,14 @@ public class MiniDFSCluster {
     /**
      * Default: 1
      */
+    public Builder numNameNodes(int val) {
+      this.numNameNodes = val;
+      return this;
+    }
+
+    /**
+     * Default: 1
+     */
     public Builder numDataNodes(int val) {
       this.numDataNodes = val;
       return this;
@@ -180,6 +204,14 @@ public class MiniDFSCluster {
     }
     
     /**
+     * Default: null
+     */
+    public Builder clusterId(String cid) {
+      this.clusterId = cid;
+      return this;
+    }
+
+    /**
      * Default: false
      * When true the hosts file/include file for the cluster is setup
      */
@@ -200,6 +232,12 @@ public class MiniDFSCluster {
    * Used by builder to create and return an instance of MiniDFSCluster
    */
   private MiniDFSCluster(Builder builder) throws IOException {
+    LOG.info("starting cluster with " + builder.numNameNodes + " namenodes.");
+    nameNodes = new NameNodeInfo[builder.numNameNodes];
+    // try to determine if in federation mode
+    if(builder.numNameNodes > 1)
+      builder.federation = true;
+      
     initMiniDFSCluster(builder.nameNodePort,
                        builder.conf,
                        builder.numDataNodes,
@@ -210,8 +248,10 @@ public class MiniDFSCluster {
                        builder.racks,
                        builder.hosts,
                        builder.simulatedCapacities,
+                       builder.clusterId,
                        builder.waitSafeMode,
-                       builder.setupHostsFile);
+                       builder.setupHostsFile,
+                       builder.federation);
   }
   
   public class DataNodeProperties {
@@ -225,27 +265,35 @@ public class MiniDFSCluster {
       this.dnArgs = args;
     }
   }
-  private static final Log LOG = LogFactory.getLog(MiniDFSCluster.class);
 
-  private URI myUri = null;
   private Configuration conf;
-  private NameNode nameNode;
+  private NameNodeInfo[] nameNodes;
   private int numDataNodes;
   private ArrayList<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
   private File data_dir;
-
+  private boolean federation = false; 
   private boolean waitSafeMode = true;
   
-  public final static String FINALIZED_DIR_NAME = "/current/finalized/";
-  
+  /**
+   * Stores the information related to a namenode in the cluster
+   */
+  static class NameNodeInfo {
+    final NameNode nameNode;
+    final Configuration conf;
+    NameNodeInfo(NameNode nn, Configuration conf) {
+      this.nameNode = nn;
+      this.conf = conf;
+    }
+  }
   
   /**
    * This null constructor is used only when wishing to start a data node cluster
    * without a name node (ie when the name node is started elsewhere).
    */
   public MiniDFSCluster() {
+    nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
   }
   
   /**
@@ -406,21 +454,22 @@ public class MiniDFSCluster {
                         StartupOption operation,
                         String[] racks, String hosts[],
                         long[] simulatedCapacities) throws IOException {
-    initMiniDFSCluster(nameNodePort, conf, numDataNodes, format,
+    this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
+    initMiniDFSCluster(nameNodePort, conf, 1, format,
         manageNameDfsDirs, manageDataDfsDirs, operation, racks, hosts,
-        simulatedCapacities, true, false);
+        simulatedCapacities, null, true, false, false);
   }
 
   private void initMiniDFSCluster(int nameNodePort, Configuration conf,
       int numDataNodes, boolean format, boolean manageNameDfsDirs,
       boolean manageDataDfsDirs, StartupOption operation, String[] racks,
-      String[] hosts, long[] simulatedCapacities, boolean waitSafeMode,
-      boolean setupHostsFile)
-    throws IOException {
+      String[] hosts, long[] simulatedCapacities, String clusterId,
+      boolean waitSafeMode, boolean setupHostsFile, boolean federation) 
+  throws IOException {
     this.conf = conf;
     base_dir = new File(getBaseDirectory());
     data_dir = new File(base_dir, "data");
-
+    this.federation = federation;
     this.waitSafeMode = waitSafeMode;
     
     // use alternate RPC engine if spec'd
@@ -444,73 +493,179 @@ public class MiniDFSCluster {
       conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
                       false);
     }
-
-    // Setup the NameNode configuration
-    FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
-    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");  
-    if (manageNameDfsDirs) {
-      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
-          fileAsURI(new File(base_dir, "name1"))+","+
-          fileAsURI(new File(base_dir, "name2")));
-      conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
-          fileAsURI(new File(base_dir, "namesecondary1"))+","+
-          fileAsURI(new File(base_dir, "namesecondary2")));
-    }
     
     int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, Math.min(replication, numDataNodes));
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second
+    conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
+                   StaticMapping.class, DNSToSwitchMapping.class);
+    
+    Collection<String> nameserviceIds = DFSUtil.getNameServiceIds(conf);
+    if(nameserviceIds.size() > 1)  
+      federation = true;
+  
+    if (!federation) {
+      conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "127.0.0.1:" + nameNodePort);
+      conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");
+      NameNode nn = createNameNode(0, conf, numDataNodes, manageNameDfsDirs,
+          format, operation, clusterId);
+      nameNodes[0] = new NameNodeInfo(nn, conf);
+      FileSystem.setDefaultUri(conf, getURI(0));
+    } else {
+      if (nameserviceIds.isEmpty()) {
+        for (int i = 0; i < nameNodes.length; i++) {
+          nameserviceIds.add(NAMESERVICE_ID_PREFIX + i);
+        }
+      }
+      initFederationConf(conf, nameserviceIds, numDataNodes, nameNodePort);
+      createFederationNamenodes(conf, nameserviceIds, manageNameDfsDirs, format,
+          operation, clusterId);
+    }
     
-    // Format and clean out DataNode directories
     if (format) {
       if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
         throw new IOException("Cannot remove data directory: " + data_dir);
       }
-      NameNode.format(conf); 
     }
     
-    // Start the NameNode
-    String[] args = (operation == null ||
-                     operation == StartupOption.FORMAT ||
-                     operation == StartupOption.REGULAR) ?
-      new String[] {} : new String[] {operation.getName()};
-    conf.setClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
-                   StaticMapping.class, DNSToSwitchMapping.class);
-    nameNode = NameNode.createNameNode(args, conf);
-    
     // Start the DataNodes
     startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
         hosts, simulatedCapacities, setupHostsFile);
     waitClusterUp();
-
     //make sure ProxyUsers uses the latest conf
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
   }
   
+  /** Initialize configuration for federated cluster */
+  private static void initFederationConf(Configuration conf,
+      Collection<String> nameserviceIds, int numDataNodes, int nnPort) {
+    String nameserviceIdList = "";
+    for (String nameserviceId : nameserviceIds) {
+      // Create comma separated list of nameserviceIds
+      if (nameserviceIdList.length() > 0) {
+        nameserviceIdList += ",";
+      }
+      nameserviceIdList += nameserviceId;
+      initFederatedNamenodeAddress(conf, nameserviceId, nnPort);
+      nnPort = nnPort == 0 ? 0 : nnPort + 2;
+    }
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nameserviceIdList);
+  }
+
+  /* For federated namenode initialize the address:port */
+  private static void initFederatedNamenodeAddress(Configuration conf,
+      String nameserviceId, int nnPort) {
+    // Set nameserviceId specific key
+    String key = DFSUtil.getNameServiceIdKey(
+        DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId);
+    conf.set(key, "127.0.0.1:0");
+
+    key = DFSUtil.getNameServiceIdKey(
+        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId);
+    conf.set(key, "127.0.0.1:" + nnPort);
+  }
+  
+  private void createFederationNamenodes(Configuration conf,
+      Collection<String> nameserviceIds, boolean manageNameDfsDirs,
+      boolean format, StartupOption operation, String clusterId)
+      throws IOException {
+    // Create namenodes in the cluster
+    int nnCounter = 0;
+    for (String nameserviceId : nameserviceIds) {
+      createFederatedNameNode(nnCounter++, conf, numDataNodes, manageNameDfsDirs,
+          format, operation, clusterId, nameserviceId);
+    }
+  }
+  
+  private NameNode createNameNode(int nnIndex, Configuration conf,
+      int numDataNodes, boolean manageNameDfsDirs, boolean format,
+      StartupOption operation, String clusterId)
+      throws IOException {
+    if (manageNameDfsDirs) {
+      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+          fileAsURI(new File(base_dir, "name" + (2*nnIndex + 1)))+","+
+          fileAsURI(new File(base_dir, "name" + (2*nnIndex + 2))));
+      conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+          fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 1)))+","+
+          fileAsURI(new File(base_dir, "namesecondary" + (2*nnIndex + 2))));
+    }
+    
+    // Format and clean out DataNode directories
+    if (format) {
+      GenericTestUtils.formatNamenode(conf);
+    }
+    if (operation == StartupOption.UPGRADE){
+      operation.setClusterId(clusterId);
+    }
+    
+    // Start the NameNode
+    String[] args = (operation == null ||
+                     operation == StartupOption.FORMAT ||
+                     operation == StartupOption.REGULAR) ?
+      new String[] {} : new String[] {operation.getName()};
+    return NameNode.createNameNode(args, conf);
+  }
+  
+  private void createFederatedNameNode(int nnIndex, Configuration conf,
+      int numDataNodes, boolean manageNameDfsDirs, boolean format,
+      StartupOption operation, String clusterId, String nameserviceId)
+      throws IOException {
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID, nameserviceId);
+    NameNode nn = createNameNode(nnIndex, conf, numDataNodes, manageNameDfsDirs,
+        format, operation, clusterId);
+    conf.set(DFSUtil.getNameServiceIdKey(
+        DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId), NameNode
+        .getHostPortString(nn.getNameNodeAddress()));
+    conf.set(DFSUtil.getNameServiceIdKey(
+        DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nameserviceId), NameNode
+        .getHostPortString(nn.getHttpAddress()));
+    DFSUtil.setGenericConf(conf, nameserviceId, 
+        DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+    nameNodes[nnIndex] = new NameNodeInfo(nn, new Configuration(conf));
+  }
+
   private void setRpcEngine(Configuration conf, Class<?> protocol, Class<?> engine) {
     conf.setClass("rpc.engine."+protocol.getName(), engine, Object.class);
   }
 
   /**
-   * 
-   * @return URI of this MiniDFSCluster
+   * @return URI of the given namenode in MiniDFSCluster
    */
-  public URI getURI() {
-    return myUri;
+  public URI getURI(int nnIndex) {
+    InetSocketAddress addr = nameNodes[nnIndex].nameNode.getNameNodeAddress();
+    String hostPort = NameNode.getHostPortString(addr);
+    URI uri = null;
+    try {
+      uri = new URI("hdfs://" + hostPort);
+    } catch (URISyntaxException e) {
+      NameNode.LOG.warn("unexpected URISyntaxException: " + e );
+    }
+    return uri;
   }
 
   /**
-   * Get configuration.
-   * @return Configuration of this MiniDFSCluster
+   * @return Configuration of for the given namenode
    */
-  public Configuration getConfiguration() {
-    return conf;
+  public Configuration getConfiguration(int nnIndex) {
+    return nameNodes[nnIndex].conf;
   }
 
   /**
-   * wait for the cluster to get out of 
-   * safemode.
+   * wait for the given namenode to get out of safemode.
+   */
+  public void waitNameNodeUp(int nnIndex) {
+    while (!isNameNodeUp(nnIndex)) {
+      try {
+        LOG.warn("Waiting for namenode at " + nnIndex + " to start...");
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+  
+  /**
+   * wait for the cluster to get out of safemode.
    */
   public void waitClusterUp() {
     if (numDataNodes > 0) {
@@ -593,18 +748,6 @@ public class MiniDFSCluster {
     // If minicluster's name node is null assume that the conf has been
     // set with the right address:port of the name node.
     //
-    if (nameNode != null) { // set conf from the name node
-      InetSocketAddress nnAddr = nameNode.getNameNodeAddress(); 
-      int nameNodePort = nnAddr.getPort(); 
-      try {
-	  myUri = new URI("hdfs://"+ nnAddr.getHostName() + ":" +
-			  Integer.toString(nameNodePort));
-      } catch (URISyntaxException e) {
-	  throw new IOException("Couldn't parse own URI", e);
-      }
-      FileSystem.setDefaultUri(conf, myUri);
-    }
-
     if (racks != null && numDataNodes > racks.length ) {
       throw new IllegalArgumentException( "The length of racks [" + racks.length
           + "] is less than the number of datanodes [" + numDataNodes + "].");
@@ -615,7 +758,6 @@ public class MiniDFSCluster {
     }
     //Generate some hostnames if required
     if (racks != null && hosts == null) {
-      System.out.println("Generating host names for datanodes");
       hosts = new String[numDataNodes];
       for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
         hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
@@ -639,8 +781,8 @@ public class MiniDFSCluster {
       // Set up datanode address
       setupDatanodeAddress(dnConf, setupHostsFile);
       if (manageDfsDirs) {
-        File dir1 = new File(data_dir, "data"+(2*i+1));
-        File dir2 = new File(data_dir, "data"+(2*i+2));
+        File dir1 = getStorageDir(i, 0);
+        File dir2 = getStorageDir(i, 1);
         dir1.mkdirs();
         dir2.mkdirs();
         if (!dir1.isDirectory() || !dir2.isDirectory()) { 
@@ -652,7 +794,7 @@ public class MiniDFSCluster {
         conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
       }
       if (simulatedCapacities != null) {
-        dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
+        dnConf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
             simulatedCapacities[i-curDatanodesNum]);
       }
@@ -689,7 +831,7 @@ public class MiniDFSCluster {
         StaticMapping.addNodeToRack(ipAddr + ":" + port,
                                   racks[i-curDatanodesNum]);
       }
-      DataNode.runDatanodeDaemon(dn);
+      dn.runDatanodeDaemon();
       dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
     }
     curDatanodesNum += numDataNodes;
@@ -753,6 +895,30 @@ public class MiniDFSCluster {
                    simulatedCapacities, false);
     
   }
+
+  /**
+   * Finalize the namenode. Block pools corresponding to the namenode are
+   * finalized on the datanode.
+   */
+  private void finalizeNamenode(NameNode nn, Configuration conf) throws Exception {
+    if (nn == null) {
+      throw new IllegalStateException("Attempting to finalize "
+                                      + "Namenode but it is not running");
+    }
+    ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
+  }
+  
+  /**
+   * Finalize cluster for the namenode at the given index 
+   * @see MiniDFSCluster#finalizeCluster(Configuration)
+   * @param nnIndex
+   * @param conf
+   * @throws Exception
+   */
+  public void finalizeCluster(int nnIndex, Configuration conf) throws Exception {
+    finalizeNamenode(nameNodes[nnIndex].nameNode, nameNodes[nnIndex].conf);
+  }
+
   /**
    * If the NameNode is running, attempt to finalize a previous upgrade.
    * When this method return, the NameNode should be finalized, but
@@ -761,18 +927,32 @@ public class MiniDFSCluster {
    * @throws IllegalStateException if the Namenode is not running.
    */
   public void finalizeCluster(Configuration conf) throws Exception {
-    if (nameNode == null) {
-      throw new IllegalStateException("Attempting to finalize "
-                                      + "Namenode but it is not running");
+    for (NameNodeInfo nnInfo : nameNodes) {
+      if (nnInfo == null) {
+        throw new IllegalStateException("Attempting to finalize "
+            + "Namenode but it is not running");
+      }
+      finalizeNamenode(nnInfo.nameNode, nnInfo.conf);
     }
-    ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
+  }
+  
+  public int getNumNameNodes() {
+    return nameNodes.length;
   }
   
   /**
    * Gets the started NameNode.  May be null.
    */
   public NameNode getNameNode() {
-    return nameNode;
+    checkSingleNameNode();
+    return getNameNode(0);
+  }
+  
+  /**
+   * Gets the NameNode for the index.  May be null.
+   */
+  public NameNode getNameNode(int nnIndex) {
+    return nameNodes[nnIndex].nameNode;
   }
   
   /**
@@ -780,7 +960,12 @@ public class MiniDFSCluster {
    * @return {@link FSNamesystem} object.
    */
   public FSNamesystem getNamesystem() {
-    return NameNodeAdapter.getNamesystem(nameNode);
+    checkSingleNameNode();
+    return NameNodeAdapter.getNamesystem(nameNodes[0].nameNode);
+  }
+  
+  public FSNamesystem getNamesystem(int nnIndex) {
+    return NameNodeAdapter.getNamesystem(nameNodes[nnIndex].nameNode);
   }
 
   /**
@@ -808,21 +993,34 @@ public class MiniDFSCluster {
   /**
    * Gets the rpc port used by the NameNode, because the caller 
    * supplied port is not necessarily the actual port used.
+   * Assumption: cluster has a single namenode
    */     
   public int getNameNodePort() {
-    return nameNode.getNameNodeAddress().getPort();
+    checkSingleNameNode();
+    return getNameNodePort(0);
   }
     
   /**
-   * Shut down the servers that are up.
+   * Gets the rpc port used by the NameNode at the given index, because the
+   * caller supplied port is not necessarily the actual port used.
+   */     
+  public int getNameNodePort(int nnIndex) {
+    return nameNodes[nnIndex].nameNode.getNameNodeAddress().getPort();
+  }
+    
+  /**
+   * Shutdown all the nodes in the cluster.
    */
   public void shutdown() {
     System.out.println("Shutting down the Mini HDFS Cluster");
     shutdownDataNodes();
-    if (nameNode != null) {
-      nameNode.stop();
-      nameNode.join();
-      nameNode = null;
+    for (NameNodeInfo nnInfo : nameNodes) {
+      NameNode nameNode = nnInfo.nameNode;
+      if (nameNode != null) {
+        nameNode.stop();
+        nameNode.join();
+        nameNode = null;
+      }
     }
   }
   
@@ -840,23 +1038,36 @@ public class MiniDFSCluster {
   }
 
   /**
-   * Shutdown namenode.
+   * Shutdown all the namenodes.
    */
-  public synchronized void shutdownNameNode() {
-    if (nameNode != null) {
+  public synchronized void shutdownNameNodes() {
+    for (int i = 0; i < nameNodes.length; i++) {
+      shutdownNameNode(i);
+    }
+  }
+  
+  /**
+   * Shutdown the namenode at a given index.
+   */
+  public synchronized void shutdownNameNode(int nnIndex) {
+    NameNode nn = nameNodes[nnIndex].nameNode;
+    if (nn != null) {
       System.out.println("Shutting down the namenode");
-      nameNode.stop();
-      nameNode.join();
-      nameNode = null;
+      nn.stop();
+      nn.join();
+      Configuration conf = nameNodes[nnIndex].conf;
+      nameNodes[nnIndex] = new NameNodeInfo(null, conf);
     }
   }
 
   /**
-   * Restart namenode.
+   * Restart namenode at a given index.
    */
-  public synchronized void restartNameNode() throws IOException {
-    shutdownNameNode();
-    nameNode = NameNode.createNameNode(new String[] {}, conf);
+  public synchronized void restartNameNode(int nnIndex) throws IOException {
+    Configuration conf = nameNodes[nnIndex].conf;
+    shutdownNameNode(nnIndex);
+    NameNode nn = NameNode.createNameNode(new String[] {}, conf);
+    nameNodes[nnIndex] = new NameNodeInfo(nn, conf);
     waitClusterUp();
     System.out.println("Restarted the namenode");
     int failedCount = 0;
@@ -884,76 +1095,62 @@ public class MiniDFSCluster {
    * @param The index of the datanode
    * @param The name of the block
    * @throws IOException on error accessing the file for the given block
-   * @return The contents of the block file, null if none found
    */
-  public String readBlockOnDataNode(int i, String blockName) throws IOException {
-    assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
-
-    // Each datanode has multiple data dirs, check each
-    for (int dn = i*2; dn < i*2+2; dn++) {
-      File dataDir = new File(getBaseDirectory() + "data");
-      File blockFile = new File(dataDir,
-          "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
-      if (blockFile.exists()) {
-        return DFSTestUtil.readFile(blockFile);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Corrupt a block on all datanodes.
-   *
-   * @param The name of the block
-   * @throws IOException on error accessing the given block.
-   * @return The number of block files corrupted.
-   */  
-  public int corruptBlockOnDataNodes(String blockName) throws IOException {
+  int corruptBlockOnDataNodes(ExtendedBlock block) throws IOException{
     int blocksCorrupted = 0;
-    for (int i=0; i < dataNodes.size(); i++) {
-      if (corruptReplica(blockName, i)) {
+    File[] blockFiles = getAllBlockFiles(block);
+    for (File f : blockFiles) {
+      if (corruptBlock(f)) {
         blocksCorrupted++;
       }
     }
     return blocksCorrupted;
   }
 
+  public String readBlockOnDataNode(int i, ExtendedBlock block)
+      throws IOException {
+    assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+    File blockFile = getBlockFile(i, block);
+    if (blockFile != null && blockFile.exists()) {
+      return DFSTestUtil.readFile(blockFile);
+    }
+    return null;
+  }
+
   /**
    * Corrupt a block on a particular datanode.
    *
-   * @param The index of the datanode
-   * @param The name of the block
+   * @param i index of the datanode
+   * @param blk name of the block
    * @throws IOException on error accessing the given block or if
    * the contents of the block (on the same datanode) differ.
    * @return true if a replica was corrupted, false otherwise
+   * Types: delete, write bad data, truncate
    */
-  public boolean corruptReplica(String blockName, int i) throws IOException {
-    Random random = new Random();
-    assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
-    int filesCorrupted = 0;
+  public static boolean corruptReplica(int i, ExtendedBlock blk)
+      throws IOException {
+    File blockFile = getBlockFile(i, blk);
+    return corruptBlock(blockFile);
+  }
 
-    // Each datanode has multiple data dirs, check each
-    for (int dn = i*2; dn < i*2+2; dn++) {
-      File dataDir = new File(getBaseDirectory() + "data");
-      File blockFile = new File(dataDir,
-          "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
-
-      // Corrupt the replica by writing some bytes into a random offset
-      if (blockFile.exists()) { 
-        System.out.println("Corrupting " + blockFile);
-        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        FileChannel channel = raFile.getChannel();
-        String badString = "BADBAD";
-        int rand = random.nextInt((int)channel.size()/2);
-        raFile.seek(rand);
-        raFile.write(badString.getBytes());
-        raFile.close();
-        filesCorrupted++;
-      }
-    }
-    assert filesCorrupted == 0 || filesCorrupted == 1
-      : "Unexpected # block files";
-    return filesCorrupted == 1;
+  /*
+   * Corrupt a block on a particular datanode
+   */
+  public static boolean corruptBlock(File blockFile) throws IOException {
+    if (blockFile == null || !blockFile.exists()) {
+      return false;
+    }
+    // Corrupt replica by writing random bytes into replica
+    Random random = new Random();
+    RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+    FileChannel channel = raFile.getChannel();
+    String badString = "BADBAD";
+    int rand = random.nextInt((int)channel.size()/2);
+    raFile.seek(rand);
+    raFile.write(badString.getBytes());
+    raFile.close();
+    LOG.warn("Corrupting the block " + blockFile);
+    return true;
   }
 
   /*
@@ -966,7 +1163,7 @@ public class MiniDFSCluster {
     DataNodeProperties dnprop = dataNodes.remove(i);
     DataNode dn = dnprop.datanode;
     System.out.println("MiniDFSCluster Stopping DataNode " + 
-                       dn.dnRegistration.getName() +
+                       dn.getMachineName() +
                        " from a total of " + (dataNodes.size() + 1) + 
                        " datanodes.");
     dn.shutdown();
@@ -981,7 +1178,12 @@ public class MiniDFSCluster {
     int i;
     for (i = 0; i < dataNodes.size(); i++) {
       DataNode dn = dataNodes.get(i).datanode;
-      if (dn.dnRegistration.getName().equals(name)) {
+      // get BP registration
+      DatanodeRegistration dnR = 
+        DataNodeTestUtils.getDNRegistrationByMachineName(dn, name);
+      LOG.info("for name=" + name + " found bp=" + dnR + 
+          "; with dnMn=" + dn.getMachineName());
+      if(dnR != null) {
         break;
       }
     }
@@ -1065,7 +1267,8 @@ public class MiniDFSCluster {
    * Returns true if the NameNode is running and is out of Safe Mode
    * or if waiting for safe mode is disabled.
    */
-  public boolean isClusterUp() {
+  public boolean isNameNodeUp(int nnIndex) {
+    NameNode nameNode = nameNodes[nnIndex].nameNode;
     if (nameNode == null) {
       return false;
     }
@@ -1076,6 +1279,18 @@ public class MiniDFSCluster {
     }
     return isUp;
   }
+
+  /**
+   * Returns true if all the NameNodes are running and is out of Safe Mode.
+   */
+  public boolean isClusterUp() {
+    for (int index = 0; index < nameNodes.length; index++) {
+      if (!isNameNodeUp(index)) {
+        return false;
+      }
+    }
+    return true;
+  }
   
   /**
    * Returns true if there is at least one DataNode running.
@@ -1084,33 +1299,55 @@ public class MiniDFSCluster {
     if (dataNodes == null || dataNodes.size() == 0) {
       return false;
     }
-    return true;
+    for (DataNodeProperties dn : dataNodes) {
+      if (dn.datanode.isDatanodeUp()) {
+        return true;
+      }
+    }
+    return false;
   }
   
   /**
-   * Get a client handle to the DFS cluster.
+   * Get a client handle to the DFS cluster with a single namenode.
    */
   public FileSystem getFileSystem() throws IOException {
-    return FileSystem.get(conf);
+    checkSingleNameNode();
+    return getFileSystem(0);
   }
   
+  /**
+   * Get a client handle to the DFS cluster for the namenode at given index.
+   */
+  public FileSystem getFileSystem(int nnIndex) throws IOException {
+    return FileSystem.get(getURI(nnIndex), nameNodes[nnIndex].conf);
+  }
 
   /**
    * Get another FileSystem instance that is different from FileSystem.get(conf).
    * This simulating different threads working on different FileSystem instances.
    */
-  public FileSystem getNewFileSystemInstance() throws IOException {
-    return FileSystem.newInstance(conf);
+  public FileSystem getNewFileSystemInstance(int nnIndex) throws IOException {
+    return FileSystem.newInstance(getURI(nnIndex), nameNodes[nnIndex].conf);
+  }
+  
+  /**
+   * @return a http URL
+   */
+  public String getHttpUri(int nnIndex) throws IOException {
+    return "http://"
+        + nameNodes[nnIndex].conf
+            .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
   }
   
   /**
    * @return a {@link HftpFileSystem} object.
    */
-  public HftpFileSystem getHftpFileSystem() throws IOException {
-    final String str = "hftp://"
-        + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+  public HftpFileSystem getHftpFileSystem(int nnIndex) throws IOException {
+    String uri = "hftp://"
+        + nameNodes[nnIndex].conf
+            .get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
     try {
-      return (HftpFileSystem)FileSystem.get(new URI(str), conf); 
+      return (HftpFileSystem)FileSystem.get(new URI(uri), conf);
     } catch (URISyntaxException e) {
       throw new IOException(e);
     }
@@ -1120,14 +1357,14 @@ public class MiniDFSCluster {
    *  @return a {@link HftpFileSystem} object as specified user. 
    */
   public HftpFileSystem getHftpFileSystemAs(final String username,
-      final Configuration conf, final String... groups
-      ) throws IOException, InterruptedException {
+      final Configuration conf, final int nnIndex, final String... groups)
+      throws IOException, InterruptedException {
     final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
         username, groups);
     return ugi.doAs(new PrivilegedExceptionAction<HftpFileSystem>() {
       @Override
       public HftpFileSystem run() throws Exception {
-        return getHftpFileSystem();
+        return getHftpFileSystem(nnIndex);
       }
     });
   }
@@ -1135,31 +1372,29 @@ public class MiniDFSCluster {
   /**
    * Get the directories where the namenode stores its image.
    */
-  public Collection<URI> getNameDirs() {
-    return FSNamesystem.getNamespaceDirs(conf);
+  public Collection<URI> getNameDirs(int nnIndex) {
+    return FSNamesystem.getNamespaceDirs(nameNodes[nnIndex].conf);
   }
 
   /**
    * Get the directories where the namenode stores its edits.
    */
-  public Collection<URI> getNameEditsDirs() {
-    return FSNamesystem.getNamespaceEditsDirs(conf);
+  public Collection<URI> getNameEditsDirs(int nnIndex) {
+    return FSNamesystem.getNamespaceEditsDirs(nameNodes[nnIndex].conf);
   }
 
-  /**
-   * Wait until the cluster is active and running.
-   */
-  public void waitActive() throws IOException {
-    if (nameNode == null) {
+  /** Wait until the given namenode gets registration from all the datanodes */
+  public void waitActive(int nnIndex) throws IOException {
+    if (nameNodes.length == 0 || nameNodes[nnIndex] == null) {
       return;
     }
-    InetSocketAddress addr = new InetSocketAddress("localhost",
-                                                   getNameNodePort());
+    InetSocketAddress addr = nameNodes[nnIndex].nameNode.getServiceRpcAddress();
     DFSClient client = new DFSClient(addr, conf);
 
-    // make sure all datanodes have registered and sent heartbeat
-    while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
+    // ensure all datanodes have registered and sent heartbeat to the namenode
+    while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE), addr)) {
       try {
+        LOG.info("Waiting for cluster to become active");
         Thread.sleep(100);
       } catch (InterruptedException e) {
       }
@@ -1168,10 +1403,39 @@ public class MiniDFSCluster {
     client.close();
   }
   
-  private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
+  /**
+   * Wait until the cluster is active and running.
+   */
+  public void waitActive() throws IOException {
+    for (int index = 0; index < nameNodes.length; index++) {
+      waitActive(index);
+    }
+  }
+  
+  private synchronized boolean shouldWait(DatanodeInfo[] dnInfo,
+      InetSocketAddress addr) {
+    // If a datanode failed to start, then do not wait
+    for (DataNodeProperties dn : dataNodes) {
+      // the datanode thread communicating with the namenode should be alive
+      if (!dn.datanode.isBPServiceAlive(addr)) {
+        LOG.warn("BPOfferService failed to start in datanode " + dn.datanode
+            + " for namenode at " + addr);
+        return false;
+      }
+    }
+    
+    // Wait for expected number of datanodes to start
     if (dnInfo.length != numDataNodes) {
       return true;
     }
+    
+    // if one of the data nodes is not fully started, continue to wait
+    for (DataNodeProperties dn : dataNodes) {
+      if (!dn.datanode.isDatanodeFullyStarted()) {
+        return true;
+      }
+    }
+    
     // make sure all datanodes have sent first heartbeat to namenode,
     // using (capacity == 0) as proxy.
     for (DatanodeInfo dn : dnInfo) {
@@ -1179,6 +1443,13 @@ public class MiniDFSCluster {
         return true;
       }
     }
+    
+    // If datanode dataset is not initialized then wait
+    for (DataNodeProperties dn : dataNodes) {
+      if (dn.datanode.data == null) {
+        return true;
+      }
+    }
     return false;
   }
 
@@ -1195,11 +1466,12 @@ public class MiniDFSCluster {
    * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
    * @return the block report for the specified data node
    */
-  public Iterable<Block> getBlockReport(int dataNodeIndex) {
+  public Iterable<Block> getBlockReport(String bpid, int dataNodeIndex) {
     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
-    return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport();
+    return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport(
+        bpid);
   }
   
   
@@ -1208,11 +1480,11 @@ public class MiniDFSCluster {
    * @return block reports from all data nodes
    *    BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
    */
-  public Iterable<Block>[] getAllBlockReports() {
+  public Iterable<Block>[] getAllBlockReports(String bpid) {
     int numDataNodes = dataNodes.size();
     Iterable<Block>[] result = new BlockListAsLongs[numDataNodes];
     for (int i = 0; i < numDataNodes; ++i) {
-     result[i] = getBlockReport(i);
+     result[i] = getBlockReport(bpid, i);
     }
     return result;
   }
@@ -1235,11 +1507,30 @@ public class MiniDFSCluster {
     if (!(dataSet instanceof SimulatedFSDataset)) {
       throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
     }
+    String bpid = getNamesystem().getBlockPoolId();
     SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
-    sdataset.injectBlocks(blocksToInject);
-    dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
+    sdataset.injectBlocks(bpid, blocksToInject);
+    dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
   }
-  
+
+  /**
+   * Multiple-NameNode version of {@link #injectBlocks(Iterable[])}.
+   */
+  public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
+      Iterable<Block> blocksToInject) throws IOException {
+    if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
+      throw new IndexOutOfBoundsException();
+    }
+    FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
+    if (!(dataSet instanceof SimulatedFSDataset)) {
+      throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
+    }
+    String bpid = getNamesystem(nameNodeIndex).getBlockPoolId();
+    SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
+    sdataset.injectBlocks(bpid, blocksToInject);
+    dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
+  }
+
   /**
    * This method is valid only if the data nodes have simulated data
    * @param blocksToInject - blocksToInject[] is indexed in the same order as the list 
@@ -1249,7 +1540,8 @@ public class MiniDFSCluster {
    *             if any of blocks already exist in the data nodes
    *             Note the rest of the blocks are not injected.
    */
-  public void injectBlocks(Iterable<Block>[] blocksToInject) throws IOException {
+  public void injectBlocks(Iterable<Block>[] blocksToInject)
+      throws IOException {
     if (blocksToInject.length >  dataNodes.size()) {
       throw new IndexOutOfBoundsException();
     }
@@ -1288,6 +1580,163 @@ public class MiniDFSCluster {
   public static String getBaseDirectory() {
     return System.getProperty("test.build.data", "build/test/data") + "/dfs/";
   }
+
+  /**
+   * Get a storage directory for a datanode. There are two storage directories
+   * per datanode:
+   * <ol>
+   * <li><base directory>/data/data<2*dnIndex + 1></li>
+   * <li><base directory>/data/data<2*dnIndex + 2></li>
+   * </ol>
+   * 
+   * @param dnIndex datanode index (starts from 0)
+   * @param dirIndex directory index (0 or 1). Index 0 provides access to the
+   *          first storage directory. Index 1 provides access to the second
+   *          storage directory.
+   * @return Storage directory
+   */
+  public static File getStorageDir(int dnIndex, int dirIndex) {
+    return new File(getBaseDirectory() + "data/data" + (2*dnIndex + 1 + dirIndex));
+  }
+  
+  /**
+   * Get current directory corresponding to the datanode
+   * @param storageDir
+   * @return current directory
+   */
+  public static String getDNCurrentDir(File storageDir) {
+    return storageDir + "/" + Storage.STORAGE_DIR_CURRENT + "/";
+  }
+  
+  /**
+   * Get directory corresponding to block pool directory in the datanode
+   * @param storageDir
+   * @return current directory
+   */
+  public static String getBPDir(File storageDir, String bpid) {
+    return getDNCurrentDir(storageDir) + bpid + "/";
+  }
+  /**
+   * Get directory relative to block pool directory in the datanode
+   * @param storageDir
+   * @return current directory
+   */
+  public static String getBPDir(File storageDir, String bpid, String dirName) {
+    return getBPDir(storageDir, bpid) + dirName + "/";
+  }
+  
+  /**
+   * Get finalized directory for a block pool
+   * @param storageDir storage directory
+   * @param bpid Block pool Id
+   * @return finalized directory for a block pool
+   */
+  public static File getRbwDir(File storageDir, String bpid) {
+    return new File(getBPDir(storageDir, bpid, Storage.STORAGE_DIR_CURRENT)
+        + DataStorage.STORAGE_DIR_RBW );
+  }
+  
+  /**
+   * Get finalized directory for a block pool
+   * @param storageDir storage directory
+   * @param bpid Block pool Id
+   * @return finalized directory for a block pool
+   */
+  public static File getFinalizedDir(File storageDir, String bpid) {
+    return new File(getBPDir(storageDir, bpid, Storage.STORAGE_DIR_CURRENT)
+        + DataStorage.STORAGE_DIR_FINALIZED );
+  }
+  
+  /**
+   * Get file correpsonding to a block
+   * @param storageDir storage directory
+   * @param blk block to be corrupted
+   * @return file corresponding to the block
+   */
+  public static File getBlockFile(File storageDir, ExtendedBlock blk) {
+    return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), 
+        blk.getBlockName());
+  }
+  
+  /**
+   * Get all files related to a block from all the datanodes
+   * @param block block for which corresponding files are needed
+   */
+  public File[] getAllBlockFiles(ExtendedBlock block) {
+    if (dataNodes.size() == 0) return new File[0];
+    ArrayList<File> list = new ArrayList<File>();
+    for (int i=0; i < dataNodes.size(); i++) {
+      File blockFile = getBlockFile(i, block);
+      if (blockFile != null) {
+        list.add(blockFile);
+      }
+    }
+    return list.toArray(new File[list.size()]);
+  }
+  
+  /**
+   * Get files related to a block for a given datanode
+   * @param dnIndex Index of the datanode to get block files for
+   * @param block block for which corresponding files are needed
+   */
+  public static File getBlockFile(int dnIndex, ExtendedBlock block) {
+    // Check for block file in the two storage directories of the datanode
+    for (int i = 0; i <=1 ; i++) {
+      File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
+      File blockFile = getBlockFile(storageDir, block);
+      if (blockFile.exists()) {
+        return blockFile;
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * Throw an exception if the MiniDFSCluster is not started with a single
+   * namenode
+   */
+  private void checkSingleNameNode() {
+    if (nameNodes.length != 1) {
+      throw new IllegalArgumentException("Namenode index is needed");
+    }
+  }
+
+  /**
+   * Add a namenode to a federated cluster and start it. Configuration of
+   * datanodes in the cluster is refreshed to register with the new namenode.
+   * 
+   * @return newly started namenode
+   */
+  public NameNode addNameNode(Configuration conf, int namenodePort)
+      throws IOException {
+    if(!federation)
+      throw new IOException("cannot add namenode to non-federated cluster");
+    
+    int nnIndex = nameNodes.length;
+    int numNameNodes = nameNodes.length + 1;
+    NameNodeInfo[] newlist = new NameNodeInfo[numNameNodes];
+    System.arraycopy(nameNodes, 0, newlist, 0, nameNodes.length);
+    nameNodes = newlist;
+    String nameserviceId = NAMESERVICE_ID_PREFIX + (nnIndex + 1);
+    
+    String nameserviceIds = conf.get(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES);
+    nameserviceIds += "," + nameserviceId;
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nameserviceIds);
+    
+    initFederatedNamenodeAddress(conf, nameserviceId, namenodePort);
+    createFederatedNameNode(nnIndex, conf, numDataNodes, true, true, null,
+        null, nameserviceId);
+
+    // Refresh datanodes with the newly started namenode
+    for (DataNodeProperties dn : dataNodes) {
+      DataNode datanode = dn.datanode;
+      datanode.refreshNamenodes(conf);
+    }
+
+    // Wait for new namenode to get registrations from all the datanodes
+    waitActive(nnIndex);
+    return nameNodes[nnIndex].nameNode;
+  }
   
   private int getFreeSocketPort() {
     int port = 0;
@@ -1322,9 +1771,7 @@ public class MiniDFSCluster {
   
   private void addToFile(String p, String address) throws IOException {
     File f = new File(p);
-    if (!f.exists()) {
-      f.createNewFile();
-    }
+    f.createNewFile();
     PrintWriter writer = new PrintWriter(new FileWriter(f, true));
     try {
       writer.println(address);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java Fri Apr 29 18:16:32 2011
@@ -19,21 +19,18 @@ package org.apache.hadoop.hdfs;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Properties;
 
 import junit.framework.TestCase;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.BlockMissingException;
@@ -64,7 +61,7 @@ public class TestBlockMissingException e
       // extract block locations from File system. Wait till file is closed.
       LocatedBlocks locations = null;
       locations = fileSys.dfs.getNamenode().getBlockLocations(file1.toString(),
-                                                             0, numBlocks * blockSize);
+          0, numBlocks * blockSize);
       // remove block of file
       LOG.info("Remove first block of file");
       corruptBlock(file1, locations.get(0).getBlock());
@@ -119,46 +116,15 @@ public class TestBlockMissingException e
     assertTrue("Expected BlockMissingException ", gotException);
   }
 
-  /*
-   * The Data directories for a datanode
-   */
-  private File[] getDataNodeDirs(int i) throws IOException {
-    String base_dir = MiniDFSCluster.getBaseDirectory();
-    File data_dir = new File(base_dir, "data");
-    File dir1 = new File(data_dir, "data"+(2*i+1));
-    File dir2 = new File(data_dir, "data"+(2*i+2));
-    if (dir1.isDirectory() && dir2.isDirectory()) {
-      File[] dir = new File[2];
-      dir[0] = new File(dir1, MiniDFSCluster.FINALIZED_DIR_NAME);
-      dir[1] = new File(dir2, MiniDFSCluster.FINALIZED_DIR_NAME); 
-      return dir;
-    }
-    return new File[0];
-  }
-
   //
   // Corrupt specified block of file
   //
-  void corruptBlock(Path file, Block blockNum) throws IOException {
-    long id = blockNum.getBlockId();
-
-    // Now deliberately remove/truncate data blocks from the block.
-    //
-    for (int i = 0; i < NUM_DATANODES; i++) {
-      File[] dirs = getDataNodeDirs(i);
-      
-      for (int j = 0; j < dirs.length; j++) {
-        File[] blocks = dirs[j].listFiles();
-        assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
-        for (int idx = 0; idx < blocks.length; idx++) {
-          if (blocks[idx].getName().startsWith("blk_" + id) &&
-              !blocks[idx].getName().endsWith(".meta")) {
-            blocks[idx].delete();
-            LOG.info("Deleted block " + blocks[idx]);
-          }
-        }
-      }
+  void corruptBlock(Path file, ExtendedBlock blk) {
+    // Now deliberately remove/truncate data blocks from the file.
+    File[] blockFiles = dfs.getAllBlockFiles(blk);
+    for (File f : blockFiles) {
+      f.delete();
+      LOG.info("Deleted block " + f);
     }
   }
-
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Fri Apr 29 18:16:32 2011
@@ -30,8 +30,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.never;
 
-import static org.junit.Assert.*;
-
 public class TestClientBlockVerification {
 
   static BlockReaderTestUtil util = null;

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Fri Apr 29 18:16:32 2011
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -51,7 +51,7 @@ public class TestClientProtocolForPipeli
       Path file = new Path("dataprotocol.dat");    
       DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
       // get the first blockid for the file
-      Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+      ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
 
       // test getNewStampAndToken on a finalized block
       try {
@@ -64,8 +64,8 @@ public class TestClientProtocolForPipeli
       // test getNewStampAndToken on a non-existent block
       try {
         long newBlockId = firstBlock.getBlockId() + 1;
-        Block newBlock = new Block(newBlockId, 0, 
-            firstBlock.getGenerationStamp());
+        ExtendedBlock newBlock = new ExtendedBlock(firstBlock.getBlockPoolId(),
+            newBlockId, 0, firstBlock.getGenerationStamp());
         namenode.updateBlockForPipeline(newBlock, "");
         Assert.fail("Cannot get a new GS from a non-existent block");
       } catch (IOException e) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java Fri Apr 29 18:16:32 2011
@@ -31,6 +31,7 @@ import static org.junit.Assert.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
 
 /**
@@ -82,8 +83,9 @@ public class TestCrcCorruption {
       // file disallows this Datanode to send data to another datanode.
       // However, a client is alowed access to this block.
       //
-      File data_dir = new File(System.getProperty("test.build.data"),
-                               "dfs/data/data1" + MiniDFSCluster.FINALIZED_DIR_NAME);
+      File storageDir = MiniDFSCluster.getStorageDir(0, 1);
+      String bpid = cluster.getNamesystem().getBlockPoolId();
+      File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
       assertTrue("data directory does not exist", data_dir.exists());
       File[] blocks = data_dir.listFiles();
       assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
@@ -135,12 +137,13 @@ public class TestCrcCorruption {
           }
         }
       }
+      
       //
       // Now deliberately corrupt all meta blocks from the second
       // directory of the first datanode
       //
-      data_dir = new File(System.getProperty("test.build.data"),
-                               "dfs/data/data2" + MiniDFSCluster.FINALIZED_DIR_NAME);
+      storageDir = MiniDFSCluster.getStorageDir(0, 1);
+      data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
       assertTrue("data directory does not exist", data_dir.exists());
       blocks = data_dir.listFiles();
       assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
@@ -256,7 +259,7 @@ public class TestCrcCorruption {
       DFSTestUtil.createFile(fs, file, fileSize, replFactor, 12345L /*seed*/);
       DFSTestUtil.waitReplication(fs, file, replFactor);
 
-      String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file);
       int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
       assertEquals("All replicas not corrupted", replFactor, blockFilesCorrupted);
 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Apr 29 18:16:32 2011
@@ -43,9 +43,9 @@ import org.apache.hadoop.fs.FileChecksum
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -157,7 +157,7 @@ public class TestDFSClientRetries extend
     };
     when(mockNN.addBlock(anyString(), 
                          anyString(),
-                         any(Block.class),
+                         any(ExtendedBlock.class),
                          any(DatanodeInfo[].class))).thenAnswer(answer);
 
     final DFSClient client = new DFSClient(null, mockNN, conf, null);
@@ -489,7 +489,7 @@ public class TestDFSClientRetries extend
     
     public void run() {
       try {
-        fs = cluster.getNewFileSystemInstance();
+        fs = cluster.getNewFileSystemInstance(0);
         
         int bufferSize = len;
         byte[] buf = new byte[bufferSize];

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSFinalize.java Fri Apr 29 18:16:32 2011
@@ -69,7 +69,7 @@ public class TestDFSFinalize extends Tes
       assertEquals(
                    UpgradeUtilities.checksumContents(
                                                      DATA_NODE, new File(dataNodeDirs[i],"current")),
-                   UpgradeUtilities.checksumMasterContents(DATA_NODE));
+                   UpgradeUtilities.checksumMasterDataNodeContents());
     }
     for (int i = 0; i < nameNodeDirs.length; i++) {
       assertFalse(new File(nameNodeDirs[i],"previous").isDirectory());
@@ -95,10 +95,10 @@ public class TestDFSFinalize extends Tes
       String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
       
       log("Finalize with existing previous dir", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
       cluster = new MiniDFSCluster.Builder(conf)
                                   .format(false)
                                   .manageDataDfsDirs(false)

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSRollback.java Fri Apr 29 18:16:32 2011
@@ -76,7 +76,7 @@ public class TestDFSRollback extends Tes
         assertEquals(
                      UpgradeUtilities.checksumContents(
                                                        nodeType, new File(baseDirs[i],"current")),
-                     UpgradeUtilities.checksumMasterContents(nodeType));
+                     UpgradeUtilities.checksumMasterDataNodeContents());
       }
       break;
     }
@@ -104,17 +104,17 @@ public class TestDFSRollback extends Tes
   }
   
   /**
-   * Attempts to start a DataNode with the given operation.  Starting
-   * the DataNode should throw an exception.
+   * Attempts to start a DataNode with the given operation. Starting
+   * the given block pool should fail.
+   * @param operation startup option
+   * @param bpid block pool Id that should fail to start
+   * @throws IOException 
    */
-  void startDataNodeShouldFail(StartupOption operation) {
-    try {
-      cluster.startDataNodes(conf, 1, false, operation, null); // should fail
-      throw new AssertionError("DataNode should have failed to start");
-    } catch (Exception expected) {
-      // expected
-      assertFalse(cluster.isDataNodeUp());
-    }
+  void startBlockPoolShouldFail(StartupOption operation, String bpid)
+      throws IOException {
+    cluster.startDataNodes(conf, 1, false, operation, null); // should fail
+    assertFalse("Block pool " + bpid + " should have failed to start", 
+        cluster.getDataNodes().get(0).isBPServiceAlive(bpid));
   }
  
   /**
@@ -125,6 +125,7 @@ public class TestDFSRollback extends Tes
     File[] baseDirs;
     UpgradeUtilities.initialize();
     
+    StorageInfo storageInfo = null;
     for (int numDirs = 1; numDirs <= 2; numDirs++) {
       conf = new HdfsConfiguration();
       conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);      
@@ -133,8 +134,8 @@ public class TestDFSRollback extends Tes
       String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
       
       log("Normal NameNode rollback", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
@@ -146,16 +147,16 @@ public class TestDFSRollback extends Tes
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("Normal DataNode rollback", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
                                                 .startupOption(StartupOption.ROLLBACK)
                                                 .build();
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
       cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null);
       checkResult(DATA_NODE, dataNodeDirs);
       cluster.shutdown();
@@ -163,67 +164,77 @@ public class TestDFSRollback extends Tes
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
 
       log("NameNode rollback without existing previous dir", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       startNameNodeShouldFail(StartupOption.ROLLBACK);
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("DataNode rollback without existing previous dir", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
                                                 .startupOption(StartupOption.UPGRADE)
                                                 .build();
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
       cluster.startDataNodes(conf, 1, false, StartupOption.ROLLBACK, null);
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
 
       log("DataNode rollback with future stored layout version in previous", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
                                                 .startupOption(StartupOption.ROLLBACK)
                                                 .build();
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
-      UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
-                                         new StorageInfo(Integer.MIN_VALUE,
-                                                         UpgradeUtilities.getCurrentNamespaceID(cluster),
-                                                         UpgradeUtilities.getCurrentFsscTime(cluster)));
-      startDataNodeShouldFail(StartupOption.ROLLBACK);
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
+      storageInfo = new StorageInfo(Integer.MIN_VALUE, 
+          UpgradeUtilities.getCurrentNamespaceID(cluster), 
+          UpgradeUtilities.getCurrentClusterID(cluster), 
+          UpgradeUtilities.getCurrentFsscTime(cluster));
+      
+      UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
+          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+      
+      startBlockPoolShouldFail(StartupOption.ROLLBACK, 
+          cluster.getNamesystem().getBlockPoolId());
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
       
       log("DataNode rollback with newer fsscTime in previous", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
                                                 .startupOption(StartupOption.ROLLBACK)
                                                 .build();
-      UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
-      baseDirs = UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "previous");
-      UpgradeUtilities.createVersionFile(conf, DATA_NODE, baseDirs,
-                                         new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(),
-                                                         UpgradeUtilities.getCurrentNamespaceID(cluster),
-                                                         Long.MAX_VALUE));
-      startDataNodeShouldFail(StartupOption.ROLLBACK);
+      
+      UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
+      storageInfo = new StorageInfo(UpgradeUtilities.getCurrentLayoutVersion(), 
+            UpgradeUtilities.getCurrentNamespaceID(cluster), 
+            UpgradeUtilities.getCurrentClusterID(cluster), Long.MAX_VALUE);
+      
+      UpgradeUtilities.createDataNodeVersionFile(baseDirs, storageInfo,
+          UpgradeUtilities.getCurrentBlockPoolID(cluster));
+      
+      startBlockPoolShouldFail(StartupOption.ROLLBACK, 
+          cluster.getNamesystem().getBlockPoolId());
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
 
       log("NameNode rollback with no edits file", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       for (File f : baseDirs) { 
         FileUtil.fullyDelete(new File(f,"edits"));
       }
@@ -231,8 +242,8 @@ public class TestDFSRollback extends Tes
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode rollback with no image file", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       for (File f : baseDirs) { 
         FileUtil.fullyDelete(new File(f,"fsimage")); 
       }
@@ -240,8 +251,8 @@ public class TestDFSRollback extends Tes
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode rollback with corrupt version file", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       for (File f : baseDirs) { 
         UpgradeUtilities.corruptFile(new File(f,"VERSION")); 
       }
@@ -249,12 +260,15 @@ public class TestDFSRollback extends Tes
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode rollback with old layout version in previous", numDirs);
-      UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
-      baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "previous");
-      UpgradeUtilities.createVersionFile(conf, NAME_NODE, baseDirs,
-                                         new StorageInfo(1,
-                                                         UpgradeUtilities.getCurrentNamespaceID(null),
-                                                         UpgradeUtilities.getCurrentFsscTime(null)));
+      UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
+      baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+      storageInfo = new StorageInfo(1, 
+          UpgradeUtilities.getCurrentNamespaceID(null),
+          UpgradeUtilities.getCurrentClusterID(null),
+          UpgradeUtilities.getCurrentFsscTime(null));
+      
+      UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs,
+          storageInfo, UpgradeUtilities.getCurrentBlockPoolID(cluster));
       startNameNodeShouldFail(StartupOption.UPGRADE);
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
     } // end numDir loop

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=1097905&r1=1097904&r2=1097905&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Fri Apr 29 18:16:32 2011
@@ -1108,11 +1108,12 @@ public class TestDFSShell extends TestCa
   static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
     List<File> files = new ArrayList<File>();
     List<DataNode> datanodes = cluster.getDataNodes();
-    Iterable<Block>[] blocks = cluster.getAllBlockReports();
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    Iterable<Block>[] blocks = cluster.getAllBlockReports(poolId);
     for(int i = 0; i < blocks.length; i++) {
       FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
       for(Block b : blocks[i]) {
-        files.add(ds.getBlockFile(b));
+        files.add(ds.getBlockFile(poolId, b));
       }        
     }
     return files;