You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:06:08 UTC

svn commit: r885143 [10/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs...

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Nov 28 20:05:56 2009
@@ -17,19 +17,21 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import javax.management.NotCompliantMBeanException;
@@ -41,18 +43,24 @@
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.mortbay.log.Log;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.io.IOUtils;
 
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
@@ -120,8 +128,9 @@
         if ( ! metaData.renameTo( newmeta ) ||
             ! src.renameTo( dest ) ) {
           throw new IOException( "could not move files for " + b +
-                                 " from tmp to " + 
-                                 dest.getAbsolutePath() );
+                                 " from " + src + " to " + 
+                                 dest.getAbsolutePath() + " or from"
+                                 + metaData + " to " + newmeta);
         }
         if (DataNode.LOG.isDebugEnabled()) {
           DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
@@ -166,46 +175,48 @@
       return children[ lastChildIdx ].addBlock(b, src, true, false); 
     }
 
-    /** Find the metadata file for the specified block file.
-     * Return the generation stamp from the name of the metafile.
-     */
-    long getGenerationStampFromFile(File[] listdir, File blockFile) {
-      String blockName = blockFile.getName();
-      for (int j = 0; j < listdir.length; j++) {
-        String path = listdir[j].getName();
-        if (!path.startsWith(blockName)) {
-          continue;
-        }
-        if (blockFile == listdir[j]) {
-          continue;
-        }
-        return Block.getGenerationStamp(listdir[j].getName());
-      }
-      DataNode.LOG.warn("Block " + blockFile + 
-                        " does not have a metafile!");
-      return Block.GRANDFATHER_GENERATION_STAMP;
-    }
-
-    void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
+    void getVolumeMap(ReplicasMap volumeMap, FSVolume volume) 
+    throws IOException {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
           children[i].getVolumeMap(volumeMap, volume);
         }
       }
 
-      File blockFiles[] = dir.listFiles();
-      if (blockFiles == null) {
-        throw new IllegalStateException("Not a valid directory: " + dir);
-      }
-      for (int i = 0; i < blockFiles.length; i++) {
-        if (Block.isBlockFilename(blockFiles[i])) {
-          long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
-          volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp), 
-                        new ReplicaInfo(volume, blockFiles[i]));
+      recoverTempUnlinkedBlock();
+      volume.addToReplicasMap(volumeMap, dir, true);
+    }
+        
+    /**
+     * Recover unlinked tmp files on datanode restart. If the original block
+     * does not exist, then the tmp file is renamed to be the
+     * original file name; otherwise the tmp file is deleted.
+     */
+    private void recoverTempUnlinkedBlock() throws IOException {
+      File files[] = dir.listFiles();
+      for (File file : files) {
+        if (!FSDataset.isUnlinkTmpFile(file)) {
+          continue;
+        }
+        File blockFile = getOrigFile(file);
+        if (blockFile.exists()) {
+          //
+          // If the original block file still exists, then no recovery
+          // is needed.
+          //
+          if (!file.delete()) {
+            throw new IOException("Unable to cleanup unlinked tmp file " +
+                file);
+          }
+        } else {
+          if (!file.renameTo(blockFile)) {
+            throw new IOException("Unable to cleanup detached file " +
+                file);
+          }
         }
       }
     }
-        
+    
     /**
      * check if a data diretory is healthy
      * @throws DiskErrorException
@@ -283,9 +294,10 @@
   }
 
   class FSVolume {
-    private FSDir dataDir;
-    private File tmpDir;
-    private File detachDir; // copy on write for blocks in snapshot
+    private File currentDir;
+    private FSDir dataDir;      // directory store Finalized replica
+    private File rbwDir;        // directory store RBW replica
+    private File tmpDir;        // directory store Temporary replica
     private DF usage;
     private DU dfsUsage;
     private long reserved;
@@ -293,13 +305,10 @@
     
     FSVolume(File currentDir, Configuration conf) throws IOException {
       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
-      boolean supportAppends = conf.getBoolean("dfs.support.append", false);
+      this.currentDir = currentDir; 
       File parent = currentDir.getParentFile();
-
-      this.detachDir = new File(parent, "detach");
-      if (detachDir.exists()) {
-        recoverDetachedBlocks(currentDir, detachDir);
-      }
+      final File finalizedDir = new File(
+          currentDir, DataStorage.STORAGE_DIR_FINALIZED);
 
       // Files that were being written when the datanode was last shutdown
       // are now moved back to the data directory. It is possible that
@@ -308,30 +317,38 @@
       //
       this.tmpDir = new File(parent, "tmp");
       if (tmpDir.exists()) {
-        if (supportAppends) {
-          recoverDetachedBlocks(currentDir, tmpDir);
-        } else {
-          FileUtil.fullyDelete(tmpDir);
+        FileUtil.fullyDelete(tmpDir);
+      }
+      this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
+      if (rbwDir.exists() && !supportAppends) {
+        FileUtil.fullyDelete(rbwDir);
+      }
+      this.dataDir = new FSDir(finalizedDir);
+      if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
+        if (!rbwDir.isDirectory()) {
+          throw new IOException("Mkdirs failed to create " + rbwDir.toString());
         }
       }
-      this.dataDir = new FSDir(currentDir);
       if (!tmpDir.mkdirs()) {
         if (!tmpDir.isDirectory()) {
           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
         }
       }
-      if (!detachDir.mkdirs()) {
-        if (!detachDir.isDirectory()) {
-          throw new IOException("Mkdirs failed to create " + detachDir.toString());
-        }
-      }
       this.usage = new DF(parent, conf);
       this.dfsUsage = new DU(parent, conf);
       this.dfsUsage.start();
     }
 
+    File getCurrentDir() {
+      return currentDir;
+    }
+    
     void decDfsUsed(long value) {
-      dfsUsage.decDfsUsed(value);
+      // The caller to this method (BlockFileDeleteTask.run()) does
+      // not have locked FSDataset.this yet.
+      synchronized(FSDataset.this) {
+        dfsUsage.decDfsUsed(value);
+      }
     }
     
     long getDfsUsed() throws IOException {
@@ -364,51 +381,23 @@
     }
     
     /**
-     * Temporary files. They get moved to the real block directory either when
-     * the block is finalized or the datanode restarts.
+     * Temporary files. They get moved to the finalized block directory when
+     * the block is finalized.
      */
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
-      return createTmpFile(b, f);
+      return FSDataset.createTmpFile(b, f);
     }
 
     /**
-     * Returns the name of the temporary file for this block.
+     * RBW files. They get moved to the finalized block directory when
+     * the block is finalized.
      */
