You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2011/04/22 19:10:44 UTC

svn commit: r1095963 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: eli
Date: Fri Apr 22 17:10:43 2011
New Revision: 1095963

URL: http://svn.apache.org/viewvc?rev=1095963&view=rev
Log:
HDFS-1562. Add rack policy tests. Contributed by Eli Collins

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Apr 22 17:10:43 2011
@@ -130,6 +130,8 @@ Trunk (unreleased changes)
     HDFS-1854. make failure message more useful in
     DFSTestUtil.waitReplication(). (Matt Foley via eli)
 
+    HDFS-1562. Add rack policy tests. (eli)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Fri Apr 22 17:10:43 2011
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Fri Apr 22 17:10:43 2011
@@ -86,9 +86,8 @@ class UnderReplicatedBlocks implements I
                           int curReplicas, 
                           int decommissionedReplicas,
                           int expectedReplicas) {
-    if (curReplicas<0) {
-      return LEVEL;
-    } else if (curReplicas>=expectedReplicas) {
+    assert curReplicas >= 0 : "Negative replicas!";
+    if (curReplicas >= expectedReplicas) {
       return 3; // Block doesn't have enough racks
     } else if(curReplicas==0) {
       // If there are zero non-decommissioned replica but there are
@@ -116,9 +115,7 @@ class UnderReplicatedBlocks implements I
                            int curReplicas, 
                            int decomissionedReplicas,
                            int expectedReplicas) {
-    if(curReplicas<0) {
-      return false;
-    }
+    assert curReplicas >= 0 : "Negative replicas!";
     int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
                                expectedReplicas);
     if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {
@@ -159,8 +156,10 @@ class UnderReplicatedBlocks implements I
       }
       return true;
     } else {
+      // Try to remove the block from all queues if the block was
+      // not found in the queue for the given priority level.
       for(int i=0; i<LEVEL; i++) {
-        if(i!=priLevel && priorityQueues.get(i).remove(block)) {
+        if(priorityQueues.get(i).remove(block)) {
           if(NameNode.stateChangeLog.isDebugEnabled()) {
             NameNode.stateChangeLog.debug(
               "BLOCK* NameSystem.UnderReplicationBlock.remove: "
@@ -202,7 +201,7 @@ class UnderReplicatedBlocks implements I
           "BLOCK* NameSystem.UnderReplicationBlock.update:"
           + block
           + " has only "+curReplicas
-          + " replicas and need " + curExpectedReplicas
+          + " replicas and needs " + curExpectedReplicas
           + " replicas so is added to neededReplications"
           + " at priority level " + curPri);
       }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Apr 22 17:10:43 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -27,6 +28,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URL;
 import java.net.URLConnection;
@@ -36,6 +39,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -50,15 +54,19 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.junit.Assert;
+
+import static org.junit.Assert.*;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -143,6 +151,12 @@ public class DFSTestUtil {
           replicationFactor, files[idx].getSeed());
     }
   }
+
+  public static String readFile(FileSystem fs, Path fileName) throws IOException {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    IOUtils.copyBytes(fs.open(fileName), os, 1024, true);
+    return os.toString();
+  }
   
   public static void createFile(FileSystem fs, Path fileName, long fileLen, 
       short replFactor, long seed) throws IOException {
@@ -172,8 +186,6 @@ public class DFSTestUtil {
   
   /** check if the files have been copied correctly. */
   public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
-    
-    //Configuration conf = new HdfsConfiguration();
     Path root = new Path(topdir);
     
     for (int idx = 0; idx < nFiles; idx++) {
@@ -206,9 +218,10 @@ public class DFSTestUtil {
     }
   }
 
-  // waits for the replication factor of all files to reach the
-  // specified target
-  //
+  /*
+   * Waits for the replication factor of all files to reach the
+   * specified target.
+   */
   public void waitReplication(FileSystem fs, String topdir, short value) 
                                               throws IOException {
     Path root = new Path(topdir);
@@ -219,6 +232,128 @@ public class DFSTestUtil {
     }
   }
 
+  /*
+   * Check if the given block in the given file is corrupt.
+   */
+  public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
+      Path file, int blockNo) throws IOException {
+    DFSClient client = new DFSClient(new InetSocketAddress("localhost",
+        cluster.getNameNodePort()), cluster.getConfiguration());
+    LocatedBlocks blocks;
+    try {
+       blocks = client.getNamenode().getBlockLocations(
+           file.toString(), 0, Long.MAX_VALUE);
+    } finally {
+      client.close();
+    }
+    return blocks.get(blockNo).isCorrupt();
+  }
+
+  /*
+   * Wait up to 20s for the given block to be replicated across
+   * the requested number of racks, with the requested number of
+   * replicas, and the requested number of replicas still needed.
+   */
+  public static void waitForReplication(MiniDFSCluster cluster, Block b,
+      int racks, int replicas, int neededReplicas)
+      throws IOException, TimeoutException, InterruptedException {
+    int curRacks = 0;
+    int curReplicas = 0;
+    int curNeededReplicas = 0;
+    int count = 0;
+    final int ATTEMPTS = 20;
+
+    do {
+      Thread.sleep(1000);
+      int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b);
+      curRacks = r[0];
+      curReplicas = r[1];
+      curNeededReplicas = r[2];
+      count++;
+    } while ((curRacks != racks ||
+              curReplicas != replicas ||
+              curNeededReplicas != neededReplicas) && count < ATTEMPTS);
+
+    if (count == ATTEMPTS) {
+      throw new TimeoutException("Timed out waiting for replication."
+          + " Needed replicas = "+neededReplicas
+          + " Cur needed replicas = "+curNeededReplicas
+          + " Replicas = "+replicas+" Cur replicas = "+curReplicas
+          + " Racks = "+racks+" Cur racks = "+curRacks);
+    }
+  }
+
+  /*
+   * Keep accessing the given file until the namenode reports that the
+   * given block in the file contains the given number of corrupt replicas.
+   */
+  public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
+      Path file, Block b, int corruptRepls)
+      throws IOException, TimeoutException {
+    int count = 0;
+    final int ATTEMPTS = 20;
+    int repls = ns.numCorruptReplicas(b);
+    while (repls != corruptRepls && count < ATTEMPTS) {
+      try {
+        IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
+            512, true);
+      } catch (IOException e) {
+        // Swallow exceptions
+      }
+      System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
+      repls = ns.numCorruptReplicas(b);
+      count++;
+    }
+    if (count == ATTEMPTS) {
+      throw new TimeoutException("Timed out waiting for corrupt replicas."
+          + " Waiting for "+corruptRepls+", but only found "+repls);
+    }
+  }
+
+  /*
+   * Wait up to 20s for the given DN (host:port) to be decommissioned.
+   */
+  public static void waitForDecommission(FileSystem fs, String name) 
+      throws IOException, InterruptedException, TimeoutException {
+    DatanodeInfo dn = null;
+    int count = 0;
+    final int ATTEMPTS = 20;
+
+    do {
+      Thread.sleep(1000);
+      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      for (DatanodeInfo info : dfs.getDataNodeStats()) {
+        if (name.equals(info.getName())) {
+          dn = info;
+        }
+      }
+      count++;
+    } while ((dn == null ||
+              dn.isDecommissionInProgress() ||
+              !dn.isDecommissioned()) && count < ATTEMPTS);
+
+    if (count == ATTEMPTS) {
+      throw new TimeoutException("Timed out waiting for datanode "
+          + name + " to decommission.");
+    }
+  }
+
+  /*
+   * Returns the index of the first datanode which has a copy
+   * of the given block, or -1 if no such datanode exists.
+   */
+  public static int firstDnWithBlock(MiniDFSCluster cluster, Block b)
+      throws IOException {
+    int numDatanodes = cluster.getDataNodes().size();
+    for (int i = 0; i < numDatanodes; i++) {
+      String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+      if (blockContent != null) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
   /** return list of filenames created as part of createFiles */
   public String[] getFileNames(String topDir) {
     if (nFiles == 0)
@@ -292,7 +427,7 @@ public class DFSTestUtil {
         ).getLogger().setLevel(org.apache.log4j.Level.ALL);
   }
 
-  static String readFile(File f) throws IOException {
+  public static String readFile(File f) throws IOException {
     StringBuilder b = new StringBuilder();
     BufferedReader in = new BufferedReader(new FileReader(f));
     for(int c; (c = in.read()) != -1; b.append((char)c));
@@ -300,6 +435,17 @@ public class DFSTestUtil {
     return b.toString();
   }
 
+  /* Write the given string to the given file */
+  public static void writeFile(FileSystem fs, Path p, String s) 
+      throws IOException {
+    if (fs.exists(p)) {
+      fs.delete(p, true);
+    }
+    InputStream is = new ByteArrayInputStream(s.getBytes());
+    FSDataOutputStream os = fs.create(p);
+    IOUtils.copyBytes(is, os, s.length(), true);
+  }
+
   // Returns url content as string.
   public static String urlGet(URL url) throws IOException {
     URLConnection conn = url.openConnection();
@@ -409,7 +555,7 @@ public class DFSTestUtil {
   /** For {@link TestTransferRbw} */
   public static DataTransferProtocol.Status transferRbw(final Block b, 
       final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
-    Assert.assertEquals(2, datanodes.length);
+    assertEquals(2, datanodes.length);
     final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
         datanodes.length, dfsClient);
     final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Apr 22 17:10:43 2011
@@ -878,29 +878,69 @@ public class MiniDFSCluster {
     System.out.println("Cluster is active");
   }
 
-  /*
-   * Corrupt a block on all datanode
+  /**
+   * Return the contents of the given block on the given datanode.
+   *
+   * @param The index of the datanode
+   * @param The name of the block
+   * @throws IOException on error accessing the file for the given block
+   * @return The contents of the block file, null if none found
    */
-  void corruptBlockOnDataNodes(String blockName) throws Exception{
-    for (int i=0; i < dataNodes.size(); i++)
-      corruptBlockOnDataNode(i,blockName);
+  public String readBlockOnDataNode(int i, String blockName) throws IOException {
+    assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+
+    // Each datanode has multiple data dirs, check each
+    for (int dn = i*2; dn < i*2+2; dn++) {
+      File dataDir = new File(getBaseDirectory() + "data");
+      File blockFile = new File(dataDir,
+          "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
+      if (blockFile.exists()) {
+        return DFSTestUtil.readFile(blockFile);
+      }
+    }
+    return null;
   }
 
-  /*
-   * Corrupt a block on a particular datanode
+  /**
+   * Corrupt a block on all datanodes.
+   *
+   * @param The name of the block
+   * @throws IOException on error accessing the given block.
+   * @return The number of block files corrupted.
+   */  
+  public int corruptBlockOnDataNodes(String blockName) throws IOException {
+    int blocksCorrupted = 0;
+    for (int i=0; i < dataNodes.size(); i++) {
+      if (corruptReplica(blockName, i)) {
+        blocksCorrupted++;
+      }
+    }
+    return blocksCorrupted;
+  }
+
+  /**
+   * Corrupt a block on a particular datanode.
+   *
+   * @param The index of the datanode
+   * @param The name of the block
+   * @throws IOException on error accessing the given block or if
+   * the contents of the block (on the same datanode) differ.
+   * @return true if a replica was corrupted, false otherwise
    */
-  boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
+  public boolean corruptReplica(String blockName, int i) throws IOException {
     Random random = new Random();
-    boolean corrupted = false;
-    File dataDir = new File(getBaseDirectory() + "data");
-    if (i < 0 || i >= dataNodes.size())
-      return false;
+    assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+    int filesCorrupted = 0;
+
+    // Each datanode has multiple data dirs, check each
     for (int dn = i*2; dn < i*2+2; dn++) {
-      File blockFile = new File(dataDir, "data" + (dn+1) + FINALIZED_DIR_NAME +
-                                blockName);
-      System.out.println("Corrupting for: " + blockFile);
-      if (blockFile.exists()) {
-        // Corrupt replica by writing random bytes into replica
+      File dataDir = new File(getBaseDirectory() + "data");
+      File blockFile = new File(dataDir,
+          "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
+
+      // Corrupt the replica by writing some bytes into a random offset
+      if (blockFile.exists()) { 
+        System.out.println("Corrupting " + blockFile);
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
         FileChannel channel = raFile.getChannel();
         String badString = "BADBAD";
@@ -908,10 +948,12 @@ public class MiniDFSCluster {
         raFile.seek(rand);
         raFile.write(badString.getBytes());
         raFile.close();
+        filesCorrupted++;
       }
-      corrupted = true;
     }
-    return corrupted;
+    assert filesCorrupted == 0 || filesCorrupted == 1
+      : "Unexpected # block files";
+    return filesCorrupted == 1;
   }
 
   /*

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java Fri Apr 22 17:10:43 2011
@@ -244,7 +244,7 @@ public class TestCrcCorruption {
   private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception {
     long fileSize = 4096;
     Path file = new Path("/testFile");
-
+    short replFactor = (short)numDataNodes;
     Configuration conf = new Configuration();
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
@@ -253,11 +253,12 @@ public class TestCrcCorruption {
       cluster.waitActive();
       FileSystem fs = cluster.getFileSystem();
 
-      DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/);
-      DFSTestUtil.waitReplication(fs, file, (short)numDataNodes);
+      DFSTestUtil.createFile(fs, file, fileSize, replFactor, 12345L /*seed*/);
+      DFSTestUtil.waitReplication(fs, file, replFactor);
 
       String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
-      cluster.corruptBlockOnDataNodes(block);
+      int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+      assertEquals("All replicas not corrupted", replFactor, blockFilesCorrupted);
 
       try {
         IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Fri Apr 22 17:10:43 2011
@@ -24,7 +24,6 @@ import java.net.URL;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.io.*;
-import java.nio.channels.FileChannel;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -32,7 +31,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -142,36 +140,11 @@ public class TestDatanodeBlockScanner ex
     cluster.shutdown();
   }
 
-  public static boolean corruptReplica(String blockName, int replica) throws IOException {
-    Random random = new Random();
-    File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
-    boolean corrupted = false;
-    for (int i=replica*2; i<replica*2+2; i++) {
-      File blockFile = new File(baseDir, "data" + (i+1) + 
-          MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
-      if (blockFile.exists()) {
-        // Corrupt replica by writing random bytes into replica
-        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        FileChannel channel = raFile.getChannel();
-        String badString = "BADBAD";
-        int rand = random.nextInt((int)channel.size()/2);
-        raFile.seek(rand);
-        raFile.write(badString.getBytes());
-        raFile.close();
-        corrupted = true;
-      }
-    }
-    return corrupted;
-  }
-
-  public void testBlockCorruptionPolicy() throws IOException {
+  public void testBlockCorruptionPolicy() throws Exception {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
     Random random = new Random();
     FileSystem fs = null;
-    DFSClient dfsClient = null;
-    LocatedBlocks blocks = null;
-    int blockCount = 0;
     int rand = random.nextInt(3);
 
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
@@ -181,44 +154,24 @@ public class TestDatanodeBlockScanner ex
     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
     String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
     
-    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
-                                        cluster.getNameNodePort()), conf);
-    do {
-      blocks = dfsClient.getNamenode().
-                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      blockCount = blocks.get(0).getLocations().length;
-      try {
-        LOG.info("Looping until expected blockCount of 3 is received: " + blockCount);
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-    } while (blockCount != 3);
-    assertTrue(blocks.get(0).isCorrupt() == false);
+    DFSTestUtil.waitReplication(fs, file1, (short)3);
+    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
 
     // Corrupt random replica of block 
-    assertTrue(corruptReplica(block, rand));
+    assertTrue(cluster.corruptReplica(block, rand));
 
     // Restart the datanode hoping the corrupt block to be reported
     cluster.restartDataNode(rand);
 
     // We have 2 good replicas and block is not corrupt
-    do {
-      blocks = dfsClient.getNamenode().
-                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      blockCount = blocks.get(0).getLocations().length;
-      try {
-        LOG.info("Looping until expected blockCount of 2 is received: " + blockCount);
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-    } while (blockCount != 2);
-    assertTrue(blocks.get(0).isCorrupt() == false);
+    DFSTestUtil.waitReplication(fs, file1, (short)2);
+    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
   
     // Corrupt all replicas. Now, block should be marked as corrupt
     // and we should get all the replicas 
-    assertTrue(corruptReplica(block, 0));
-    assertTrue(corruptReplica(block, 1));
-    assertTrue(corruptReplica(block, 2));
+    assertTrue(cluster.corruptReplica(block, 0));
+    assertTrue(cluster.corruptReplica(block, 1));
+    assertTrue(cluster.corruptReplica(block, 2));
 
     // Read the file to trigger reportBadBlocks by client
     try {
@@ -230,18 +183,8 @@ public class TestDatanodeBlockScanner ex
 
     // We now have the blocks to be marked as corrupt and we get back all
     // its replicas
-    do {
-      blocks = dfsClient.getNamenode().
-                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      blockCount = blocks.get(0).getLocations().length;
-      try {
-        LOG.info("Looping until expected blockCount of 3 is received");
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-    } while (blockCount != 3);
-    assertTrue(blocks.get(0).isCorrupt() == true);
-
+    DFSTestUtil.waitReplication(fs, file1, (short)3);
+    assertTrue(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
     cluster.shutdown();
   }
   
@@ -261,7 +204,7 @@ public class TestDatanodeBlockScanner ex
    * 4. Test again waits until the block is reported with expected number
    *    of good replicas.
    */
-  public void testBlockCorruptionRecoveryPolicy() throws IOException {
+  public void testBlockCorruptionRecoveryPolicy() throws Exception {
     // Test recovery of 1 corrupt replica
     LOG.info("Testing corrupt replica recovery for one corrupt replica");
     blockCorruptionRecoveryPolicy(4, (short)3, 1);
@@ -274,50 +217,30 @@ public class TestDatanodeBlockScanner ex
   private void blockCorruptionRecoveryPolicy(int numDataNodes, 
                                              short numReplicas,
                                              int numCorruptReplicas) 
-                                             throws IOException {
+                                             throws Exception {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 30L);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 30);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30L);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-    FileSystem fs = null;
-    DFSClient dfsClient = null;
-    LocatedBlocks blocks = null;
-    int replicaCount = 0;
 
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
     cluster.waitActive();
-    fs = cluster.getFileSystem();
+    FileSystem fs = cluster.getFileSystem();
     Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
     DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
     Block blk = DFSTestUtil.getFirstBlock(fs, file1);
     String block = blk.getBlockName();
-    
-    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
-                                        cluster.getNameNodePort()), conf);
-    blocks = dfsClient.getNamenode().
-               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-    replicaCount = blocks.get(0).getLocations().length;
 
     // Wait until block is replicated to numReplicas
-    while (replicaCount != numReplicas) {
-      try {
-        LOG.info("Looping until expected replicaCount of " + numReplicas +
-                  "is reached");
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-      blocks = dfsClient.getNamenode().
-                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      replicaCount = blocks.get(0).getLocations().length;
-    }
-    assertTrue(blocks.get(0).isCorrupt() == false);
+    DFSTestUtil.waitReplication(fs, file1, numReplicas);
 
     // Corrupt numCorruptReplicas replicas of block 
     int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
     for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
-      if (corruptReplica(block, i)) 
+      if (cluster.corruptReplica(block, i)) {
         corruptReplicasDNIDs[j++] = i;
+      }
     }
     
     // Restart the datanodes containing corrupt replicas 
@@ -331,61 +254,19 @@ public class TestDatanodeBlockScanner ex
           + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getSelfAddr());
      cluster.restartDataNode(corruptReplicasDNIDs[i]);
     }
-    
+
     // Loop until all corrupt replicas are reported
-    int corruptReplicaSize = cluster.getNamesystem().
-                              numCorruptReplicas(blk);
-    while (corruptReplicaSize != numCorruptReplicas) {
-      try {
-        IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
-                          conf, true);
-      } catch (IOException e) {
-      }
-      try {
-        LOG.info("Looping until expected " + numCorruptReplicas + " are " +
-                 "reported. Current reported " + corruptReplicaSize);
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-      corruptReplicaSize = cluster.getNamesystem().
-                              numCorruptReplicas(blk);
-    }
+    DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
+        blk, numCorruptReplicas);
     
     // Loop until the block recovers after replication
-    blocks = dfsClient.getNamenode().
-               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-    replicaCount = blocks.get(0).getLocations().length;
-    while (replicaCount != numReplicas) {
-      try {
-        LOG.info("Looping until block gets rereplicated to " + numReplicas);
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-      blocks = dfsClient.getNamenode().
-                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      replicaCount = blocks.get(0).getLocations().length;
-    }
+    DFSTestUtil.waitReplication(fs, file1, numReplicas);
+    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
 
     // Make sure the corrupt replica is invalidated and removed from
     // corruptReplicasMap
-    corruptReplicaSize = cluster.getNamesystem().
-                          numCorruptReplicas(blk);
-    while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
-      try {
-        LOG.info("Looping until corrupt replica is invalidated");
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-      corruptReplicaSize = cluster.getNamesystem().
-                            numCorruptReplicas(blk);
-      blocks = dfsClient.getNamenode().
-                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      replicaCount = blocks.get(0).getLocations().length;
-    }
-    // Make sure block is healthy 
-    assertTrue(corruptReplicaSize == 0);
-    assertTrue(replicaCount == numReplicas);
-    assertTrue(blocks.get(0).isCorrupt() == false);
+    DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
+        blk, 0);
     cluster.shutdown();
   }
   

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java Fri Apr 22 17:10:43 2011
@@ -68,7 +68,7 @@ public class TestMissingBlocksAlert exte
 
       // Corrupt the block
       String block = DFSTestUtil.getFirstBlock(dfs, corruptFile).getBlockName();
-      TestDatanodeBlockScanner.corruptReplica(block, 0);
+      assertTrue(cluster.corruptReplica(block, 0));
 
       // read the file so that the corrupt block is reported to NN
       FSDataInputStream in = dfs.open(corruptFile); 

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java Fri Apr 22 17:10:43 2011
@@ -151,6 +151,7 @@ public class TestReplication extends Tes
     DFSClient dfsClient = null;
     LocatedBlocks blocks = null;
     int replicaCount = 0;
+    short replFactor = 1;
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
@@ -159,16 +160,19 @@ public class TestReplication extends Tes
   
     // Create file with replication factor of 1
     Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
-    DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
-    DFSTestUtil.waitReplication(fs, file1, (short)1);
+    DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
+    DFSTestUtil.waitReplication(fs, file1, replFactor);
   
     // Corrupt the block belonging to the created file
     String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
-    cluster.corruptBlockOnDataNodes(block);
-  
+
+    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+    assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted); 
+
     // Increase replication factor, this should invoke transfer request
     // Receiving datanode fails on checksum and reports it to namenode
-    fs.setReplication(file1, (short)2);
+    replFactor = 2;
+    fs.setReplication(file1, replFactor);
   
     // Now get block details and check if the block is corrupt
     blocks = dfsClient.getNamenode().

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Fri Apr 22 17:10:43 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 /**
@@ -57,4 +58,19 @@ public class NameNodeAdapter {
   public static Server getRpcServer(NameNode namenode) {
     return namenode.server;
   }
+
+  /**
+   * Return a tuple of the replica state (number racks, number live
+   * replicas, and number needed replicas) for the given block.
+   * @param namenode to proxy the invocation to.
+   */
+  public static int[] getReplicaInfo(NameNode namenode, Block b) {
+    FSNamesystem ns = namenode.getNamesystem();
+    ns.readLock();
+    int[] r = {ns.blockManager.getNumberOfRacks(b),
+               ns.blockManager.countNodes(b).liveReplicas(),
+               ns.blockManager.neededReplications.contains(b) ? 1 : 0};
+    ns.readUnlock();
+    return r;
+  }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java Fri Apr 22 17:10:43 2011
@@ -18,141 +18,458 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.ArrayList;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.log4j.Level;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import junit.framework.TestCase;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
 
-public class TestBlocksWithNotEnoughRacks extends TestCase {
+import static org.junit.Assert.*;
+import org.junit.Test;
 
+public class TestBlocksWithNotEnoughRacks {
+  public static final Log LOG = LogFactory.getLog(TestBlocksWithNotEnoughRacks.class);
   static {
-    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL) ;
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
   }
 
-  private static final Log LOG =
-    LogFactory.getLog(TestBlocksWithNotEnoughRacks.class.getName());
-  //Creates a block with all datanodes on same rack
-  //Adds additional datanode on a different rack
-  //The block should be replicated to the new rack
-  public void testSufficientlyReplicatedBlocksWithNotEnoughRacks() throws Exception {
+  /*
+   * Return a configuration object with low timeouts for testing and 
+   * a topology script set (which enables rack awareness).  
+   */
+  private Configuration getConf() {
     Configuration conf = new HdfsConfiguration();
+
+    // Lower the heart beat interval so the NN quickly learns of dead
+    // or decommissioned DNs and the NN issues replication and invalidation
+    // commands quickly (as replies to heartbeats)
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+
+    // Have the NN ReplicationMonitor compute the replication and
+    // invalidation commands to send DNs every second.
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+
+    // Have the NN check for pending replications every second so it
+    // quickly schedules additional replicas as they are identified.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
+
+    // The DNs report blocks every second.
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
+
+    // Indicates we have multiple racks
     conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
+    return conf;
+  }
+
+  /*
+   * Creates a block with all datanodes on the same rack, though the block
+   * is sufficiently replicated. Adds an additional datanode on a new rack. 
+   * The block should be replicated to the new rack.
+   */
+  @Test
+  public void testSufficientlyReplBlocksUsesNewRack() throws Exception {
+    Configuration conf = getConf();
     final short REPLICATION_FACTOR = 3;
-    final String FILE_NAME = "/testFile";
-    final Path FILE_PATH = new Path(FILE_NAME);
-    //All datanodes are on the same rack
-    String racks[] = {"/rack1","/rack1","/rack1",} ;
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).racks(racks).build();
+    final Path filePath = new Path("/testFile");
+    // All datanodes are on the same rack
+    String racks[] = {"/rack1", "/rack1", "/rack1"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+
+    try {
+      // Create a file with one block with a replication factor of 3
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+
+      // Add a new datanode on a different rack
+      String newRacks[] = {"/rack2"};
+      cluster.startDataNodes(conf, 1, true, null, newRacks);
+      cluster.waitActive();
+
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Like the previous test but the block starts with a single replica,
+   * and therefore unlike the previous test the block does not start
+   * off needing replicas.
+   */
+  @Test
+  public void testSufficientlySingleReplBlockUsesNewRack() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 1;
+    final Path filePath = new Path("/testFile");
+
+    String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      // Create a file with one block with a replication factor of 1
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
+
+      REPLICATION_FACTOR = 2;
+      ns.setReplication("/testFile", REPLICATION_FACTOR);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Creates a block with all datanodes on the same rack. Add additional
+   * datanodes on a different rack and increase the replication factor, 
+   * making sure there are enough replicas across racks. If the previous
+   * test passes this one should too, however this test may pass when
+   * the previous one fails because the replication code is explicitly
+   * triggered by setting the replication factor.
+   */
+  @Test
+  public void testUnderReplicatedUsesNewRacks() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 3;
+    final Path filePath = new Path("/testFile");
+    // All datanodes are on the same rack
+    String racks[] = {"/rack1", "/rack1", "/rack1", "/rack1", "/rack1"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
     try {
-      // create a file with one block with a replication factor of 3
+      // Create a file with one block
       final FileSystem fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
-      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
       
-      Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
-      final FSNamesystem namesystem = cluster.getNamesystem();
-      int numRacks = namesystem.blockManager.getNumberOfRacks(b);
-      NumberReplicas number = namesystem.blockManager.countNodes(b);
-      int curReplicas = number.liveReplicas();
-      int neededReplicationSize = 
-                           namesystem.blockManager.neededReplications.size();
+      // Add new datanodes on a different rack and increase the
+      // replication factor so the block is underreplicated and make
+      // sure at least one of the hosts on the new rack is used. 
+      String newRacks[] = {"/rack2", "/rack2"};
+      cluster.startDataNodes(conf, 2, true, null, newRacks);
+      REPLICATION_FACTOR = 5;
+      ns.setReplication("/testFile", REPLICATION_FACTOR);
+
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Test that a block that is re-replicated because one of its replicas
+   * is found to be corrupt and is re-replicated across racks.
+   */
+  @Test
+  public void testCorruptBlockRereplicatedAcrossRacks() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 2;
+    int fileLen = 512;
+    final Path filePath = new Path("/testFile");
+    // Datanodes are spread across two racks
+    String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      // Create a file with one block with a replication factor of 2
+      final FileSystem fs = cluster.getFileSystem();
       
-      //Add a new datanode on a different rack
-      String newRacks[] = {"/rack2"} ;
-      cluster.startDataNodes(conf, 1, true, null, newRacks);
+      DFSTestUtil.createFile(fs, filePath, fileLen, REPLICATION_FACTOR, 1L);
+      final String fileContent = DFSTestUtil.readFile(fs, filePath);
+
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
 
-      while ( (numRacks < 2) || (curReplicas != REPLICATION_FACTOR) ||
-              (neededReplicationSize > 0) ) {
-        LOG.info("Waiting for replication");
-        Thread.sleep(600);
-        numRacks = namesystem.blockManager.getNumberOfRacks(b);
-        number = namesystem.blockManager.countNodes(b);
-        curReplicas = number.liveReplicas();
-        neededReplicationSize = 
-                           namesystem.blockManager.neededReplications.size();
+      // Corrupt a replica of the block
+      int dnToCorrupt = DFSTestUtil.firstDnWithBlock(cluster, b);
+      assertTrue(cluster.corruptReplica(b.getBlockName(), dnToCorrupt));
+
+      // Restart the datanode so blocks are re-scanned, and the corrupt
+      // block is detected.
+      cluster.restartDataNode(dnToCorrupt);
+
+      // Wait for the namenode to notice the corrupt replica
+      DFSTestUtil.waitCorruptReplicas(fs, ns, filePath, b, 1);
+
+      // The rack policy is still respected
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Ensure all replicas are valid (the corrupt replica may not
+      // have been cleaned up yet).
+      for (int i = 0; i < racks.length; i++) {
+        String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+        if (blockContent != null && i != dnToCorrupt) {
+          assertEquals("Corrupt replica", fileContent, blockContent);
+        }
       }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Reduce the replication factor of a file, making sure that the only
+   * cross rack replica is not removed when deleting replicas.
+   */
+  @Test
+  public void testReduceReplFactorRespectsRackPolicy() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 3;
+    final Path filePath = new Path("/testFile");
+    String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
 
-      LOG.info("curReplicas = " + curReplicas);
-      LOG.info("numRacks = " + numRacks);
-      LOG.info("Size = " + namesystem.blockManager.neededReplications.size());
+    try {
+      // Create a file with one block
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Decrease the replication factor, make sure the deleted replica
+      // was not the one that lived on the rack with only one replica,
+      // ie we should still have 2 racks after reducing the repl factor.
+      REPLICATION_FACTOR = 2;
+      ns.setReplication("/testFile", REPLICATION_FACTOR); 
 
-      assertEquals(2,numRacks);
-      assertTrue(curReplicas == REPLICATION_FACTOR);
-      assertEquals(0,namesystem.blockManager.neededReplications.size());
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
     } finally {
       cluster.shutdown();
     }
-    
   }
 
-  public void testUnderReplicatedNotEnoughRacks() throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
-    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
+  /*
+   * Test that when a block is replicated because a replica is lost due
+   * to host failure the the rack policy is preserved.
+   */
+  @Test
+  public void testReplDueToNodeFailRespectsRackPolicy() throws Exception {
+    Configuration conf = getConf();
     short REPLICATION_FACTOR = 3;
-    final String FILE_NAME = "/testFile";
-    final Path FILE_PATH = new Path(FILE_NAME);
-    //All datanodes are on the same rack
-    String racks[] = {"/rack1","/rack1","/rack1",} ;
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).racks(racks).build();
+    final Path filePath = new Path("/testFile");
+    // Last datanode is on a different rack
+    String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
     try {
-      // create a file with one block with a replication factor of 3
+      // Create a file with one block with a replication factor of 2
       final FileSystem fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
-      DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Make the last datanode look like it failed to heartbeat by 
+      // calling removeDatanode and stopping it.
+      ArrayList<DataNode> datanodes = cluster.getDataNodes();
+      int idx = datanodes.size() - 1;
+      DataNode dataNode = datanodes.get(idx);
+      cluster.stopDataNode(idx);
+      ns.removeDatanode(dataNode.dnRegistration);
+
+      // The block should still have sufficient # replicas, across racks.
+      // The last node may not have contained a replica, but if it did
+      // it should have been replicated within the same rack.
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
       
-      Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
-      final FSNamesystem namesystem = cluster.getNamesystem();
-      int numRacks = namesystem.blockManager.getNumberOfRacks(b);
-      NumberReplicas number = namesystem.blockManager.countNodes(b);
-      int curReplicas = number.liveReplicas();
-      int neededReplicationSize = 
-                           namesystem.blockManager.neededReplications.size();
+      // Fail the last datanode again, it's also on rack2 so there is
+      // only 1 rack for all the replicas
+      datanodes = cluster.getDataNodes();
+      idx = datanodes.size() - 1;
+      dataNode = datanodes.get(idx);
+      cluster.stopDataNode(idx);
+      ns.removeDatanode(dataNode.dnRegistration);
+
+      // Make sure we have enough live replicas even though we are
+      // short one rack and therefore need one replica
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /*
+   * Test that when the excess replicas of a block are reduced due to
+   * a node re-joining the cluster the rack policy is not violated.
+   */
+  @Test
+  public void testReduceReplFactorDueToRejoinRespectsRackPolicy() 
+      throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 2;
+    final Path filePath = new Path("/testFile");
+    // Last datanode is on a different rack
+    String racks[] = {"/rack1", "/rack1", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      // Create a file with one block
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Make the last (cross rack) datanode look like it failed
+      // to heartbeat by stopping it and calling removeDatanode.
+      ArrayList<DataNode> datanodes = cluster.getDataNodes();
+      assertEquals(3, datanodes.size());
+      DataNode dataNode = datanodes.get(2);
+      cluster.stopDataNode(2);
+      ns.removeDatanode(dataNode.dnRegistration);
+
+      // The block gets re-replicated to another datanode so it has a 
+      // sufficient # replicas, but not across racks, so there should
+      // be 1 rack, and 1 needed replica (even though there are 2 hosts 
+      // available and only 2 replicas required).
+      DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+
+      // Start the "failed" datanode, which has a replica so the block is
+      // now over-replicated and therefore a replica should be removed but
+      // not on the restarted datanode as that would violate the rack policy.
+      String rack2[] = {"/rack2"};
+      cluster.startDataNodes(conf, 1, true, null, rack2);
+      cluster.waitActive();      
       
-      //Add a new datanode on a different rack
-      String newRacks[] = {"/rack2","/rack2","/rack2"} ;
-      cluster.startDataNodes(conf, 3, true, null, newRacks);
-      REPLICATION_FACTOR = 5;
-      namesystem.setReplication(FILE_NAME, REPLICATION_FACTOR); 
+      // The block now has sufficient # replicas, across racks
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 
-      while ( (numRacks < 2) || (curReplicas < REPLICATION_FACTOR) ||
-              (neededReplicationSize > 0) ) {
-        LOG.info("Waiting for replication");
-        Thread.sleep(600);
-        numRacks = namesystem.blockManager.getNumberOfRacks(b);
-        number = namesystem.blockManager.countNodes(b);
-        curReplicas = number.liveReplicas();
-        neededReplicationSize = 
-                           namesystem.blockManager.neededReplications.size();
-      }
+  /*
+   * Test that rack policy is still respected when blocks are replicated
+   * due to node decommissioning.
+   */
+  @Test
+  public void testNodeDecomissionRespectsRackPolicy() throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 2;
+    final Path filePath = new Path("/testFile");
+
+    // Configure an excludes file
+    FileSystem localFileSys = FileSystem.getLocal(conf);
+    Path workingDir = localFileSys.getWorkingDirectory();
+    Path dir = new Path(workingDir, "build/test/data/temp/decommission");
+    Path excludeFile = new Path(dir, "exclude");
+    assertTrue(localFileSys.mkdirs(dir));
+    DFSTestUtil.writeFile(localFileSys, excludeFile, "");
+    conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+
+    // Two blocks and four racks
+    String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      // Create a file with one block
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Decommission one of the hosts with the block, this should cause 
+      // the block to get replicated to another host on the same rack,
+      // otherwise the rack policy is violated.
+      BlockLocation locs[] = fs.getFileBlockLocations(
+          fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
+      String name = locs[0].getNames()[0];
+      DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+      ns.refreshNodes(conf);
+      DFSTestUtil.waitForDecommission(fs, name);
 
-      LOG.info("curReplicas = " + curReplicas);
-      LOG.info("numRacks = " + numRacks);
-      LOG.info("Size = " + namesystem.blockManager.neededReplications.size());
+      // Check the block still has sufficient # replicas across racks
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /*
+   * Test that rack policy is still respected when blocks are replicated
+   * due to node decommissioning, when the blocks are over-replicated.
+   */
+  @Test
+  public void testNodeDecomissionWithOverreplicationRespectsRackPolicy() 
+      throws Exception {
+    Configuration conf = getConf();
+    short REPLICATION_FACTOR = 5;
+    final Path filePath = new Path("/testFile");
+
+    // Configure an excludes file
+    FileSystem localFileSys = FileSystem.getLocal(conf);
+    Path workingDir = localFileSys.getWorkingDirectory();
+    Path dir = new Path(workingDir, "build/test/data/temp/decommission");
+    Path excludeFile = new Path(dir, "exclude");
+    assertTrue(localFileSys.mkdirs(dir));
+    DFSTestUtil.writeFile(localFileSys, excludeFile, "");
+    conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+
+    // All hosts are on two racks, only one host on /rack2
+    String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"};
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(racks.length).racks(racks).build();
+    final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+    try {
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+      Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+      // Lower the replication factor so the blocks are over replicated
+      REPLICATION_FACTOR = 2;
+      fs.setReplication(filePath, REPLICATION_FACTOR);
+
+      // Decommission one of the hosts with the block that is not on
+      // the lone host on rack2 (if we decomission that host it would
+      // be impossible to respect the rack policy).
+      BlockLocation locs[] = fs.getFileBlockLocations(
+          fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
+      for (String top : locs[0].getTopologyPaths()) {
+        if (!top.startsWith("/rack2")) {
+          String name = top.substring("/rack1".length()+1);
+          DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+          ns.refreshNodes(conf);
+          DFSTestUtil.waitForDecommission(fs, name);
+          break;
+        }
+      }
 
-      assertEquals(2,numRacks);
-      assertTrue(curReplicas == REPLICATION_FACTOR);
-      assertEquals(0,namesystem.blockManager.neededReplications.size());
+      // Check the block still has sufficient # replicas across racks,
+      // ie we didn't remove the replica on the host on /rack1.
+      DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
     } finally {
       cluster.shutdown();
     }
-    
   }
-}
+}
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java Fri Apr 22 17:10:43 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import java.net.URL;
 import java.util.Collection;
@@ -31,7 +32,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
 import org.junit.Test;
 
 /** A JUnit test for corrupt_files.jsp */
@@ -82,7 +82,7 @@ public class TestCorruptFilesJsp  {
       for (int idx = 0; idx < filepaths.length - 1; idx++) {
         String blockName = DFSTestUtil.getFirstBlock(fs, filepaths[idx])
             .getBlockName();
-        TestDatanodeBlockScanner.corruptReplica(blockName, 0);
+        assertTrue(cluster.corruptReplica(blockName, 0));
 
         // read the file so that the corrupt block is reported to NN
         FSDataInputStream in = fs.open(filepaths[idx]);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java Fri Apr 22 17:10:43 2011
@@ -54,7 +54,7 @@ public class TestOverReplicatedBlocks ex
       
       // corrupt the block on datanode 0
       Block block = DFSTestUtil.getFirstBlock(fs, fileName);
-      TestDatanodeBlockScanner.corruptReplica(block.getBlockName(), 0);
+      assertTrue(cluster.corruptReplica(block.getBlockName(), 0));
       DataNodeProperties dnProps = cluster.stopDataNode(0);
       // remove block scanner log to trigger block scanning
       File scanLog = new File(System.getProperty("test.build.data"),