You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/04/29 05:03:27 UTC

svn commit: r1097671 [2/3] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/co...

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Apr 29 03:03:25 2011
@@ -461,7 +461,9 @@ abstract class INode implements Comparab
                         long nsQuota,
                         long dsQuota,
                         long preferredBlockSize) {
-    if (blocks == null) {
+    if (symlink.length() != 0) { // check if symbolic link
+      return new INodeSymlink(symlink, modificationTime, atime, permissions);
+    }  else if (blocks == null) { //not sym link and blocks null? directory!
       if (nsQuota >= 0 || dsQuota >= 0) {
         return new INodeDirectoryWithQuota(
             permissions, modificationTime, nsQuota, dsQuota);
@@ -469,10 +471,6 @@ abstract class INode implements Comparab
       // regular directory
       return new INodeDirectory(permissions, modificationTime);
     }
-    // check if symbolic link
-    if (symlink.length() != 0) {
-      return new INodeSymlink(symlink, modificationTime, atime, permissions);
-    } 
     // file
     return new INodeFile(permissions, blocks, replication,
         modificationTime, atime, preferredBlockSize);

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Fri Apr 29 03:03:25 2011
@@ -86,9 +86,8 @@ class UnderReplicatedBlocks implements I
                           int curReplicas, 
                           int decommissionedReplicas,
                           int expectedReplicas) {
-    if (curReplicas<0) {
-      return LEVEL;
-    } else if (curReplicas>=expectedReplicas) {
+    assert curReplicas >= 0 : "Negative replicas!";
+    if (curReplicas >= expectedReplicas) {
       return 3; // Block doesn't have enough racks
     } else if(curReplicas==0) {
       // If there are zero non-decommissioned replica but there are
@@ -116,9 +115,7 @@ class UnderReplicatedBlocks implements I
                            int curReplicas, 
                            int decomissionedReplicas,
                            int expectedReplicas) {
-    if(curReplicas<0) {
-      return false;
-    }
+    assert curReplicas >= 0 : "Negative replicas!";
     int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
                                expectedReplicas);
     if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {
@@ -159,8 +156,10 @@ class UnderReplicatedBlocks implements I
       }
       return true;
     } else {
+      // Try to remove the block from all queues if the block was
+      // not found in the queue for the given priority level.
       for(int i=0; i<LEVEL; i++) {
-        if(i!=priLevel && priorityQueues.get(i).remove(block)) {
+        if(priorityQueues.get(i).remove(block)) {
           if(NameNode.stateChangeLog.isDebugEnabled()) {
             NameNode.stateChangeLog.debug(
               "BLOCK* NameSystem.UnderReplicationBlock.remove: "
@@ -202,7 +201,7 @@ class UnderReplicatedBlocks implements I
           "BLOCK* NameSystem.UnderReplicationBlock.update:"
           + block
           + " has only "+curReplicas
-          + " replicas and need " + curExpectedReplicas
+          + " replicas and needs " + curExpectedReplicas
           + " replicas so is added to neededReplications"
           + " at priority level " + curPri);
       }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java Fri Apr 29 03:03:25 2011
@@ -37,8 +37,9 @@ import static org.apache.hadoop.hdfs.too
 @InterfaceStability.Unstable
 class EditsLoaderCurrent implements EditsLoader {
 
-  private static int [] supportedVersions = {
-    -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31 };
+  private static int[] supportedVersions = {
+      -18, -19, -20, -21, -22, -23, -24,
+      -25, -26, -27, -28, -30, -31, -32, -33, -34 };
 
   private EditsVisitor v;
   private int editsVersion = 0;

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Fri Apr 29 03:03:25 2011
@@ -119,8 +119,8 @@ import org.apache.hadoop.security.token.
 class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  private static int [] versions = 
-    {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31};
+  private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
+      -24, -25, -26, -27, -28, -30, -31, -32, -33, -34 };
   private int imageVersion = 0;
 
   /* (non-Javadoc)

Propchange: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
 /hadoop/core/trunk/src/test/hdfs:776175-785643
 /hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
-/hadoop/hdfs/trunk/src/test/hdfs:1086482-1095244
+/hadoop/hdfs/trunk/src/test/hdfs:1086482-1097628

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Apr 29 03:03:25 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -27,6 +28,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URL;
 import java.net.URLConnection;
@@ -36,6 +39,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -50,15 +54,19 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.junit.Assert;
+
+import static org.junit.Assert.*;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -143,6 +151,12 @@ public class DFSTestUtil {
           replicationFactor, files[idx].getSeed());
     }
   }
+
+  public static String readFile(FileSystem fs, Path fileName) throws IOException {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    IOUtils.copyBytes(fs.open(fileName), os, 1024, true);
+    return os.toString();
+  }
   
   public static void createFile(FileSystem fs, Path fileName, long fileLen, 
       short replFactor, long seed) throws IOException {
@@ -172,8 +186,6 @@ public class DFSTestUtil {
   
   /** check if the files have been copied correctly. */
   public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
-    
-    //Configuration conf = new HdfsConfiguration();
     Path root = new Path(topdir);
     
     for (int idx = 0; idx < nFiles; idx++) {
@@ -206,9 +218,10 @@ public class DFSTestUtil {
     }
   }
 
-  // waits for the replication factor of all files to reach the
-  // specified target
-  //
+  /*
+   * Waits for the replication factor of all files to reach the
+   * specified target.
+   */
   public void waitReplication(FileSystem fs, String topdir, short value) 
                                               throws IOException {
     Path root = new Path(topdir);
@@ -219,6 +232,128 @@ public class DFSTestUtil {
     }
   }
 
+  /*
+   * Check if the given block in the given file is corrupt.
+   */
+  public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
+      Path file, int blockNo) throws IOException {
+    DFSClient client = new DFSClient(new InetSocketAddress("localhost",
+        cluster.getNameNodePort()), cluster.getConfiguration());
+    LocatedBlocks blocks;
+    try {
+       blocks = client.getNamenode().getBlockLocations(
+           file.toString(), 0, Long.MAX_VALUE);
+    } finally {
+      client.close();
+    }
+    return blocks.get(blockNo).isCorrupt();
+  }
+
+  /*
+   * Wait up to 20s for the given block to be replicated across
+   * the requested number of racks, with the requested number of
+   * replicas, and the requested number of replicas still needed.
+   */
+  public static void waitForReplication(MiniDFSCluster cluster, Block b,
+      int racks, int replicas, int neededReplicas)
+      throws IOException, TimeoutException, InterruptedException {
+    int curRacks = 0;
+    int curReplicas = 0;
+    int curNeededReplicas = 0;
+    int count = 0;
+    final int ATTEMPTS = 20;
+
+    do {
+      Thread.sleep(1000);
+      int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b);
+      curRacks = r[0];
+      curReplicas = r[1];
+      curNeededReplicas = r[2];
+      count++;
+    } while ((curRacks != racks ||
+              curReplicas != replicas ||
+              curNeededReplicas != neededReplicas) && count < ATTEMPTS);
+
+    if (count == ATTEMPTS) {
+      throw new TimeoutException("Timed out waiting for replication."
+          + " Needed replicas = "+neededReplicas
+          + " Cur needed replicas = "+curNeededReplicas
+          + " Replicas = "+replicas+" Cur replicas = "+curReplicas
+          + " Racks = "+racks+" Cur racks = "+curRacks);
+    }
+  }
+
+  /*
+   * Keep accessing the given file until the namenode reports that the
+   * given block in the file contains the given number of corrupt replicas.
+   */
+  public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
+      Path file, Block b, int corruptRepls)
+      throws IOException, TimeoutException {
+    int count = 0;
+    final int ATTEMPTS = 20;
+    int repls = ns.numCorruptReplicas(b);
+    while (repls != corruptRepls && count < ATTEMPTS) {
+      try {
+        IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
+            512, true);
+      } catch (IOException e) {
+        // Swallow exceptions
+      }
+      System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
+      repls = ns.numCorruptReplicas(b);
+      count++;
+    }
+    if (count == ATTEMPTS) {
+      throw new TimeoutException("Timed out waiting for corrupt replicas."
+          + " Waiting for "+corruptRepls+", but only found "+repls);
+    }
+  }
+
+  /*
+   * Wait up to 20s for the given DN (host:port) to be decommissioned.
+   */
+  public static void waitForDecommission(FileSystem fs, String name) 
+      throws IOException, InterruptedException, TimeoutException {
+    DatanodeInfo dn = null;
+    int count = 0;
+    final int ATTEMPTS = 20;
+
+    do {
+      Thread.sleep(1000);
+      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      for (DatanodeInfo info : dfs.getDataNodeStats()) {
+        if (name.equals(info.getName())) {
+          dn = info;
+        }
+      }
+      count++;
+    } while ((dn == null ||
+              dn.isDecommissionInProgress() ||
+              !dn.isDecommissioned()) && count < ATTEMPTS);
+
+    if (count == ATTEMPTS) {
+      throw new TimeoutException("Timed out waiting for datanode "
+          + name + " to decommission.");
+    }
+  }
+
+  /*
+   * Returns the index of the first datanode which has a copy
+   * of the given block, or -1 if no such datanode exists.
+   */
+  public static int firstDnWithBlock(MiniDFSCluster cluster, Block b)
+      throws IOException {
+    int numDatanodes = cluster.getDataNodes().size();
+    for (int i = 0; i < numDatanodes; i++) {
+      String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+      if (blockContent != null) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
   /** return list of filenames created as part of createFiles */
   public String[] getFileNames(String topDir) {
     if (nFiles == 0)
@@ -241,10 +376,13 @@ public class DFSTestUtil {
       BlockLocation locs[] = fs.getFileBlockLocations(
         fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
       for (int j = 0; j < locs.length; j++) {
-        String[] loc = locs[j].getHosts();
-        if (loc.length != replFactor) {
-          System.out.println("File " + fileName + " has replication factor " +
-              loc.length);
+        String[] hostnames = locs[j].getNames();
+        if (hostnames.length != replFactor) {
+          String hostNameList = "";
+          for (String h : hostnames) hostNameList += h + " ";
+          System.out.println("Block " + j + " of file " + fileName 
+              + " has replication factor " + hostnames.length + "; locations "
+              + hostNameList);
           good = false;
           try {
             System.out.println("Waiting for replication factor to drain");
@@ -253,6 +391,10 @@ public class DFSTestUtil {
           break;
         }
       }
+      if (good) {
+        System.out.println("All blocks of file " + fileName
+            + " verified to have replication factor " + replFactor);
+      }
     } while(!good);
   }
   
@@ -285,7 +427,7 @@ public class DFSTestUtil {
         ).getLogger().setLevel(org.apache.log4j.Level.ALL);
   }
 
-  static String readFile(File f) throws IOException {
+  public static String readFile(File f) throws IOException {
     StringBuilder b = new StringBuilder();
     BufferedReader in = new BufferedReader(new FileReader(f));
     for(int c; (c = in.read()) != -1; b.append((char)c));
@@ -293,6 +435,17 @@ public class DFSTestUtil {
     return b.toString();
   }
 
+  /* Write the given string to the given file */
+  public static void writeFile(FileSystem fs, Path p, String s) 
+      throws IOException {
+    if (fs.exists(p)) {
+      fs.delete(p, true);
+    }
+    InputStream is = new ByteArrayInputStream(s.getBytes());
+    FSDataOutputStream os = fs.create(p);
+    IOUtils.copyBytes(is, os, s.length(), true);
+  }
+
   // Returns url content as string.
   public static String urlGet(URL url) throws IOException {
     URLConnection conn = url.openConnection();
@@ -402,7 +555,7 @@ public class DFSTestUtil {
   /** For {@link TestTransferRbw} */
   public static DataTransferProtocol.Status transferRbw(final Block b, 
       final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
-    Assert.assertEquals(2, datanodes.length);
+    assertEquals(2, datanodes.length);
     final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
         datanodes.length, dfsClient);
     final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Apr 29 03:03:25 2011
@@ -878,29 +878,69 @@ public class MiniDFSCluster {
     System.out.println("Cluster is active");
   }
 
-  /*
-   * Corrupt a block on all datanode
+  /**
+   * Return the contents of the given block on the given datanode.
+   *
+   * @param The index of the datanode
+   * @param The name of the block
+   * @throws IOException on error accessing the file for the given block
+   * @return The contents of the block file, null if none found
    */
-  void corruptBlockOnDataNodes(String blockName) throws Exception{
-    for (int i=0; i < dataNodes.size(); i++)
-      corruptBlockOnDataNode(i,blockName);
+  public String readBlockOnDataNode(int i, String blockName) throws IOException {
+    assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+
+    // Each datanode has multiple data dirs, check each
+    for (int dn = i*2; dn < i*2+2; dn++) {
+      File dataDir = new File(getBaseDirectory() + "data");
+      File blockFile = new File(dataDir,
+          "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
+      if (blockFile.exists()) {
+        return DFSTestUtil.readFile(blockFile);
+      }
+    }
+    return null;
   }
 
-  /*
-   * Corrupt a block on a particular datanode
+  /**
+   * Corrupt a block on all datanodes.
+   *
+   * @param The name of the block
+   * @throws IOException on error accessing the given block.
+   * @return The number of block files corrupted.
+   */  
+  public int corruptBlockOnDataNodes(String blockName) throws IOException {
+    int blocksCorrupted = 0;
+    for (int i=0; i < dataNodes.size(); i++) {
+      if (corruptReplica(blockName, i)) {
+        blocksCorrupted++;
+      }
+    }
+    return blocksCorrupted;
+  }
+
+  /**
+   * Corrupt a block on a particular datanode.
+   *
+   * @param The index of the datanode
+   * @param The name of the block
+   * @throws IOException on error accessing the given block or if
+   * the contents of the block (on the same datanode) differ.
+   * @return true if a replica was corrupted, false otherwise
    */
-  boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
+  public boolean corruptReplica(String blockName, int i) throws IOException {
     Random random = new Random();
-    boolean corrupted = false;
-    File dataDir = new File(getBaseDirectory() + "data");
-    if (i < 0 || i >= dataNodes.size())
-      return false;
+    assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+    int filesCorrupted = 0;
+
+    // Each datanode has multiple data dirs, check each
     for (int dn = i*2; dn < i*2+2; dn++) {
-      File blockFile = new File(dataDir, "data" + (dn+1) + FINALIZED_DIR_NAME +
-                                blockName);
-      System.out.println("Corrupting for: " + blockFile);
-      if (blockFile.exists()) {
-        // Corrupt replica by writing random bytes into replica
+      File dataDir = new File(getBaseDirectory() + "data");
+      File blockFile = new File(dataDir,
+          "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
+
+      // Corrupt the replica by writing some bytes into a random offset
+      if (blockFile.exists()) { 
+        System.out.println("Corrupting " + blockFile);
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
         FileChannel channel = raFile.getChannel();
         String badString = "BADBAD";
@@ -908,10 +948,12 @@ public class MiniDFSCluster {
         raFile.seek(rand);
         raFile.write(badString.getBytes());
         raFile.close();
+        filesCorrupted++;
       }
-      corrupted = true;
     }
-    return corrupted;
+    assert filesCorrupted == 0 || filesCorrupted == 1
+      : "Unexpected # block files";
+    return filesCorrupted == 1;
   }
 
   /*

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java Fri Apr 29 03:03:25 2011
@@ -244,7 +244,7 @@ public class TestCrcCorruption {
   private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception {
     long fileSize = 4096;
     Path file = new Path("/testFile");
-
+    short replFactor = (short)numDataNodes;
     Configuration conf = new Configuration();
     conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
@@ -253,11 +253,12 @@ public class TestCrcCorruption {
       cluster.waitActive();
       FileSystem fs = cluster.getFileSystem();
 
-      DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/);
-      DFSTestUtil.waitReplication(fs, file, (short)numDataNodes);
+      DFSTestUtil.createFile(fs, file, fileSize, replFactor, 12345L /*seed*/);
+      DFSTestUtil.waitReplication(fs, file, replFactor);
 
       String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
-      cluster.corruptBlockOnDataNodes(block);
+      int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+      assertEquals("All replicas not corrupted", replFactor, blockFilesCorrupted);
 
       try {
         IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Apr 29 03:03:25 2011
@@ -380,7 +380,7 @@ public class TestDFSClientRetries extend
     int bufferSize = 4096;
     
     Configuration conf = new HdfsConfiguration();
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_XCIEVERS_KEY,xcievers);
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, xcievers);
     conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 
                 retries);
     conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWin);

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java Fri Apr 29 03:03:25 2011
@@ -29,17 +29,20 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 /**
 * This test ensures the appropriate response (successful or failure) from
 * the system when the system is upgraded under various storage state and
 * version conditions.
 */
-public class TestDFSUpgrade extends TestCase {
+public class TestDFSUpgrade {
  
   private static final Log LOG = LogFactory.getLog(
                                                    "org.apache.hadoop.hdfs.TestDFSUpgrade");
@@ -141,6 +144,7 @@ public class TestDFSUpgrade extends Test
    * This test attempts to upgrade the NameNode and DataNode under
    * a number of valid and invalid conditions.
    */
+  @Test
   public void testUpgrade() throws Exception {
     File[] baseDirs;
     UpgradeUtilities.initialize();
@@ -256,15 +260,23 @@ public class TestDFSUpgrade extends Test
     } // end numDir loop
   }
  
-  protected void tearDown() throws Exception {
-    LOG.info("Shutting down MiniDFSCluster");
-    if (cluster != null) cluster.shutdown();
+  @Test(expected=IOException.class)
+  public void testUpgradeFromPreUpgradeLVFails() throws IOException {
+    // Upgrade from versions prior to Storage#LAST_UPGRADABLE_LAYOUT_VERSION
+    // is not allowed
+    Storage.checkVersionUpgradable(Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION + 1);
+    fail("Expected IOException is not thrown");
   }
-    
+  
+  public void test203LayoutVersion() {
+    for (int lv : Storage.LAYOUT_VERSIONS_203) {
+      assertTrue(Storage.is203LayoutVersion(lv));
+    }
+  }
+  
   public static void main(String[] args) throws Exception {
     new TestDFSUpgrade().testUpgrade();
   }
-  
 }
 
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Fri Apr 29 03:03:25 2011
@@ -21,10 +21,10 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.io.*;
-import java.nio.channels.FileChannel;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +47,8 @@ public class TestDatanodeBlockScanner ex
   private static final Log LOG = 
                  LogFactory.getLog(TestDatanodeBlockScanner.class);
   
+  private static final long TIMEOUT = 20000; // 20 sec.
+  
   private static Pattern pattern = 
              Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)");
   
@@ -55,18 +56,35 @@ public class TestDatanodeBlockScanner ex
              Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)");
   /**
    * This connects to datanode and fetches block verification data.
-   * It repeats this until the given block has a verification time > 0.
+   * It repeats this until the given block has a verification time > newTime.
+   * @param newTime - validation timestamps before newTime are "old", the
+   *            result of previous validations.  This method waits until a "new"
+   *            validation timestamp is obtained.  If no validator runs soon
+   *            enough, the method will time out.
+   * @return - the new validation timestamp
+   * @throws IOException
+   * @throws TimeoutException
    */
   private static long waitForVerification(DatanodeInfo dn, FileSystem fs, 
-                                          Path file, int blocksValidated) throws IOException {
+                          Path file, int blocksValidated, 
+                          long newTime, long timeout) 
+  throws IOException, TimeoutException {
     URL url = new URL("http://localhost:" + dn.getInfoPort() +
                       "/blockScannerReport?listblocks");
     long lastWarnTime = System.currentTimeMillis();
+    if (newTime <= 0) newTime = 1L;
     long verificationTime = 0;
     
     String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
-    
-    while (verificationTime <= 0) {
+    long failtime = (timeout <= 0) ? Long.MAX_VALUE 
+        : System.currentTimeMillis() + timeout;
+    while (verificationTime < newTime) {
+      if (failtime < System.currentTimeMillis()) {
+        throw new TimeoutException("failed to achieve block verification after "
+            + timeout + " msec.  Current verification timestamp = "
+            + verificationTime + ", requested verification time > " 
+            + newTime);
+      }
       String response = DFSTestUtil.urlGet(url);
       if(blocksValidated >= 0) {
         for(Matcher matcher = pattern_blockVerify.matcher(response); matcher.find();) {
@@ -83,7 +101,7 @@ public class TestDatanodeBlockScanner ex
         }
       }
       
-      if (verificationTime <= 0) {
+      if (verificationTime < newTime) {
         long now = System.currentTimeMillis();
         if ((now - lastWarnTime) >= 5*1000) {
           LOG.info("Waiting for verification of " + block);
@@ -98,8 +116,7 @@ public class TestDatanodeBlockScanner ex
     return verificationTime;
   }
 
-  public void testDatanodeBlockScanner() throws IOException {
-    
+  public void testDatanodeBlockScanner() throws IOException, TimeoutException {
     long startTime = System.currentTimeMillis();
     
     Configuration conf = new HdfsConfiguration();
@@ -115,6 +132,7 @@ public class TestDatanodeBlockScanner ex
      */
     DFSTestUtil.createFile(fs, file1, 10, (short)1, 0);
     cluster.shutdown();
+
     cluster = new MiniDFSCluster.Builder(conf)
                                 .numDataNodes(1)
                                 .format(false).build();
@@ -128,7 +146,7 @@ public class TestDatanodeBlockScanner ex
     /*
      * The cluster restarted. The block should be verified by now.
      */
-    assertTrue(waitForVerification(dn, fs, file1, 1) > startTime);
+    assertTrue(waitForVerification(dn, fs, file1, 1, startTime, TIMEOUT) >= startTime);
     
     /*
      * Create a new file and read the block. The block should be marked 
@@ -137,41 +155,16 @@ public class TestDatanodeBlockScanner ex
     DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
     IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(), 
                       conf, true); 
-    assertTrue(waitForVerification(dn, fs, file2, 2) > startTime);
+    assertTrue(waitForVerification(dn, fs, file2, 2, startTime, TIMEOUT) >= startTime);
     
     cluster.shutdown();
   }
 
-  public static boolean corruptReplica(String blockName, int replica) throws IOException {
-    Random random = new Random();
-    File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
-    boolean corrupted = false;
-    for (int i=replica*2; i<replica*2+2; i++) {
-      File blockFile = new File(baseDir, "data" + (i+1) + 
-          MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
-      if (blockFile.exists()) {
-        // Corrupt replica by writing random bytes into replica
-        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        FileChannel channel = raFile.getChannel();
-        String badString = "BADBAD";
-        int rand = random.nextInt((int)channel.size()/2);
-        raFile.seek(rand);
-        raFile.write(badString.getBytes());
-        raFile.close();
-        corrupted = true;
-      }
-    }
-    return corrupted;
-  }
-
-  public void testBlockCorruptionPolicy() throws IOException {
+  public void testBlockCorruptionPolicy() throws Exception {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
     Random random = new Random();
     FileSystem fs = null;
-    DFSClient dfsClient = null;
-    LocatedBlocks blocks = null;
-    int blockCount = 0;
     int rand = random.nextInt(3);
 
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
@@ -181,44 +174,24 @@ public class TestDatanodeBlockScanner ex
     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
     String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
     
-    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
-                                        cluster.getNameNodePort()), conf);
-    do {
-      blocks = dfsClient.getNamenode().
-                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      blockCount = blocks.get(0).getLocations().length;
-      try {
-        LOG.info("Looping until expected blockCount of 3 is received: " + blockCount);
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-    } while (blockCount != 3);
-    assertTrue(blocks.get(0).isCorrupt() == false);
+    DFSTestUtil.waitReplication(fs, file1, (short)3);
+    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
 
     // Corrupt random replica of block 
-    assertTrue(corruptReplica(block, rand));
+    assertTrue(cluster.corruptReplica(block, rand));
 
     // Restart the datanode hoping the corrupt block to be reported
     cluster.restartDataNode(rand);
 
     // We have 2 good replicas and block is not corrupt
-    do {
-      blocks = dfsClient.getNamenode().
-                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      blockCount = blocks.get(0).getLocations().length;
-      try {
-        LOG.info("Looping until expected blockCount of 2 is received: " + blockCount);
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-    } while (blockCount != 2);
-    assertTrue(blocks.get(0).isCorrupt() == false);
+    DFSTestUtil.waitReplication(fs, file1, (short)2);
+    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
   
     // Corrupt all replicas. Now, block should be marked as corrupt
     // and we should get all the replicas 
-    assertTrue(corruptReplica(block, 0));
-    assertTrue(corruptReplica(block, 1));
-    assertTrue(corruptReplica(block, 2));
+    assertTrue(cluster.corruptReplica(block, 0));
+    assertTrue(cluster.corruptReplica(block, 1));
+    assertTrue(cluster.corruptReplica(block, 2));
 
     // Read the file to trigger reportBadBlocks by client
     try {
@@ -230,18 +203,8 @@ public class TestDatanodeBlockScanner ex
 
     // We now have the blocks to be marked as corrupt and we get back all
     // its replicas
-    do {
-      blocks = dfsClient.getNamenode().
-                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      blockCount = blocks.get(0).getLocations().length;
-      try {
-        LOG.info("Looping until expected blockCount of 3 is received");
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-    } while (blockCount != 3);
-    assertTrue(blocks.get(0).isCorrupt() == true);
-
+    DFSTestUtil.waitReplication(fs, file1, (short)3);
+    assertTrue(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
     cluster.shutdown();
   }
   
@@ -261,11 +224,13 @@ public class TestDatanodeBlockScanner ex
    * 4. Test again waits until the block is reported with expected number
    *    of good replicas.
    */
-  public void testBlockCorruptionRecoveryPolicy() throws IOException {
+  public void testBlockCorruptionRecoveryPolicy1() throws Exception {
     // Test recovery of 1 corrupt replica
     LOG.info("Testing corrupt replica recovery for one corrupt replica");
     blockCorruptionRecoveryPolicy(4, (short)3, 1);
+  }
 
+  public void testBlockCorruptionRecoveryPolicy2() throws Exception {
     // Test recovery of 2 corrupt replicas
     LOG.info("Testing corrupt replica recovery for two corrupt replicas");
     blockCorruptionRecoveryPolicy(5, (short)3, 2);
@@ -274,111 +239,58 @@ public class TestDatanodeBlockScanner ex
   private void blockCorruptionRecoveryPolicy(int numDataNodes, 
                                              short numReplicas,
                                              int numCorruptReplicas) 
-                                             throws IOException {
+                                             throws Exception {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 30L);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 30);
-    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30L);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-    FileSystem fs = null;
-    DFSClient dfsClient = null;
-    LocatedBlocks blocks = null;
-    int replicaCount = 0;
 
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
     cluster.waitActive();
-    fs = cluster.getFileSystem();
+    FileSystem fs = cluster.getFileSystem();
     Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
     DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
     Block blk = DFSTestUtil.getFirstBlock(fs, file1);
     String block = blk.getBlockName();
-    
-    dfsClient = new DFSClient(new InetSocketAddress("localhost", 
-                                        cluster.getNameNodePort()), conf);
-    blocks = dfsClient.getNamenode().
-               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-    replicaCount = blocks.get(0).getLocations().length;
 
     // Wait until block is replicated to numReplicas
-    while (replicaCount != numReplicas) {
-      try {
-        LOG.info("Looping until expected replicaCount of " + numReplicas +
-                  "is reached");
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-      blocks = dfsClient.getNamenode().
-                   getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      replicaCount = blocks.get(0).getLocations().length;
-    }
-    assertTrue(blocks.get(0).isCorrupt() == false);
+    DFSTestUtil.waitReplication(fs, file1, numReplicas);
 
     // Corrupt numCorruptReplicas replicas of block 
     int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
     for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
-      if (corruptReplica(block, i)) 
+      if (cluster.corruptReplica(block, i)) {
         corruptReplicasDNIDs[j++] = i;
+        LOG.info("successfully corrupted block " + block + " on node " 
+                 + i + " " + cluster.getDataNodes().get(i).getSelfAddr());
+      }
     }
     
     // Restart the datanodes containing corrupt replicas 
     // so they would be reported to namenode and re-replicated
-    for (int i =0; i < numCorruptReplicas; i++) 
-     cluster.restartDataNode(corruptReplicasDNIDs[i]);
+    // They MUST be restarted in reverse order from highest to lowest index,
+    // because the act of restarting them removes them from the ArrayList
+    // and causes the indexes of all nodes above them in the list to change.
+    for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
+      LOG.info("restarting node with corrupt replica: position " 
+          + i + " node " + corruptReplicasDNIDs[i] + " " 
+          + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getSelfAddr());
+      cluster.restartDataNode(corruptReplicasDNIDs[i]);
+    }
 
     // Loop until all corrupt replicas are reported
-    int corruptReplicaSize = cluster.getNamesystem().
-                              numCorruptReplicas(blk);
-    while (corruptReplicaSize != numCorruptReplicas) {
-      try {
-        IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
-                          conf, true);
-      } catch (IOException e) {
-      }
-      try {
-        LOG.info("Looping until expected " + numCorruptReplicas + " are " +
-                 "reported. Current reported " + corruptReplicaSize);
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-      corruptReplicaSize = cluster.getNamesystem().
-                              numCorruptReplicas(blk);
-    }
+    DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
+        blk, numCorruptReplicas);
     
     // Loop until the block recovers after replication
-    blocks = dfsClient.getNamenode().
-               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-    replicaCount = blocks.get(0).getLocations().length;
-    while (replicaCount != numReplicas) {
-      try {
-        LOG.info("Looping until block gets rereplicated to " + numReplicas);
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-      blocks = dfsClient.getNamenode().
-                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      replicaCount = blocks.get(0).getLocations().length;
-    }
+    DFSTestUtil.waitReplication(fs, file1, numReplicas);
+    assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
 
     // Make sure the corrupt replica is invalidated and removed from
     // corruptReplicasMap
-    corruptReplicaSize = cluster.getNamesystem().
-                          numCorruptReplicas(blk);
-    while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
-      try {
-        LOG.info("Looping until corrupt replica is invalidated");
-        Thread.sleep(1000);
-      } catch (InterruptedException ignore) {
-      }
-      corruptReplicaSize = cluster.getNamesystem().
-                            numCorruptReplicas(blk);
-      blocks = dfsClient.getNamenode().
-                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
-      replicaCount = blocks.get(0).getLocations().length;
-    }
-    // Make sure block is healthy 
-    assertTrue(corruptReplicaSize == 0);
-    assertTrue(replicaCount == numReplicas);
-    assertTrue(blocks.get(0).isCorrupt() == false);
+    DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1, 
+        blk, 0);
     cluster.shutdown();
   }
   
@@ -386,36 +298,73 @@ public class TestDatanodeBlockScanner ex
   public void testTruncatedBlockReport() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     final short REPLICATION_FACTOR = (short)2;
+    final Path fileName = new Path("/file1");
+    String block; //block file name
+
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3L);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
 
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).build();
+    long startTime = System.currentTimeMillis();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+                                               .numDataNodes(REPLICATION_FACTOR)
+                                               .build();
     cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
+    
     try {
-      final Path fileName = new Path("/file1");
+      FileSystem fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
       DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
+      block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
+    } finally {
+      cluster.shutdown();
+    }
 
-      String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
-
+    // Restart cluster and confirm block is verified on datanode 0,
+    // then truncate it on datanode 0.
+    cluster = new MiniDFSCluster.Builder(conf)
+                                .numDataNodes(REPLICATION_FACTOR)
+                                .format(false)
+                                .build();
+    cluster.waitActive();
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      DatanodeInfo dn = new DatanodeInfo(cluster.getDataNodes().get(0).dnRegistration);
+      assertTrue(waitForVerification(dn, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
+      
       // Truncate replica of block
-      changeReplicaLength(block, 0, -1);
-
+      if (!changeReplicaLength(block, 0, -1)) {
+        throw new IOException(
+            "failed to find or change length of replica on node 0 "
+            + cluster.getDataNodes().get(0).getSelfAddr());
+      }      
+    } finally {
       cluster.shutdown();
+    }
 
-      // restart the cluster
-      cluster = new MiniDFSCluster.Builder(conf)
-                                  .numDataNodes(REPLICATION_FACTOR)
-                                  .format(false)
-                                  .build();
-      cluster.startDataNodes(conf, 1, true, null, null);
-      cluster.waitActive();  // now we have 3 datanodes
+    // Restart the cluster, add a node, and check that the truncated block is 
+    // handled correctly
+    cluster = new MiniDFSCluster.Builder(conf)
+                                .numDataNodes(REPLICATION_FACTOR)
+                                .format(false)
+                                .build();
+    cluster.startDataNodes(conf, 1, true, null, null);
+    cluster.waitActive();  // now we have 3 datanodes
+
+    // Assure the cluster has left safe mode.
+    cluster.waitClusterUp();
+    assertFalse("failed to leave safe mode", 
+        cluster.getNameNode().isInSafeMode());
 
-      // wait for truncated block be detected and the block to be replicated
+    try {
+      // wait for truncated block be detected by block scanner,
+      // and the block to be replicated
       DFSTestUtil.waitReplication(
           cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
       
       // Make sure that truncated block will be deleted
-      waitForBlockDeleted(block, 0);
+      waitForBlockDeleted(block, 0, TIMEOUT);
     } finally {
       cluster.shutdown();
     }
@@ -431,22 +380,35 @@ public class TestDatanodeBlockScanner ex
           MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
       if (blockFile.exists()) {
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
-        raFile.setLength(raFile.length()+lenDelta);
+        long origLen = raFile.length();
+        raFile.setLength(origLen + lenDelta);
         raFile.close();
+        LOG.info("assigned length " + (origLen + lenDelta) 
+            + " to block file " + blockFile.getPath()
+            + " on datanode " + dnIndex);
         return true;
       }
     }
+    LOG.info("failed to change length of block " + blockName);
     return false;
   }
   
-  private static void waitForBlockDeleted(String blockName, int dnIndex) 
-  throws IOException, InterruptedException {
+  private static void waitForBlockDeleted(String blockName, int dnIndex,
+      long timeout) 
+  throws IOException, TimeoutException, InterruptedException {
     File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
     File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1) + 
         MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
     File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2) + 
         MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
+    long failtime = System.currentTimeMillis() 
+                    + ((timeout > 0) ? timeout : Long.MAX_VALUE);
     while (blockFile1.exists() || blockFile2.exists()) {
+      if (failtime < System.currentTimeMillis()) {
+        throw new TimeoutException("waited too long for blocks to be deleted: "
+            + blockFile1.getPath() + (blockFile1.exists() ? " still exists; " : " is absent; ")
+            + blockFile2.getPath() + (blockFile2.exists() ? " still exists." : " is absent."));
+      }
       Thread.sleep(100);
     }
   }

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Apr 29 03:03:25 2011
@@ -44,6 +44,10 @@ import org.junit.Test;
 public class TestDistributedFileSystem {
   private static final Random RAN = new Random();
 
+  {
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private boolean dualPortTesting = false;
   
   private HdfsConfiguration getTestConfiguration() {
@@ -100,26 +104,94 @@ public class TestDistributedFileSystem {
   @Test
   public void testDFSClient() throws Exception {
     Configuration conf = getTestConfiguration();
+    final long grace = 1000L;
     MiniDFSCluster cluster = null;
 
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-      final Path filepath = new Path("/test/LeaseChecker/foo");
+      final String filepathstring = "/test/LeaseChecker/foo";
+      final Path[] filepaths = new Path[4];
+      for(int i = 0; i < filepaths.length; i++) {
+        filepaths[i] = new Path(filepathstring + i);
+      }
       final long millis = System.currentTimeMillis();
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
-  
-        //create a file
-        FSDataOutputStream out = dfs.create(filepath);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        dfs.dfs.leasechecker.setGraceSleepPeriod(grace);
+        assertFalse(dfs.dfs.leasechecker.isRunning());
   
-        //write something and close
-        out.writeLong(millis);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
-        out.close();
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        {
+          //create a file
+          final FSDataOutputStream out = dfs.create(filepaths[0]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //write something
+          out.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close
+          out.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.leasechecker.isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.leasechecker.isRunning());
+        }
+
+        {
+          //create file1
+          final FSDataOutputStream out1 = dfs.create(filepaths[1]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //create file2
+          final FSDataOutputStream out2 = dfs.create(filepaths[2]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+
+          //write something to file1
+          out1.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file1
+          out1.close();
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+
+          //write something to file2
+          out2.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file2
+          out2.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+        }
+
+        {
+          //create file3
+          final FSDataOutputStream out3 = dfs.create(filepaths[3]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          Thread.sleep(grace/4*3);
+          //passed previous grace period, should still running
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //write something to file3
+          out3.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file3
+          out3.close();
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.leasechecker.isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.leasechecker.isRunning());
+        }
+
         dfs.close();
       }
 
@@ -146,15 +218,15 @@ public class TestDistributedFileSystem {
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
 
         //open and check the file
-        FSDataInputStream in = dfs.open(filepath);
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        FSDataInputStream in = dfs.open(filepaths[0]);
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         assertEquals(millis, in.readLong());
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         in.close();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         dfs.close();
       }
       

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java Fri Apr 29 03:03:25 2011
@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.*;
+
 import java.io.File;
 import java.io.IOException;
+import java.io.FileNotFoundException;
 import java.net.InetSocketAddress;
 import java.util.List;
 
-import junit.framework.TestCase;
+import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -41,7 +44,7 @@ import org.apache.hadoop.hdfs.server.dat
  * This class tests the building blocks that are needed to
  * support HDFS appends.
  */
-public class TestFileAppend extends TestCase {
+public class TestFileAppend{
   boolean simulatedStorage = false;
 
   private static byte[] fileContents = null;
@@ -101,6 +104,7 @@ public class TestFileAppend extends Test
    * Test that copy on write for blocks works correctly
    * @throws IOException an exception might be thrown
    */
+  @Test
   public void testCopyOnWrite() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
@@ -171,6 +175,7 @@ public class TestFileAppend extends Test
    * Test a simple flush on a simple HDFS file.
    * @throws IOException an exception might be thrown
    */
+  @Test
   public void testSimpleFlush() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
@@ -226,6 +231,7 @@ public class TestFileAppend extends Test
    * Test that file data can be flushed.
    * @throws IOException an exception might be thrown
    */
+  @Test
   public void testComplexFlush() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
@@ -268,4 +274,26 @@ public class TestFileAppend extends Test
       cluster.shutdown();
     }
   }
+ 
+  /**
+   * FileNotFoundException is expected for appending to a non-exisiting file
+   * 
+   * @throws FileNotFoundException as the result
+   */
+  @Test(expected = FileNotFoundException.class)
+  public void testFileNotFound() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      Path file1 = new Path("/nonexistingfile.dat");
+      fs.append(file1);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java Fri Apr 29 03:03:25 2011
@@ -68,7 +68,7 @@ public class TestMissingBlocksAlert exte
 
       // Corrupt the block
       String block = DFSTestUtil.getFirstBlock(dfs, corruptFile).getBlockName();
-      TestDatanodeBlockScanner.corruptReplica(block, 0);
+      assertTrue(cluster.corruptReplica(block, 0));
 
       // read the file so that the corrupt block is reported to NN
       FSDataInputStream in = dfs.open(corruptFile); 

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java Fri Apr 29 03:03:25 2011
@@ -151,6 +151,7 @@ public class TestReplication extends Tes
     DFSClient dfsClient = null;
     LocatedBlocks blocks = null;
     int replicaCount = 0;
+    short replFactor = 1;
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
@@ -159,16 +160,19 @@ public class TestReplication extends Tes
   
     // Create file with replication factor of 1
     Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
-    DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
-    DFSTestUtil.waitReplication(fs, file1, (short)1);
+    DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
+    DFSTestUtil.waitReplication(fs, file1, replFactor);
   
     // Corrupt the block belonging to the created file
     String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
-    cluster.corruptBlockOnDataNodes(block);
-  
+
+    int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+    assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted); 
+
     // Increase replication factor, this should invoke transfer request
     // Receiving datanode fails on checksum and reports it to namenode
-    fs.setReplication(file1, (short)2);
+    replFactor = 2;
+    fs.setReplication(file1, replFactor);
   
     // Now get block details and check if the block is corrupt
     blocks = dfsClient.getNamenode().

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Fri Apr 29 03:03:25 2011
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.mortbay.log.Log;
 
 import junit.framework.TestCase;
 /**
@@ -53,6 +55,9 @@ public class TestBalancer extends TestCa
 
   ClientProtocol client;
 
+  static final long TIMEOUT = 20000L; //msec
+  static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
+  static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
   static final int DEFAULT_BLOCK_SIZE = 10;
   private Balancer balancer;
   private Random r = new Random();
@@ -186,28 +191,101 @@ public class TestBalancer extends TestCa
     cluster.shutdown();
   }
 
-  /* wait for one heartbeat */
-  private void waitForHeartBeat( long expectedUsedSpace, long expectedTotalSpace )
-  throws IOException {
-    long[] status = client.getStats();
-    while(status[0] != expectedTotalSpace || status[1] != expectedUsedSpace ) {
+  /**
+   * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, 
+   * summed over all nodes.  Times out after TIMEOUT msec.
+   * @param expectedUsedSpace
+   * @param expectedTotalSpace
+   * @throws IOException - if getStats() fails
+   * @throws TimeoutException
+   */
+  private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
+  throws IOException, TimeoutException {
+    long timeout = TIMEOUT;
+    long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+             : System.currentTimeMillis() + timeout;
+    
+    while (true) {
+      long[] status = client.getStats();
+      double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) 
+          / expectedTotalSpace;
+      double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) 
+          / expectedUsedSpace;
+      if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE 
+          && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
+        break; //done
+
+      if (System.currentTimeMillis() > failtime) {
+        throw new TimeoutException("Cluster failed to reached expected values of "
+            + "totalSpace (current: " + status[0] 
+            + ", expected: " + expectedTotalSpace 
+            + "), or usedSpace (current: " + status[1] 
+            + ", expected: " + expectedUsedSpace
+            + "), in more than " + timeout + " msec.");
+      }
       try {
         Thread.sleep(100L);
       } catch(InterruptedException ignored) {
       }
-      status = client.getStats();
     }
   }
+  
+  /**
+   * Wait until balanced: each datanode gives utilization within 
+   * BALANCE_ALLOWED_VARIANCE of average
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  private void waitForBalancer(long totalUsedSpace, long totalCapacity) 
+  throws IOException, TimeoutException {
+    long timeout = TIMEOUT;
+    long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+        : System.currentTimeMillis() + timeout;
+    final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
+    boolean balanced;
+    do {
+      DatanodeInfo[] datanodeReport = 
+          client.getDatanodeReport(DatanodeReportType.ALL);
+      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+      balanced = true;
+      for (DatanodeInfo datanode : datanodeReport) {
+        double nodeUtilization = ((double)datanode.getDfsUsed())
+            / datanode.getCapacity();
+        if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
+          balanced = false;
+          if (System.currentTimeMillis() > failtime) {
+            throw new TimeoutException(
+                "Rebalancing expected avg utilization to become "
+                + avgUtilization + ", but on datanode " + datanode
+                + " it remains at " + nodeUtilization
+                + " after more than " + TIMEOUT + " msec.");
+          }
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ignored) {
+          }
+          break;
+        }
+      }
+    } while (!balanced);
+  }
 
-  /* This test start a one-node cluster, fill the node to be 30% full;
-   * It then adds an empty node and start balancing.
-   * @param newCapacity new node's capacity
-   * @param new 
+  /** This test start a cluster with specified number of nodes, 
+   * and fills it to be 30% full (with a single file replicated identically
+   * to all datanodes);
+   * It then adds one new empty node and starts balancing.
+   * 
+   * @param conf - configuration
+   * @param capacities - array of capacities of original nodes in cluster
+   * @param racks - array of racks for original nodes in cluster
+   * @param newCapacity - new node's capacity
+   * @param newRack - new node's rack
+   * @throws Exception
    */
-  private void test(Configuration conf, long[] capacities, String[] racks, 
+  private void doTest(Configuration conf, long[] capacities, String[] racks, 
       long newCapacity, String newRack) throws Exception {
+    assertEquals(capacities.length, racks.length);
     int numOfDatanodes = capacities.length;
-    assertEquals(numOfDatanodes, racks.length);
     cluster = new MiniDFSCluster.Builder(conf)
                                 .numDataNodes(capacities.length)
                                 .racks(racks)
@@ -247,26 +325,8 @@ public class TestBalancer extends TestCa
     balancer.run(new String[0]);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
-    boolean balanced;
-    do {
-      DatanodeInfo[] datanodeReport = 
-        client.getDatanodeReport(DatanodeReportType.ALL);
-      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
-      balanced = true;
-      double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
-      for(DatanodeInfo datanode:datanodeReport) {
-        if(Math.abs(avgUtilization-
-            ((double)datanode.getDfsUsed())/datanode.getCapacity()*100)>10) {
-          balanced = false;
-          try {
-            Thread.sleep(100);
-          } catch(InterruptedException ignored) {
-          }
-          break;
-        }
-      }
-    } while(!balanced);
-
+    Log.info("Rebalancing.");
+    waitForBalancer(totalUsedSpace, totalCapacity);
   }
 
   private void runBalancerDefaultConstructor(Configuration conf,
@@ -279,37 +339,19 @@ public class TestBalancer extends TestCa
     balancer.run(new String[0]);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
-    boolean balanced;
-    do {
-      DatanodeInfo[] datanodeReport = client
-          .getDatanodeReport(DatanodeReportType.ALL);
-      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
-      balanced = true;
-      double avgUtilization = ((double) totalUsedSpace) / totalCapacity * 100;
-      for (DatanodeInfo datanode : datanodeReport) {
-        if (Math.abs(avgUtilization - ((double) datanode.getDfsUsed())
-            / datanode.getCapacity() * 100) > 10) {
-          balanced = false;
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException ignored) {
-          }
-          break;
-        }
-      }
-    } while (!balanced);
-
+    Log.info("Rebalancing with default ctor.");
+    waitForBalancer(totalUsedSpace, totalCapacity);
   }
   
   /** one-node cluster test*/
   private void oneNodeTest(Configuration conf) throws Exception {
     // add an empty node with half of the CAPACITY & the same rack
-    test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+    doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
   }
   
   /** two-node cluster test */
   private void twoNodeTest(Configuration conf) throws Exception {
-    test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
         CAPACITY, RACK2);
   }
   

Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Fri Apr 29 03:03:25 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 /**
@@ -57,4 +58,19 @@ public class NameNodeAdapter {
   public static Server getRpcServer(NameNode namenode) {
     return namenode.server;
   }
+
+  /**
+   * Return a tuple of the replica state (number racks, number live
+   * replicas, and number needed replicas) for the given block.
+   * @param namenode to proxy the invocation to.
+   */
+  public static int[] getReplicaInfo(NameNode namenode, Block b) {
+    FSNamesystem ns = namenode.getNamesystem();
+    ns.readLock();
+    int[] r = {ns.blockManager.getNumberOfRacks(b),
+               ns.blockManager.countNodes(b).liveReplicas(),
+               ns.blockManager.neededReplications.contains(b) ? 1 : 0};
+    ns.readUnlock();
+    return r;
+  }
 }