-    File getTmpFile(Block b) throws IOException {
-      File f = new File(tmpDir, b.getBlockName());
-      return f;
+    File createRbwFile(Block b) throws IOException {
+      File f = new File(rbwDir, b.getBlockName());
+      return FSDataset.createTmpFile(b, f);
     }
 
-    /**
-     * Files used for copy-on-write. They need recovery when datanode
-     * restarts.
-     */
-    File createDetachFile(Block b, String filename) throws IOException {
-      File f = new File(detachDir, filename);
-      return createTmpFile(b, f);
-    }
-
-    private File createTmpFile(Block b, File f) throws IOException {
-      if (f.exists()) {
-        throw new IOException("Unexpected problem in creating temporary file for "+
-                              b + ".  File " + f + " should not be present, but is.");
-      }
-      // Create the zero-length temp file
-      //
-      boolean fileCreated = false;
-      try {
-        fileCreated = f.createNewFile();
-      } catch (IOException ioe) {
-        throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
-      }
-      if (!fileCreated) {
-        throw new IOException("Unexpected problem in creating temporary file for "+
-                              b + ".  File " + f + " should be creatable, but is already present.");
-      }
-      return f;
-    }
-      
     File addBlock(Block b, File f) throws IOException {
       File blockFile = dataDir.addBlock(b, f);
       File metaFile = getMetaFile( blockFile , b);
@@ -419,55 +408,126 @@
     void checkDirs() throws DiskErrorException {
       dataDir.checkDirTree();
       DiskChecker.checkDir(tmpDir);
+      DiskChecker.checkDir(rbwDir);
     }
       
-    void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+    void getVolumeMap(ReplicasMap volumeMap) throws IOException {
+      // add finalized replicas
       dataDir.getVolumeMap(volumeMap, this);
-    }
-      
-    void clearPath(File f) {
-      dataDir.clearPath(f);
-    }
-      
-    public String toString() {
-      return dataDir.dir.getAbsolutePath();
+      // add rbw replicas
+      addToReplicasMap(volumeMap, rbwDir, false);
     }
 
     /**
-     * Recover detached files on datanode restart. If a detached block
-     * does not exist in the original directory, then it is moved to the
-     * original directory.
+     * Add replicas under the given directory to the volume map
+     * @param volumeMap the replicas map
+     * @param dir an input directory
+     * @param isFinalized true if the directory has finalized replicas;
+     *                    false if the directory has rbw replicas
      */
-    private void recoverDetachedBlocks(File dataDir, File dir) 
-                                           throws IOException {
-      File contents[] = dir.listFiles();
-      if (contents == null) {
-        return;
-      }
-      for (int i = 0; i < contents.length; i++) {
-        if (!contents[i].isFile()) {
-          throw new IOException ("Found " + contents[i] + " in " + dir +
-                                 " but it is not a file.");
+    private void addToReplicasMap(ReplicasMap volumeMap, 
+        File dir, boolean isFinalized) {
+      File blockFiles[] = dir.listFiles();
+      for (File blockFile : blockFiles) {
+        if (!Block.isBlockFilename(blockFile))
+          continue;
+        
+        long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
+        long blockId = Block.filename2id(blockFile.getName());
+        ReplicaInfo newReplica = null;
+        if (isFinalized) {
+          newReplica = new FinalizedReplica(blockId, 
+              blockFile.length(), genStamp, this, blockFile.getParentFile());
+        } else {
+          newReplica = new ReplicaWaitingToBeRecovered(blockId,
+              validateIntegrity(blockFile, genStamp), 
+              genStamp, this, blockFile.getParentFile());
         }
 
-        //
-        // If the original block file still exists, then no recovery
-        // is needed.
-        //
-        File blk = new File(dataDir, contents[i].getName());
-        if (!blk.exists()) {
-          if (!contents[i].renameTo(blk)) {
-            throw new IOException("Unable to recover detached file " +
-                                  contents[i]);
-          }
-          continue;
+        ReplicaInfo oldReplica = volumeMap.add(newReplica);
+        if (oldReplica != null) {
+          DataNode.LOG.warn("Two block files with the same block id exist " +
+              "on disk: " + oldReplica.getBlockFile() +
+              " and " + blockFile );
         }
-        if (!contents[i].delete()) {
-            throw new IOException("Unable to cleanup detached file " +
-                                  contents[i]);
+      }
+    }
+    
+    /**
+     * Find out the number of bytes in the block that match its crc.
+     * 
+     * This algorithm assumes that data corruption caused by unexpected 
+     * datanode shutdown occurs only in the last crc chunk. So it checks
+     * only the last chunk.
+     * 
+     * @param blockFile the block file
+     * @param genStamp generation stamp of the block
+     * @return the number of valid bytes
+     */
+    private long validateIntegrity(File blockFile, long genStamp) {
+      DataInputStream checksumIn = null;
+      InputStream blockIn = null;
+      try {
+        File metaFile = new File(getMetaFileName(blockFile.toString(), genStamp));
+        long blockFileLen = blockFile.length();
+        long metaFileLen = metaFile.length();
+        int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
+        if (!blockFile.exists() || blockFileLen == 0 ||
+            !metaFile.exists() || metaFileLen < (long)crcHeaderLen) {
+          return 0;
+        }
+        checksumIn = new DataInputStream(
+            new BufferedInputStream(new FileInputStream(metaFile),
+                BUFFER_SIZE));
+
+        // read and handle the common header here. For now just a version
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+        short version = header.getVersion();
+        if (version != FSDataset.METADATA_VERSION) {
+          DataNode.LOG.warn("Wrong version (" + version + ") for metadata file "
+              + metaFile + " ignoring ...");
+        }
+        DataChecksum checksum = header.getChecksum();
+        int bytesPerChecksum = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+        long numChunks = Math.min(
+            (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, 
+            (metaFileLen - crcHeaderLen)/checksumSize);
+        if (numChunks == 0) {
+          return 0;
+        }
+        IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
+        blockIn = new FileInputStream(blockFile);
+        long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
+        IOUtils.skipFully(blockIn, lastChunkStartPos);
+        int lastChunkSize = (int)Math.min(
+            bytesPerChecksum, blockFileLen-lastChunkStartPos);
+        byte[] buf = new byte[lastChunkSize+checksumSize];
+        checksumIn.readFully(buf, lastChunkSize, checksumSize);
+        IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
+
+        checksum.update(buf, 0, lastChunkSize);
+        if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
+          return lastChunkStartPos + lastChunkSize;
+        } else { // last chunck is corrupt
+          return lastChunkStartPos;
         }
+      } catch (IOException e) {
+        DataNode.LOG.warn(e);
+        return 0;
+      } finally {
+        IOUtils.closeStream(checksumIn);
+        IOUtils.closeStream(blockIn);
       }
     }
+      
+    void clearPath(File f) {
+      dataDir.clearPath(f);
+    }
+      
+    public String toString() {
+      return getDir().getAbsolutePath();
+    }
   }
     
   static class FSVolumeSet {
@@ -530,7 +590,7 @@
       return remaining;
     }
       
-    synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+    synchronized void getVolumeMap(ReplicasMap volumeMap) throws IOException {
       for (int idx = 0; idx < volumes.length; idx++) {
         volumes[idx].getVolumeMap(volumeMap);
       }
@@ -571,10 +631,11 @@
           }
         }
         volumes = fsvs; // replace array of volumes
+        DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
+            + removed_vols.size() + " volumes. List of current volumes: "
+            + this);
       }
-      Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size + 
-          "volumes. List of current volumes: " +   toString());
-      
+
       return removed_vols;
     }
       
@@ -606,25 +667,22 @@
   //Find better place?
   public static final String METADATA_EXTENSION = ".meta";
   public static final short METADATA_VERSION = 1;
-    
+  static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
 
-  static class ActiveFile {
-    final File file;
-    final List<Thread> threads = new ArrayList<Thread>(2);
-
-    ActiveFile(File f, List<Thread> list) {
-      file = f;
-      if (list != null) {
-        threads.addAll(list);
-      }
-      threads.add(Thread.currentThread());
-    }
-    
-    public String toString() {
-      return getClass().getSimpleName() + "(file=" + file
-          + ", threads=" + threads + ")";
-    }
-  } 
+  private static boolean isUnlinkTmpFile(File f) {
+    String name = f.getName();
+    return name.endsWith(UNLINK_BLOCK_SUFFIX);
+  }
+  
+  static File getUnlinkTmpFile(File f) {
+    return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
+  }
+  
+  private static File getOrigFile(File unlinkTmpFile) {
+    String fileName = unlinkTmpFile.getName();
+    return new File(unlinkTmpFile.getParentFile(),
+        fileName.substring(0, fileName.length()-UNLINK_BLOCK_SUFFIX.length()));
+  }
   
   static String getMetaFileName(String blockFileName, long genStamp) {
     return blockFileName + "_" + genStamp + METADATA_EXTENSION;
@@ -638,6 +696,26 @@
     return getMetaFile(getBlockFile(b), b);
   }
 
+  /** Find the metadata file for the specified block file.
+   * Return the generation stamp from the name of the metafile.
+   */
+  private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+    String blockName = blockFile.getName();
+    for (int j = 0; j < listdir.length; j++) {
+      String path = listdir[j].getName();
+      if (!path.startsWith(blockName)) {
+        continue;
+      }
+      if (blockFile == listdir[j]) {
+        continue;
+      }
+      return Block.getGenerationStamp(listdir[j].getName());
+    }
+    DataNode.LOG.warn("Block " + blockFile + 
+                      " does not have a metafile!");
+    return GenerationStamp.GRANDFATHER_GENERATION_STAMP;
+  }
+
   /** Find the corresponding meta data file from a given block file */
   private static File findMetaFile(final File blockFile) throws IOException {
     final String prefix = blockFile.getName() + "_";
@@ -675,22 +753,7 @@
 
   /** Return the block file for the given ID */ 
   public File findBlockFile(long blockId) {
-    final Block b = new Block(blockId);
-    File blockfile = null;
-    ActiveFile activefile = ongoingCreates.get(b);
-    if (activefile != null) {
-      blockfile = activefile.file;
-    }
-    if (blockfile == null) {
-      blockfile = getFile(b);
-    }
-    if (blockfile == null) {
-      if (DataNode.LOG.isDebugEnabled()) {
-        DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
-        DataNode.LOG.debug("volumeMap=" + volumeMap);
-      }
-    }
-    return blockfile;
+    return getFile(blockId);
   }
 
   /** {@inheritDoc} */
@@ -704,6 +767,31 @@
         parseGenerationStamp(blockfile, metafile));
   }
 
