You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [29/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java Tue Aug 19 23:49:39 2014
@@ -158,21 +158,23 @@ public class UpgradeUtilities {
       FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock"));
     }
     namenodeStorageChecksum = checksumContents(NAME_NODE, 
-        new File(namenodeStorage, "current"));
+        new File(namenodeStorage, "current"), false);
     File dnCurDir = new File(datanodeStorage, "current");
-    datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir);
+    datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir, false);
     
     File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
         "current");
-    blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir);
+    blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir, false);
     
     File bpCurFinalizeDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
         "current/"+DataStorage.STORAGE_DIR_FINALIZED);
-    blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE, bpCurFinalizeDir);
+    blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE,
+        bpCurFinalizeDir, true);
     
     File bpCurRbwDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
         "current/"+DataStorage.STORAGE_DIR_RBW);
-    blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir);
+    blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir,
+        false);
   }
   
   // Private helper method that writes a file to the given file system.
@@ -266,36 +268,47 @@ public class UpgradeUtilities {
   
   /**
    * Compute the checksum of all the files in the specified directory.
-   * The contents of subdirectories are not included. This method provides
-   * an easy way to ensure equality between the contents of two directories.
+   * This method provides an easy way to ensure equality between the contents
+   * of two directories.
    *
    * @param nodeType if DATA_NODE then any file named "VERSION" is ignored.
    *    This is because this file file is changed every time
    *    the Datanode is started.
-   * @param dir must be a directory. Subdirectories are ignored.
+   * @param dir must be a directory
+   * @param recursive whether or not to consider subdirectories
    *
    * @throws IllegalArgumentException if specified directory is not a directory
    * @throws IOException if an IOException occurs while reading the files
    * @return the computed checksum value
    */
-  public static long checksumContents(NodeType nodeType, File dir) throws IOException {
+  public static long checksumContents(NodeType nodeType, File dir,
+      boolean recursive) throws IOException {
+    CRC32 checksum = new CRC32();
+    checksumContentsHelper(nodeType, dir, checksum, recursive);
+    return checksum.getValue();
+  }
+
+  public static void checksumContentsHelper(NodeType nodeType, File dir,
+      CRC32 checksum, boolean recursive) throws IOException {
     if (!dir.isDirectory()) {
       throw new IllegalArgumentException(
-                                         "Given argument is not a directory:" + dir);
+          "Given argument is not a directory:" + dir);
     }
     File[] list = dir.listFiles();
     Arrays.sort(list);
-    CRC32 checksum = new CRC32();
     for (int i = 0; i < list.length; i++) {
       if (!list[i].isFile()) {
+        if (recursive) {
+          checksumContentsHelper(nodeType, list[i], checksum, recursive);
+        }
         continue;
       }
 
       // skip VERSION and dfsUsed file for DataNodes
-      if (nodeType == DATA_NODE && 
-         (list[i].getName().equals("VERSION") || 
-         list[i].getName().equals("dfsUsed"))) {
-        continue; 
+      if (nodeType == DATA_NODE &&
+          (list[i].getName().equals("VERSION") ||
+              list[i].getName().equals("dfsUsed"))) {
+        continue;
       }
 
       FileInputStream fis = null;
@@ -312,7 +325,6 @@ public class UpgradeUtilities {
         }
       }
     }
-    return checksum.getValue();
   }
   
   /**

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java Tue Aug 19 23:49:39 2014
@@ -49,6 +49,26 @@ public class TestExtendedBlock {
         new ExtendedBlock(POOL_A, BLOCK_1_GS1),
         new ExtendedBlock(POOL_A, BLOCK_1_GS2));
   }
+  
+  @Test
+  public void testHashcode() {
+    
+    // Different pools, same block id -> different hashcode
+    assertNotEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+        new ExtendedBlock(POOL_B, BLOCK_1_GS1).hashCode());
+    
+    // Same pool, different block id -> different hashcode
+    assertNotEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+        new ExtendedBlock(POOL_A, BLOCK_2_GS1).hashCode());
+    
+    // Same block -> same hashcode
+    assertEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(),
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode());
+
+  }
 
   private static void assertNotEquals(Object a, Object b) {
     assertFalse("expected not equal: '" + a + "' and '" + b + "'",

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Tue Aug 19 23:49:39 2014
@@ -31,25 +31,25 @@ import org.apache.hadoop.fs.permission.A
 import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@@ -67,9 +67,18 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
@@ -175,8 +184,10 @@ public class TestPBHelper {
   private static BlockWithLocations getBlockWithLocations(int bid) {
     final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
     final String[] storageIDs = {"s1", "s2", "s3"};
+    final StorageType[] storageTypes = {
+        StorageType.DISK, StorageType.DISK, StorageType.DISK};
     return new BlockWithLocations(new Block(bid, 0, 1),
-        datanodeUuids, storageIDs);
+        datanodeUuids, storageIDs, storageTypes);
   }
 
   private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
@@ -550,8 +561,10 @@ public class TestPBHelper {
     dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo();
     dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo();
     String[][] storageIDs = {{"s00"}, {"s10", "s11"}};
+    StorageType[][] storageTypes = {{StorageType.DEFAULT},
+        {StorageType.DEFAULT, StorageType.DEFAULT}};
     BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1",
-        blocks, dnInfos, storageIDs);
+        blocks, dnInfos, storageTypes, storageIDs);
     BlockCommandProto bcProto = PBHelper.convert(bc);
     BlockCommand bc2 = PBHelper.convert(bcProto);
     assertEquals(bc.getAction(), bc2.getAction());

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java Tue Aug 19 23:49:39 2014
@@ -22,8 +22,12 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 
 import java.io.IOException;
+import java.net.BindException;
 import java.net.URI;
+import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -37,14 +41,13 @@ public class MiniQJMHACluster {
   private MiniDFSCluster cluster;
   private MiniJournalCluster journalCluster;
   private final Configuration conf;
+  private static final Log LOG = LogFactory.getLog(MiniQJMHACluster.class);
   
   public static final String NAMESERVICE = "ns1";
   private static final String NN1 = "nn1";
   private static final String NN2 = "nn2";
-  private static final int NN1_IPC_PORT = 10000;
-  private static final int NN1_INFO_PORT = 10001;
-  private static final int NN2_IPC_PORT = 10002;
-  private static final int NN2_INFO_PORT = 10003;
+  private static final Random RANDOM = new Random();
+  private int basePort = 10000;
 
   public static class Builder {
     private final Configuration conf;
@@ -69,51 +72,62 @@ public class MiniQJMHACluster {
     }
   }
   
-  public static MiniDFSNNTopology createDefaultTopology() {
+  public static MiniDFSNNTopology createDefaultTopology(int basePort) {
     return new MiniDFSNNTopology()
       .addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN(
-        new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT)
-            .setHttpPort(NN1_INFO_PORT)).addNN(
-        new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT)
-            .setHttpPort(NN2_INFO_PORT)));
+        new MiniDFSNNTopology.NNConf("nn1").setIpcPort(basePort)
+            .setHttpPort(basePort + 1)).addNN(
+        new MiniDFSNNTopology.NNConf("nn2").setIpcPort(basePort + 2)
+            .setHttpPort(basePort + 3)));
   }
-  
+
   private MiniQJMHACluster(Builder builder) throws IOException {
     this.conf = builder.conf;
-    // start 3 journal nodes
-    journalCluster = new MiniJournalCluster.Builder(conf).format(true)
-        .build();
-    URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
-    
-    // start cluster with 2 NameNodes
-    MiniDFSNNTopology topology = createDefaultTopology();
-    
-    initHAConf(journalURI, builder.conf);
-    
-    // First start up the NNs just to format the namespace. The MinIDFSCluster
-    // has no way to just format the NameNodes without also starting them.
-    cluster = builder.dfsBuilder.nnTopology(topology)
-        .manageNameDfsSharedDirs(false).build();
-    cluster.waitActive();
-    cluster.shutdown();
-    
-    // initialize the journal nodes
-    Configuration confNN0 = cluster.getConfiguration(0);
-    NameNode.initializeSharedEdits(confNN0, true);
-    
-    cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
-    cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
-    
-    // restart the cluster
-    cluster.restartNameNodes();
+    int retryCount = 0;
+    while (true) {
+      try {
+        basePort = 10000 + RANDOM.nextInt(1000) * 4;
+        // start 3 journal nodes
+        journalCluster = new MiniJournalCluster.Builder(conf).format(true)
+            .build();
+        URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
+
+        // start cluster with 2 NameNodes
+        MiniDFSNNTopology topology = createDefaultTopology(basePort);
+
+        initHAConf(journalURI, builder.conf);
+
+        // First start up the NNs just to format the namespace. The MinIDFSCluster
+        // has no way to just format the NameNodes without also starting them.
+        cluster = builder.dfsBuilder.nnTopology(topology)
+            .manageNameDfsSharedDirs(false).build();
+        cluster.waitActive();
+        cluster.shutdown();
+
+        // initialize the journal nodes
+        Configuration confNN0 = cluster.getConfiguration(0);
+        NameNode.initializeSharedEdits(confNN0, true);
+
+        cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
+        cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
+
+        // restart the cluster
+        cluster.restartNameNodes();
+        ++retryCount;
+        break;
+      } catch (BindException e) {
+        LOG.info("MiniQJMHACluster port conflicts, retried " +
+            retryCount + " times");
+      }
+    }
   }
   
   private Configuration initHAConf(URI journalURI, Configuration conf) {
     conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
         journalURI.toString());
     
-    String address1 = "127.0.0.1:" + NN1_IPC_PORT;
-    String address2 = "127.0.0.1:" + NN2_IPC_PORT;
+    String address1 = "127.0.0.1:" + basePort;
+    String address2 = "127.0.0.1:" + (basePort + 2);
     conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
         NAMESERVICE, NN1), address1);
     conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Tue Aug 19 23:49:39 2014
@@ -21,16 +21,12 @@ import static org.junit.Assert.*;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URL;
-import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -41,6 +37,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+
 public class TestNNWithQJM {
   final Configuration conf = new HdfsConfiguration();
   private MiniJournalCluster mjc = null;
@@ -204,55 +201,4 @@ public class TestNNWithQJM {
           "Unable to start log segment 1: too few journals", ioe);
     }
   }
-
-  @Test (timeout = 30000)
-  public void testWebPageHasQjmInfo() throws Exception {
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
-        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
-        mjc.getQuorumJournalURI("myjournal").toString());
-    // Speed up the test
-    conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
-    
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-      .numDataNodes(0)
-      .manageNameDfsDirs(false)
-      .build();
-    try {
-      URL url = new URL("http://localhost:"
-          + NameNode.getHttpAddress(cluster.getConfiguration(0)).getPort()
-          + "/dfshealth.jsp");
-      
-      cluster.getFileSystem().mkdirs(TEST_PATH);
-      
-      String contents = DFSTestUtil.urlGet(url); 
-      assertTrue(contents.contains("QJM to ["));
-      assertTrue(contents.contains("Written txid 2"));
-
-      // Stop one JN, do another txn, and make sure it shows as behind
-      // stuck behind the others.
-      mjc.getJournalNode(0).stopAndJoin(0);
-      
-      cluster.getFileSystem().delete(TEST_PATH, true);
-      
-      contents = DFSTestUtil.urlGet(url); 
-      System.out.println(contents);
-      assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents)
-          .find());
-
-      // Restart NN while JN0 is still down.
-      cluster.restartNameNode();
-
-      contents = DFSTestUtil.urlGet(url); 
-      System.out.println(contents);
-      assertTrue(Pattern.compile("never written").matcher(contents)
-          .find());
-      
-
-    } finally {
-      cluster.shutdown();
-    }
-
-  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Tue Aug 19 23:49:39 2014
@@ -25,7 +25,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
@@ -208,7 +208,7 @@ public class TestQuorumJournalManagerUni
         anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
     
     // And the third log not respond
-    SettableFuture<Void> slowLog = SettableFuture.<Void>create();
+    SettableFuture<Void> slowLog = SettableFuture.create();
     Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits(
         anyLong(), eq(1L), eq(1), Mockito.<byte[]>any());
     stm.flush();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Tue Aug 19 23:49:39 2014
@@ -170,11 +170,6 @@ public class TestJournalNode {
     assertTrue("Bad contents: " + pageContents,
         pageContents.contains(
             "Hadoop:service=JournalNode,name=JvmMetrics"));
-    
-    // Check JSP page.
-    pageContents = DFSTestUtil.urlGet(
-        new URL(urlRoot + "/journalstatus.jsp"));
-    assertTrue(pageContents.contains("JournalNode"));
 
     // Create some edits on server side
     byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java Tue Aug 19 23:49:39 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.web.WebHdf
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
 import org.apache.hadoop.security.TestDoAsEffectiveUser;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
 import org.junit.AfterClass;
@@ -88,7 +89,8 @@ public class TestDelegationTokenForProxy
     builder.append("127.0.1.1,");
     builder.append(InetAddress.getLocalHost().getCanonicalHostName());
     LOG.info("Local Ip addresses: " + builder.toString());
-    conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName),
+    conf.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserIpConfKey(superUserShortName),
         builder.toString());
   }
   
@@ -100,7 +102,8 @@ public class TestDelegationTokenForProxy
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
     config.setLong(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
-    config.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER),
+    config.setStrings(DefaultImpersonationProvider.getTestProvider().
+            getProxySuperuserGroupConfKey(REAL_USER),
         "group1");
     config.setBoolean(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Tue Aug 19 23:49:39 2014
@@ -18,17 +18,23 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -48,6 +54,8 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
+import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -81,7 +89,7 @@ public class TestBalancer {
   private static final Random r = new Random();
 
   static {
-    Balancer.setBlockMoveWaitTime(1000L) ;
+    Dispatcher.setBlockMoveWaitTime(1000L) ;
   }
 
   static void initConf(Configuration conf) {
@@ -255,6 +263,18 @@ public class TestBalancer {
       }
     }
   }
+
+  /**
+   * Wait until balanced: each datanode gives utilization within
+   * BALANCE_ALLOWED_VARIANCE of average
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  static void waitForBalancer(long totalUsedSpace, long totalCapacity,
+      ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)
+  throws IOException, TimeoutException {
+    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
+  }
   
   /**
    * Wait until balanced: each datanode gives utilization within 
@@ -263,11 +283,17 @@ public class TestBalancer {
    * @throws TimeoutException
    */
   static void waitForBalancer(long totalUsedSpace, long totalCapacity,
-      ClientProtocol client, MiniDFSCluster cluster)
-  throws IOException, TimeoutException {
+      ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p,
+      int expectedExcludedNodes) throws IOException, TimeoutException {
     long timeout = TIMEOUT;
     long failtime = (timeout <= 0L) ? Long.MAX_VALUE
         : Time.now() + timeout;
+    if (!p.nodesToBeIncluded.isEmpty()) {
+      totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
+    }
+    if (!p.nodesToBeExcluded.isEmpty()) {
+        totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY;
+    }
     final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
     boolean balanced;
     do {
@@ -275,9 +301,20 @@ public class TestBalancer {
           client.getDatanodeReport(DatanodeReportType.ALL);
       assertEquals(datanodeReport.length, cluster.getDataNodes().size());
       balanced = true;
+      int actualExcludedNodeCount = 0;
       for (DatanodeInfo datanode : datanodeReport) {
         double nodeUtilization = ((double)datanode.getDfsUsed())
             / datanode.getCapacity();
+        if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) {
+          assertTrue(nodeUtilization == 0);
+          actualExcludedNodeCount++;
+          continue;
+        }
+        if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) {
+          assertTrue(nodeUtilization == 0);
+          actualExcludedNodeCount++;
+          continue;
+        }
         if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
           balanced = false;
           if (Time.now() > failtime) {
@@ -294,6 +331,7 @@ public class TestBalancer {
           break;
         }
       }
+      assertEquals(expectedExcludedNodes,actualExcludedNodeCount);
     } while (!balanced);
   }
 
@@ -307,22 +345,118 @@ public class TestBalancer {
     }
     return b.append("]").toString();
   }
