You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2010/06/11 00:25:39 UTC

svn commit: r953482 [1/2] - in /hadoop/common/branches/branch-0.20-append: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/server/common/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/nameno...

Author: dhruba
Date: Thu Jun 10 22:25:39 2010
New Revision: 953482

URL: http://svn.apache.org/viewvc?rev=953482&view=rev
Log:
HDFS-142. Blocks that are being written by a client are stored in the
blocksBeingWritten directory. 
(Dhruba Borthakur, Nicolas Spiegelberg, Todd Lipcon via dhruba)


Added:
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/FSImageAdapter.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/TestDFSConcurrentFileOperations.java
Modified:
    hadoop/common/branches/branch-0.20-append/CHANGES.txt
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Thu Jun 10 22:25:39 2010
@@ -15,6 +15,10 @@ Release 0.20-append - Unreleased
     HDFS-826. Allow a mechanism for an application to detect that 
     datanode(s) have died in the write pipeline. (dhruba)
 
+    HDFS-142. Blocks that are being written by a client are stored in the
+    blocksBeingWritten directory. 
+    (Dhruba Borthakur, Nicolas Spiegelberg, Todd Lipcon via dhruba)
+
   IMPROVEMENTS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Thu Jun 10 22:25:39 2010
@@ -2803,16 +2803,24 @@ public class DFSClient implements FSCons
                                  bytesPerChecksum);
         }
 
-        // setup pipeline to append to the last block XXX retries??
+        // setup pipeline to append to the last block
         nodes = lastBlock.getLocations();
         errorIndex = -1;   // no errors yet.
         if (nodes.length < 1) {
           throw new IOException("Unable to retrieve blocks locations " +
                                 " for last block " + block +
                                 "of file " + src);
-                        
         }