+  /**
+   * Returns a clone of a replica stored in data-node memory.
+   * Should be primarily used for testing.
+   * @param blockId
+   * @return
+   */
+  synchronized ReplicaInfo fetchReplicaInfo(long blockId) {
+    ReplicaInfo r = volumeMap.get(blockId);
+    if(r == null)
+      return null;
+    switch(r.getState()) {
+    case FINALIZED:
+      return new FinalizedReplica((FinalizedReplica)r);
+    case RBW:
+      return new ReplicaBeingWritten((ReplicaBeingWritten)r);
+    case RWR:
+      return new ReplicaWaitingToBeRecovered((ReplicaWaitingToBeRecovered)r);
+    case RUR:
+      return new ReplicaUnderRecovery((ReplicaUnderRecovery)r);
+    case TEMPORARY:
+      return new ReplicaInPipeline((ReplicaInPipeline)r);
+    }
+    return null;
+  }
+
   public boolean metaFileExists(Block b) throws IOException {
     return getMetaFile(b).exists();
   }
@@ -720,27 +808,54 @@
                                                     checksumFile.length());
   }
 
+  static File createTmpFile(Block b, File f) throws IOException {
+    if (f.exists()) {
+      throw new IOException("Unexpected problem in creating temporary file for "+
+                            b + ".  File " + f + " should not be present, but is.");
+    }
+    // Create the zero-length temp file
+    //
+    boolean fileCreated = false;
+    try {
+      fileCreated = f.createNewFile();
+    } catch (IOException ioe) {
+      throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
+    }
+    if (!fileCreated) {
+      throw new IOException("Unexpected problem in creating temporary file for "+
+                            b + ".  File " + f + " should be creatable, but is already present.");
+    }
+    return f;
+  }
+    
   FSVolumeSet volumes;
-  private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
-  HashMap<Block,ReplicaInfo> volumeMap = null;
+  ReplicasMap volumeMap = new ReplicasMap();
   static  Random random = new Random();
+  FSDatasetAsyncDiskService asyncDiskService;
 
   // Used for synchronizing access to usage stats
   private Object statsLock = new Object();
 
+  boolean supportAppends = false;
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
   public FSDataset(DataStorage storage, Configuration conf) throws IOException {
     this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+    this.supportAppends = conf.getBoolean("dfs.support.append", false);
     FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
     }
     volumes = new FSVolumeSet(volArray);
-    volumeMap = new HashMap<Block, ReplicaInfo>();
     volumes.getVolumeMap(volumeMap);
+    File[] roots = new File[storage.getNumStorageDirs()];
+    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
+      roots[idx] = storage.getStorageDir(idx).getCurrentDir();
+    }
+    asyncDiskService = new FSDatasetAsyncDiskService(roots);
     registerMBean(storage.getStorageID());
   }
 
@@ -814,22 +929,33 @@
   }
 
   /**
-   * Returns handles to the block file and its metadata file
+   * Get the meta info of a block stored in volumeMap
+   * @param b block
+   * @return the meta replica information
+   * @throws IOException if no entry is in the map or 
+   *                        there is a generation stamp mismatch
    */
-  public synchronized BlockInputStreams getTmpInputStreams(Block b, 
-                          long blkOffset, long ckoff) throws IOException {
-
+  private ReplicaInfo getReplicaInfo(Block b) throws IOException {
     ReplicaInfo info = volumeMap.get(b);
     if (info == null) {
       throw new IOException("Block " + b + " does not exist in volumeMap.");
     }
-    FSVolume v = info.getVolume();
-    File blockFile = v.getTmpFile(b);
+    return info;
+  }
+  
+  /**
+   * Returns handles to the block file and its metadata file
+   */
+  public synchronized BlockInputStreams getTmpInputStreams(Block b, 
+                          long blkOffset, long ckoff) throws IOException {
+
+    ReplicaInfo info = getReplicaInfo(b);
+    File blockFile = info.getBlockFile();
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
     if (blkOffset > 0) {
       blockInFile.seek(blkOffset);
     }
-    File metaFile = getMetaFile(blockFile, b);
+    File metaFile = info.getMetaFile();
     RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
     if (ckoff > 0) {
       metaInFile.seek(ckoff);
@@ -838,140 +964,32 @@
                                 new FileInputStream(metaInFile.getFD()));
   }
     
-  private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
-      return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
-          new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
-
-  }
-
   /**
    * Make a copy of the block if this block is linked to an existing
    * snapshot. This ensures that modifying this block does not modify
    * data in any existing snapshots.
    * @param block Block
-   * @param numLinks Detach if the number of links exceed this value
+   * @param numLinks Unlink if the number of links exceed this value
    * @throws IOException
-   * @return - true if the specified block was detached
+   * @return - true if the specified block was unlinked or the block
+   *           is not in any snapshot.
    */
-  public boolean detachBlock(Block block, int numLinks) throws IOException {
+  public boolean unlinkBlock(Block block, int numLinks) throws IOException {
     ReplicaInfo info = null;
 
     synchronized (this) {
-      info = volumeMap.get(block);
+      info = getReplicaInfo(block);
     }
-    return info.detachBlock(block, numLinks);
-  }
-
-  static private <T> void updateBlockMap(Map<Block, T> blockmap,
-      Block oldblock, Block newblock) throws IOException {
-    if (blockmap.containsKey(oldblock)) {
-      T value = blockmap.remove(oldblock);
-      blockmap.put(newblock, value);
-    }
-  }
-
-  /** {@inheritDoc} */
-  public void updateBlock(Block oldblock, Block newblock) throws IOException {
-    if (oldblock.getBlockId() != newblock.getBlockId()) {
-      throw new IOException("Cannot update oldblock (=" + oldblock
-          + ") to newblock (=" + newblock + ").");
-    }
-    
-    for(;;) {
-      final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
-      if (threads == null) {
-        return;
-      }
-
-      // interrupt and wait for all ongoing create threads
-      for(Thread t : threads) {
-        t.interrupt();
-      }
-      for(Thread t : threads) {
-        try {
-          t.join();
-        } catch (InterruptedException e) {
-          DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
-        }
-      }
-    }
-  }
-
-  /**
-   * Try to update an old block to a new block.
-   * If there are ongoing create threads running for the old block,
-   * the threads will be returned without updating the block. 
-   * 
-   * @return ongoing create threads if there is any. Otherwise, return null.
-   */
-  private synchronized List<Thread> tryUpdateBlock(
-      Block oldblock, Block newblock) throws IOException {
-    //check ongoing create threads
-    final ActiveFile activefile = ongoingCreates.get(oldblock);
-    if (activefile != null && !activefile.threads.isEmpty()) {
-      //remove dead threads
-      for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
-        final Thread t = i.next();
-        if (!t.isAlive()) {
-          i.remove();
-        }
-      }
-
-      //return living threads
-      if (!activefile.threads.isEmpty()) {
-        return new ArrayList<Thread>(activefile.threads);
-      }
-    }
-
-    //No ongoing create threads is alive.  Update block.
-    File blockFile = findBlockFile(oldblock.getBlockId());
-    if (blockFile == null) {
-      throw new IOException("Block " + oldblock + " does not exist.");
-    }
-
-    File oldMetaFile = findMetaFile(blockFile);
-    long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
-    
-    //rename meta file to a tmp file
-    File tmpMetaFile = new File(oldMetaFile.getParent(),
-        oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());
-    if (!oldMetaFile.renameTo(tmpMetaFile)){
-      throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
-    }
-
-    //update generation stamp
-    if (oldgs > newblock.getGenerationStamp()) {
-      throw new IOException("Cannot update block (id=" + newblock.getBlockId()
-          + ") generation stamp from " + oldgs
-          + " to " + newblock.getGenerationStamp());
-    }
-    
-    //update length
-    if (newblock.getNumBytes() > oldblock.getNumBytes()) {
-      throw new IOException("Cannot update block file (=" + blockFile
-          + ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes());
-    }
-    if (newblock.getNumBytes() < oldblock.getNumBytes()) {
-      truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
-    }
-
-    //rename the tmp file to the new meta file (with new generation stamp)
-    File newMetaFile = getMetaFile(blockFile, newblock);
-    if (!tmpMetaFile.renameTo(newMetaFile)) {
-      throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
-    }
-
-    updateBlockMap(ongoingCreates, oldblock, newblock);
-    updateBlockMap(volumeMap, oldblock, newblock);
-
-    // paranoia! verify that the contents of the stored block 
-    // matches the block file on disk.
-    validateBlockMetadata(newblock);
-    return null;
+   return info.unlinkBlock(numLinks);
   }
 
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
+    DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
+        + ", metaFile=" + metaFile
+        + ", oldlen=" + oldlen
+        + ", newlen=" + newlen);
+
     if (newlen == oldlen) {
       return;
     }