-  /** This test start a cluster with specified number of nodes, 
+  /**
+   * Class which contains information about the
+   * new nodes to be added to the cluster for balancing.
+   */
+  static abstract class NewNodeInfo {
+
+    Set<String> nodesToBeExcluded = new HashSet<String>();
+    Set<String> nodesToBeIncluded = new HashSet<String>();
+
+     abstract String[] getNames();
+     abstract int getNumberofNewNodes();
+     abstract int getNumberofIncludeNodes();
+     abstract int getNumberofExcludeNodes();
+
+     public Set<String> getNodesToBeIncluded() {
+       return nodesToBeIncluded;
+     }
+     public Set<String> getNodesToBeExcluded() {
+       return nodesToBeExcluded;
+     }
+  }
+
+  /**
+   * The host names of new nodes are specified
+   */
+  static class HostNameBasedNodes extends NewNodeInfo {
+    String[] hostnames;
+
+    public HostNameBasedNodes(String[] hostnames,
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
+      this.hostnames = hostnames;
+      this.nodesToBeExcluded = nodesToBeExcluded;
+      this.nodesToBeIncluded = nodesToBeIncluded;
+    }
+
+    @Override
+    String[] getNames() {
+      return hostnames;
+    }
+    @Override
+    int getNumberofNewNodes() {
+      return hostnames.length;
+    }
+    @Override
+    int getNumberofIncludeNodes() {
+      return nodesToBeIncluded.size();
+    }
+    @Override
+    int getNumberofExcludeNodes() {
+      return nodesToBeExcluded.size();
+    }
+  }
+
+  /**
+   * The number of data nodes to be started are specified.
+   * The data nodes will have same host name, but different port numbers.
+   *
+   */
+  static class PortNumberBasedNodes extends NewNodeInfo {
+    int newNodes;
+    int excludeNodes;
+    int includeNodes;
+
+    public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) {
+      this.newNodes = newNodes;
+      this.excludeNodes = excludeNodes;
+      this.includeNodes = includeNodes;
+    }
+
+    @Override
+    String[] getNames() {
+      return null;
+    }
+    @Override
+    int getNumberofNewNodes() {
+      return newNodes;
+    }
+    @Override
+    int getNumberofIncludeNodes() {
+      return includeNodes;
+    }
+    @Override
+    int getNumberofExcludeNodes() {
+      return excludeNodes;
+    }
+  }
+
+  private void doTest(Configuration conf, long[] capacities, String[] racks,
+      long newCapacity, String newRack, boolean useTool) throws Exception {
+    doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
+  }
+
+  /** This test start a cluster with specified number of nodes,
    * and fills it to be 30% full (with a single file replicated identically
    * to all datanodes);
    * It then adds one new empty node and starts balancing.
-   * 
+   *
    * @param conf - configuration
    * @param capacities - array of capacities of original nodes in cluster
    * @param racks - array of racks for original nodes in cluster
    * @param newCapacity - new node's capacity
    * @param newRack - new node's rack
+   * @param nodes - information about new nodes to be started.
    * @param useTool - if true run test via Cli with command-line argument 
    *   parsing, etc.   Otherwise invoke balancer API directly.
+   * @param useFile - if true, the hosts to included or excluded will be stored in a
+   *   file and then later read from the file.
    * @throws Exception
    */