-        processDatanodeError(true, true);
+        // keep trying to setup a pipeline until you know all DNs are dead
+        while (processDatanodeError(true, true)) {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException  e) {
+          }
+        }
+        if (lastException != null) {
+          throw lastException;
+        }
         streamer.start();
       }
       else {

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Thu Jun 10 22:25:39 2010
@@ -51,5 +51,9 @@ public interface HdfsConstants {
   public static int WRITE_TIMEOUT = 8 * 60 * 1000;
   public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
 
+
+  // The lease holder for recovery initiated by the NameNode
+  public static final String NN_RECOVERY_LEASEHOLDER = "NN_Recovery";
+
 }
 

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Jun 10 22:25:39 2010
@@ -96,7 +96,8 @@ class BlockReceiver implements java.io.C
       //
       // Open local disk out
       //
-      streams = datanode.data.writeToBlock(block, isRecovery);
+      streams = datanode.data.writeToBlock(block, isRecovery,
+                              clientName == null || clientName.length() == 0);
       this.finalized = false;
       if (streams != null) {
         this.out = streams.dataOut;

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Thu Jun 10 22:25:39 2010
@@ -1425,7 +1425,7 @@ public class DataNode extends Configured
         + "), datanode=" + dnRegistration.getName());
     data.updateBlock(oldblock, newblock);
     if (finalize) {
-      data.finalizeBlock(newblock);
+      data.finalizeBlockIfNeeded(newblock);
       myMetrics.blocksWritten.inc(); 
       notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
       LOG.info("Received block " + newblock +

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Jun 10 22:25:39 2010
@@ -43,6 +43,24 @@ import org.apache.hadoop.hdfs.server.pro
  ***************************************************/
 public class FSDataset implements FSConstants, FSDatasetInterface {
 
+  /**
+   * A data structure than encapsulates a Block along with the full pathname
+   * of the block file
+   */
+  static class BlockAndFile implements Comparable<BlockAndFile> {
+    final Block block;
+    final File pathfile;
+
+    BlockAndFile(File fullpathname, Block block) {
+      this.pathfile = fullpathname;
+      this.block = block;
+    }
+
+    public int compareTo(BlockAndFile o)
+    {
+      return this.block.compareTo(o.block);
+    }
+  }
 
   /**
    * A node type that can be built into a tree reflecting the
@@ -193,6 +211,28 @@ public class FSDataset implements FSCons
       }
     }
 
+    /**
+     * Populate the given blockSet with any child blocks
+     * found at this node. With each block, return the full path
+     * of the block file.
+     */
+    void getBlockAndFileInfo(TreeSet<BlockAndFile> blockSet) {
+      if (children != null) {
+        for (int i = 0; i < children.length; i++) {
+          children[i].getBlockAndFileInfo(blockSet);
+        }
+      }
+
+      File blockFiles[] = dir.listFiles();
+      for (int i = 0; i < blockFiles.length; i++) {
+        if (Block.isBlockFilename(blockFiles[i])) {
+          long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
+          Block block = new Block(blockFiles[i], blockFiles[i].length(), genStamp);
+          blockSet.add(new BlockAndFile(blockFiles[i].getAbsoluteFile(), block));
+        }
+      }
+    }    
+    
     void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
@@ -289,6 +329,7 @@ public class FSDataset implements FSCons
   class FSVolume {
     private FSDir dataDir;
     private File tmpDir;
+    private File blocksBeingWritten;     // clients write here
     private File detachDir; // copy on write for blocks in snapshot
     private DF usage;
     private DU dfsUsage;
@@ -297,6 +338,7 @@ public class FSDataset implements FSCons
     
     FSVolume(File currentDir, Configuration conf) throws IOException {
       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
+      this.dataDir = new FSDir(currentDir);
       boolean supportAppends = conf.getBoolean("dfs.support.append", false);
       File parent = currentDir.getParentFile();
 
@@ -305,20 +347,30 @@ public class FSDataset implements FSCons
         recoverDetachedBlocks(currentDir, detachDir);
       }
 
-      // Files that were being written when the datanode was last shutdown
-      // are now moved back to the data directory. It is possible that
-      // in the future, we might want to do some sort of datanode-local
-      // recovery for these blocks. For example, crc validation.
-      //
+      // remove all blocks from "tmp" directory. These were either created
+      // by pre-append clients (0.18.x) or are part of replication request.
+      // They can be safely removed.
       this.tmpDir = new File(parent, "tmp");
       if (tmpDir.exists()) {
+        FileUtil.fullyDelete(tmpDir);
+      }
+      
+      // Files that were being written when the datanode was last shutdown
+      // should not be deleted.
+      blocksBeingWritten = new File(parent, "blocksBeingWritten");
+      if (blocksBeingWritten.exists()) {
         if (supportAppends) {
-          recoverDetachedBlocks(currentDir, tmpDir);
+          recoverBlocksBeingWritten(blocksBeingWritten);
         } else {
-          FileUtil.fullyDelete(tmpDir);
+          FileUtil.fullyDelete(blocksBeingWritten);
+        }
+      }
+      
+      if (!blocksBeingWritten.mkdirs()) {
+        if (!blocksBeingWritten.isDirectory()) {
+          throw new IOException("Mkdirs failed to create " + blocksBeingWritten.toString());
         }
       }
-      this.dataDir = new FSDir(currentDir);
       if (!tmpDir.mkdirs()) {
         if (!tmpDir.isDirectory()) {
           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
@@ -371,8 +423,13 @@ public class FSDataset implements FSCons
      * Temporary files. They get moved to the real block directory either when
      * the block is finalized or the datanode restarts.
      */
-    File createTmpFile(Block b) throws IOException {
-      File f = new File(tmpDir, b.getBlockName());
+    File createTmpFile(Block b, boolean replicationRequest) throws IOException {
+      File f= null;
+      if (!replicationRequest) {
+        f = new File(blocksBeingWritten, b.getBlockName());
+      } else {
+        f = new File(tmpDir, b.getBlockName());
+      }
       return createTmpFile(b, f);
     }
 
@@ -404,6 +461,7 @@ public class FSDataset implements FSCons
       try {
         fileCreated = f.createNewFile();
       } catch (IOException ioe) {
+        DataNode.LOG.warn("createTmpFile failed for file " + f + " Block " + b);
         throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
       }
       if (!fileCreated) {
@@ -423,12 +481,13 @@ public class FSDataset implements FSCons
     void checkDirs() throws DiskErrorException {
       dataDir.checkDirTree();
       DiskChecker.checkDir(tmpDir);
+      DiskChecker.checkDir(blocksBeingWritten);
     }
       
     void getBlockInfo(TreeSet<Block> blockSet) {
       dataDir.getBlockInfo(blockSet);
     }
-      
+
     void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
       dataDir.getVolumeMap(volumeMap, this);
     }
@@ -442,6 +501,29 @@ public class FSDataset implements FSCons
     }
 
     /**
+     * Recover blocks that were being written when the datanode
+     * was earlier shut down. These blocks get re-inserted into
+     * ongoingCreates. Also, send a blockreceived message to the NN
+     * for each of these blocks because these are not part of a 
+     * block report.
+     */
+    private void recoverBlocksBeingWritten(File bbw) throws IOException {
+      FSDir fsd = new FSDir(bbw);
+      TreeSet<BlockAndFile> blockSet = new TreeSet<BlockAndFile>();
+      fsd.getBlockAndFileInfo(blockSet);
+      for (BlockAndFile b : blockSet) {
+        File f = b.pathfile;  // full path name of block file
+        volumeMap.put(b.block, new DatanodeBlockInfo(this, f));
+        ongoingCreates.put(b.block, new ActiveFile(f));
+        if (DataNode.LOG.isDebugEnabled()) {
+          DataNode.LOG.debug("recoverBlocksBeingWritten for block " + b.block);
+        }
+        DataNode.getDataNode().notifyNamenodeReceivedBlock(b.block, 
+                               DataNode.EMPTY_DEL_HINT);
+      }
+    } 
+    
+    /**
      * Recover detached files on datanode restart. If a detached block
      * does not exist in the original directory, then it is moved to the
      * original directory.
@@ -572,6 +654,11 @@ public class FSDataset implements FSCons
       }
       threads.add(Thread.currentThread());
     }
+
+    // no active threads associated with this ActiveFile
+    ActiveFile(File f) {
+      file = f;
+    }
     
     public String toString() {
       return getClass().getSimpleName() + "(file=" + file
@@ -592,7 +679,7 @@ public class FSDataset implements FSCons
   }
 
   /** Find the corresponding meta data file from a given block file */
-  private static File findMetaFile(final File blockFile) throws IOException {
+  static File findMetaFile(final File blockFile) throws IOException {
     final String prefix = blockFile.getName() + "_";
     final File parent = blockFile.getParentFile();
     File[] matches = parent.listFiles(new FilenameFilter() {
@@ -676,7 +763,7 @@ public class FSDataset implements FSCons
   FSVolumeSet volumes;
   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
-  private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
+  HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();;
   static  Random random = new Random();
   
   /**
@@ -689,7 +776,6 @@ public class FSDataset implements FSCons
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
     }
     volumes = new FSVolumeSet(volArray);
-    volumeMap = new HashMap<Block, DatanodeBlockInfo>();
     volumes.getVolumeMap(volumeMap);
     registerMBean(storage.getStorageID());
   }
@@ -761,7 +847,10 @@ public class FSDataset implements FSCons
       throw new IOException("Block " + b + " does not exist in volumeMap.");
     }
     FSVolume v = info.getVolume();
-    File blockFile = v.getTmpFile(b);
+    File blockFile = info.getFile();
+    if (blockFile == null) {
+      blockFile = v.getTmpFile(b);
+    }
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
     if (blkOffset > 0) {
       blockInFile.seek(blkOffset);
@@ -813,6 +902,22 @@ public class FSDataset implements FSCons
       throw new IOException("Cannot update oldblock (=" + oldblock
           + ") to newblock (=" + newblock + ").");
     }
+
+
+    // Protect against a straggler updateblock call moving a block backwards
+    // in time.
+    boolean isValidUpdate =
+      (newblock.getGenerationStamp() > oldblock.getGenerationStamp()) ||
+      (newblock.getGenerationStamp() == oldblock.getGenerationStamp() &&
+       newblock.getNumBytes() == oldblock.getNumBytes());
+
+    if (!isValidUpdate) {
+      throw new IOException(
+        "Cannot update oldblock=" + oldblock +
+        " to newblock=" + newblock + " since generation stamps must " +
+        "increase, or else length must not change.");
+    }
+
     
     for(;;) {
       final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
@@ -829,6 +934,7 @@ public class FSDataset implements FSCons
           t.join();
         } catch (InterruptedException e) {
           DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
+          break; // retry with new threadlist from the beginning
         }
       }
     }
@@ -907,13 +1013,13 @@ public class FSDataset implements FSCons
     return null;
   }
 
-  static private void truncateBlock(File blockFile, File metaFile,
+  static void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     if (newlen == oldlen) {
       return;
     }
     if (newlen > oldlen) {
-      throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
+      throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
           + ") to newlen (=" + newlen + ")");
     }
 
@@ -973,8 +1079,11 @@ public class FSDataset implements FSCons
       volumeMap.put(b, v);
       volumeMap.put(b, v);
    * other threads that might be writing to this block, and then reopen the file.
+   * If replicationRequest is true, then this operation is part of a block
+   * replication request.
    */
-  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
+  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery,
+                           boolean replicationRequest) throws IOException {
     //
     // Make sure the block isn't a valid one - we're still creating it!
     //
@@ -1019,8 +1128,7 @@ public class FSDataset implements FSCons
       if (!isRecovery) {
         v = volumes.getNextVolume(blockSize);
         // create temporary file to hold block in the designated volume
-        f = createTmpFile(v, b);
-        volumeMap.put(b, new DatanodeBlockInfo(v, f));
+        f = createTmpFile(v, b, replicationRequest);
       } else if (f != null) {
         DataNode.LOG.info("Reopen already-open Block for append " + b);
         // create or reuse temporary file to hold block in the designated volume
@@ -1030,7 +1138,7 @@ public class FSDataset implements FSCons
         // reopening block for appending to it.
         DataNode.LOG.info("Reopen Block for append " + b);
         v = volumeMap.get(b).getVolume();
-        f = createTmpFile(v, b);
+        f = createTmpFile(v, b, replicationRequest);
         File blkfile = getBlockFile(b);
         File oldmeta = getMetaFile(b);
         File newmeta = getMetaFile(f, b);
@@ -1056,7 +1164,6 @@ public class FSDataset implements FSCons
                                   " to tmp dir " + f);
           }
         }
-        volumeMap.put(b, new DatanodeBlockInfo(v, f));
       }
       if (f == null) {
         DataNode.LOG.warn("Block " + b + " reopen failed " +
@@ -1064,6 +1171,14 @@ public class FSDataset implements FSCons
         throw new IOException("Block " + b + " reopen failed " +
                               " Unable to locate tmp file.");
       }
+      // If this is a replication request, then this is not a permanent
+      // block yet, it could get removed if the datanode restarts. If this
+      // is a write or append request, then it is a valid block.
+      if (replicationRequest) {
+        volumeMap.put(b, new DatanodeBlockInfo(v));
+      } else {
+        volumeMap.put(b, new DatanodeBlockInfo(v, f));
+      }
       ongoingCreates.put(b, new ActiveFile(f, threads));
     }
 
@@ -1105,32 +1220,29 @@ public class FSDataset implements FSCons
   public void setChannelPosition(Block b, BlockWriteStreams streams, 
                                  long dataOffset, long ckOffset) 
                                  throws IOException {
-    long size = 0;
-    synchronized (this) {
-      FSVolume vol = volumeMap.get(b).getVolume();
-      size = vol.getTmpFile(b).length();
-    }
-    if (size < dataOffset) {
+    FileOutputStream file = (FileOutputStream) streams.dataOut;
+    if (file.getChannel().size() < dataOffset) {
       String msg = "Trying to change block file offset of block " + b +
+                     " file " + volumeMap.get(b).getVolume().getTmpFile(b) +
                      " to " + dataOffset +
                      " but actual size of file is " +
-                     size;
+                     file.getChannel().size();
       throw new IOException(msg);
     }
-    FileOutputStream file = (FileOutputStream) streams.dataOut;
     file.getChannel().position(dataOffset);
     file = (FileOutputStream) streams.checksumOut;
     file.getChannel().position(ckOffset);
   }
 
-  synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
+  synchronized File createTmpFile( FSVolume vol, Block blk,
+                        boolean replicationRequest) throws IOException {
     if ( vol == null ) {
       vol = volumeMap.get( blk ).getVolume();
       if ( vol == null ) {
         throw new IOException("Could not find volume for block " + blk);
       }
     }
-    return vol.createTmpFile(blk);
+    return vol.createTmpFile(blk, replicationRequest);
   }
 
   //
@@ -1141,13 +1253,29 @@ public class FSDataset implements FSCons
   // we can GC it safely.
   //
 
+
+  @Override
+  public void finalizeBlock(Block b) throws IOException {
+    finalizeBlockInternal(b, false);
+  }
+
+  @Override
+  public void finalizeBlockIfNeeded(Block b) throws IOException {
+    finalizeBlockInternal(b, true);
+  }
+
   /**
    * Complete the block write!
    */
-  public synchronized void finalizeBlock(Block b) throws IOException {
+  private synchronized void finalizeBlockInternal(Block b, boolean reFinalizeOk) 
+    throws IOException {
     ActiveFile activeFile = ongoingCreates.get(b);
     if (activeFile == null) {
-      throw new IOException("Block " + b + " is already finalized.");
+      if (reFinalizeOk) {
+        return;
+      } else {
+        throw new IOException("Block " + b + " is already finalized.");
+      }
     }
     File f = activeFile.file;
     if (f == null || !f.exists()) {
@@ -1166,6 +1294,28 @@ public class FSDataset implements FSCons
   }
 
   /**
+   * is this block finalized? Returns true if the block is already
+   * finalized, otherwise returns false.
+   */
+  private synchronized boolean isFinalized(Block b) {
+    FSVolume v = volumeMap.get(b).getVolume();
+    if (v == null) {
+      DataNode.LOG.warn("No volume for block " + b);
+      return false;             // block is not finalized
+    }
+    ActiveFile activeFile = ongoingCreates.get(b);
+    if (activeFile == null) {
+      return true;            // block is already finalized
+    }
+    File f = activeFile.file;
+    if (f == null || !f.exists()) {
+      // we shud never get into this position.
+      DataNode.LOG.warn("No temporary file " + f + " for block " + b);
+    }
+    return false;             // block is not finalized
+  }
+  
+  /**
    * Remove the temporary block file (if any)
    */
   public synchronized void unfinalizeBlock(Block b) throws IOException {
@@ -1226,7 +1376,8 @@ public class FSDataset implements FSCons
    * Check whether the given block is a valid one.
    */
   public boolean isValidBlock(Block b) {
-    return validateBlockFile(b) != null;
+    File f = validateBlockFile(b);
+    return ((f != null) ? isFinalized(b) : false);
   }
 
   /**
@@ -1244,7 +1395,7 @@ public class FSDataset implements FSCons
   }
 
   /** {@inheritDoc} */
-  public void validateBlockMetadata(Block b) throws IOException {
+  public synchronized void validateBlockMetadata(Block b) throws IOException {
     DatanodeBlockInfo info = volumeMap.get(b);
     if (info == null) {
       throw new IOException("Block " + b + " does not exist in volumeMap.");
@@ -1289,6 +1440,30 @@ public class FSDataset implements FSCons
                             " does not match meta file stamp " +
                             stamp);
     }
+    // verify that checksum file has an integral number of checkum values.
+    DataChecksum dcs = BlockMetadataHeader.readHeader(meta).getChecksum();
+    int checksumsize = dcs.getChecksumSize();
+    long actual = meta.length() - BlockMetadataHeader.getHeaderSize();
+    long numChunksInMeta = actual/checksumsize;
+    if (actual % checksumsize != 0) {
+      throw new IOException("Block " + b +
+                            " has a checksum file of size " + meta.length() +
+                            " but it does not align with checksum size of " +
+                            checksumsize);
+    }
+    int bpc = dcs.getBytesPerChecksum();
+    long minDataSize = (numChunksInMeta - 1) * bpc;
+    long maxDataSize = numChunksInMeta * bpc;
+    if (f.length() > maxDataSize || f.length() <= minDataSize) {
+      throw new IOException("Block " + b +
+                            " is of size " + f.length() +
+                            " but has " + (numChunksInMeta + 1) +
+                            " checksums and each checksum size is " +
+                            checksumsize + " bytes.");
+    }
+    // We could crc-check the entire block here, but it will be a costly 
+    // operation. Instead we rely on the above check (file length mismatch)
+    // to detect corrupt blocks.
   }
 
   /**

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Jun 10 22:25:39 2010
@@ -169,12 +169,14 @@ public interface FSDatasetInterface exte
   /**
    * Creates the block and returns output streams to write data and CRC
    * @param b
-   * @param isRecovery True if this is part of erro recovery, otherwise false
+   * @param isRecovery True if this is part of error recovery, otherwise false
+   * @param isReplicationRequest True if this is part of block replication request
    * @return a BlockWriteStreams object to allow writing the block data
    *  and CRC
    * @throws IOException
    */
-  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
+  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery, 
+                                        boolean isReplicationRequest) throws IOException;
 
   /**
    * Update the block to the new generation stamp and length.  
@@ -191,6 +193,14 @@ public interface FSDatasetInterface exte
   public void finalizeBlock(Block b) throws IOException;
 
   /**
+   * Finalizes the block previously opened for writing using writeToBlock 
+   * if not already finalized
+   * @param b
+   * @throws IOException
+   */
+  public void finalizeBlockIfNeeded(Block b) throws IOException;
+
+  /**
    * Unfinalizes the block previously opened for writing using writeToBlock.
    * The temporary file associated with this block is deleted.
    * @param b
@@ -200,6 +210,7 @@ public interface FSDatasetInterface exte
 
   /**
    * Returns the block report - the full list of blocks stored
+   * Returns only finalized blocks
    * @return - the block report - the full list of blocks stored
    */
   public Block[] getBlockReport();
@@ -228,7 +239,7 @@ public interface FSDatasetInterface exte
      * Stringifies the name of the storage
      */
   public String toString();
-  
+ 
   /**
    * Shutdown the FSDataset
    */

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Thu Jun 10 22:25:39 2010
@@ -367,6 +367,11 @@ class BlocksMap {
     return map.get(b);
   }
 
+  /** Return the block object without matching against generation stamp. */
+  BlockInfo getStoredBlockWithoutMatchingGS(Block b) {
+    return map.get(new Block(b.getBlockId()));
+  }
+
   /** Returned Iterator does not support. */
   Iterator<DatanodeDescriptor> nodeIterator(Block b) {
     return new NodeIterator(map.get(b));

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Thu Jun 10 22:25:39 2010
@@ -421,8 +421,13 @@ public class DatanodeDescriptor extends 
     // collect blocks that have not been reported
     // all of them are next to the delimiter
     Iterator<Block> it = new BlockIterator(delimiter.getNext(0), this);
-    while(it.hasNext())
-      toRemove.add(it.next());
+    while(it.hasNext()) {
+      BlockInfo storedBlock = (BlockInfo)it.next();
+      INodeFile file = storedBlock.getINode();
+      if (file == null || !file.isUnderConstruction()) {
+        toRemove.add(storedBlock);
+      }
+    }
     this.removeBlock(delimiter);
   }
 

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jun 10 22:25:39 2010
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -236,6 +237,8 @@ public class FSNamesystem implements FSC
   private int minReplication;
   // Default replication
   private int defaultReplication;
+  // Variable to stall new replication checks for testing purposes
+  private boolean stallReplicationWork = false;
   // heartbeatRecheckInterval is how often namenode checks for expired datanodes
   private long heartbeatRecheckInterval;
   // heartbeatExpireInterval is how long namenode waits for datanode to report
@@ -1072,7 +1075,8 @@ public class FSNamesystem implements FSC
         // period, then start lease recovery.
         //
         if (lease.expiredSoftLimit()) {
-          LOG.info("startFile: recover lease " + lease + ", src=" + src);
+          LOG.info("startFile: recover lease " + lease + ", src=" + src +
+                   " from client " + pendingFile.clientName);
           internalReleaseLease(lease, src);
         }
         throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
@@ -1174,7 +1178,9 @@ public class FSNamesystem implements FSC
     //
     LocatedBlock lb = null;
     synchronized (this) {
-      INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
+      // Need to re-check existence here, since the file may have been deleted
+      // in between the synchronized blocks
+      INodeFileUnderConstruction file = checkLease(src, holder);
 
       Block[] blocks = file.getBlocks();
       if (blocks != null && blocks.length > 0) {
@@ -1203,9 +1209,15 @@ public class FSNamesystem implements FSC
           // remove this block from the list of pending blocks to be deleted. 
           // This reduces the possibility of triggering HADOOP-1349.
           //
-          for(Collection<Block> v : recentInvalidateSets.values()) {
+          for (Iterator<Collection<Block>> iter = recentInvalidateSets.values().iterator();
+               iter.hasNext();
+               ) {
+            Collection<Block> v = iter.next();
             if (v.remove(last)) {
               pendingDeletionBlocksCount--;
+              if (v.isEmpty()) {
+                iter.remove();
+              }
             }
           }
         }
@@ -1887,7 +1899,7 @@ public class FSNamesystem implements FSC
       if (pendingFile.getBlocks().length == 0) {
         finalizeINodeFileUnderConstruction(src, pendingFile);
         NameNode.stateChangeLog.warn("BLOCK*"
-          + " internalReleaseLease: No blocks found, lease removed.");
+          + " internalReleaseLease: No blocks found, lease removed for " +  src);
         return;
       }
       // setup the Inode.targets for the last block from the blocksMap
@@ -1904,11 +1916,24 @@ public class FSNamesystem implements FSC
     }
     // start lease recovery of the last block for this file.
     pendingFile.assignPrimaryDatanode();
-    leaseManager.renewLease(lease);
+    Lease reassignedLease = reassignLease(
+      lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);
+    leaseManager.renewLease(reassignedLease);
+  }
+
+  private Lease reassignLease(Lease lease, String src, String newHolder,
+                      INodeFileUnderConstruction pendingFile) {
+    if(newHolder == null)
+      return lease;
+    pendingFile.setClientName(newHolder);
+    return leaseManager.reassignLease(lease, src, newHolder);
   }
 
+
   private void finalizeINodeFileUnderConstruction(String src,
       INodeFileUnderConstruction pendingFile) throws IOException {
+    NameNode.stateChangeLog.info("Removing lease on  file " + src + 
+                                 " from client " + pendingFile.clientName);
     leaseManager.removeLease(pendingFile.clientName, src);
 
     // The file is no longer pending.
@@ -1968,13 +1993,22 @@ public class FSNamesystem implements FSC
       // There should be no locations in the blocksMap till now because the
       // file is underConstruction
       DatanodeDescriptor[] descriptors = null;
-      if (newtargets.length > 0) {
-        descriptors = new DatanodeDescriptor[newtargets.length];
-        for(int i = 0; i < newtargets.length; i++) {
-          descriptors[i] = getDatanode(newtargets[i]);
-          descriptors[i].addBlock(newblockinfo);
+      List<DatanodeDescriptor> descriptorsList =
+        new ArrayList<DatanodeDescriptor>(newtargets.length);
+      for(int i = 0; i < newtargets.length; i++) {
+        DatanodeDescriptor node =
+          datanodeMap.get(newtargets[i].getStorageID());
+        if (node != null) {
+          node.addBlock(newblockinfo);
+          descriptorsList.add(node);
+        } else {
+          LOG.warn("commitBlockSynchronization included a target DN " +
+            newtargets[i] + " which is not known to DN. Ignoring.");
         }
       }
+      if (!descriptorsList.isEmpty()) {
+        descriptors = descriptorsList.toArray(new DatanodeDescriptor[0]);
+      }
       // add locations into the INodeUnderConstruction
       pendingFile.setLastBlock(newblockinfo, descriptors);
     }
@@ -2437,6 +2471,11 @@ public class FSNamesystem implements FSC
    */
   private int computeReplicationWork(
                                   int blocksToProcess) throws IOException {
+    // stall only useful for unit tests (see TestFileAppend4.java)
+    if (stallReplicationWork)  {
+      return 0;
+    }
+    
     // Choose the blocks to be replicated
     List<List<Block>> blocksToReplicate = 
       chooseUnderReplicatedBlocks(blocksToProcess);
@@ -2559,7 +2598,7 @@ public class FSNamesystem implements FSC
       }
     }
 
-    // choose replication targets: NOT HODING THE GLOBAL LOCK
+    // choose replication targets: NOT HOLDING THE GLOBAL LOCK
     DatanodeDescriptor targets[] = replicator.chooseTarget(
         requiredReplication - numEffectiveReplicas,
         srcNode, containingNodes, null, block.getNumBytes());
@@ -2970,12 +3009,14 @@ public class FSNamesystem implements FSC
                                     DatanodeDescriptor delNodeHint) {
     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if (storedBlock == null) {
-      // if the block with a WILDCARD generation stamp matches and the
-      // corresponding file is under construction, then accept this block.
-      // This block has a diferent generation stamp on the datanode 
-      // because of a lease-recovery-attempt.
-      Block nblk = new Block(block.getBlockId());
-      storedBlock = blocksMap.getStoredBlock(nblk);
+      // If we have a block in the block map with the same ID, but a different
+      // generation stamp, and the corresponding file is under construction,
+      // then we need to do some special processing.
+      storedBlock = blocksMap.getStoredBlockWithoutMatchingGS(block);
+
+      // If the block ID is valid, and it either (a) belongs to a file under
+      // construction, or (b) the reported genstamp is higher than what we
+      // know about, then we accept the block.
       if (storedBlock != null && storedBlock.getINode() != null &&
           (storedBlock.getGenerationStamp() <= block.getGenerationStamp() ||
            storedBlock.getINode().isUnderConstruction())) {
@@ -2994,9 +3035,8 @@ public class FSNamesystem implements FSC
                                    + block + " on " + node.getName()
                                    + " size " + block.getNumBytes()
                                    + " But it does not belong to any file.");
-      // we could add this block to invalidate set of this datanode. 
-      // it will happen in next block report otherwise.
-      return block;      
+      addToInvalidates(block, node);
+      return block;
     }
      
     // add block to the data-node
@@ -3116,6 +3156,18 @@ public class FSNamesystem implements FSC
     INodeFile fileINode = null;
     fileINode = storedBlock.getINode();
     if (fileINode.isUnderConstruction()) {
+      INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
+      Block[] blocks = fileINode.getBlocks();
+      // If this is the last block of this
+      // file, then set targets. This enables lease recovery to occur.
+      // This is especially important after a restart of the NN.
+      Block last = blocks[blocks.length-1];
+      if (last.equals(storedBlock)) {
+        Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
+        for (int i = 0; it != null && it.hasNext(); i++) {
+          cons.addTarget(it.next());
+        }
+      }
       return block;
     }
 
@@ -3721,6 +3773,9 @@ public class FSNamesystem implements FSC
   short getMaxReplication()     { return (short)maxReplication; }
   short getMinReplication()     { return (short)minReplication; }
   short getDefaultReplication() { return (short)defaultReplication; }
+  
+  public void stallReplicationWork()   { stallReplicationWork = true;   }
+  public void restartReplicationWork() { stallReplicationWork = false;  }
     
   /**
    * A immutable object that stores the number of live replicas and
@@ -4750,7 +4805,7 @@ public class FSNamesystem implements FSC
       throw new IOException(msg);
     }
     if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
-      String msg = block + " is beening recovered, ignoring this request.";
+      String msg = block + " is already being recovered, ignoring this request.";
       LOG.info(msg);
       throw new IOException(msg);
     }

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Thu Jun 10 22:25:39 2010
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -25,7 +27,7 @@ import org.apache.hadoop.hdfs.server.nam
 
 
 class INodeFileUnderConstruction extends INodeFile {
-  final String clientName;         // lease holder
+  String clientName;         // lease holder
   private final String clientMachine;
   private final DatanodeDescriptor clientNode; // if client is a cluster node too.
 
@@ -68,6 +70,10 @@ class INodeFileUnderConstruction extends
     return clientName;
   }
 
+  void setClientName(String newName) {
+    clientName = newName;
+  }
+
   String getClientMachine() {
     return clientMachine;
   }
@@ -93,6 +99,30 @@ class INodeFileUnderConstruction extends
     this.primaryNodeIndex = -1;
   }
 
+  /**
+   * add this target if it does not already exists
+   */
+  void addTarget(DatanodeDescriptor node) {
+    if (this.targets == null) {
+      this.targets = new DatanodeDescriptor[0];
+    }
+
+    for (int j = 0; j < this.targets.length; j++) {
+      if (this.targets[j].equals(node)) {
+        return;  // target already exists
+      }
+    }
+      
+    // allocate new data structure to store additional target
+    DatanodeDescriptor[] newt = new DatanodeDescriptor[targets.length + 1];
+    for (int i = 0; i < targets.length; i++) {
+      newt[i] = this.targets[i];
+    }
+    newt[targets.length] = node;
+    this.targets = newt;
+    this.primaryNodeIndex = -1;
+  }
+
   //
   // converts a INodeFileUnderConstruction into a INodeFile
   // use the modification time as the access time
@@ -132,10 +162,29 @@ class INodeFileUnderConstruction extends
 
   synchronized void setLastBlock(BlockInfo newblock, DatanodeDescriptor[] newtargets
       ) throws IOException {
-    if (blocks == null) {
+    if (blocks == null || blocks.length == 0) {
       throw new IOException("Trying to update non-existant block (newblock="
           + newblock + ")");
     }
+    BlockInfo oldLast = blocks[blocks.length - 1];
+    if (oldLast.getBlockId() != newblock.getBlockId()) {
+      // This should not happen - this means that we're performing recovery
+      // on an internal block in the file!
+      NameNode.stateChangeLog.error(
+        "Trying to commit block synchronization for an internal block on"
+        + " inode=" + this
+        + " newblock=" + newblock + " oldLast=" + oldLast);
+      throw new IOException("Trying to update an internal block of " +
+                            "pending file " + this);
+    }
+
+    if (oldLast.getGenerationStamp() > newblock.getGenerationStamp()) {
+      NameNode.stateChangeLog.warn(
+        "Updating last block " + oldLast + " of inode " +
+        "under construction " + this + " with a block that " +
+        "has an older generation stamp: " + newblock);
+    }
+
     blocks[blocks.length - 1] = newblock;
     setTargets(newtargets);
     lastRecoveryTime = 0;

Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java Thu Jun 10 22:25:39 2010
@@ -102,7 +102,7 @@ public class LeaseManager {
   /**
    * Adds (or re-adds) the lease for the specified file.
    */
-  synchronized void addLease(String holder, String src) {
+  synchronized Lease addLease(String holder, String src) {
     Lease lease = getLease(holder);
     if (lease == null) {
       lease = new Lease(holder);
@@ -113,6 +113,7 @@ public class LeaseManager {
     }
     sortedLeasesByPath.put(src, lease);
     lease.paths.add(src);
+    return lease;
   }
 
   /**
@@ -133,6 +134,17 @@ public class LeaseManager {
   }
 
   /**
+   * Reassign lease for file src to the new holder.
+   */
+  synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
+    assert newHolder != null : "new lease holder is null";
+    if (lease != null) {
+      removeLease(lease, src);
+    }
+    return addLease(newHolder, src);
+  }
+
+  /**
    * Remove the lease for the specified holder and src
    */
   synchronized void removeLease(String holder, String src) {

Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java Thu Jun 10 22:25:39 2010
@@ -267,4 +267,15 @@ public class DFSTestUtil extends TestCas
     IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
     return out.toString();
   }
+
+
+  public static byte[] generateSequentialBytes(int start, int length) {
+    byte[] result = new byte[length];
+
+    for (int i = 0; i < length; i++) {
+      result[i] = (byte)((start + i) % 127);
+    }
+
+    return result;
+  }  
 }

Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Thu Jun 10 22:25:39 2010
@@ -31,6 +31,7 @@ import javax.security.auth.login.LoginEx
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -725,17 +726,56 @@ public class MiniDFSCluster {
                                                    getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
 
-    // make sure all datanodes are alive
-    while(client.datanodeReport(DatanodeReportType.LIVE).length
-        != numDataNodes) {
+    // make sure all datanodes are alive and sent heartbeat
+    while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE))) {
       try {
-        Thread.sleep(500);
-      } catch (Exception e) {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
       }
     }
 
     client.close();
   }
+
+  private synchronized boolean shouldWait(DatanodeInfo[] dnInfo) {
+    if (dnInfo.length != numDataNodes) {
+      return true;
+    }
+    // make sure all datanodes have sent first heartbeat to namenode,
+    // using (capacity == 0) as proxy.
+    for (DatanodeInfo dn : dnInfo) {
+      if (dn.getCapacity() == 0) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Wait for the given datanode to heartbeat once.
+   */
+  public void waitForDNHeartbeat(int dnIndex, long timeoutMillis)
+    throws IOException, InterruptedException {
+    DataNode dn = getDataNodes().get(dnIndex);
+    InetSocketAddress addr = new InetSocketAddress("localhost",
+                                                   getNameNodePort());
+    DFSClient client = new DFSClient(addr, conf);
+
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() < startTime + timeoutMillis) {
+      DatanodeInfo report[] = client.datanodeReport(DatanodeReportType.LIVE);
+
+      for (DatanodeInfo thisReport : report) {
+        if (thisReport.getStorageID().equals(
+              dn.dnRegistration.getStorageID())) {
+          if (thisReport.getLastUpdate() > startTime)
+            return;
+        }
+      }
+
+      Thread.sleep(500);
+    }
+  }
   
   public void formatDataNodeDirs() throws IOException {
     base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/");

Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=953482&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Thu Jun 10 22:25:39 2010
@@ -0,0 +1,944 @@
+package org.apache.hadoop.hdfs;
+
+import junit.framework.TestCase;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+/* File Append tests for HDFS-200 & HDFS-142, specifically focused on:
+ *  using append()/sync() to recover block information
+ */
+public class TestFileAppend4 extends TestCase {
+  static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
+  static final long BLOCK_SIZE = 1024;
+  static final long BBW_SIZE = 500; // don't align on bytes/checksum
+
+  static final Object [] NO_ARGS = new Object []{};
+
+  Configuration conf;
+  MiniDFSCluster cluster;
+  Path file1;
+  FSDataOutputStream stm;
+  boolean simulatedStorage = false;
+
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    this.conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    conf.setBoolean("dfs.support.append", true);
+
+    // lower heartbeat interval for fast recognition of DN death
+    conf.setInt("heartbeat.recheck.interval", 1000);
+    conf.setInt("dfs.heartbeat.interval", 1);
+    conf.setInt("dfs.socket.timeout", 5000);
+    // handle under-replicated blocks quickly (for replication asserts)
+//    conf.set("dfs.replication.pending.timeout.sec", Integer.toString(5));
+    conf.setInt("dfs.replication.pending.timeout.sec", 5);
+    conf.setInt("dfs.replication.interval", 1);
+    // handle failures in the DFSClient pipeline quickly
+    // (for cluster.shutdown(); fs.close() idiom)
+    conf.setInt("ipc.client.connect.max.retries", 1);
+    conf.setInt("dfs.client.block.recovery.retries", 1);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    
+  }
+  
+  private void createFile(FileSystem whichfs, String filename, 
+                  int rep, long fileSize) throws Exception {
+    file1 = new Path(filename);
+    stm = whichfs.create(file1, true, (int)fileSize+1, (short)rep, BLOCK_SIZE);
+    LOG.info("Created file " + filename);
+    LOG.info("Writing " + fileSize + " bytes to " + file1);
+    AppendTestUtil.write(stm, 0, (int)fileSize);
+  }
+  
+  private void assertFileSize(FileSystem whichfs, long expectedSize) throws Exception {
+    LOG.info("reading length of " + file1.getName() + " on namenode");
+    long realSize = whichfs.getFileStatus(file1).getLen();
+    assertTrue("unexpected file size! received=" + realSize 
+                                + " , expected=" + expectedSize, 
+               realSize == expectedSize);
+  }
+  
+  private void assertNumCurrentReplicas(short rep) throws Exception {
+    OutputStream hdfs_out = stm.getWrappedStream();
+    Method r = hdfs_out.getClass().getMethod("getNumCurrentReplicas",
+                                             new Class<?> []{});
+    r.setAccessible(true);
+    int actualRepl = ((Integer)r.invoke(hdfs_out, NO_ARGS)).intValue();
+    assertTrue(file1 + " should be replicated to " + rep + " datanodes, not " +
+               actualRepl + ".", actualRepl == rep);
+  }
+  
+  private void loseLeases(FileSystem whichfs) throws Exception {
+    LOG.info("leasechecker.interruptAndJoin()");
+    // lose the lease on the client
+    DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
+    dfs.dfs.leasechecker.interruptAndJoin();
+  }
+
+  /*
+   * Recover file.
+   * Try and open file in append mode.
+   * Doing this, we get a hold of the file that crashed writer
+   * was writing to.  Once we have it, close it.  This will
+   * allow subsequent reader to see up to last sync.
+   * NOTE: This is the same algorithm that HBase uses for file recovery
+   * @param fs
+   * @throws Exception
+   */
+  private void recoverFile(final FileSystem fs) throws Exception {
+    LOG.info("Recovering File Lease");
+
+    // set the soft limit to be 1 second so that the
+    // namenode triggers lease recovery upon append request
+    cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
+
+    // Trying recovery
+    int tries = 60;
+    boolean recovered = false;
+    FSDataOutputStream out = null;
+    while (!recovered && tries-- > 0) {
+      try {
+        out = fs.append(file1);
+        LOG.info("Successfully opened for appends");
+        recovered = true;
+      } catch (IOException e) {
+        LOG.info("Failed open for append, waiting on lease recovery");
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          // ignore it and try again
+        }
+      }
+    }
+    if (out != null) {
+      try {
+        out.close();
+        LOG.info("Successfully obtained lease");
+      } catch (IOException e) {
+        LOG.info("Unable to close file after opening for appends. " + e);
+        recovered = false;
+      }
+//      out.close();
+    }
+    if (!recovered) {
+      fail((tries > 0) ? "Recovery failed" : "Recovery should take < 1 min");
+    }
+    LOG.info("Past out lease recovery");
+  }
+  
+  // Waits for all of the blocks to have expected replication
+  private void waitForBlockReplication(FileSystem whichfs, String filename, 
+                                       int expected, long maxWaitSec) 
+                                       throws IOException {
+    long start = System.currentTimeMillis();
+    
+    //wait for all the blocks to be replicated;
+    LOG.info("Checking for block replication for " + filename);
+    int iters = 0;
+    while (true) {
+      boolean replOk = true;
+
+      BlockLocation[] bl = whichfs.getFileBlockLocations(
+          whichfs.getFileStatus(file1), 0, BLOCK_SIZE);
+      if(bl.length == 0) {
+        replOk = false;
+      }
+      for (BlockLocation b : bl) {
+        
+        int actual = b.getNames().length;
+        if ( actual < expected ) {
+          if (true || iters > 0) {
+            LOG.info("Not enough replicas for " + b +
+                               " yet. Expecting " + expected + ", got " + 
+                               actual + ".");
+          }
+          replOk = false;
+          break;
+        }
+      }
+      
+      if (replOk) {
+        return;
+      }
+      
+      iters++;
+      
+      if (maxWaitSec > 0 && 
+          (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
+        throw new IOException("Timedout while waiting for all blocks to " +
+                              " be replicated for " + filename);
+      }
+      
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {}
+    }
+  }
+
+  private void checkFile(FileSystem whichfs, long fileSize) throws Exception {
+    LOG.info("validating content from datanodes...");
+    AppendTestUtil.check(whichfs, file1, fileSize);
+  }
+  
+  private void corruptDatanode(int dnNumber) throws Exception {
+    // get the FS data of the 2nd datanode
+    File data_dir = new File(System.getProperty("test.build.data"),
+                             "dfs/data/data" + 
+                             Integer.toString(dnNumber*2 + 1) + 
+                             "/blocksBeingWritten");
+    int corrupted = 0;
+    for (File block : data_dir.listFiles()) {
+      // only touch the actual data, not the metadata (with CRC)
+      if (block.getName().startsWith("blk_") && 
+         !block.getName().endsWith("meta")) {
+        RandomAccessFile file = new RandomAccessFile(block, "rw");
+        FileChannel channel = file.getChannel();
+
+        Random r = new Random();
+        long lastBlockSize = channel.size() % 512;
+        long position = channel.size() - lastBlockSize;
+        int length = r.nextInt((int)(channel.size() - position + 1));
+        byte[] buffer = new byte[length];
+        r.nextBytes(buffer);
+
+        channel.write(ByteBuffer.wrap(buffer), position);
+        System.out.println("Deliberately corrupting file " + block.getName() + 
+                           " at offset " + position +
+                           " length " + length);
+        file.close();
+        ++corrupted;
+      }
+    }
+    assertTrue("Should have some data in bbw to corrupt", corrupted > 0);
+  }
+
+  // test [1 bbw, 0 HDFS block]
+  public void testAppendSyncBbw() throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs1 = cluster.getFileSystem();;
+    FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+    try {
+      createFile(fs1, "/bbw.test", 1, BBW_SIZE);
+      stm.sync();
+      // empty before close()
+      assertFileSize(fs1, 0); 
+      loseLeases(fs1);
+      recoverFile(fs2);
+      // close() should write recovered bbw to HDFS block
+      assertFileSize(fs2, BBW_SIZE); 
+      checkFile(fs2, BBW_SIZE);
+    } finally {
+      fs2.close();
+      fs1.close();
+      cluster.shutdown();
+    }
+    LOG.info("STOP");
+  }
+
+  // test [1 bbw, 0 HDFS block] with cluster restart
+  public void testAppendSyncBbwClusterRestart() throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    FileSystem fs2 = null;
+    try {
+      createFile(fs1, "/bbwRestart.test", 1, BBW_SIZE);
+      stm.sync();
+      // empty before close()
+      assertFileSize(fs1, 0); 
+
+      cluster.shutdown();
+      fs1.close(); // same as: loseLeases()
+      LOG.info("STOPPED first instance of the cluster");
+
+      cluster = new MiniDFSCluster(conf, 1, false, null);
+      cluster.waitActive();
+      LOG.info("START second instance.");
+
+      fs2 = cluster.getFileSystem();
+
+      recoverFile(fs2);
+      
+      // close() should write recovered bbw to HDFS block
+      assertFileSize(fs2, BBW_SIZE); 
+      checkFile(fs2, BBW_SIZE);
+
+    } finally {
+      if(fs2 != null) {
+        fs2.close();
+      }
+      fs1.close();
+      cluster.shutdown();
+    }
+    LOG.info("STOP");
+  }
+
+
+  // test [3 bbw, 0 HDFS block] with cluster restart
+  // ** previous HDFS-142 patches hit an problem with multiple outstanding bbw on a single disk**
+  public void testAppendSync2XBbwClusterRestart() throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    // assumption: this MiniDFS starts up 1 datanode with 2 dirs to load balance
+    assertTrue(cluster.getDataNodes().get(0).getConf().get("dfs.data.dir").matches("[^,]+,[^,]*"));
+    FileSystem fs1 = cluster.getFileSystem();
+    FileSystem fs2 = null;
+    try {
+      // create 3 bbw files [so at least one dir has 2 files]
+      int[] files = new int[]{0,1,2};
+      Path[] paths = new Path[files.length];
+      FSDataOutputStream[] stms = new FSDataOutputStream[files.length];
+      for (int i : files ) {
+        createFile(fs1, "/bbwRestart" + i + ".test", 1, BBW_SIZE);
+        stm.sync();
+        assertFileSize(fs1, 0); 
+        paths[i] = file1;
+        stms[i] = stm;
+      }
+
+      cluster.shutdown();
+      fs1.close(); // same as: loseLeases()
+      LOG.info("STOPPED first instance of the cluster");
+
+      cluster = new MiniDFSCluster(conf, 1, false, null);
+      cluster.waitActive();
+      LOG.info("START second instance.");
+
+      fs2 = cluster.getFileSystem();
+      
+      // recover 3 bbw files
+      for (int i : files) {
+        file1 = paths[i];
+        recoverFile(fs2);
+        assertFileSize(fs2, BBW_SIZE); 
+        checkFile(fs2, BBW_SIZE);
+      }
+    } finally {
+      if(fs2 != null) {
+        fs2.close();
+      }
+      fs1.close();
+      cluster.shutdown();
+    }
+    LOG.info("STOP");
+  }  
+  // test [1 bbw, 1 HDFS block]
+  public void testAppendSyncBlockPlusBbw() throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs1 = cluster.getFileSystem();;
+    FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+    try {
+      createFile(fs1, "/blockPlusBbw.test", 1, BLOCK_SIZE + BBW_SIZE);
+      // 0 before sync()
+      assertFileSize(fs1, 0); 
+      stm.sync();
+      // BLOCK_SIZE after sync()
+      assertFileSize(fs1, BLOCK_SIZE); 
+      loseLeases(fs1);
+      recoverFile(fs2);
+      // close() should write recovered bbw to HDFS block
+      assertFileSize(fs2, BLOCK_SIZE + BBW_SIZE); 
+      checkFile(fs2, BLOCK_SIZE + BBW_SIZE);
+    } finally {
+      stm = null;
+      fs2.close();
+      fs1.close();
+      cluster.shutdown();
+    }
+    LOG.info("STOP");
+  }
+
+  // we test different datanodes restarting to exercise 
+  // the start, middle, & end of the DFSOutputStream pipeline
+  public void testAppendSyncReplication0() throws Exception {
+    replicationTest(0);
+  }
+  public void testAppendSyncReplication1() throws Exception {
+    replicationTest(1);
+  }
+  public void testAppendSyncReplication2() throws Exception {
+    replicationTest(2);
+  }
+  
+  void replicationTest(int badDN) throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      int halfBlock = (int)BLOCK_SIZE/2;
+      short rep = 3; // replication
+      assertTrue(BLOCK_SIZE%4 == 0);
+
+      file1 = new Path("/appendWithReplication.dat");
+
+      // write 1/2 block & sync
+      stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
+      AppendTestUtil.write(stm, 0, halfBlock);
+      stm.sync();
+      assertNumCurrentReplicas(rep);
+      
+      // close one of the datanodes
+      cluster.stopDataNode(badDN);
+      
+      // write 1/4 block & sync
+      AppendTestUtil.write(stm, halfBlock, (int)BLOCK_SIZE/4);
+      stm.sync();
+      assertNumCurrentReplicas((short)(rep - 1));
+      
+      // restart the cluster
+      /* 
+       * we put the namenode in safe mode first so he doesn't process 
+       * recoverBlock() commands from the remaining DFSClient as datanodes 
+       * are serially shutdown
+       */
+      cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      cluster.shutdown();
+      fs1.close();
+      LOG.info("STOPPED first instance of the cluster");
+      cluster = new MiniDFSCluster(conf, 3, false, null);
+      cluster.getNameNode().getNamesystem().stallReplicationWork();
+      cluster.waitActive();
+      fs1 = cluster.getFileSystem();
+      LOG.info("START second instance.");
+
+      recoverFile(fs1);
+      
+      // the 2 DNs with the larger sequence number should win
+      BlockLocation[] bl = fs1.getFileBlockLocations(
+          fs1.getFileStatus(file1), 0, BLOCK_SIZE);
+      assertTrue("Should have one block", bl.length == 1);
+      assertTrue("Should have 2 replicas for that block, not " + 
+                 bl[0].getNames().length, bl[0].getNames().length == 2);  
+
+      assertFileSize(fs1, BLOCK_SIZE*3/4);
+      checkFile(fs1, BLOCK_SIZE*3/4);
+
+      // verify that, over time, the block has been replicated to 3 DN
+      cluster.getNameNode().getNamesystem().restartReplicationWork();
+      waitForBlockReplication(fs1, file1.toString(), 3, 20);
+    } finally {
+      fs1.close();
+      cluster.shutdown();
+    }
+  }
+
+  // we test different datanodes restarting to exercise 
+  // the start, middle, & end of the DFSOutputStream pipeline
+  public void testAppendSyncChecksum0() throws Exception {
+    checksumTest(0);
+  }
+  public void testAppendSyncChecksum1() throws Exception {
+    checksumTest(1);
+  }
+  public void testAppendSyncChecksum2() throws Exception {
+    checksumTest(2);
+  }
+
+  void checksumTest(int goodDN) throws Exception {
+    int deadDN = (goodDN + 1) % 3;
+    int corruptDN  = (goodDN + 2) % 3;
+    
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      int halfBlock = (int)BLOCK_SIZE/2;
+      short rep = 3; // replication
+      assertTrue(BLOCK_SIZE%8 == 0);
+
+      file1 = new Path("/appendWithReplication.dat");
+
+      // write 1/2 block & sync
+      stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
+      AppendTestUtil.write(stm, 0, halfBlock);
+      stm.sync();
+      assertNumCurrentReplicas(rep);
+      
+      // close one of the datanodes
+      cluster.stopDataNode(deadDN);
+      
+      // write 1/4 block & sync
+      AppendTestUtil.write(stm, halfBlock, (int)BLOCK_SIZE/4);
+      stm.sync();
+      assertNumCurrentReplicas((short)(rep - 1));
+      
+      // stop the cluster
+      cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      cluster.shutdown();
+      fs1.close();
+      LOG.info("STOPPED first instance of the cluster");
+
+      // give the second datanode a bad CRC
+      corruptDatanode(corruptDN);
+      
+      // restart the cluster
+      cluster = new MiniDFSCluster(conf, 3, false, null);
+      cluster.getNameNode().getNamesystem().stallReplicationWork();
+      cluster.waitActive();
+      fs1 = cluster.getFileSystem();
+      LOG.info("START second instance.");
+
+      // verify that only the good datanode's file is used
+      recoverFile(fs1);
+
+      BlockLocation[] bl = fs1.getFileBlockLocations(
+          fs1.getFileStatus(file1), 0, BLOCK_SIZE);
+      assertTrue("Should have one block", bl.length == 1);
+      assertTrue("Should have 1 replica for that block, not " + 
+          bl[0].getNames().length, bl[0].getNames().length == 1);  
+
+      assertTrue("The replica should be the datanode with the correct CRC",
+                 cluster.getDataNodes().get(goodDN).getSelfAddr().toString()
+                   .endsWith(bl[0].getNames()[0]) );
+      assertFileSize(fs1, BLOCK_SIZE*3/4);
+
+      // should fail checkFile() if data with the bad CRC was used
+      checkFile(fs1, BLOCK_SIZE*3/4);
+
+      // ensure proper re-replication
+      cluster.getNameNode().getNamesystem().restartReplicationWork();
+      waitForBlockReplication(fs1, file1.toString(), 3, 20);
+    } finally {
+      cluster.shutdown();
+      fs1.close();
+    }
+  }
+  
+  // we test different datanodes dying and not coming back
+  public void testDnDeath0() throws Exception {
+    dnDeathTest(0);
+  }
+  public void testDnDeath1() throws Exception {
+    dnDeathTest(1);
+  }
+  public void testDnDeath2() throws Exception {
+    dnDeathTest(2);
+  }
+
+  /**
+   * Test case that writes and completes a file, and then
+   * tries to recover the file after the old primary
+   * DN has failed.
+   */
+  void dnDeathTest(int badDN) throws Exception {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      int halfBlock = (int)BLOCK_SIZE/2;
+      short rep = 3; // replication
+      assertTrue(BLOCK_SIZE%4 == 0);
+
+      file1 = new Path("/dnDeath.dat");
+
+      // write 1/2 block & close
+      stm = fs1.create(file1, true, (int)BLOCK_SIZE*2, rep, BLOCK_SIZE);
+      AppendTestUtil.write(stm, 0, halfBlock);
+      stm.close();
+      
+      // close one of the datanodes
+      cluster.stopDataNode(badDN);
+
+      // Recover the lease
+      recoverFile(fs1);
+      checkFile(fs1, halfBlock);
+    } finally {
+      fs1.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test case that stops a writer after finalizing a block but
+   * before calling completeFile, and then tries to recover
+   * the lease.
+   */
+  public void testRecoverFinalizedBlock() throws Throwable {
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+
+    try {
+      cluster.waitActive();
+      NameNode preSpyNN = cluster.getNameNode();
+      NameNode spyNN = spy(preSpyNN);
+
+      // Delay completeFile
+      DelayAnswer delayer = new DelayAnswer();
+      doAnswer(delayer).when(spyNN).complete(anyString(), anyString());
+
+      DFSClient client = new DFSClient(null, spyNN, conf, null);
+      file1 = new Path("/testRecoverFinalized");
+      final OutputStream stm = client.create("/testRecoverFinalized", true);
+
+      // write 1/2 block
+      AppendTestUtil.write(stm, 0, 4096);
+      final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+      Thread t = new Thread() { 
+          public void run() {
+            try {
+              stm.close();
+            } catch (Throwable t) {
+              err.set(t);
+            }
+          }};
+      t.start();
+      LOG.info("Waiting for close to get to latch...");
+      delayer.waitForCall();
+
+      // At this point, the block is finalized on the DNs, but the file
+      // has not been completed in the NN.
+      // Lose the leases
+      LOG.info("Killing lease checker");
+      client.leasechecker.interruptAndJoin();
+
+      FileSystem fs1 = cluster.getFileSystem();
+      FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(
+        fs1.getConf());
+
+      LOG.info("Recovering file");
+      recoverFile(fs2);
+
+      LOG.info("Telling close to proceed.");
+      delayer.proceed();
+      LOG.info("Waiting for close to finish.");
+      t.join();
+      LOG.info("Close finished.");
+
+      // We expect that close will get a "Could not complete file"
+      // error.
+      Throwable thrownByClose = err.get();
+      assertNotNull(thrownByClose);
+      assertTrue(thrownByClose instanceof IOException);
+      if (!thrownByClose.getMessage().contains("Could not complete write")) {
+        throw thrownByClose;
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test for an intermittent failure of commitBlockSynchronization.
+   * This could happen if the DN crashed between calling updateBlocks
+   * and commitBlockSynchronization.
+   */
+  public void testDatanodeFailsToCommit() throws Throwable {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs1 = cluster.getFileSystem();;
+    FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+    try {
+      createFile(fs1, "/datanodeFailsCommit.test", 1, BBW_SIZE);
+      stm.sync();
+      loseLeases(fs1);
+
+      // Make the NN fail to commitBlockSynchronization one time
+      NameNode nn = cluster.getNameNode();
+      nn.namesystem = spy(nn.namesystem);
+      doAnswer(new ThrowNTimesAnswer(IOException.class, 1)).
+        when(nn.namesystem).
+        commitBlockSynchronization((Block)anyObject(), anyInt(), anyInt(),
+                                   anyBoolean(), anyBoolean(),
+                                   (DatanodeID[])anyObject());
+
+      recoverFile(fs2);
+      // close() should write recovered bbw to HDFS block
+      assertFileSize(fs2, BBW_SIZE); 
+      checkFile(fs2, BBW_SIZE);
+    } finally {
+      fs2.close();
+      fs1.close();
+      cluster.shutdown();
+    }
+    LOG.info("STOP");
+  }
+
+  /**
+   * Test that when a DN starts up with bbws from a file that got
+   * removed or finalized when it was down, the block gets deleted.
+   */
+  public void testBBWCleanupOnStartup() throws Throwable {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 3, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    try {
+      int halfBlock = (int) BLOCK_SIZE / 2;
+      short rep = 3; // replication
+      assertTrue(BLOCK_SIZE % 4 == 0);
+
+      file1 = new Path("/bbwCleanupOnStartup.dat");
+
+      // write 1/2 block & sync
+      stm = fs1.create(file1, true, (int) BLOCK_SIZE * 2, rep, BLOCK_SIZE);
+      AppendTestUtil.write(stm, 0, halfBlock);
+      stm.sync();
+
+      String dataDirs = cluster.getDataNodes().get(0).getConf().get("dfs.data.dir");
+      // close one of the datanodes
+      MiniDFSCluster.DataNodeProperties dnprops = cluster.stopDataNode(0);
+
+      stm.close();
+
+      List<File> bbwFilesAfterShutdown = getBBWFiles(dataDirs);
+      assertEquals(1, bbwFilesAfterShutdown.size());
+
+      assertTrue(cluster.restartDataNode(dnprops));
+
+      List<File> bbwFilesAfterRestart = null;
+      // Wait up to 10 heartbeats for the files to get removed - it should
+      // really happen after just a couple.
+      for (int i = 0; i < 10; i++) {
+        LOG.info("Waiting for heartbeat #" + i + " after DN restart");
+        cluster.waitForDNHeartbeat(0, 10000);
+
+        // Check if it has been deleted
+        bbwFilesAfterRestart = getBBWFiles(dataDirs);
+        if (bbwFilesAfterRestart.size() == 0) {
+          break;
+        }
+      }
+
+      assertEquals(0, bbwFilesAfterRestart.size());
+
+    } finally {
+      fs1.close();
+      cluster.shutdown();
+    }
+  }
+
+  private List<File> getBBWFiles(String dfsDataDirs) {
+    ArrayList<File> files = new ArrayList<File>();
+    for (String dirString : dfsDataDirs.split(",")) {
+      File dir = new File(dirString);
+      assertTrue("data dir " + dir + " should exist",
+        dir.exists());
+      File bbwDir = new File(dir, "blocksBeingWritten");
+      assertTrue("bbw dir " + bbwDir + " should eixst",
+        bbwDir.exists());
+      for (File blockFile : bbwDir.listFiles()) {
+        if (!blockFile.getName().endsWith(".meta")) {
+          files.add(blockFile);
+        }
+      }
+    }
+    return files;
+  }
+
+  /**
+   * Test for following sequence:
+   * 1. Client finishes writing a block, but does not allocate next one
+   * 2. Client loses lease
+   * 3. Recovery process starts, but commitBlockSynchronization not called yet
+   * 4. Client calls addBlock and continues writing
+   * 5. commitBlockSynchronization proceeds
+   * 6. Original client tries to write/close
+   */
+  public void testRecoveryOnBlockBoundary() throws Throwable {
+    LOG.info("START");
+    cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs1 = cluster.getFileSystem();
+    ;
+    final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+
+    // Allow us to delay commitBlockSynchronization
+    DelayAnswer delayer = new DelayAnswer();
+    NameNode nn = cluster.getNameNode();
+    nn.namesystem = spy(nn.namesystem);
+    doAnswer(delayer).
+      when(nn.namesystem).
+      commitBlockSynchronization((Block) anyObject(), anyInt(), anyInt(),
+        anyBoolean(), anyBoolean(),
+        (DatanodeID[]) anyObject());
+
+    try {
+      file1 = new Path("/testWritingDuringRecovery.test");
+      stm = fs1.create(file1, true, (int) BLOCK_SIZE * 2, (short) 3, BLOCK_SIZE);
+      AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
+      stm.sync();
+
+      LOG.info("Losing lease");
+      loseLeases(fs1);
+
+
+      LOG.info("Triggering recovery in another thread");
+
+      final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+      Thread recoverThread = new Thread() {
+        public void run() {
+          try {
+            recoverFile(fs2);
+          } catch (Throwable t) {
+            err.set(t);
+          }
+        }
+      };
+      recoverThread.start();
+
+      LOG.info("Waiting for recovery about to call commitBlockSynchronization");
+      delayer.waitForCall();
+
+      LOG.info("Continuing to write to stream");
+      AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
+      try {
+        stm.sync();
+        fail("Sync was allowed after recovery started");
+      } catch (IOException ioe) {
+        LOG.info("Got expected IOE trying to write to a file from the writer " +
+          "that lost its lease", ioe);
+      }
+
+      LOG.info("Written more to stream, allowing commit to proceed");
+      delayer.proceed();
+
+      LOG.info("Joining on recovery thread");
+      recoverThread.join();
+      if (err.get() != null) {
+        throw err.get();
+      }
+
+      LOG.info("Now that recovery has finished, still expect further writes to fail.");
+      try {
+        AppendTestUtil.write(stm, 0, (int) (BLOCK_SIZE));
+        stm.sync();
+        fail("Further writes after recovery finished did not fail!");
+      } catch (IOException ioe) {
+        LOG.info("Got expected exception", ioe);
+      }
+
+
+      LOG.info("Checking that file looks good");
+
+      // close() should write recovered only the first successful
+      // writes
+      assertFileSize(fs2, BLOCK_SIZE);
+      checkFile(fs2, BLOCK_SIZE);
+    } finally {
+      try {
+        fs2.close();
+        fs1.close();
+        cluster.shutdown();
+      } catch (Throwable t) {
+        LOG.warn("Didn't close down cleanly", t);
+      }
+    }
+    LOG.info("STOP");
+  }
+
+  /**
+   * Mockito answer helper that triggers one latch as soon as the
+   * method is called, then waits on another before continuing.
+   */
+  private static class DelayAnswer implements Answer {
+    private final CountDownLatch fireLatch = new CountDownLatch(1);
+    private final CountDownLatch waitLatch = new CountDownLatch(1);
+
+    /**
+     * Wait until the method is called.
+     */
+    public void waitForCall() throws InterruptedException {
+      fireLatch.await();
+    }
+
+    /**
+     * Tell the method to proceed.
+     * This should only be called after waitForCall()
+     */
+    public void proceed() {
+      waitLatch.countDown();
+    }
+
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      LOG.info("DelayAnswer firing fireLatch");
+      fireLatch.countDown();
+      try {
+        LOG.info("DelayAnswer waiting on waitLatch");
+        waitLatch.await();
+        LOG.info("DelayAnswer delay complete");
+      } catch (InterruptedException ie) {
+        throw new IOException("Interrupted waiting on latch", ie);
+      }
+      return invocation.callRealMethod();
+    }
+  }
+
+  /**
+   * Mockito answer helper that will throw an exception a given number
+   * of times before eventually succeding.
+   */
+  private static class ThrowNTimesAnswer implements Answer {
+    private int numTimesToThrow;
+    private Class<? extends Throwable> exceptionClass;
+
+    public ThrowNTimesAnswer(Class<? extends Throwable> exceptionClass,
+                             int numTimesToThrow) {
+      this.exceptionClass = exceptionClass;
+      this.numTimesToThrow = numTimesToThrow;
+    }
+
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      if (numTimesToThrow-- > 0) {
+        throw exceptionClass.newInstance();
+      }
+
+      return invocation.callRealMethod();
+    }
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=953482&r1=953481&r2=953482&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java Thu Jun 10 22:25:39 2010
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetTestUtil;
+
 import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 
@@ -106,9 +108,10 @@ public class TestLeaseRecovery extends j
       //update blocks with random block sizes
       Block[] newblocks = new Block[REPLICATION_NUM];
       for(int i = 0; i < REPLICATION_NUM; i++) {
+        DataNode dn = datanodes[i];
+        FSDatasetTestUtil.truncateBlock(dn, lastblock, newblocksizes[i]);
         newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
             lastblock.getGenerationStamp());
-        idps[i].updateBlock(lastblock, newblocks[i], false);
         checkMetaInfo(newblocks[i], idps[i]);
       }
 

Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java?rev=953482&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java Thu Jun 10 22:25:39 2010
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.io.File;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+public abstract class FSDatasetTestUtil {
+
+  /**
+   * Truncate the given block in place, such that the new truncated block
+   * is still valid (ie checksums are updated to stay in sync with block file)
+   */
+  public static void truncateBlock(DataNode dn,
+                                   Block block,
+                                   long newLength)
+    throws IOException
+  {
+    FSDataset ds = (FSDataset)dn.data;
+    
+    File blockFile = ds.findBlockFile(block.getBlockId());
+    if (blockFile == null) {
+      throw new IOException("Can't find block file for block " +
+                            block + " on DN " + dn);
+    }
+    File metaFile = ds.findMetaFile(blockFile);
+    FSDataset.truncateBlock(blockFile, metaFile,
+                            block.getNumBytes(), newLength);
+  }
+
+}
\ No newline at end of file