@@ -1030,165 +1048,310 @@
     }
   }
 
+  @Override  // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface append(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
+    // If the block was successfully finalized because all packets
+    // were successfully processed at the Datanode but the ack for
+    // some of the packets were not received by the client. The client 
+    // re-opens the connection and retries sending those packets.
+    // The other reason is that an "append" is occurring to this block.
+    
+    // check the validity of the parameter
+    if (newGS < b.getGenerationStamp()) {
+      throw new IOException("The new generation stamp " + newGS + 
+          " should be greater than the replica " + b + "'s generation stamp");
+    }
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }  
+    DataNode.LOG.info("Appending to replica " + replicaInfo);
+    if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+    }
+    if (replicaInfo.getNumBytes() != expectedBlockLen) {
+      throw new IOException("Corrupted replica " + replicaInfo + 
+          " with a length of " + replicaInfo.getNumBytes() + 
+          " expected length is " + expectedBlockLen);
+    }
+
+    return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+  }
+  
+  /** Append to a finalized replica
+   * Change a finalized replica to be a RBW replica and 
+   * bump its generation stamp to be the newGS
+   * 
+   * @param replicaInfo a finalized replica
+   * @param newGS new generation stamp
+   * @param estimateBlockLen estimate generation stamp
+   * @return a RBW replica
+   * @throws IOException if moving the replica from finalized directory 
+   *         to rbw directory fails
+   */
+  private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo, 
+      long newGS, long estimateBlockLen) throws IOException {
+    // unlink the finalized replica
+    replicaInfo.unlinkBlock(1);
+    
+    // construct a RBW replica with the new GS
+    File blkfile = replicaInfo.getBlockFile();
+    FSVolume v = volumes.getNextVolume(estimateBlockLen);
+    File newBlkFile = new File(v.rbwDir, replicaInfo.getBlockName());
+    File oldmeta = replicaInfo.getMetaFile();
+    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+        replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
+        v, newBlkFile.getParentFile(), Thread.currentThread());
+    File newmeta = newReplicaInfo.getMetaFile();
+
+    // rename meta file to rbw directory
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    if (!oldmeta.renameTo(newmeta)) {
+      throw new IOException("Block " + replicaInfo + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to rbw dir " + newmeta);
+    }
+
+    // rename block file to rbw directory
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
+      DataNode.LOG.debug("Old block file length is " + blkfile.length());
+    }
+    if (!blkfile.renameTo(newBlkFile)) {
+      if (!newmeta.renameTo(oldmeta)) {  // restore the meta file
+        DataNode.LOG.warn("Cannot move meta file " + newmeta + 
+            "back to the finalized directory " + oldmeta);
+      }
+      throw new IOException("Block " + replicaInfo + " reopen failed. " +
+                              " Unable to move block file " + blkfile +
+                              " to rbw dir " + newBlkFile);
+    }
+    
+    // Replace finalized replica by a RBW replica in replicas map
+    volumeMap.add(newReplicaInfo);
+    
+    return newReplicaInfo;
+  }
+
+  private ReplicaInfo recoverCheck(Block b, long newGS, 
+      long expectedBlockLen) throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    
+    // check state
+    if (replicaInfo.getState() != ReplicaState.FINALIZED &&
+        replicaInfo.getState() != ReplicaState.RBW) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
+    }
+
+    // check generation stamp
+    long replicaGenerationStamp = replicaInfo.getGenerationStamp();
+    if (replicaGenerationStamp < b.getGenerationStamp() ||
+        replicaGenerationStamp > newGS) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
+          + ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
+          newGS + "].");
+    }
+    
+    // stop the previous writer before check a replica's length
+    long replicaLen = replicaInfo.getNumBytes();
+    if (replicaInfo.getState() == ReplicaState.RBW) {
+      ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+      // kill the previous writer
+      rbw.stopWriter();
+      rbw.setWriter(Thread.currentThread());
+      // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
+      if (replicaLen != rbw.getBytesOnDisk() 
+          || replicaLen != rbw.getBytesAcked()) {
+        throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + 
+            "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + 
+            rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
+            ") are not the same.");
+      }
+    }
+    
+    // check block length
+    if (replicaLen != expectedBlockLen) {
+      throw new IOException("Corrupted replica " + replicaInfo + 
+          " with a length of " + replicaLen + 
+          " expected length is " + expectedBlockLen);
+    }
+    
+    return replicaInfo;
+  }
+  @Override  // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
+    DataNode.LOG.info("Recover failed append to " + b);
+
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+
+    // change the replica's state/gs etc.
+    if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
+      return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+    } else { //RBW
+      bumpReplicaGS(replicaInfo, newGS);
+      return (ReplicaBeingWritten)replicaInfo;
+    }
+  }
+
+  @Override
+  public void recoverClose(Block b, long newGS,
+      long expectedBlockLen) throws IOException {
+    DataNode.LOG.info("Recover failed close " + b);
+    // check replica's state
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+    // bump the replica's GS
+    bumpReplicaGS(replicaInfo, newGS);
+    // finalize the replica if RBW
+    if (replicaInfo.getState() == ReplicaState.RBW) {
+      finalizeBlock(replicaInfo);
+    }
+  }
+  
   /**
-   * Start writing to a block file
-   * If isRecovery is true and the block pre-exists, then we kill all
-      volumeMap.put(b, v);
-      volumeMap.put(b, v);
-   * other threads that might be writing to this block, and then reopen the file.
-   */
-  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
-    //
-    // Make sure the block isn't a valid one - we're still creating it!
-    //
-    if (isValidBlock(b)) {
-      if (!isRecovery) {
-        throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
-      }
-      // If the block was successfully finalized because all packets
-      // were successfully processed at the Datanode but the ack for
-      // some of the packets were not received by the client. The client 
-      // re-opens the connection and retries sending those packets.
-      // The other reason is that an "append" is occurring to this block.
-      detachBlock(b, 1);
+   * Bump a replica's generation stamp to a new one.
+   * Its on-disk meta file name is renamed to be the new one too.
+   * 
+   * @param replicaInfo a replica
+   * @param newGS new generation stamp
+   * @throws IOException if rename fails
+   */
+  private void bumpReplicaGS(ReplicaInfo replicaInfo, 
+      long newGS) throws IOException { 
+    long oldGS = replicaInfo.getGenerationStamp();
+    File oldmeta = replicaInfo.getMetaFile();
+    replicaInfo.setGenerationStamp(newGS);
+    File newmeta = replicaInfo.getMetaFile();
+
+    // rename meta file to new GS
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    if (!oldmeta.renameTo(newmeta)) {
+      replicaInfo.setGenerationStamp(oldGS); // restore old GS
+      throw new IOException("Block " + (Block)replicaInfo + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to " + newmeta);
     }
-    long blockSize = b.getNumBytes();
+  }
 
-    //
-    // Serialize access to /tmp, and check if file already there.
-    //
-    File f = null;
-    List<Thread> threads = null;
-    synchronized (this) {
-      //
-      // Is it already in the create process?
-      //
-      ActiveFile activeFile = ongoingCreates.get(b);
-      if (activeFile != null) {
-        f = activeFile.file;
-        threads = activeFile.threads;
-        
-        if (!isRecovery) {
-          throw new BlockAlreadyExistsException("Block " + b +
-                                  " has already been started (though not completed), and thus cannot be created.");
-        } else {
-          for (Thread thread:threads) {
-            thread.interrupt();
-          }
-        }
-        ongoingCreates.remove(b);
-      }
-      FSVolume v = null;
-      if (!isRecovery) {
-        v = volumes.getNextVolume(blockSize);
-        // create temporary file to hold block in the designated volume
-        f = createTmpFile(v, b);
-        volumeMap.put(b, new ReplicaInfo(v));
-      } else if (f != null) {
-        DataNode.LOG.info("Reopen already-open Block for append " + b);
-        // create or reuse temporary file to hold block in the designated volume
-        v = volumeMap.get(b).getVolume();
-        volumeMap.put(b, new ReplicaInfo(v));
-      } else {
-        // reopening block for appending to it.
-        DataNode.LOG.info("Reopen Block for append " + b);
-        v = volumeMap.get(b).getVolume();
-        f = createTmpFile(v, b);
-        File blkfile = getBlockFile(b);
-        File oldmeta = getMetaFile(b);
-        File newmeta = getMetaFile(f, b);
-
-        // rename meta file to tmp directory
-        DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-        if (!oldmeta.renameTo(newmeta)) {
-          throw new IOException("Block " + b + " reopen failed. " +
-                                " Unable to move meta file  " + oldmeta +
-                                " to tmp dir " + newmeta);
-        }
-
-        // rename block file to tmp directory
-        DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
-        if (!blkfile.renameTo(f)) {
-          if (!f.delete()) {
-            throw new IOException("Block " + b + " reopen failed. " +
-                                  " Unable to remove file " + f);
-          }
-          if (!blkfile.renameTo(f)) {
-            throw new IOException("Block " + b + " reopen failed. " +
-                                  " Unable to move block file " + blkfile +
-                                  " to tmp dir " + f);
-          }
-        }
-        volumeMap.put(b, new ReplicaInfo(v));
-      }
-      if (f == null) {
-        DataNode.LOG.warn("Block " + b + " reopen failed " +
-                          " Unable to locate tmp file.");
-        throw new IOException("Block " + b + " reopen failed " +
-                              " Unable to locate tmp file.");
-      }
-      ongoingCreates.put(b, new ActiveFile(f, threads));
+  @Override
+  public synchronized ReplicaInPipelineInterface createRbw(Block b)
+      throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo != null) {
+      throw new ReplicaAlreadyExistsException("Block " + b +
+      " already exists in state " + replicaInfo.getState() +
+      " and thus cannot be created.");
+    }
+    // create a new block
+    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    // create a rbw file to hold block in the designated volume
+    File f = v.createRbwFile(b);
+    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
+        b.getGenerationStamp(), v, f.getParentFile());
+    volumeMap.add(newReplicaInfo);
+    return newReplicaInfo;
+  }
+  
+  @Override
+  public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+      long newGS, long minBytesRcvd, long maxBytesRcvd)
+      throws IOException {
+    DataNode.LOG.info("Recover the RBW replica " + b);
+
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    
+    // check the replica's state
+    if (replicaInfo.getState() != ReplicaState.RBW) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
     }