-  private void doTest(Configuration conf, long[] capacities, String[] racks, 
-      long newCapacity, String newRack, boolean useTool) throws Exception {
+  private void doTest(Configuration conf, long[] capacities,
+      String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
+      boolean useTool, boolean useFile) throws Exception {
     LOG.info("capacities = " +  long2String(capacities)); 
     LOG.info("racks      = " +  Arrays.asList(racks)); 
     LOG.info("newCapacity= " +  newCapacity); 
@@ -346,17 +480,75 @@ public class TestBalancer {
       long totalUsedSpace = totalCapacity*3/10;
       createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
           (short) numOfDatanodes, 0);
-      // start up an empty node with the same capacity and on the same rack
-      cluster.startDataNodes(conf, 1, true, null,
-          new String[]{newRack}, new long[]{newCapacity});
 
-      totalCapacity += newCapacity;
+      if (nodes == null) { // there is no specification of new nodes.
+        // start up an empty node with the same capacity and on the same rack
+        cluster.startDataNodes(conf, 1, true, null,
+            new String[]{newRack}, null,new long[]{newCapacity});
+        totalCapacity += newCapacity;
+      } else {
+        //if running a test with "include list", include original nodes as well
+        if (nodes.getNumberofIncludeNodes()>0) {
+          for (DataNode dn: cluster.getDataNodes())
+            nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName());
+        }
+        String[] newRacks = new String[nodes.getNumberofNewNodes()];
+        long[] newCapacities = new long[nodes.getNumberofNewNodes()];
+        for (int i=0; i < nodes.getNumberofNewNodes(); i++) {
+          newRacks[i] = newRack;
+          newCapacities[i] = newCapacity;
+        }
+        // if host names are specified for the new nodes to be created.
+        if (nodes.getNames() != null) {
+          cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
+              newRacks, nodes.getNames(), newCapacities);
+          totalCapacity += newCapacity*nodes.getNumberofNewNodes();
+        } else {  // host names are not specified
+          cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
+              newRacks, null, newCapacities);
+          totalCapacity += newCapacity*nodes.getNumberofNewNodes();
+          //populate the include nodes
+          if (nodes.getNumberofIncludeNodes() > 0) {
+            int totalNodes = cluster.getDataNodes().size();
+            for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) {
+              nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get(
+                  totalNodes-1-i).getDatanodeId().getXferAddr());
+            }
+          }
+          //polulate the exclude nodes
+          if (nodes.getNumberofExcludeNodes() > 0) {
+            int totalNodes = cluster.getDataNodes().size();
+            for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) {
+              nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get(
+                  totalNodes-1-i).getDatanodeId().getXferAddr());
+            }
+          }
+        }
+      }
+      // run balancer and validate results
+      Balancer.Parameters p = Balancer.Parameters.DEFAULT;
+      if (nodes != null) {
+        p = new Balancer.Parameters(
+            Balancer.Parameters.DEFAULT.policy,
+            Balancer.Parameters.DEFAULT.threshold,
+            nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
+      }
+
+      int expectedExcludedNodes = 0;
+      if (nodes != null) {
+        if (!nodes.getNodesToBeExcluded().isEmpty()) {
+          expectedExcludedNodes = nodes.getNodesToBeExcluded().size();
+        } else if (!nodes.getNodesToBeIncluded().isEmpty()) {
+          expectedExcludedNodes =
+              cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size();
+        }
+      }
 
       // run balancer and validate results
       if (useTool) {
-        runBalancerCli(conf, totalUsedSpace, totalCapacity);
+        runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes);
       } else {
-        runBalancer(conf, totalUsedSpace, totalCapacity);
+        runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
       }
     } finally {
       cluster.shutdown();
@@ -365,31 +557,86 @@ public class TestBalancer {
 
   private void runBalancer(Configuration conf,
       long totalUsedSpace, long totalCapacity) throws Exception {
+    runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0);
+  }
+
+  private void runBalancer(Configuration conf,
+     long totalUsedSpace, long totalCapacity, Balancer.Parameters p,
+     int excludedNodes) throws Exception {
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
 
     // start rebalancing
     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
-    assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
-
+    final int r = Balancer.run(namenodes, p, conf);
+    if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
+      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+      return;
+    } else {
+      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+    }
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
     LOG.info("Rebalancing with default ctor.");
-    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
+    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
   }
-  
+
   private void runBalancerCli(Configuration conf,
-      long totalUsedSpace, long totalCapacity) throws Exception {
+      long totalUsedSpace, long totalCapacity,
+      Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception {
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+    List <String> args = new ArrayList<String>();
+    args.add("-policy");
+    args.add("datanode");
+
+    File excludeHostsFile = null;
+    if (!p.nodesToBeExcluded.isEmpty()) {
+      args.add("-exclude");
+      if (useFile) {
+        excludeHostsFile = new File ("exclude-hosts-file");
+        PrintWriter pw = new PrintWriter(excludeHostsFile);
+        for (String host: p.nodesToBeExcluded) {
+          pw.write( host + "\n");
+        }
+        pw.close();
+        args.add("-f");
+        args.add("exclude-hosts-file");
+      } else {
+        args.add(StringUtils.join(p.nodesToBeExcluded, ','));
+      }
+    }
+
+    File includeHostsFile = null;
+    if (!p.nodesToBeIncluded.isEmpty()) {
+      args.add("-include");
+      if (useFile) {
+        includeHostsFile = new File ("include-hosts-file");
+        PrintWriter pw = new PrintWriter(includeHostsFile);
+        for (String host: p.nodesToBeIncluded){
+          pw.write( host + "\n");
+        }
+        pw.close();
+        args.add("-f");
+        args.add("include-hosts-file");
+      } else {
+        args.add(StringUtils.join(p.nodesToBeIncluded, ','));
+      }
+    }
 
-    final String[] args = { "-policy", "datanode" };
     final Tool tool = new Cli();    
     tool.setConf(conf);
-    final int r = tool.run(args); // start rebalancing
+    final int r = tool.run(args.toArray(new String[0])); // start rebalancing
     
     assertEquals("Tools should exit 0 on success", 0, r);
     waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
     LOG.info("Rebalancing with default ctor.");
-    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
+    waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes);
+
+    if (excludeHostsFile != null && excludeHostsFile.exists()) {
+      excludeHostsFile.delete();
+    }
+    if (includeHostsFile != null && includeHostsFile.exists()) {
+      includeHostsFile.delete();
+    }
   }
   
   /** one-node cluster test*/