+    ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+    
+    DataNode.LOG.info("Recovering replica " + rbw);
 
-    try {
-      if (threads != null) {
-        for (Thread thread:threads) {
-          thread.join();
-        }
-      }
-    } catch (InterruptedException e) {
-      throw new IOException("Recovery waiting for thread interrupted.");
+    // Stop the previous writer
+    rbw.stopWriter();
+    rbw.setWriter(Thread.currentThread());
+
+    // check generation stamp
+    long replicaGenerationStamp = rbw.getGenerationStamp();
+    if (replicaGenerationStamp < b.getGenerationStamp() ||
+        replicaGenerationStamp > newGS) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+          ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
+          newGS + "].");
+    }
+    
+    // check replica length
+    if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
+      throw new ReplicaNotFoundException("Unmatched length replica " + 
+          replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() + 
+          " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" + 
+          minBytesRcvd + ", " + maxBytesRcvd + "].");
     }
 
-    //
-    // Finally, allow a writer to the block file
-    // REMIND - mjc - make this a filter stream that enforces a max
-    // block size, so clients can't go crazy
-    //
-    File metafile = getMetaFile(f, b);
-    DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
-    DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
-    return createBlockWriteStreams( f , metafile);
+    // bump the replica's generation stamp to newGS
+    bumpReplicaGS(rbw, newGS);
+    
+    return rbw;
   }
-
-  /**
-   * Retrieves the offset in the block to which the
-   * the next write will write data to.
-   */
-  public long getChannelPosition(Block b, BlockWriteStreams streams) 
-                                 throws IOException {
-    FileOutputStream file = (FileOutputStream) streams.dataOut;
-    return file.getChannel().position();
+  
+  @Override
+  public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+      throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo != null) {
+      throw new ReplicaAlreadyExistsException("Block " + b +
+          " already exists in state " + replicaInfo.getState() +
+          " and thus cannot be created.");
+    }
+    
+    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    // create a temporary file to hold block in the designated volume
+    File f = v.createTmpFile(b);
+    ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
+        b.getGenerationStamp(), v, f.getParentFile());
+    volumeMap.add(newReplicaInfo);
+    
+    return newReplicaInfo;
   }
 
   /**
-   * Sets the offset in the block to which the
-   * the next write will write data to.
-   */
-  public void setChannelPosition(Block b, BlockWriteStreams streams, 
-                                 long dataOffset, long ckOffset) 
-                                 throws IOException {
-    long size = 0;
-    synchronized (this) {
-      FSVolume vol = volumeMap.get(b).getVolume();
-      size = vol.getTmpFile(b).length();
-    }
-    if (size < dataOffset) {
-      String msg = "Trying to change block file offset of block " + b +
-                     " to " + dataOffset +
-                     " but actual size of file is " +
-                     size;
-      throw new IOException(msg);
-    }
-    FileOutputStream file = (FileOutputStream) streams.dataOut;
-    file.getChannel().position(dataOffset);
-    file = (FileOutputStream) streams.checksumOut;
-    file.getChannel().position(ckOffset);
+   * Sets the offset in the meta file so that the
+   * last checksum will be overwritten.
+   */
+  public void adjustCrcChannelPosition(Block b, BlockWriteStreams streams, 
+      int checksumSize) throws IOException {
+    FileOutputStream file = (FileOutputStream) streams.checksumOut;
+    FileChannel channel = file.getChannel();
+    long oldPos = channel.position();
+    long newPos = oldPos - checksumSize;
+    DataNode.LOG.info("Changing meta file offset of block " + b + " from " + 
+        oldPos + " to " + newPos);
+    channel.position(newPos);
   }
 
   synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
     if ( vol == null ) {
-      vol = volumeMap.get( blk ).getVolume();
+      vol = getReplicaInfo( blk ).getVolume();
       if ( vol == null ) {
         throw new IOException("Could not find volume for block " + blk);
       }
@@ -1208,40 +1371,52 @@
    * Complete the block write!
    */
   public synchronized void finalizeBlock(Block b) throws IOException {
-    ActiveFile activeFile = ongoingCreates.get(b);
-    if (activeFile == null) {
-      throw new IOException("Block " + b + " is already finalized.");
-    }
-    File f = activeFile.file;
-    if (f == null || !f.exists()) {
-      throw new IOException("No temporary file " + f + " for block " + b);
-    }
-    FSVolume v = volumeMap.get(b).getVolume();
-    if (v == null) {
-      throw new IOException("No volume for temporary file " + f + 
-                            " for block " + b);
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
+    if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+      // this is legal, when recovery happens on a file that has
+      // been opened for append but never modified
+      return;
     }
-        
-    File dest = null;
-    dest = v.addBlock(b, f);
-    volumeMap.put(b, new ReplicaInfo(v, dest));
-    ongoingCreates.remove(b);
+    finalizeReplica(replicaInfo);
+  }
+  
+  private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
+  throws IOException {
+    FinalizedReplica newReplicaInfo = null;
+    if (replicaInfo.getState() == ReplicaState.RUR &&
+       ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() == 
+         ReplicaState.FINALIZED) {
+      newReplicaInfo = (FinalizedReplica)
+             ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
+    } else {
+      FSVolume v = replicaInfo.getVolume();
+      File f = replicaInfo.getBlockFile();
+      if (v == null) {
+        throw new IOException("No volume for temporary file " + f + 
+            " for block " + replicaInfo);
+      }
+
+      File dest = v.addBlock(replicaInfo, f);
+      newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+    }
+    volumeMap.add(newReplicaInfo);
+    return newReplicaInfo;
   }
 
   /**
    * Remove the temporary block file (if any)
    */
   public synchronized void unfinalizeBlock(Block b) throws IOException {
-    // remove the block from in-memory data structure
-    ActiveFile activefile = ongoingCreates.remove(b);
-    if (activefile == null) {
-      return;
-    }
-    volumeMap.remove(b);
-    
-    // delete the on-disk temp file
-    if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
-      DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+      // remove from volumeMap
+      volumeMap.remove(b);
+      
+      // delete the on-disk temp file
+      if (delBlockFromDisk(replicaInfo.getBlockFile(), 
+          replicaInfo.getMetaFile(), b)) {
+        DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+      }
     }
   }
 
@@ -1272,18 +1447,34 @@
   }
 
   /**
-   * Return finalized blocks from the in-memory blockmap
+   * Generates a block report from the in-memory block map.
    */