@@ -411,6 +658,71 @@ public class TestBalancer {
     oneNodeTest(conf, false);
   }
   
+  /* we first start a cluster and fill the cluster up to a certain size.
+   * then redistribute blocks according the required distribution.
+   * Then we start an empty datanode.
+   * Afterwards a balancer is run to balance the cluster.
+   * A partially filled datanode is excluded during balancing.
+   * This triggers a situation where one of the block's location is unknown.
+   */
+  @Test(timeout=100000)
+  public void testUnknownDatanode() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    long distribution[] = new long[] {50*CAPACITY/100, 70*CAPACITY/100, 0*CAPACITY/100};
+    long capacities[] = new long[]{CAPACITY, CAPACITY, CAPACITY};
+    String racks[] = new String[] {RACK0, RACK1, RACK1};
+
+    int numDatanodes = distribution.length;
+    if (capacities.length != numDatanodes || racks.length != numDatanodes) {
+      throw new IllegalArgumentException("Array length is not the same");
+    }
+
+    // calculate total space that need to be filled
+    final long totalUsedSpace = sum(distribution);
+
+    // fill the cluster
+    ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace,
+        (short) numDatanodes);
+
+    // redistribute blocks
+    Block[][] blocksDN = distributeBlocks(
+        blocks, (short)(numDatanodes-1), distribution);
+
+    // restart the cluster: do NOT format the cluster
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes)
+        .format(false)
+        .racks(racks)
+        .simulatedCapacities(capacities)
+        .build();
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
+          ClientProtocol.class).getProxy();
+
+      for(int i = 0; i < 3; i++) {
+        cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
+      }
+
+      cluster.startDataNodes(conf, 1, true, null,
+          new String[]{RACK0}, null,new long[]{CAPACITY});
+      cluster.triggerHeartbeats();
+
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      Set<String>  datanodes = new HashSet<String>();
+      datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
+      Balancer.Parameters p = new Balancer.Parameters(
+          Balancer.Parameters.DEFAULT.policy,
+          Balancer.Parameters.DEFAULT.threshold,
+          datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Test parse method in Balancer#Cli class with threshold value out of
    * boundaries.
@@ -435,7 +747,7 @@ public class TestBalancer {
     }
   }
   
-  /** Test a cluster with even distribution, 
+  /** Test a cluster with even distribution,
    * then a new empty node is added to the cluster*/
   @Test(timeout=100000)
   public void testBalancer0() throws Exception {
@@ -463,6 +775,20 @@ public class TestBalancer {
   }
   
   @Test(timeout=100000)
+  public void testBalancerWithZeroThreadsForMove() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);
+    testBalancer1Internal (conf);
+  }
+
+  @Test(timeout=100000)
+  public void testBalancerWithNonZeroThreadsForMove() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8);
+    testBalancer1Internal (conf);
+  }
+  
+  @Test(timeout=100000)
   public void testBalancer2() throws Exception {
     testBalancer2Internal(new HdfsConfiguration());
   }
@@ -528,14 +854,49 @@ public class TestBalancer {
     } catch (IllegalArgumentException e) {
 
     }
-    parameters = new String[] { "-threshold 1 -policy" };
+    parameters = new String[] {"-threshold", "1", "-policy"};
+    try {
+      Balancer.Cli.parse(parameters);
+      fail(reason);
+    } catch (IllegalArgumentException e) {
+
+    }
+    parameters = new String[] {"-threshold", "1", "-include"};
     try {
       Balancer.Cli.parse(parameters);
       fail(reason);
     } catch (IllegalArgumentException e) {
 
     }
+    parameters = new String[] {"-threshold", "1", "-exclude"};
+    try {
+      Balancer.Cli.parse(parameters);
+      fail(reason);
+    } catch (IllegalArgumentException e) {
 
+    }
+    parameters = new String[] {"-include",  "-f"};
+    try {
+      Balancer.Cli.parse(parameters);
+      fail(reason);
+    } catch (IllegalArgumentException e) {
+
+    }
+    parameters = new String[] {"-exclude",  "-f"};
+    try {
+      Balancer.Cli.parse(parameters);
+      fail(reason);
+    } catch (IllegalArgumentException e) {
+
+    }
+
+    parameters = new String[] {"-include",  "testnode1", "-exclude", "testnode2"};
+    try {
+      Balancer.Cli.parse(parameters);
+      fail("IllegalArgumentException is expected when both -exclude and -include are specified");
+    } catch (IllegalArgumentException e) {
+
+    }
   }
 
   /**
@@ -551,6 +912,183 @@ public class TestBalancer {
   }
   
   /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithExcludeList() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> excludeHosts = new HashSet<String>();
+    excludeHosts.add( "datanodeY");
+    excludeHosts.add( "datanodeZ");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithExcludeListWithPorts() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithExcludeList() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> excludeHosts = new HashSet<String>();
+    excludeHosts.add( "datanodeY");
+    excludeHosts.add( "datanodeZ");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+      new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
+      Parameters.DEFAULT.nodesToBeIncluded), true, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithExcludeListWithPorts() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list in a file
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithExcludeListInAFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> excludeHosts = new HashSet<String>();
+    excludeHosts.add( "datanodeY");
+    excludeHosts.add( "datanodeZ");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true);
+  }
+
+  /**
+   * Test a cluster with even distribution,G
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the exclude list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithIncludeList() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> includeHosts = new HashSet<String>();
+    includeHosts.add( "datanodeY");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithIncludeListWithPorts() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithIncludeList() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> includeHosts = new HashSet<String>();
+    includeHosts.add( "datanodeY");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithIncludeListWithPorts() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithIncludeListInAFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    Set<String> includeHosts = new HashSet<String>();
+    includeHosts.add( "datanodeY");
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
+        new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
+        Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true);
+  }
+
+  /**
+   * Test a cluster with even distribution,
+   * then three nodes are added to the cluster,
+   * runs balancer with two of the nodes in the include list
+   */
+  @Test(timeout=100000)
+  public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+        CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
+  }
+
+  /**
    * @param args
    */
   public static void main(String[] args) throws Exception {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java Tue Aug 19 23:49:39 2014
@@ -44,7 +44,7 @@ public class TestBalancerWithHANameNodes
   ClientProtocol client;
 
   static {
-    Balancer.setBlockMoveWaitTime(1000L);
+    Dispatcher.setBlockMoveWaitTime(1000L);
   }
 
   /**
@@ -97,10 +97,10 @@ public class TestBalancerWithHANameNodes
       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
       assertEquals(1, namenodes.size());
       assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
-      final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
-      assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+      final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
       TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
-          cluster);
+          cluster, Balancer.Parameters.DEFAULT);
     } finally {
       cluster.shutdown();
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java Tue Aug 19 23:49:39 2014
@@ -73,7 +73,7 @@ public class TestBalancerWithMultipleNam
   private static final Random RANDOM = new Random();
 
   static {
-    Balancer.setBlockMoveWaitTime(1000L) ;
+    Dispatcher.setBlockMoveWaitTime(1000L) ;
   }
 
   /** Common objects used in various methods. */
@@ -159,8 +159,8 @@ public class TestBalancerWithMultipleNam
 
     // start rebalancing
     final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
-    Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
+    Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
 
     LOG.info("BALANCER 2");
     wait(s.clients, totalUsed, totalCapacity);
@@ -195,7 +195,7 @@ public class TestBalancerWithMultipleNam
       balanced = true;
       for(int d = 0; d < used.length; d++) {
         final double p = used[d]*100.0/cap[d];
-        balanced = p <= avg + Balancer.Parameters.DEFALUT.threshold;
+        balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold;
         if (!balanced) {
           if (i % 100 == 0) {
             LOG.warn("datanodes " + d + " is not yet balanced: "

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Tue Aug 19 23:49:39 2014
@@ -22,8 +22,9 @@ import static org.junit.Assert.assertEqu
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -39,6 +40,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
 import org.apache.hadoop.net.NetworkTopology;
@@ -53,7 +57,7 @@ public class TestBalancerWithNodeGroup {
   private static final Log LOG = LogFactory.getLog(
   "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
   
-  final private static long CAPACITY = 6000L;
+  final private static long CAPACITY = 5000L;
   final private static String RACK0 = "/rack0";
   final private static String RACK1 = "/rack1";
   final private static String NODEGROUP0 = "/nodegroup0";
@@ -71,12 +75,13 @@ public class TestBalancerWithNodeGroup {
   static final int DEFAULT_BLOCK_SIZE = 100;
 
   static {
-    Balancer.setBlockMoveWaitTime(1000L) ;
+    Dispatcher.setBlockMoveWaitTime(1000L) ;
   }
 
   static Configuration createConf() {
     Configuration conf = new HdfsConfiguration();
     TestBalancer.initConf(conf);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 
         NetworkTopologyWithNodeGroup.class.getName());
     conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, 
@@ -170,8 +175,8 @@ public class TestBalancerWithNodeGroup {
 
     // start rebalancing
     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
-    assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+    assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
     LOG.info("Rebalancing with default factor.");
@@ -184,13 +189,26 @@ public class TestBalancerWithNodeGroup {
 
     // start rebalancing
     Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
-    Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
-        (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+    Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
+        (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
     waitForHeartBeat(totalUsedSpace, totalCapacity);
     LOG.info("Rebalancing with default factor.");
   }
 
+  private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) {
+    Set<ExtendedBlock> ret = new HashSet<ExtendedBlock>();
+    for (LocatedBlock blk : blks) {
+      for (DatanodeInfo di : blk.getLocations()) {
+        if (rack.equals(NetworkTopology.getFirstHalf(di.getNetworkLocation()))) {
+          ret.add(blk.getBlock());
+          break;
+        }
+      }
+    }
+    return ret;
+  }
+
   /**
    * Create a cluster with even distribution, and a new empty node is added to
    * the cluster, then test rack locality for balancer policy. 
@@ -220,9 +238,14 @@ public class TestBalancerWithNodeGroup {
 
       // fill up the cluster to be 30% full
       long totalUsedSpace = totalCapacity * 3 / 10;
-      TestBalancer.createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+      long length = totalUsedSpace / numOfDatanodes;
+      TestBalancer.createFile(cluster, filePath, length,
           (short) numOfDatanodes, 0);
       
+      LocatedBlocks lbs = client.getBlockLocations(filePath.toUri().getPath(), 0,
+          length);
+      Set<ExtendedBlock> before = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
+
       long newCapacity = CAPACITY;
       String newRack = RACK1;
       String newNodeGroup = NODEGROUP2;
@@ -235,22 +258,9 @@ public class TestBalancerWithNodeGroup {
       // run balancer and validate results
       runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
       
-      DatanodeInfo[] datanodeReport = 
-              client.getDatanodeReport(DatanodeReportType.ALL);
-      
-      Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>();
-      for (DatanodeInfo datanode: datanodeReport) {
-        String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
-        int usedCapacity = (int) datanode.getDfsUsed();
-         
-        if (rackToUsedCapacity.get(rack) != null) {
-          rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
-        } else {
-          rackToUsedCapacity.put(rack, usedCapacity);
-        }
-      }
-      assertEquals(rackToUsedCapacity.size(), 2);
-      assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
+      lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length);
+      Set<ExtendedBlock> after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
+      assertEquals(before, after);
       
     } finally {
       cluster.shutdown();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Tue Aug 19 23:49:39 2014
@@ -101,7 +101,6 @@ public class BlockManagerTestUtil {
   }
 
   /**
-   * @param blockManager
    * @return replication monitor thread instance from block manager.
    */
   public static Daemon getReplicationThread(final BlockManager blockManager)
@@ -111,7 +110,6 @@ public class BlockManagerTestUtil {
   
   /**
    * Stop the replication monitor thread
-   * @param blockManager
    */
   public static void stopReplicationThread(final BlockManager blockManager) 
       throws IOException {
@@ -126,7 +124,6 @@ public class BlockManagerTestUtil {
   }
 
   /**
-   * @param blockManager
    * @return corruptReplicas from block manager
    */
   public static  CorruptReplicasMap getCorruptReplicas(final BlockManager blockManager){
@@ -135,7 +132,6 @@ public class BlockManagerTestUtil {
   }
 
   /**
-   * @param blockManager
    * @return computed block replication and block invalidation work that can be
    *         scheduled on data-nodes.
    * @throws IOException
@@ -158,7 +154,7 @@ public class BlockManagerTestUtil {
    * regardless of invalidation/replication limit configurations.
    * 
    * NB: you may want to set
-   * {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to
+   * {@link DFSConfigKeys#DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to
    * a high value to ensure that all work is calculated.
    */
   public static int computeAllPendingWork(BlockManager bm) {
@@ -200,7 +196,7 @@ public class BlockManagerTestUtil {
   /**
    * Change whether the block placement policy will prefer the writer's
    * local Datanode or not.
-   * @param prefer
+   * @param prefer if true, prefer local node
    */
   public static void setWritingPrefersLocalNode(
       BlockManager bm, boolean prefer) {
@@ -240,8 +236,13 @@ public class BlockManagerTestUtil {
 
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
       String rackLocation, DatanodeStorage storage) {
+    return getDatanodeDescriptor(ipAddr, rackLocation, storage, "host");
+  }
+
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation, DatanodeStorage storage, String hostname) {
       DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr,
-          DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation);
+          DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname);
       if (storage != null) {
         dn.updateStorage(storage);
       }
@@ -267,4 +268,14 @@ public class BlockManagerTestUtil {
     }
     return reports.toArray(StorageReport.EMPTY_ARRAY);
   }
+
+  /**
+   * Have DatanodeManager check decommission state.
+   * @param dm the DatanodeManager to manipulate
+   */
+  public static void checkDecommissionState(DatanodeManager dm,
+      DatanodeDescriptor node) {
+    dm.checkDecommissionState(node);
+  }
+
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -29,6 +30,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -42,6 +45,41 @@ public class TestBlockInfo {
   private static final Log LOG = LogFactory
       .getLog("org.apache.hadoop.hdfs.TestBlockInfo");
 
+
+  @Test
+  public void testAddStorage() throws Exception {
+    BlockInfo blockInfo = new BlockInfo(3);
+
+    final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
+
+    boolean added = blockInfo.addStorage(storage);
+
+    Assert.assertTrue(added);
+    Assert.assertEquals(storage, blockInfo.getStorageInfo(0));
+  }
+
+
+  @Test
+  public void testReplaceStorage() throws Exception {
+
+    // Create two dummy storages.
+    final DatanodeStorageInfo storage1 = DFSTestUtil.createDatanodeStorageInfo("storageID1", "127.0.0.1");
+    final DatanodeStorageInfo storage2 = new DatanodeStorageInfo(storage1.getDatanodeDescriptor(), new DatanodeStorage("storageID2"));
+    final int NUM_BLOCKS = 10;
+    BlockInfo[] blockInfos = new BlockInfo[NUM_BLOCKS];
+
+    // Create a few dummy blocks and add them to the first storage.
+    for (int i = 0; i < NUM_BLOCKS; ++i) {
+      blockInfos[i] = new BlockInfo(3);
+      storage1.addBlock(blockInfos[i]);
+    }
+
+    // Try to move one of the blocks to a different storage.
+    boolean added = storage2.addBlock(blockInfos[NUM_BLOCKS/2]);
+    Assert.assertThat(added, is(false));
+    Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
+  }
+
   @Test
   public void testBlockListMoveToHead() throws Exception {
     LOG.info("BlockInfo moveToHead tests...");

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Tue Aug 19 23:49:39 2014
@@ -368,7 +368,7 @@ public class TestBlockManager {
       DatanodeStorageInfo[] pipeline) throws IOException {
     for (int i = 1; i < pipeline.length; i++) {
       DatanodeStorageInfo storage = pipeline[i];
-      bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null);
+      bm.addBlock(storage, blockInfo, null);
       blockInfo.addStorage(storage);
     }
   }
@@ -549,12 +549,12 @@ public class TestBlockManager {
     // send block report, should be processed
     reset(node);
     
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", 
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         new BlockListAsLongs(null, null));
     assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         new BlockListAsLongs(null, null));
     assertEquals(1, ds.getBlockReportCount());
 
@@ -566,7 +566,7 @@ public class TestBlockManager {
     assertEquals(0, ds.getBlockReportCount()); // ready for report again
     // send block report, should be processed after restart
     reset(node);
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         new BlockListAsLongs(null, null));
     assertEquals(1, ds.getBlockReportCount());
   }
@@ -595,7 +595,7 @@ public class TestBlockManager {
     // send block report while pretending to already have blocks
     reset(node);
     doReturn(1).when(node).numBlocks();
-    bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool",
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         new BlockListAsLongs(null, null));
     assertEquals(1, ds.getBlockReportCount());
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Tue Aug 19 23:49:39 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -160,7 +161,8 @@ public class TestBlockTokenWithDFS {
           setConfiguration(conf).
           setRemotePeerFactory(new RemotePeerFactory() {
             @Override
-            public Peer newConnectedPeer(InetSocketAddress addr)
+            public Peer newConnectedPeer(InetSocketAddress addr,
+                Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
                 throws IOException {
               Peer peer = null;
               Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
@@ -209,6 +211,8 @@ public class TestBlockTokenWithDFS {
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
     conf.setInt("ipc.client.connect.max.retries", 0);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
     return conf;
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java Tue Aug 19 23:49:39 2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
 import org.junit.Test;
 
 
@@ -89,14 +90,14 @@ public class TestCorruptReplicaInfo {
       DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor();
       DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor();
       
-      crm.addToCorruptReplicasMap(getBlock(0), dn1, "TEST");
+      addToCorruptReplicasMap(crm, getBlock(0), dn1);
       assertEquals("Number of corrupt blocks not returning correctly",
                    1, crm.size());
-      crm.addToCorruptReplicasMap(getBlock(1), dn1, "TEST");
+      addToCorruptReplicasMap(crm, getBlock(1), dn1);
       assertEquals("Number of corrupt blocks not returning correctly",
                    2, crm.size());
       
-      crm.addToCorruptReplicasMap(getBlock(1), dn2, "TEST");
+      addToCorruptReplicasMap(crm, getBlock(1), dn2);
       assertEquals("Number of corrupt blocks not returning correctly",
                    2, crm.size());
       
@@ -109,7 +110,7 @@ public class TestCorruptReplicaInfo {
                    0, crm.size());
       
       for (Long block_id: block_ids) {
-        crm.addToCorruptReplicasMap(getBlock(block_id), dn1, "TEST");
+        addToCorruptReplicasMap(crm, getBlock(block_id), dn1);
       }
             
       assertEquals("Number of corrupt blocks not returning correctly",
@@ -127,4 +128,9 @@ public class TestCorruptReplicaInfo {
                               crm.getCorruptReplicaBlockIds(10, 7L)));
       
   }
+  
+  private static void addToCorruptReplicasMap(CorruptReplicasMap crm,
+      Block blk, DatanodeDescriptor dn) {
+    crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Tue Aug 19 23:49:39 2014
@@ -63,16 +63,16 @@ public class TestDatanodeDescriptor {
     assertTrue(storages.length > 0);
     final String storageID = storages[0].getStorageID();
     // add first block
-    assertTrue(dd.addBlock(storageID, blk));
+    assertTrue(storages[0].addBlock(blk));
     assertEquals(1, dd.numBlocks());
     // remove a non-existent block
     assertFalse(dd.removeBlock(blk1));
     assertEquals(1, dd.numBlocks());
     // add an existent block
-    assertFalse(dd.addBlock(storageID, blk));
+    assertFalse(storages[0].addBlock(blk));
     assertEquals(1, dd.numBlocks());
     // add second block
-    assertTrue(dd.addBlock(storageID, blk1));
+    assertTrue(storages[0].addBlock(blk1));
     assertEquals(2, dd.numBlocks());
     // remove first block
     assertTrue(dd.removeBlock(blk));

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
@@ -43,8 +44,10 @@ public class TestPendingDataNodeMessages
   @Test
   public void testQueues() {
     DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor();
-    msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED);
-    msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED);
+    DatanodeStorage storage = new DatanodeStorage("STORAGE_ID");
+    DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage);
+    msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED);
+    msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED);
 
     assertEquals(2, msgs.count());
     

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Tue Aug 19 23:49:39 2014
@@ -82,7 +82,7 @@ public class TestReplicationPolicy {
   private static NameNode namenode;
   private static BlockPlacementPolicy replicator;
   private static final String filename = "/dummyfile.txt";
-  private static DatanodeDescriptor dataNodes[];
+  private static DatanodeDescriptor[] dataNodes;
   private static DatanodeStorageInfo[] storages;
   // The interval for marking a datanode as stale,
   private static final long staleInterval =
@@ -905,49 +905,46 @@ public class TestReplicationPolicy {
    */
   @Test
   public void testChooseReplicaToDelete() throws Exception {
-    List<DatanodeDescriptor> replicaNodeList = new 
-        ArrayList<DatanodeDescriptor>();
-    final Map<String, List<DatanodeDescriptor>> rackMap
-        = new HashMap<String, List<DatanodeDescriptor>>();
+    List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>();
+    final Map<String, List<DatanodeStorageInfo>> rackMap
+        = new HashMap<String, List<DatanodeStorageInfo>>();
     
     dataNodes[0].setRemaining(4*1024*1024);
-    replicaNodeList.add(dataNodes[0]);
+    replicaList.add(storages[0]);
     
     dataNodes[1].setRemaining(3*1024*1024);
-    replicaNodeList.add(dataNodes[1]);
+    replicaList.add(storages[1]);
     
     dataNodes[2].setRemaining(2*1024*1024);
-    replicaNodeList.add(dataNodes[2]);
+    replicaList.add(storages[2]);
     
     dataNodes[5].setRemaining(1*1024*1024);
-    replicaNodeList.add(dataNodes[5]);
+    replicaList.add(storages[5]);
     
     // Refresh the last update time for all the datanodes
     for (int i = 0; i < dataNodes.length; i++) {
       dataNodes[i].setLastUpdate(Time.now());
     }
     
-    List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
-    List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
-    replicator.splitNodesWithRack(
-        replicaNodeList, rackMap, first, second);
-    // dataNodes[0] and dataNodes[1] are in first set as their rack has two 
-    // replica nodes, while datanodes[2] and dataNodes[5] are in second set.
+    List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
+    List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>();
+    replicator.splitNodesWithRack(replicaList, rackMap, first, second);
+    // storages[0] and storages[1] are in first set as their rack has two 
+    // replica nodes, while storages[2] and dataNodes[5] are in second set.
     assertEquals(2, first.size());
     assertEquals(2, second.size());
-    DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
+    DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete(
         null, null, (short)3, first, second);
-    // Within first set, dataNodes[1] with less free space
-    assertEquals(chosenNode, dataNodes[1]);
+    // Within first set, storages[1] with less free space
+    assertEquals(chosen, storages[1]);
 
-    replicator.adjustSetsWithChosenReplica(
-        rackMap, first, second, chosenNode);
+    replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen);
     assertEquals(0, first.size());
     assertEquals(3, second.size());
-    // Within second set, dataNodes[5] with less free space
-    chosenNode = replicator.chooseReplicaToDelete(
+    // Within second set, storages[5] with less free space
+    chosen = replicator.chooseReplicaToDelete(
         null, null, (short)2, first, second);
-    assertEquals(chosenNode, dataNodes[5]);
+    assertEquals(chosen, storages[5]);
   }
   
   /**
@@ -1121,8 +1118,7 @@ public class TestReplicationPolicy {
     // Adding this block will increase its current replication, and that will
     // remove it from the queue.
     bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info,
-              ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0],
-            "STORAGE");
+              ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]);
 
     // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
     // from QUEUE_VERY_UNDER_REPLICATED.