-  public Block[] getBlockReport() {
-    ArrayList<Block> list =  new ArrayList<Block>(volumeMap.size());
+  public BlockListAsLongs getBlockReport() {
+    ArrayList<ReplicaInfo> finalized =
+      new ArrayList<ReplicaInfo>(volumeMap.size());
+    ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
     synchronized(this) {
-      for (Block b : volumeMap.keySet()) {
-        if (!ongoingCreates.containsKey(b)) {
-          list.add(new Block(b));
+      for (ReplicaInfo b : volumeMap.replicas()) {
+        switch(b.getState()) {
+        case FINALIZED:
+          finalized.add(b);
+          break;
+        case RBW:
+        case RWR:
+          uc.add(b);
+          break;
+        case RUR:
+          ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
+          uc.add(rur.getOriginalReplica());
+          break;
+        case TEMPORARY:
+          break;
+        default:
+          assert false : "Illegal ReplicaInfo state.";
         }
       }
+      return new BlockListAsLongs(finalized, uc);
     }
-    return list.toArray(new Block[list.size()]);
   }
 
   /**
@@ -1293,7 +1484,7 @@
    * is needed to handle concurrent modification to the block.
    */
   synchronized Block[] getBlockList(boolean deepcopy) {
-    Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
+    Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
     if (deepcopy) {
       for (int i = 0; i < list.length; i++) {
         list[i] = new Block(list[i]);
@@ -1303,17 +1494,29 @@
   }
 
   /**
+   * Get the list of finalized blocks from in-memory blockmap.
+   */
+  synchronized List<Block> getFinalizedBlocks() {
+    ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size());
+    for (ReplicaInfo b : volumeMap.replicas()) {
+      if(b.getState() == ReplicaState.FINALIZED) {
+        finalized.add(new Block(b));
+      }
+    }
+    return finalized;
+  }
+
+  /**
    * Check whether the given block is a valid one.
+   * valid means finalized
    */
   public boolean isValidBlock(Block b) {
-    File f = null;;
-    try {
-      f = validateBlockFile(b);
-    } catch(IOException e) {
-      Log.warn("Block " + b + " is not valid:",e);
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo == null || 
+        replicaInfo.getState() != ReplicaState.FINALIZED) {
+      return false;
     }
-    
-    return f != null;
+    return replicaInfo.getBlockFile().exists();
   }
 
   /**
@@ -1338,51 +1541,25 @@
     return null;
   }
 
-  /** {@inheritDoc} */
-  public void validateBlockMetadata(Block b) throws IOException {
-    ReplicaInfo info = volumeMap.get(b);
-    if (info == null) {
-      throw new IOException("Block " + b + " does not exist in volumeMap.");
+  /** Check the files of a replica. */
+  static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
+    //check replica's file
+    final File f = r.getBlockFile();
+    if (!f.exists()) {
+      throw new FileNotFoundException("File " + f + " not found, r=" + r);
     }
-    FSVolume v = info.getVolume();
-    File tmp = v.getTmpFile(b);
-    File f = getFile(b);
-    if (f == null) {
-      f = tmp;
+    if (r.getBytesOnDisk() != f.length()) {
+      throw new IOException("File length mismatched.  The length of "
+          + f + " is " + f.length() + " but r=" + r);
     }
-    if (f == null) {
-      throw new IOException("Block " + b + " does not exist on disk.");
+
+    //check replica's meta file
+    final File metafile = getMetaFile(f, r);
+    if (!metafile.exists()) {
+      throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
     }
-    if (!f.exists()) {
-      throw new IOException("Block " + b + 
-                            " block file " + f +
-                            " does not exist on disk.");
-    }
-    if (b.getNumBytes() != f.length()) {
-      throw new IOException("Block " + b + 
-                            " length is " + b.getNumBytes()  +
-                            " does not match block file length " +
-                            f.length());
-    }
-    File meta = getMetaFile(f, b);
-    if (meta == null) {
-      throw new IOException("Block " + b + 
-                            " metafile does not exist.");
-    }
-    if (!meta.exists()) {
-      throw new IOException("Block " + b + 
-                            " metafile " + meta +
-                            " does not exist on disk.");
-    }
-    if (meta.length() == 0) {
-      throw new IOException("Block " + b + " metafile " + meta + " is empty.");
-    }
-    long stamp = parseGenerationStamp(f, meta);
-    if (stamp != b.getGenerationStamp()) {
-      throw new IOException("Block " + b + 
-                            " genstamp is " + b.getGenerationStamp()  +
-                            " does not match meta file stamp " +
-                            stamp);
+    if (metafile.length() == 0) {
+      throw new IOException("Metafile " + metafile + " is empty, r=" + r);
     }
   }
 
@@ -1399,7 +1576,8 @@
       synchronized (this) {
         f = getFile(invalidBlks[i]);
         ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
-        if (dinfo == null) {
+        if (dinfo == null || 
+            dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
           DataNode.LOG.warn("Unexpected error trying to delete block "
                            + invalidBlks[i] + 
                            ". BlockInfo not found in volumeMap.");
@@ -1431,26 +1609,20 @@
           error = true;
           continue;
         }
-        v.clearPath(parent);
+        ReplicaState replicaState = dinfo.getState();
+        if (replicaState == ReplicaState.FINALIZED || 
+            (replicaState == ReplicaState.RUR && 
+                ((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() == 
+                  ReplicaState.FINALIZED)) {
+          v.clearPath(parent);
+        }
         volumeMap.remove(invalidBlks[i]);
       }
       File metaFile = getMetaFile( f, invalidBlks[i] );
-      long blockSize = f.length()+metaFile.length();
-      if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
-        DataNode.LOG.warn("Unexpected error trying to delete block "
-                          + invalidBlks[i] + " at file " + f);
-        error = true;
-        continue;
-      }
-      v.decDfsUsed(blockSize);
-      DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);
-      if (f.exists()) {
-        //
-        // This is a temporary check especially for hadoop-1220. 
-        // This will go away in the future.
-        //
-        DataNode.LOG.info("File " + f + " was deleted but still exists!");
-      }
+      long dfsBytes = f.length() + metaFile.length();
+      
+      // Delete the block asynchronously to make sure we can do it fast enough
+      asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes, invalidBlks[i].toString());
     }
     if (error) {
       throw new IOException("Error in deleting blocks.");
@@ -1458,16 +1630,24 @@
   }
 
   /**
-   * Turn the block identifier into a filename.
+   * Turn the block identifier into a filename; ignore generation stamp!!!
    */
   public synchronized File getFile(Block b) {
-    ReplicaInfo info = volumeMap.get(b);
+    return getFile(b.getBlockId());
+  }
+
+  /**
+   * Turn the block identifier into a filename
+   * @param blockId a block's id
+   * @return on disk data file path; null if the replica does not exist
+   */
+  private File getFile(long blockId) {
+    ReplicaInfo info = volumeMap.get(blockId);
     if (info != null) {
-      return info.getFile();
+      return info.getBlockFile();
     }
-    return null;
+    return null;    
   }
-
   /**
    * check if a data directory is healthy
    * if some volumes failed - make sure to remove all the blocks that belong
@@ -1486,12 +1666,12 @@
     // remove related blocks
     long mlsec = System.currentTimeMillis();
     synchronized (this) {
-      Iterator<Block> ib = volumeMap.keySet().iterator();
+      Iterator<ReplicaInfo> ib = volumeMap.replicas().iterator();
       while(ib.hasNext()) {
-        Block b = ib.next();
+        ReplicaInfo b = ib.next();
         total_blocks ++;
         // check if the volume block belongs to still valid
-        FSVolume vol = volumeMap.get(b).getVolume();
+        FSVolume vol = b.getVolume();
         for(FSVolume fv: failed_vols) {
           if(vol == fv) {
             DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol " 
@@ -1504,7 +1684,7 @@
       }
     } // end of sync
     mlsec = System.currentTimeMillis() - mlsec;
-    DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
+    DataNode.LOG.warn("Removed " + removed_blocks + " out of " + total_blocks +
         "(took " + mlsec + " millisecs)");
 
     // report the error
@@ -1554,6 +1734,10 @@
     if (mbeanName != null)
       MBeanUtil.unregisterMBean(mbeanName);
     
+    if (asyncDiskService != null) {
+      asyncDiskService.shutdown();
+    }
+    
     if(volumes != null) {
       for (FSVolume volume : volumes.volumes) {
         if(volume != null) {
@@ -1592,20 +1776,20 @@
    */
   public void checkAndUpdate(long blockId, File diskFile,
       File diskMetaFile, FSVolume vol) {
-    Block block = new Block(blockId);
     DataNode datanode = DataNode.getDataNode();
     Block corruptBlock = null;
+    ReplicaInfo memBlockInfo;
     synchronized (this) {
-      if (ongoingCreates.get(block) != null) {
+      memBlockInfo = volumeMap.get(blockId);
+      if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
         return;
       }
 
       final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
           Block.getGenerationStamp(diskMetaFile.getName()) :
-            Block.GRANDFATHER_GENERATION_STAMP;
+            GenerationStamp.GRANDFATHER_GENERATION_STAMP;
 
-      ReplicaInfo memBlockInfo = volumeMap.get(block);
       if (diskFile == null || !diskFile.exists()) {
         if (memBlockInfo == null) {
           // Block file does not exist and block does not exist in memory
@@ -1617,14 +1801,14 @@
           }
           return;
         }
-        if (!memBlockInfo.getFile().exists()) {
+        if (!memBlockInfo.getBlockFile().exists()) {
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
-          volumeMap.remove(block);
+          volumeMap.remove(blockId);
           if (datanode.blockScanner != null) {
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(new Block(blockId));
           }
-          DataNode.LOG.warn("Removed block " + block.getBlockId()
+          DataNode.LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
           // Finally remove the metadata file
           if (diskMetaFile != null && diskMetaFile.exists()
@@ -1640,23 +1824,20 @@
        */
       if (memBlockInfo == null) {
         // Block is missing in memory - add the block to volumeMap
-        ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
-        Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
-        volumeMap.put(diskBlock, diskBlockInfo);
+        ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
+            diskFile.length(), diskGS, vol, diskFile.getParentFile());
+        volumeMap.add(diskBlockInfo);
         if (datanode.blockScanner != null) {
-          datanode.blockScanner.addBlock(diskBlock);
+          datanode.blockScanner.addBlock(diskBlockInfo);
         }
-        DataNode.LOG.warn("Added missing block to memory " + diskBlock);
+        DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
         return;
       }
       /*
        * Block exists in volumeMap and the block file exists on the disk
        */
-      // Iterate to get key from volumeMap for the blockId
-      Block memBlock = getBlockKey(blockId);
-
       // Compare block files
-      File memFile = memBlockInfo.getFile();
+      File memFile = memBlockInfo.getBlockFile();
       if (memFile.exists()) {
         if (memFile.compareTo(diskFile) != 0) {
           DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
@@ -1673,19 +1854,17 @@
             + memFile.getAbsolutePath()
             + " does not exist. Updating it to the file found during scan "
             + diskFile.getAbsolutePath());
-        ReplicaInfo info = volumeMap.remove(memBlock);
-        info.setFile(diskFile);
+        memBlockInfo.setDir(diskFile.getParentFile());
         memFile = diskFile;
 
         DataNode.LOG.warn("Updating generation stamp for block " + blockId
-            + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
-        memBlock.setGenerationStamp(diskGS);
-        volumeMap.put(memBlock, info);
+            + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
+        memBlockInfo.setGenerationStamp(diskGS);
       }
 
       // Compare generation stamp
-      if (memBlock.getGenerationStamp() != diskGS) {
-        File memMetaFile = getMetaFile(diskFile, memBlock);
+      if (memBlockInfo.getGenerationStamp() != diskGS) {
+        File memMetaFile = getMetaFile(diskFile, memBlockInfo);
         if (memMetaFile.exists()) {
           if (memMetaFile.compareTo(diskMetaFile) != 0) {
             DataNode.LOG.warn("Metadata file in memory "
@@ -1699,26 +1878,22 @@
           // as the block file, then use the generation stamp from it
           long gs = diskMetaFile != null && diskMetaFile.exists()
               && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
-              : Block.GRANDFATHER_GENERATION_STAMP;
+              : GenerationStamp.GRANDFATHER_GENERATION_STAMP;
 
           DataNode.LOG.warn("Updating generation stamp for block " + blockId
-              + " from " + memBlock.getGenerationStamp() + " to " + gs);
+              + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
 
-          ReplicaInfo info = volumeMap.remove(memBlock);
-          memBlock.setGenerationStamp(gs);
-          volumeMap.put(memBlock, info);
+          memBlockInfo.setGenerationStamp(gs);
         }
       }
 
       // Compare block size
-      if (memBlock.getNumBytes() != memFile.length()) {
+      if (memBlockInfo.getNumBytes() != memFile.length()) {
         // Update the length based on the block file
-        corruptBlock = new Block(memBlock);
+        corruptBlock = new Block(memBlockInfo);
         DataNode.LOG.warn("Updating size of block " + blockId + " from "
-            + memBlock.getNumBytes() + " to " + memFile.length());
-        ReplicaInfo info = volumeMap.remove(memBlock);
-        memBlock.setNumBytes(memFile.length());
-        volumeMap.put(memBlock, info);
+            + memBlockInfo.getNumBytes() + " to " + memFile.length());
+        memBlockInfo.setNumBytes(memFile.length());
       }
     }
 
@@ -1738,18 +1913,172 @@
   }
 
   /**
-   * Get reference to the key in the volumeMap. To be called from methods that
-   * are synchronized on {@link FSDataset}
-   * @param blockId
-   * @return key from the volumeMap
+   * @deprecated use {@link #fetchReplicaInfo(long)} instead.
    */
-  Block getBlockKey(long blockId) {
+  @Override
+  @Deprecated
+  public ReplicaInfo getReplica(long blockId) {
     assert(Thread.holdsLock(this));
-    for (Block b : volumeMap.keySet()) {
-      if (b.getBlockId() == blockId) {
-        return b;
+    return volumeMap.get(blockId);
+  }
+
+  @Override // FSDatasetInterface
+  public synchronized ReplicaRecoveryInfo initReplicaRecovery(
+      RecoveringBlock rBlock) throws IOException {
+    return initReplicaRecovery(
+        volumeMap, rBlock.getBlock(), rBlock.getNewGenerationStamp());
+  }
+
+  /** static version of {@link #initReplicaRecovery(Block, long)}. */
+  static ReplicaRecoveryInfo initReplicaRecovery(
+      ReplicasMap map, Block block, long recoveryId) throws IOException {
+    final ReplicaInfo replica = map.get(block.getBlockId());
+    DataNode.LOG.info("initReplicaRecovery: block=" + block
+        + ", recoveryId=" + recoveryId
+        + ", replica=" + replica);
+
+    //check replica
+    if (replica == null) {
+      return null;
+    }
+
+    //stop writer if there is any
+    if (replica instanceof ReplicaInPipeline) {
+      final ReplicaInPipeline rip = (ReplicaInPipeline)replica;
+      rip.stopWriter();
+
+      //check replica bytes on disk.
+      if (rip.getBytesOnDisk() < rip.getVisibleLength()) {
+        throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+            + " getBytesOnDisk() < getVisibleLength(), rip=" + rip);
       }
+
+      //check the replica's files
+      checkReplicaFiles(rip);
     }
-    return null;
+
+    //check generation stamp
+    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+      throw new IOException(
+          "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+          + block + ", replica=" + replica);
+    }
+
+    //check recovery id
+    if (replica.getGenerationStamp() >= recoveryId) {
+      throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+          + " replica.getGenerationStamp() >= recoveryId = " + recoveryId
+          + ", block=" + block + ", replica=" + replica);
+    }
+
+    //check RUR
+    final ReplicaUnderRecovery rur;
+    if (replica.getState() == ReplicaState.RUR) {
+      rur = (ReplicaUnderRecovery)replica;
+      if (rur.getRecoveryID() >= recoveryId) {
+        throw new RecoveryInProgressException(
+            "rur.getRecoveryID() >= recoveryId = " + recoveryId
+            + ", block=" + block + ", rur=" + rur);
+      }
+      final long oldRecoveryID = rur.getRecoveryID();
+      rur.setRecoveryID(recoveryId);
+      DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block
+          + " from " + oldRecoveryID + " to " + recoveryId);
+    }
+    else {
+      rur = new ReplicaUnderRecovery(replica, recoveryId);
+      map.add(rur);
+      DataNode.LOG.info("initReplicaRecovery: changing replica state for "
+          + block + " from " + replica.getState()
+          + " to " + rur.getState());
+    }
+    return rur.createInfo();
+  }
+
+  @Override // FSDatasetInterface
+  public synchronized ReplicaInfo updateReplicaUnderRecovery(
+                                    final Block oldBlock,
+                                    final long recoveryId,
+                                    final long newlength) throws IOException {
+    //get replica
+    final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockId());
+    DataNode.LOG.info("updateReplica: block=" + oldBlock
+        + ", recoveryId=" + recoveryId
+        + ", length=" + newlength
+        + ", replica=" + replica);
+
+    //check replica
+    if (replica == null) {
+      throw new ReplicaNotFoundException(oldBlock);
+    }
+
+    //check replica state
+    if (replica.getState() != ReplicaState.RUR) {
+      throw new IOException("replica.getState() != " + ReplicaState.RUR
+          + ", replica=" + replica);
+    }
+
+    //check replica's byte on disk
+    if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) {
+      throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+          + " replica.getBytesOnDisk() != block.getNumBytes(), block="
+          + oldBlock + ", replica=" + replica);
+    }
+
+    //check replica files before update
+    checkReplicaFiles(replica);
+
+    //update replica
+    final FinalizedReplica finalized = updateReplicaUnderRecovery(
+        (ReplicaUnderRecovery)replica, recoveryId, newlength);
+
+    //check replica files after update
+    checkReplicaFiles(finalized);
+    return finalized;
+  }
+
+  private FinalizedReplica updateReplicaUnderRecovery(
+                                          ReplicaUnderRecovery rur,
+                                          long recoveryId,
+                                          long newlength) throws IOException {
+    //check recovery id
+    if (rur.getRecoveryID() != recoveryId) {
+      throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
+          + ", rur=" + rur);
+    }
+
+    // bump rur's GS to be recovery id
+    bumpReplicaGS(rur, recoveryId);
+
+    //update length
+    final File replicafile = rur.getBlockFile();
+    if (rur.getNumBytes() < newlength) {
+      throw new IOException("rur.getNumBytes() < newlength = " + newlength
+          + ", rur=" + rur);
+    }
+    if (rur.getNumBytes() > newlength) {
+      rur.unlinkBlock(1);
+      truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
+      // update RUR with the new length
+      rur.setNumBytes(newlength);
+   }
+
+    // finalize the block
+    return finalizeReplica(rur);
+  }
+
+  @Override // FSDatasetInterface
+  public synchronized long getReplicaVisibleLength(final Block block)
+  throws IOException {
+    final Replica replica = volumeMap.get(block.getBlockId());
+    if (replica == null) {
+      throw new ReplicaNotFoundException(block);
+    }
+    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+      throw new IOException(
+          "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+          + block + ", replica=" + replica);
+    }
+    return replica.getVisibleLength();
   }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sat Nov 28 20:05:56 2009
@@ -28,7 +28,10 @@
 
 
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -94,6 +97,15 @@
   public long getLength(Block b) throws IOException;
 
   /**
+   * Get reference to the replica meta info in the replicasMap. 
+   * To be called from methods that are synchronized on {@link FSDataset}
+   * @param blockId
+   * @return replica from the replicas map
+   */
+  @Deprecated
+  public Replica getReplica(long blockId);
+
+  /**
    * @return the generation stamp stored with the block.
    */
   public Block getStoredBlock(long blkid) throws IOException;
@@ -144,6 +156,10 @@
         checksumOut = cOut;
       }
       
+      void close() throws IOException {
+        IOUtils.closeStream(dataOut);
+        IOUtils.closeStream(checksumOut);
+      }
     }
 
   /**
@@ -167,19 +183,74 @@
   }
     
   /**
-   * Creates the block and returns output streams to write data and CRC
-   * @param b
-   * @param isRecovery True if this is part of erro recovery, otherwise false
-   * @return a BlockWriteStreams object to allow writing the block data
-   *  and CRC
+   * Creates a temporary replica and returns the meta information of the replica
+   * 
+   * @param b block
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  public ReplicaInPipelineInterface createTemporary(Block b)
+  throws IOException;
+
+  /**
+   * Creates a RBW replica and returns the meta info of the replica
+   * 
+   * @param b block
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  public ReplicaInPipelineInterface createRbw(Block b) throws IOException;
+
+  /**
+   * Recovers a RBW replica and returns the meta info of the replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param minBytesRcvd the minimum number of bytes that the replica could have
+   * @param maxBytesRcvd the maximum number of bytes that the replica could have
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  public ReplicaInPipelineInterface recoverRbw(Block b, 
+      long newGS, long minBytesRcvd, long maxBytesRcvd)
+  throws IOException;
+
+  /**
+   * Append to a finalized replica and returns the meta info of the replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @return the meata info of the replica which is being written to
    * @throws IOException
    */
-  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
+  public ReplicaInPipelineInterface append(Block b, 
+      long newGS, long expectedBlockLen) throws IOException;
 
   /**
-   * Update the block to the new generation stamp and length.  
+   * Recover a failed append to a finalized replica
+   * and returns the meta info of the replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @return the meta info of the replica which is being written to
+   * @throws IOException
+   */
+  public ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException;
+  
+  /**
+   * Recover a failed pipeline close
+   * It bumps the replica's generation stamp and finalize it if RBW replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @throws IOException
    */
-  public void updateBlock(Block oldblock, Block newblock) throws IOException;
+  public void recoverClose(Block b,
+      long newGS, long expectedBlockLen) throws IOException;
   
   /**
    * Finalizes the block previously opened for writing using writeToBlock.
@@ -202,7 +273,7 @@
    * Returns the block report - the full list of blocks stored
    * @return - the block report - the full list of blocks stored
    */
-  public Block[] getBlockReport();
+  public BlockListAsLongs getBlockReport();
 
   /**
    * Is the block valid?
@@ -235,39 +306,41 @@
   public void shutdown();
 
   /**
-   * Returns the current offset in the data stream.
-   * @param b
-   * @param stream The stream to the data file and checksum file
-   * @return the position of the file pointer in the data stream
+   * Sets the file pointer of the checksum stream so that the last checksum
+   * will be overwritten
+   * @param b block
+   * @param stream The stream for the data file and checksum file
+   * @param checksumSize number of bytes each checksum has
    * @throws IOException
    */
-  public long getChannelPosition(Block b, BlockWriteStreams stream) throws IOException;
+  public void adjustCrcChannelPosition(Block b, BlockWriteStreams stream, 
+      int checksumSize) throws IOException;
 
   /**
-   * Sets the file pointer of the data stream and checksum stream to
-   * the specified values.
-   * @param b
-   * @param stream The stream for the data file and checksum file
-   * @param dataOffset The position to which the file pointre for the data stream
-   *        should be set
-   * @param ckOffset The position to which the file pointre for the checksum stream
-   *        should be set
-   * @throws IOException
+   * checks how many valid storage volumes are there in the DataNode
+   * @return true if more then minimum valid volumes left in the FSDataSet
    */
-  public void setChannelPosition(Block b, BlockWriteStreams stream, long dataOffset,
-                                 long ckOffset) throws IOException;
+  public boolean hasEnoughResource();
 
   /**
-   * Validate that the contents in the Block matches
-   * the file on disk. Returns true if everything is fine.
-   * @param b The block to be verified.
-   * @throws IOException
+   * Get visible length of the specified replica.
    */
-  public void validateBlockMetadata(Block b) throws IOException;
+  long getReplicaVisibleLength(final Block block) throws IOException;
 
   /**
-   * checks how many valid storage volumes are there in the DataNode
-   * @return true if more then minimum valid volumes left in the FSDataSet
+   * Initialize a replica recovery.
+   * 
+   * @return actual state of the replica on this data-node or 
+   * null if data-node does not have the replica.
    */
-  public boolean hasEnoughResource();
+  public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+  throws IOException;
+
+  /**
+   * Update replica's generation stamp and length and finalize it.
+   */
+  public ReplicaInfo updateReplicaUnderRecovery(
+                                          Block oldBlock,
+                                          long recoveryId,
+                                          long newLength) throws IOException;
 }