You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 01:39:33 UTC

svn commit: r820497 [3/7] - in /hadoop/hdfs/trunk: ./ .eclipse.templates/.launches/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apach...

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Sep 30 23:39:30 2009
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -26,10 +29,8 @@
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
 import javax.management.NotCompliantMBeanException;
@@ -41,17 +42,23 @@
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.io.IOUtils;
 
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
@@ -119,8 +126,9 @@
         if ( ! metaData.renameTo( newmeta ) ||
             ! src.renameTo( dest ) ) {
           throw new IOException( "could not move files for " + b +
-                                 " from tmp to " + 
-                                 dest.getAbsolutePath() );
+                                 " from " + src + " to " + 
+                                 dest.getAbsolutePath() + " or from"
+                                 + metaData + " to " + newmeta);
         }
         if (DataNode.LOG.isDebugEnabled()) {
           DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
@@ -185,23 +193,48 @@
       return Block.GRANDFATHER_GENERATION_STAMP;
     }
 
-    void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap, FSVolume volume) {
+    void getVolumeMap(ReplicasMap volumeMap, FSVolume volume) 
+    throws IOException {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
           children[i].getVolumeMap(volumeMap, volume);
         }
       }
 
-      File blockFiles[] = dir.listFiles();
-      for (int i = 0; i < blockFiles.length; i++) {
-        if (Block.isBlockFilename(blockFiles[i])) {
-          long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
-          volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp), 
-                        new ReplicaInfo(volume, blockFiles[i]));
+      recoverTempUnlinkedBlock();
+      volume.addToReplicasMap(volumeMap, dir, true);
+    }
+        
+    /**
+     * Recover unlinked tmp files on datanode restart. If the original block
+     * does not exist, then the tmp file is renamed to be the
+     * original file name; otherwise the tmp file is deleted.
+     */
+    private void recoverTempUnlinkedBlock() throws IOException {
+      File files[] = dir.listFiles();
+      for (File file : files) {
+        if (!FSDataset.isUnlinkTmpFile(file)) {
+          continue;
+        }
+        File blockFile = getOrigFile(file);
+        if (blockFile.exists()) {
+          //
+          // If the original block file still exists, then no recovery
+          // is needed.
+          //
+          if (!file.delete()) {
+            throw new IOException("Unable to cleanup unlinked tmp file " +
+                file);
+          }
+        } else {
+          if (!file.renameTo(blockFile)) {
+            throw new IOException("Unable to cleanup detached file " +
+                file);
+          }
         }
       }
     }
-        
+    
     /**
      * check if a data diretory is healthy
      * @throws DiskErrorException
@@ -279,9 +312,9 @@
   }
 
   class FSVolume {
-    private FSDir dataDir;
-    private File tmpDir;
-    private File detachDir; // copy on write for blocks in snapshot
+    private FSDir dataDir;      // directory store Finalized replica
+    private File rbwDir;        // directory store RBW replica
+    private File tmpDir;        // directory store Temporary replica
     private DF usage;
     private DU dfsUsage;
     private long reserved;
@@ -289,13 +322,9 @@
     
     FSVolume(File currentDir, Configuration conf) throws IOException {
       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
-      boolean supportAppends = conf.getBoolean("dfs.support.append", false);
       File parent = currentDir.getParentFile();
-
-      this.detachDir = new File(parent, "detach");
-      if (detachDir.exists()) {
-        recoverDetachedBlocks(currentDir, detachDir);
-      }
+      final File finalizedDir = new File(
+          currentDir, DataStorage.STORAGE_DIR_FINALIZED);
 
       // Files that were being written when the datanode was last shutdown
       // are now moved back to the data directory. It is possible that
@@ -304,23 +333,23 @@
       //
       this.tmpDir = new File(parent, "tmp");
       if (tmpDir.exists()) {
-        if (supportAppends) {
-          recoverDetachedBlocks(currentDir, tmpDir);
-        } else {
-          FileUtil.fullyDelete(tmpDir);
+        FileUtil.fullyDelete(tmpDir);
+      }
+      this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
+      if (rbwDir.exists() && !supportAppends) {
+        FileUtil.fullyDelete(rbwDir);
+      }
+      this.dataDir = new FSDir(finalizedDir);
+      if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
+        if (!rbwDir.isDirectory()) {
+          throw new IOException("Mkdirs failed to create " + rbwDir.toString());
         }
       }
-      this.dataDir = new FSDir(currentDir);
       if (!tmpDir.mkdirs()) {
         if (!tmpDir.isDirectory()) {
           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
         }
       }
-      if (!detachDir.mkdirs()) {
-        if (!detachDir.isDirectory()) {
-          throw new IOException("Mkdirs failed to create " + detachDir.toString());
-        }
-      }
       this.usage = new DF(parent, conf);
       this.dfsUsage = new DU(parent, conf);
       this.dfsUsage.start();
@@ -360,51 +389,23 @@
     }
     
     /**
-     * Temporary files. They get moved to the real block directory either when
-     * the block is finalized or the datanode restarts.
+     * Temporary files. They get moved to the finalized block directory when
+     * the block is finalized.
      */
     File createTmpFile(Block b) throws IOException {
       File f = new File(tmpDir, b.getBlockName());
-      return createTmpFile(b, f);
-    }
-
-    /**
-     * Returns the name of the temporary file for this block.
-     */
-    File getTmpFile(Block b) throws IOException {
-      File f = new File(tmpDir, b.getBlockName());
-      return f;
+      return FSDataset.createTmpFile(b, f);
     }
 
     /**
-     * Files used for copy-on-write. They need recovery when datanode
-     * restarts.
+     * RBW files. They get moved to the finalized block directory when
+     * the block is finalized.
      */
-    File createDetachFile(Block b, String filename) throws IOException {
-      File f = new File(detachDir, filename);
-      return createTmpFile(b, f);
+    File createRbwFile(Block b) throws IOException {
+      File f = new File(rbwDir, b.getBlockName());
+      return FSDataset.createTmpFile(b, f);
     }
 
-    private File createTmpFile(Block b, File f) throws IOException {
-      if (f.exists()) {
-        throw new IOException("Unexpected problem in creating temporary file for "+
-                              b + ".  File " + f + " should not be present, but is.");
-      }
-      // Create the zero-length temp file
-      //
-      boolean fileCreated = false;
-      try {
-        fileCreated = f.createNewFile();
-      } catch (IOException ioe) {
-        throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
-      }
-      if (!fileCreated) {
-        throw new IOException("Unexpected problem in creating temporary file for "+
-                              b + ".  File " + f + " should be creatable, but is already present.");
-      }
-      return f;
-    }
-      
     File addBlock(Block b, File f) throws IOException {
       File blockFile = dataDir.addBlock(b, f);
       File metaFile = getMetaFile( blockFile , b);
@@ -415,55 +416,126 @@
     void checkDirs() throws DiskErrorException {
       dataDir.checkDirTree();
       DiskChecker.checkDir(tmpDir);
+      DiskChecker.checkDir(rbwDir);
     }
       
-    void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+    void getVolumeMap(ReplicasMap volumeMap) throws IOException {
+      // add finalized replicas
       dataDir.getVolumeMap(volumeMap, this);
-    }
-      
-    void clearPath(File f) {
-      dataDir.clearPath(f);
-    }
-      
-    public String toString() {
-      return dataDir.dir.getAbsolutePath();
+      // add rbw replicas
+      addToReplicasMap(volumeMap, rbwDir, false);
     }
 
     /**
-     * Recover detached files on datanode restart. If a detached block
-     * does not exist in the original directory, then it is moved to the
-     * original directory.
+     * Add replicas under the given directory to the volume map
+     * @param volumeMap the replicas map
+     * @param dir an input directory
+     * @param isFinalized true if the directory has finalized replicas;
+     *                    false if the directory has rbw replicas
      */
-    private void recoverDetachedBlocks(File dataDir, File dir) 
-                                           throws IOException {
-      File contents[] = dir.listFiles();
-      if (contents == null) {
-        return;
-      }
-      for (int i = 0; i < contents.length; i++) {
-        if (!contents[i].isFile()) {
-          throw new IOException ("Found " + contents[i] + " in " + dir +
-                                 " but it is not a file.");
+    private void addToReplicasMap(ReplicasMap volumeMap, 
+        File dir, boolean isFinalized) {
+      File blockFiles[] = dir.listFiles();
+      for (File blockFile : blockFiles) {
+        if (!Block.isBlockFilename(blockFile))
+          continue;
+        
+        long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
+        long blockId = Block.filename2id(blockFile.getName());
+        ReplicaInfo newReplica = null;
+        if (isFinalized) {
+          newReplica = new FinalizedReplica(blockId, 
+              blockFile.length(), genStamp, this, blockFile.getParentFile());
+        } else {
+          newReplica = new ReplicaWaitingToBeRecovered(blockId,
+              validateIntegrity(blockFile, genStamp), 
+              genStamp, this, blockFile.getParentFile());
         }
 
-        //
-        // If the original block file still exists, then no recovery
-        // is needed.
-        //
-        File blk = new File(dataDir, contents[i].getName());
-        if (!blk.exists()) {
-          if (!contents[i].renameTo(blk)) {
-            throw new IOException("Unable to recover detached file " +
-                                  contents[i]);
-          }
-          continue;
+        ReplicaInfo oldReplica = volumeMap.add(newReplica);
+        if (oldReplica != null) {
+          DataNode.LOG.warn("Two block files with the same block id exist " +
+              "on disk: " + oldReplica.getBlockFile() +
+              " and " + blockFile );
         }
-        if (!contents[i].delete()) {
-            throw new IOException("Unable to cleanup detached file " +
-                                  contents[i]);
+      }
+    }
+    
+    /**
+     * Find out the number of bytes in the block that match its crc.
+     * 
+     * This algorithm assumes that data corruption caused by unexpected 
+     * datanode shutdown occurs only in the last crc chunk. So it checks
+     * only the last chunk.
+     * 
+     * @param blockFile the block file
+     * @param genStamp generation stamp of the block
+     * @return the number of valid bytes
+     */
+    private long validateIntegrity(File blockFile, long genStamp) {
+      DataInputStream checksumIn = null;
+      InputStream blockIn = null;
+      try {
+        File metaFile = new File(getMetaFileName(blockFile.toString(), genStamp));
+        long blockFileLen = blockFile.length();
+        long metaFileLen = metaFile.length();
+        int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
+        if (!blockFile.exists() || blockFileLen == 0 ||
+            !metaFile.exists() || metaFileLen < (long)crcHeaderLen) {
+          return 0;
+        }
+        checksumIn = new DataInputStream(
+            new BufferedInputStream(new FileInputStream(metaFile),
+                BUFFER_SIZE));
+
+        // read and handle the common header here. For now just a version
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+        short version = header.getVersion();
+        if (version != FSDataset.METADATA_VERSION) {
+          DataNode.LOG.warn("Wrong version (" + version + ") for metadata file "
+              + metaFile + " ignoring ...");
+        }
+        DataChecksum checksum = header.getChecksum();
+        int bytesPerChecksum = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+        long numChunks = Math.min(
+            (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, 
+            (metaFileLen - crcHeaderLen)/checksumSize);
+        if (numChunks == 0) {
+          return 0;
+        }
+        IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
+        blockIn = new FileInputStream(blockFile);
+        long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
+        IOUtils.skipFully(blockIn, lastChunkStartPos);
+        int lastChunkSize = (int)Math.min(
+            bytesPerChecksum, blockFileLen-lastChunkStartPos);
+        byte[] buf = new byte[lastChunkSize+checksumSize];
+        checksumIn.readFully(buf, lastChunkSize, checksumSize);
+        IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
+
+        checksum.update(buf, 0, lastChunkSize);
+        if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
+          return lastChunkStartPos + lastChunkSize;
+        } else { // last chunck is corrupt
+          return lastChunkStartPos;
         }
+      } catch (IOException e) {
+        DataNode.LOG.warn(e);
+        return 0;
+      } finally {
+        IOUtils.closeStream(checksumIn);
+        IOUtils.closeStream(blockIn);
       }
     }
+      
+    void clearPath(File f) {
+      dataDir.clearPath(f);
+    }
+      
+    public String toString() {
+      return getDir().getAbsolutePath();
+    }
   }
     
   static class FSVolumeSet {
@@ -526,7 +598,7 @@
       return remaining;
     }
       
-    synchronized void getVolumeMap(HashMap<Block, ReplicaInfo> volumeMap) {
+    synchronized void getVolumeMap(ReplicasMap volumeMap) throws IOException {
       for (int idx = 0; idx < volumes.length; idx++) {
         volumes[idx].getVolumeMap(volumeMap);
       }
@@ -603,25 +675,22 @@
   //Find better place?
   public static final String METADATA_EXTENSION = ".meta";
   public static final short METADATA_VERSION = 1;
-    
+  static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
 
-  static class ActiveFile {
-    final File file;
-    final List<Thread> threads = new ArrayList<Thread>(2);
-
-    ActiveFile(File f, List<Thread> list) {
-      file = f;
-      if (list != null) {
-        threads.addAll(list);
-      }
-      threads.add(Thread.currentThread());
-    }
-    
-    public String toString() {
-      return getClass().getSimpleName() + "(file=" + file
-          + ", threads=" + threads + ")";
-    }
-  } 
+  private static boolean isUnlinkTmpFile(File f) {
+    String name = f.getName();
+    return name.endsWith(UNLINK_BLOCK_SUFFIX);
+  }
+  
+  static File getUnlinkTmpFile(File f) {
+    return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
+  }
+  
+  private static File getOrigFile(File unlinkTmpFile) {
+    String fileName = unlinkTmpFile.getName();
+    return new File(unlinkTmpFile.getParentFile(),
+        fileName.substring(0, fileName.length()-UNLINK_BLOCK_SUFFIX.length()));
+  }
   
   static String getMetaFileName(String blockFileName, long genStamp) {
     return blockFileName + "_" + genStamp + METADATA_EXTENSION;
@@ -635,6 +704,26 @@
     return getMetaFile(getBlockFile(b), b);
   }
 
+  /** Find the metadata file for the specified block file.
+   * Return the generation stamp from the name of the metafile.
+   */
+  private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+    String blockName = blockFile.getName();
+    for (int j = 0; j < listdir.length; j++) {
+      String path = listdir[j].getName();
+      if (!path.startsWith(blockName)) {
+        continue;
+      }
+      if (blockFile == listdir[j]) {
+        continue;
+      }
+      return Block.getGenerationStamp(listdir[j].getName());
+    }
+    DataNode.LOG.warn("Block " + blockFile + 
+                      " does not have a metafile!");
+    return Block.GRANDFATHER_GENERATION_STAMP;
+  }
+
   /** Find the corresponding meta data file from a given block file */
   private static File findMetaFile(final File blockFile) throws IOException {
     final String prefix = blockFile.getName() + "_";
@@ -672,22 +761,7 @@
 
   /** Return the block file for the given ID */ 
   public File findBlockFile(long blockId) {
-    final Block b = new Block(blockId);
-    File blockfile = null;
-    ActiveFile activefile = ongoingCreates.get(b);
-    if (activefile != null) {
-      blockfile = activefile.file;
-    }
-    if (blockfile == null) {
-      blockfile = getFile(b);
-    }
-    if (blockfile == null) {
-      if (DataNode.LOG.isDebugEnabled()) {
-        DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
-        DataNode.LOG.debug("volumeMap=" + volumeMap);
-      }
-    }
-    return blockfile;
+    return getFile(blockId);
   }
 
   /** {@inheritDoc} */
@@ -717,26 +791,47 @@
                                                     checksumFile.length());
   }
 
+  static File createTmpFile(Block b, File f) throws IOException {
+    if (f.exists()) {
+      throw new IOException("Unexpected problem in creating temporary file for "+
+                            b + ".  File " + f + " should not be present, but is.");
+    }
+    // Create the zero-length temp file
+    //
+    boolean fileCreated = false;
+    try {
+      fileCreated = f.createNewFile();
+    } catch (IOException ioe) {
+      throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
+    }
+    if (!fileCreated) {
+      throw new IOException("Unexpected problem in creating temporary file for "+
+                            b + ".  File " + f + " should be creatable, but is already present.");
+    }
+    return f;
+  }
+    
   FSVolumeSet volumes;
-  private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
-  HashMap<Block,ReplicaInfo> volumeMap = null;
+  ReplicasMap volumeMap = new ReplicasMap();
   static  Random random = new Random();
 
   // Used for synchronizing access to usage stats
   private Object statsLock = new Object();
 
+  boolean supportAppends = false;
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
   public FSDataset(DataStorage storage, Configuration conf) throws IOException {
     this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
+    this.supportAppends = conf.getBoolean("dfs.support.append", false);
     FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
     }
     volumes = new FSVolumeSet(volArray);
-    volumeMap = new HashMap<Block, ReplicaInfo>();
     volumes.getVolumeMap(volumeMap);
     registerMBean(storage.getStorageID());
   }
@@ -811,22 +906,33 @@
   }
 
   /**
-   * Returns handles to the block file and its metadata file
+   * Get the meta info of a block stored in volumeMap
+   * @param b block
+   * @return the meta replica information
+   * @throws IOException if no entry is in the map or 
+   *                        there is a generation stamp mismatch
    */
-  public synchronized BlockInputStreams getTmpInputStreams(Block b, 
-                          long blkOffset, long ckoff) throws IOException {
-
+  private ReplicaInfo getReplicaInfo(Block b) throws IOException {
     ReplicaInfo info = volumeMap.get(b);
     if (info == null) {
       throw new IOException("Block " + b + " does not exist in volumeMap.");
     }
-    FSVolume v = info.getVolume();
-    File blockFile = v.getTmpFile(b);
+    return info;
+  }
+  
+  /**
+   * Returns handles to the block file and its metadata file
+   */
+  public synchronized BlockInputStreams getTmpInputStreams(Block b, 
+                          long blkOffset, long ckoff) throws IOException {
+
+    ReplicaInfo info = getReplicaInfo(b);
+    File blockFile = info.getBlockFile();
     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
     if (blkOffset > 0) {
       blockInFile.seek(blkOffset);
     }
-    File metaFile = getMetaFile(blockFile, b);
+    File metaFile = info.getMetaFile();
     RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
     if (ckoff > 0) {
       metaInFile.seek(ckoff);
@@ -835,36 +941,23 @@
                                 new FileInputStream(metaInFile.getFD()));
   }
     
-  private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
-      return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
-          new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
-
-  }
-
   /**
    * Make a copy of the block if this block is linked to an existing
    * snapshot. This ensures that modifying this block does not modify
    * data in any existing snapshots.
    * @param block Block
-   * @param numLinks Detach if the number of links exceed this value
+   * @param numLinks Unlink if the number of links exceed this value
    * @throws IOException
-   * @return - true if the specified block was detached
+   * @return - true if the specified block was unlinked or the block
+   *           is not in any snapshot.
    */
-  public boolean detachBlock(Block block, int numLinks) throws IOException {
+  public boolean unlinkBlock(Block block, int numLinks) throws IOException {
     ReplicaInfo info = null;
 
     synchronized (this) {
-      info = volumeMap.get(block);
-    }
-    return info.detachBlock(block, numLinks);
-  }
-
-  static private <T> void updateBlockMap(Map<Block, T> blockmap,
-      Block oldblock, Block newblock) throws IOException {
-    if (blockmap.containsKey(oldblock)) {
-      T value = blockmap.remove(oldblock);
-      blockmap.put(newblock, value);
+      info = getReplicaInfo(block);
     }
+   return info.unlinkBlock(numLinks);
   }
 
   /** {@inheritDoc} */
@@ -874,60 +967,20 @@
           + ") to newblock (=" + newblock + ").");
     }
     
-    for(;;) {
-      final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
-      if (threads == null) {
-        return;
-      }
-
-      // interrupt and wait for all ongoing create threads
-      for(Thread t : threads) {
-        t.interrupt();
-      }
-      for(Thread t : threads) {
-        try {
-          t.join();
-        } catch (InterruptedException e) {
-          DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
-        }
-      }
+    final ReplicaInfo replicaInfo = volumeMap.get(oldblock.getBlockId());
+    File blockFile = replicaInfo==null?null:replicaInfo.getBlockFile();
+    if (blockFile == null) {
+      throw new IOException("Block " + oldblock + " does not exist.");
     }
-  }
-
-  /**
-   * Try to update an old block to a new block.
-   * If there are ongoing create threads running for the old block,
-   * the threads will be returned without updating the block. 
-   * 
-   * @return ongoing create threads if there is any. Otherwise, return null.
-   */
-  private synchronized List<Thread> tryUpdateBlock(
-      Block oldblock, Block newblock) throws IOException {
-    //check ongoing create threads
-    final ActiveFile activefile = ongoingCreates.get(oldblock);
-    if (activefile != null && !activefile.threads.isEmpty()) {
-      //remove dead threads
-      for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
-        final Thread t = i.next();
-        if (!t.isAlive()) {
-          i.remove();
-        }
-      }
 
-      //return living threads
-      if (!activefile.threads.isEmpty()) {
-        return new ArrayList<Thread>(activefile.threads);
-      }
+    //check write threads
+    if (replicaInfo instanceof ReplicaInPipeline) {
+      ((ReplicaInPipeline)replicaInfo).stopWriter();
     }
 
     //No ongoing create threads is alive.  Update block.
-    File blockFile = findBlockFile(oldblock.getBlockId());
-    if (blockFile == null) {
-      throw new IOException("Block " + oldblock + " does not exist.");
-    }
-
-    File oldMetaFile = findMetaFile(blockFile);
-    long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
+    File oldMetaFile = replicaInfo.getMetaFile();
+    long oldgs = replicaInfo.getGenerationStamp();
     
     //rename meta file to a tmp file
     File tmpMetaFile = new File(oldMetaFile.getParent(),
@@ -937,7 +990,7 @@
     }
 
     //update generation stamp
-    if (oldgs > newblock.getGenerationStamp()) {
+    if (oldgs >= newblock.getGenerationStamp()) {
       throw new IOException("Cannot update block (id=" + newblock.getBlockId()
           + ") generation stamp from " + oldgs
           + " to " + newblock.getGenerationStamp());
@@ -952,23 +1005,28 @@
       truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
     }
 
+    // update replicaInfo
+    replicaInfo.setGenerationStamp(newblock.getGenerationStamp());
+    replicaInfo.setNumBytes(newblock.getNumBytes());
+    
     //rename the tmp file to the new meta file (with new generation stamp)
-    File newMetaFile = getMetaFile(blockFile, newblock);
+    File newMetaFile = replicaInfo.getMetaFile();
     if (!tmpMetaFile.renameTo(newMetaFile)) {
       throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
     }
 
-    updateBlockMap(ongoingCreates, oldblock, newblock);
-    updateBlockMap(volumeMap, oldblock, newblock);
-
     // paranoia! verify that the contents of the stored block 
     // matches the block file on disk.
     validateBlockMetadata(newblock);
-    return null;
   }
 
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
+    DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
+        + ", metaFile=" + metaFile
+        + ", oldlen=" + oldlen
+        + ", newlen=" + newlen);
+
     if (newlen == oldlen) {
       return;
     }
@@ -1027,125 +1085,290 @@
     }
   }
 
+  @Override  // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface append(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
+    // If the block was successfully finalized because all packets
+    // were successfully processed at the Datanode but the ack for
+    // some of the packets were not received by the client. The client 
+    // re-opens the connection and retries sending those packets.
+    // The other reason is that an "append" is occurring to this block.
+    
+    // check the validity of the parameter
+    if (newGS < b.getGenerationStamp()) {
+      throw new IOException("The new generation stamp " + newGS + 
+          " should be greater than the replica " + b + "'s generation stamp");
+    }
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }  
+    DataNode.LOG.info("Appending to replica " + replicaInfo);
+    if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
+    }
+    if (replicaInfo.getNumBytes() != expectedBlockLen) {
+      throw new IOException("Corrupted replica " + replicaInfo + 
+          " with a length of " + replicaInfo.getNumBytes() + 
+          " expected length is " + expectedBlockLen);
+    }
+
+    return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+  }
+  
+  /** Append to a finalized replica
+   * Change a finalized replica to be a RBW replica and 
+   * bump its generation stamp to be the newGS
+   * 
+   * @param replicaInfo a finalized replica
+   * @param newGS new generation stamp
+   * @param estimateBlockLen estimate generation stamp
+   * @return a RBW replica
+   * @throws IOException if moving the replica from finalized directory 
+   *         to rbw directory fails
+   */
+  private synchronized ReplicaBeingWritten append(FinalizedReplica replicaInfo, 
+      long newGS, long estimateBlockLen) throws IOException {
+    // unlink the finalized replica
+    replicaInfo.unlinkBlock(1);
+    
+    // construct a RBW replica with the new GS
+    File blkfile = replicaInfo.getBlockFile();
+    FSVolume v = volumes.getNextVolume(estimateBlockLen);
+    File newBlkFile = v.createRbwFile(replicaInfo);
+    File oldmeta = replicaInfo.getMetaFile();
+    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+        replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS,
+        v, newBlkFile.getParentFile(), Thread.currentThread());
+    File newmeta = newReplicaInfo.getMetaFile();
+
+    // rename meta file to rbw directory
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    if (!oldmeta.renameTo(newmeta)) {
+      throw new IOException("Block " + replicaInfo + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to rbw dir " + newmeta);
+    }
+
+    // rename block file to rbw directory
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
+      DataNode.LOG.debug("Old block file length is " + blkfile.length());
+    }
+    if (!blkfile.renameTo(newBlkFile)) {
+      if (!newmeta.renameTo(oldmeta)) {  // restore the meta file
+        DataNode.LOG.warn("Cannot move meta file " + newmeta + 
+            "back to the finalized directory " + oldmeta);
+      }
+      throw new IOException("Block " + replicaInfo + " reopen failed. " +
+                              " Unable to move block file " + blkfile +
+                              " to rbw dir " + newBlkFile);
+    }
+    
+    // Replace finalized replica by a RBW replica in replicas map
+    volumeMap.add(newReplicaInfo);
+    
+    return newReplicaInfo;
+  }
+
+  private ReplicaInfo recoverCheck(Block b, long newGS, 
+      long expectedBlockLen) throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    
+    // check state
+    if (replicaInfo.getState() != ReplicaState.FINALIZED &&
+        replicaInfo.getState() != ReplicaState.RBW) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_AND_NONRBW_REPLICA + replicaInfo);
+    }
+
+    // check generation stamp
+    long replicaGenerationStamp = replicaInfo.getGenerationStamp();
+    if (replicaGenerationStamp < b.getGenerationStamp() ||
+        replicaGenerationStamp > newGS) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + replicaGenerationStamp
+          + ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
+          newGS + "].");
+    }
+    
+    // stop the previous writer before check a replica's length
+    long replicaLen = replicaInfo.getNumBytes();
+    if (replicaInfo.getState() == ReplicaState.RBW) {
+      ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+      // kill the previous writer
+      rbw.stopWriter();
+      rbw.setWriter(Thread.currentThread());
+      // check length: bytesRcvd, bytesOnDisk, and bytesAcked should be the same
+      if (replicaLen != rbw.getBytesOnDisk() 
+          || replicaLen != rbw.getBytesAcked()) {
+        throw new ReplicaAlreadyExistsException("RBW replica " + replicaInfo + 
+            "bytesRcvd(" + rbw.getNumBytes() + "), bytesOnDisk(" + 
+            rbw.getBytesOnDisk() + "), and bytesAcked(" + rbw.getBytesAcked() +
+            ") are not the same.");
+      }
+    }
+    
+    // check block length
+    if (replicaLen != expectedBlockLen) {
+      throw new IOException("Corrupted replica " + replicaInfo + 
+          " with a length of " + replicaLen + 
+          " expected length is " + expectedBlockLen);
+    }
+    
+    return replicaInfo;
+  }
+  @Override  // FSDatasetInterface
+  public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException {
+    DataNode.LOG.info("Recover failed append to " + b);
+
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+
+    // change the replica's state/gs etc.
+    if (replicaInfo.getState() == ReplicaState.FINALIZED ) {
+      return append((FinalizedReplica)replicaInfo, newGS, b.getNumBytes());
+    } else { //RBW
+      bumpReplicaGS(replicaInfo, newGS);
+      return (ReplicaBeingWritten)replicaInfo;
+    }
+  }
+
+  @Override
+  public void recoverClose(Block b, long newGS,
+      long expectedBlockLen) throws IOException {
+    DataNode.LOG.info("Recover failed close " + b);
+    // check replica's state
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
+    // bump the replica's GS
+    bumpReplicaGS(replicaInfo, newGS);
+    // finalize the replica if RBW
+    if (replicaInfo.getState() == ReplicaState.RBW) {
+      finalizeBlock(replicaInfo);
+    }
+  }
+  
   /**
-   * Start writing to a block file
-   * If isRecovery is true and the block pre-exists, then we kill all
-      volumeMap.put(b, v);
-      volumeMap.put(b, v);
-   * other threads that might be writing to this block, and then reopen the file.
-   */
-  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
-    //
-    // Make sure the block isn't a valid one - we're still creating it!
-    //
-    if (isValidBlock(b)) {
-      if (!isRecovery) {
-        throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
-      }
-      // If the block was successfully finalized because all packets
-      // were successfully processed at the Datanode but the ack for
-      // some of the packets were not received by the client. The client 
-      // re-opens the connection and retries sending those packets.
-      // The other reason is that an "append" is occurring to this block.
-      detachBlock(b, 1);
+   * Bump a replica's generation stamp to a new one.
+   * Its on-disk meta file name is renamed to be the new one too.
+   * 
+   * @param replicaInfo a replica
+   * @param newGS new generation stamp
+   * @throws IOException if rename fails
+   */
+  private void bumpReplicaGS(ReplicaInfo replicaInfo, 
+      long newGS) throws IOException { 
+    long oldGS = replicaInfo.getGenerationStamp();
+    File oldmeta = replicaInfo.getMetaFile();
+    replicaInfo.setGenerationStamp(newGS);
+    File newmeta = replicaInfo.getMetaFile();
+
+    // rename meta file to new GS
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    if (!oldmeta.renameTo(newmeta)) {
+      replicaInfo.setGenerationStamp(oldGS); // restore old GS
+      throw new IOException("Block " + (Block)replicaInfo + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to " + newmeta);
     }
-    long blockSize = b.getNumBytes();
+  }
 
-    //
-    // Serialize access to /tmp, and check if file already there.
-    //
-    File f = null;
-    List<Thread> threads = null;
-    synchronized (this) {
-      //
-      // Is it already in the create process?
-      //
-      ActiveFile activeFile = ongoingCreates.get(b);
-      if (activeFile != null) {
-        f = activeFile.file;
-        threads = activeFile.threads;
-        
-        if (!isRecovery) {
-          throw new BlockAlreadyExistsException("Block " + b +
-                                  " has already been started (though not completed), and thus cannot be created.");
-        } else {
-          for (Thread thread:threads) {
-            thread.interrupt();
-          }
-        }
-        ongoingCreates.remove(b);
-      }
-      FSVolume v = null;
-      if (!isRecovery) {
-        v = volumes.getNextVolume(blockSize);
-        // create temporary file to hold block in the designated volume
-        f = createTmpFile(v, b);
-        volumeMap.put(b, new ReplicaInfo(v));
-      } else if (f != null) {
-        DataNode.LOG.info("Reopen already-open Block for append " + b);
-        // create or reuse temporary file to hold block in the designated volume
-        v = volumeMap.get(b).getVolume();
-        volumeMap.put(b, new ReplicaInfo(v));
-      } else {
-        // reopening block for appending to it.
-        DataNode.LOG.info("Reopen Block for append " + b);
-        v = volumeMap.get(b).getVolume();
-        f = createTmpFile(v, b);
-        File blkfile = getBlockFile(b);
-        File oldmeta = getMetaFile(b);
-        File newmeta = getMetaFile(f, b);
-
-        // rename meta file to tmp directory
-        DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
-        if (!oldmeta.renameTo(newmeta)) {
-          throw new IOException("Block " + b + " reopen failed. " +
-                                " Unable to move meta file  " + oldmeta +
-                                " to tmp dir " + newmeta);
-        }
-
-        // rename block file to tmp directory
-        DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
-        if (!blkfile.renameTo(f)) {
-          if (!f.delete()) {
-            throw new IOException("Block " + b + " reopen failed. " +
-                                  " Unable to remove file " + f);
-          }
-          if (!blkfile.renameTo(f)) {
-            throw new IOException("Block " + b + " reopen failed. " +
-                                  " Unable to move block file " + blkfile +
-                                  " to tmp dir " + f);
-          }
-        }
-        volumeMap.put(b, new ReplicaInfo(v));
-      }
-      if (f == null) {
-        DataNode.LOG.warn("Block " + b + " reopen failed " +
-                          " Unable to locate tmp file.");
-        throw new IOException("Block " + b + " reopen failed " +
-                              " Unable to locate tmp file.");
-      }
-      ongoingCreates.put(b, new ActiveFile(f, threads));
+  @Override
+  public synchronized ReplicaInPipelineInterface createRbw(Block b)
+      throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo != null) {
+      throw new ReplicaAlreadyExistsException("Block " + b +
+      " already exists in state " + replicaInfo.getState() +
+      " and thus cannot be created.");
+    }
+    // create a new block
+    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    // create a rbw file to hold block in the designated volume
+    File f = v.createRbwFile(b);
+    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
+        b.getGenerationStamp(), v, f.getParentFile());
+    volumeMap.add(newReplicaInfo);
+    return newReplicaInfo;
+  }
+  
+  @Override
+  public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+      long newGS, long minBytesRcvd, long maxBytesRcvd)
+      throws IOException {
+    DataNode.LOG.info("Recover the RBW replica " + b);
+
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo == null) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+    }
+    
+    // check the replica's state
+    if (replicaInfo.getState() != ReplicaState.RBW) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.NON_RBW_REPLICA + replicaInfo);
     }
+    ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
+    
+    DataNode.LOG.info("Recovering replica " + rbw);
 
-    try {
-      if (threads != null) {
-        for (Thread thread:threads) {
-          thread.join();
-        }
-      }
-    } catch (InterruptedException e) {
-      throw new IOException("Recovery waiting for thread interrupted.");
+    // Stop the previous writer
+    rbw.stopWriter();
+    rbw.setWriter(Thread.currentThread());
+
+    // check generation stamp
+    long replicaGenerationStamp = rbw.getGenerationStamp();
+    if (replicaGenerationStamp < b.getGenerationStamp() ||
+        replicaGenerationStamp > newGS) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b +
+          ". Expected GS range is [" + b.getGenerationStamp() + ", " + 
+          newGS + "].");
+    }
+    
+    // check replica length
+    if (rbw.getBytesAcked() < minBytesRcvd || rbw.getNumBytes() > maxBytesRcvd){
+      throw new ReplicaNotFoundException("Unmatched length replica " + 
+          replicaInfo + ": BytesAcked = " + rbw.getBytesAcked() + 
+          " BytesRcvd = " + rbw.getNumBytes() + " are not in the range of [" + 
+          minBytesRcvd + ", " + maxBytesRcvd + "].");
     }
 
-    //
-    // Finally, allow a writer to the block file
-    // REMIND - mjc - make this a filter stream that enforces a max
-    // block size, so clients can't go crazy
-    //
-    File metafile = getMetaFile(f, b);
-    DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
-    DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
-    return createBlockWriteStreams( f , metafile);
+    // bump the replica's generation stamp to newGS
+    bumpReplicaGS(rbw, newGS);
+    
+    return rbw;
+  }
+  
+  @Override
+  public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+      throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockId());
+    if (replicaInfo != null) {
+      throw new ReplicaAlreadyExistsException("Block " + b +
+          " already exists in state " + replicaInfo.getState() +
+          " and thus cannot be created.");
+    }
+    
+    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    // create a temporary file to hold block in the designated volume
+    File f = v.createTmpFile(b);
+    ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
+        b.getGenerationStamp(), v, f.getParentFile());
+    volumeMap.add(newReplicaInfo);
+    
+    return newReplicaInfo;
   }
 
   /**
@@ -1167,8 +1390,7 @@
                                  throws IOException {
     long size = 0;
     synchronized (this) {
-      FSVolume vol = volumeMap.get(b).getVolume();
-      size = vol.getTmpFile(b).length();
+      size = getReplicaInfo(b).getBlockFile().length();
     }
     if (size < dataOffset) {
       String msg = "Trying to change block file offset of block " + b +
@@ -1185,7 +1407,7 @@
 
   synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
     if ( vol == null ) {
-      vol = volumeMap.get( blk ).getVolume();
+      vol = getReplicaInfo( blk ).getVolume();
       if ( vol == null ) {
         throw new IOException("Could not find volume for block " + blk);
       }
@@ -1205,40 +1427,52 @@
    * Complete the block write!
    */
   public synchronized void finalizeBlock(Block b) throws IOException {
-    ActiveFile activeFile = ongoingCreates.get(b);
-    if (activeFile == null) {
-      throw new IOException("Block " + b + " is already finalized.");
-    }
-    File f = activeFile.file;
-    if (f == null || !f.exists()) {
-      throw new IOException("No temporary file " + f + " for block " + b);
-    }
-    FSVolume v = volumeMap.get(b).getVolume();
-    if (v == null) {
-      throw new IOException("No volume for temporary file " + f + 
-                            " for block " + b);
+    ReplicaInfo replicaInfo = getReplicaInfo(b);
+    if (replicaInfo.getState() == ReplicaState.FINALIZED) {
+      // this is legal, when recovery happens on a file that has
+      // been opened for append but never modified
+      return;
     }
-        
-    File dest = null;
-    dest = v.addBlock(b, f);
-    volumeMap.put(b, new ReplicaInfo(v, dest));
-    ongoingCreates.remove(b);
+    finalizeReplica(replicaInfo);
+  }
+  
+  private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
+  throws IOException {
+    FinalizedReplica newReplicaInfo = null;
+    if (replicaInfo.getState() == ReplicaState.RUR &&
+       ((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() == 
+         ReplicaState.FINALIZED) {
+      newReplicaInfo = (FinalizedReplica)
+             ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
+    } else {
+      FSVolume v = replicaInfo.getVolume();
+      File f = replicaInfo.getBlockFile();
+      if (v == null) {
+        throw new IOException("No volume for temporary file " + f + 
+            " for block " + replicaInfo);
+      }
+
+      File dest = v.addBlock(replicaInfo, f);
+      newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+    }
+    volumeMap.add(newReplicaInfo);
+    return newReplicaInfo;
   }
 
   /**
    * Remove the temporary block file (if any)
    */
   public synchronized void unfinalizeBlock(Block b) throws IOException {
-    // remove the block from in-memory data structure
-    ActiveFile activefile = ongoingCreates.remove(b);
-    if (activefile == null) {
-      return;
-    }
-    volumeMap.remove(b);
-    
-    // delete the on-disk temp file
-    if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
-      DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
+      // remove from volumeMap
+      volumeMap.remove(b);
+      
+      // delete the on-disk temp file
+      if (delBlockFromDisk(replicaInfo.getBlockFile(), 
+          replicaInfo.getMetaFile(), b)) {
+        DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+      }
     }
   }
 
@@ -1269,18 +1503,34 @@
   }
 
   /**
-   * Return finalized blocks from the in-memory blockmap
+   * Generates a block report from the in-memory block map.
    */
-  public Block[] getBlockReport() {
-    ArrayList<Block> list =  new ArrayList<Block>(volumeMap.size());
+  public BlockListAsLongs getBlockReport() {
+    ArrayList<ReplicaInfo> finalized =
+      new ArrayList<ReplicaInfo>(volumeMap.size());
+    ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
     synchronized(this) {
-      for (Block b : volumeMap.keySet()) {
-        if (!ongoingCreates.containsKey(b)) {
-          list.add(new Block(b));
+      for (ReplicaInfo b : volumeMap.replicas()) {
+        switch(b.getState()) {
+        case FINALIZED:
+          finalized.add(b);
+          break;
+        case RBW:
+        case RWR:
+          uc.add(b);
+          break;
+        case RUR:
+          ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
+          uc.add(rur.getOriginalReplica());
+          break;
+        case TEMPORARY:
+          break;
+        default:
+          assert false : "Illegal ReplicaInfo state.";
         }
       }
+      return new BlockListAsLongs(finalized, uc);
     }
-    return list.toArray(new Block[list.size()]);
   }
 
   /**
@@ -1290,7 +1540,7 @@
    * is needed to handle concurrent modification to the block.
    */
   synchronized Block[] getBlockList(boolean deepcopy) {
-    Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
+    Block[] list = volumeMap.replicas().toArray(new Block[volumeMap.size()]);
     if (deepcopy) {
       for (int i = 0; i < list.length; i++) {
         list[i] = new Block(list[i]);
@@ -1300,17 +1550,29 @@
   }
 
   /**
+   * Get the list of finalized blocks from in-memory blockmap.
+   */
+  synchronized List<Block> getFinalizedBlocks() {
+    ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size());
+    for (ReplicaInfo b : volumeMap.replicas()) {
+      if(b.getState() == ReplicaState.FINALIZED) {
+        finalized.add(new Block(b));
+      }
+    }
+    return finalized;
+  }
+
+  /**
    * Check whether the given block is a valid one.
+   * valid means finalized
    */
   public boolean isValidBlock(Block b) {
-    File f = null;;
-    try {
-      f = validateBlockFile(b);
-    } catch(IOException e) {
-      DataNode.LOG.warn("Block " + b + " is not valid:",e);
+    ReplicaInfo replicaInfo = volumeMap.get(b);
+    if (replicaInfo == null || 
+        replicaInfo.getState() != ReplicaState.FINALIZED) {
+      return false;
     }
-    
-    return f != null;
+    return replicaInfo.getBlockFile().exists();
   }
 
   /**
@@ -1337,49 +1599,25 @@
 
   /** {@inheritDoc} */
   public void validateBlockMetadata(Block b) throws IOException {
-    ReplicaInfo info = volumeMap.get(b);
-    if (info == null) {
-      throw new IOException("Block " + b + " does not exist in volumeMap.");
-    }
-    FSVolume v = info.getVolume();
-    File tmp = v.getTmpFile(b);
-    File f = getFile(b);
-    if (f == null) {
-      f = tmp;
+    checkReplicaFiles(getReplicaInfo(b));
+  }
+
+  /** Check the files of a replica. */
+  static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
+    final File f = r.getBlockFile();
+    if (!f.exists()) {
+      throw new FileNotFoundException("File " + f + " not found, r=" + r);
     }
-    if (f == null) {
-      throw new IOException("Block " + b + " does not exist on disk.");
+    if (r.getNumBytes() != f.length()) {
+      throw new IOException("File length mismatched."
+          + f + " length is " + f.length() + " but r=" + r);
+    }
+    final File metafile = getMetaFile(f, r);
+    if (!metafile.exists()) {
+      throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
     }
-    if (!f.exists()) {
-      throw new IOException("Block " + b + 
-                            " block file " + f +
-                            " does not exist on disk.");
-    }
-    if (b.getNumBytes() != f.length()) {
-      throw new IOException("Block " + b + 
-                            " length is " + b.getNumBytes()  +
-                            " does not match block file length " +
-                            f.length());
-    }
-    File meta = getMetaFile(f, b);
-    if (meta == null) {
-      throw new IOException("Block " + b + 
-                            " metafile does not exist.");
-    }
-    if (!meta.exists()) {
-      throw new IOException("Block " + b + 
-                            " metafile " + meta +
-                            " does not exist on disk.");
-    }
-    if (meta.length() == 0) {
-      throw new IOException("Block " + b + " metafile " + meta + " is empty.");
-    }
-    long stamp = parseGenerationStamp(f, meta);
-    if (stamp != b.getGenerationStamp()) {
-      throw new IOException("Block " + b + 
-                            " genstamp is " + b.getGenerationStamp()  +
-                            " does not match meta file stamp " +
-                            stamp);
+    if (metafile.length() == 0) {
+      throw new IOException("Metafile " + metafile + " is empty, r=" + r);
     }
   }
 
@@ -1396,7 +1634,8 @@
       synchronized (this) {
         f = getFile(invalidBlks[i]);
         ReplicaInfo dinfo = volumeMap.get(invalidBlks[i]);
-        if (dinfo == null) {
+        if (dinfo == null || 
+            dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
           DataNode.LOG.warn("Unexpected error trying to delete block "
                            + invalidBlks[i] + 
                            ". BlockInfo not found in volumeMap.");
@@ -1428,7 +1667,13 @@
           error = true;
           continue;
         }
-        v.clearPath(parent);
+        ReplicaState replicaState = dinfo.getState();
+        if (replicaState == ReplicaState.FINALIZED || 
+            (replicaState == ReplicaState.RUR && 
+                ((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() == 
+                  ReplicaState.FINALIZED)) {
+          v.clearPath(parent);
+        }
         volumeMap.remove(invalidBlks[i]);
       }
       File metaFile = getMetaFile( f, invalidBlks[i] );
@@ -1455,16 +1700,24 @@
   }
 
   /**
-   * Turn the block identifier into a filename.
+   * Turn the block identifier into a filename; ignore generation stamp!!!
    */
   public synchronized File getFile(Block b) {
-    ReplicaInfo info = volumeMap.get(b);
+    return getFile(b.getBlockId());
+  }
+
+  /**
+   * Turn the block identifier into a filename
+   * @param blockId a block's id
+   * @return on disk data file path; null if the replica does not exist
+   */
+  private File getFile(long blockId) {
+    ReplicaInfo info = volumeMap.get(blockId);
     if (info != null) {
-      return info.getFile();
+      return info.getBlockFile();
     }
-    return null;
+    return null;    
   }
-
   /**
    * check if a data directory is healthy
    * if some volumes failed - make sure to remove all the blocks that belong
@@ -1483,12 +1736,12 @@
     // remove related blocks
     long mlsec = System.currentTimeMillis();
     synchronized (this) {
-      Iterator<Block> ib = volumeMap.keySet().iterator();
+      Iterator<ReplicaInfo> ib = volumeMap.replicas().iterator();
       while(ib.hasNext()) {
-        Block b = ib.next();
+        ReplicaInfo b = ib.next();
         total_blocks ++;
         // check if the volume block belongs to still valid
-        FSVolume vol = volumeMap.get(b).getVolume();
+        FSVolume vol = b.getVolume();
         for(FSVolume fv: failed_vols) {
           if(vol == fv) {
             DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol " 
@@ -1589,11 +1842,12 @@
    */
   public void checkAndUpdate(long blockId, File diskFile,
       File diskMetaFile, FSVolume vol) {
-    Block block = new Block(blockId);
     DataNode datanode = DataNode.getDataNode();
     Block corruptBlock = null;
+    ReplicaInfo memBlockInfo;
     synchronized (this) {
-      if (ongoingCreates.get(block) != null) {
+      memBlockInfo = volumeMap.get(blockId);
+      if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) {
         // Block is not finalized - ignore the difference
         return;
       }
@@ -1602,7 +1856,6 @@
           Block.getGenerationStamp(diskMetaFile.getName()) :
             Block.GRANDFATHER_GENERATION_STAMP;
 
-      ReplicaInfo memBlockInfo = volumeMap.get(block);
       if (diskFile == null || !diskFile.exists()) {
         if (memBlockInfo == null) {
           // Block file does not exist and block does not exist in memory
@@ -1614,14 +1867,14 @@
           }
           return;
         }
-        if (!memBlockInfo.getFile().exists()) {
+        if (!memBlockInfo.getBlockFile().exists()) {
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
-          volumeMap.remove(block);
+          volumeMap.remove(blockId);
           if (datanode.blockScanner != null) {
-            datanode.blockScanner.deleteBlock(block);
+            datanode.blockScanner.deleteBlock(new Block(blockId));
           }
-          DataNode.LOG.warn("Removed block " + block.getBlockId()
+          DataNode.LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
           // Finally remove the metadata file
           if (diskMetaFile != null && diskMetaFile.exists()
@@ -1637,23 +1890,20 @@
        */
       if (memBlockInfo == null) {
         // Block is missing in memory - add the block to volumeMap
-        ReplicaInfo diskBlockInfo = new ReplicaInfo(vol, diskFile);
-        Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
-        volumeMap.put(diskBlock, diskBlockInfo);
+        ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
+            diskFile.length(), diskGS, vol, diskFile.getParentFile());
+        volumeMap.add(diskBlockInfo);
         if (datanode.blockScanner != null) {
-          datanode.blockScanner.addBlock(diskBlock);
+          datanode.blockScanner.addBlock(diskBlockInfo);
         }
-        DataNode.LOG.warn("Added missing block to memory " + diskBlock);
+        DataNode.LOG.warn("Added missing block to memory " + (Block)diskBlockInfo);
         return;
       }
       /*
        * Block exists in volumeMap and the block file exists on the disk
        */
-      // Iterate to get key from volumeMap for the blockId
-      Block memBlock = getBlockKey(blockId);
-
       // Compare block files
-      File memFile = memBlockInfo.getFile();
+      File memFile = memBlockInfo.getBlockFile();
       if (memFile.exists()) {
         if (memFile.compareTo(diskFile) != 0) {
           DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
@@ -1670,19 +1920,17 @@
             + memFile.getAbsolutePath()
             + " does not exist. Updating it to the file found during scan "
             + diskFile.getAbsolutePath());
-        ReplicaInfo info = volumeMap.remove(memBlock);
-        info.setFile(diskFile);
+        memBlockInfo.setDir(diskFile.getParentFile());
         memFile = diskFile;
 
         DataNode.LOG.warn("Updating generation stamp for block " + blockId
-            + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
-        memBlock.setGenerationStamp(diskGS);
-        volumeMap.put(memBlock, info);
+            + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
+        memBlockInfo.setGenerationStamp(diskGS);
       }
 
       // Compare generation stamp
-      if (memBlock.getGenerationStamp() != diskGS) {
-        File memMetaFile = getMetaFile(diskFile, memBlock);
+      if (memBlockInfo.getGenerationStamp() != diskGS) {
+        File memMetaFile = getMetaFile(diskFile, memBlockInfo);
         if (memMetaFile.exists()) {
           if (memMetaFile.compareTo(diskMetaFile) != 0) {
             DataNode.LOG.warn("Metadata file in memory "
@@ -1699,23 +1947,19 @@
               : Block.GRANDFATHER_GENERATION_STAMP;
 
           DataNode.LOG.warn("Updating generation stamp for block " + blockId
-              + " from " + memBlock.getGenerationStamp() + " to " + gs);
+              + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
 
-          ReplicaInfo info = volumeMap.remove(memBlock);
-          memBlock.setGenerationStamp(gs);
-          volumeMap.put(memBlock, info);
+          memBlockInfo.setGenerationStamp(gs);
         }
       }
 
       // Compare block size
-      if (memBlock.getNumBytes() != memFile.length()) {
+      if (memBlockInfo.getNumBytes() != memFile.length()) {
         // Update the length based on the block file
-        corruptBlock = new Block(memBlock);
+        corruptBlock = new Block(memBlockInfo);
         DataNode.LOG.warn("Updating size of block " + blockId + " from "
-            + memBlock.getNumBytes() + " to " + memFile.length());
-        ReplicaInfo info = volumeMap.remove(memBlock);
-        memBlock.setNumBytes(memFile.length());
-        volumeMap.put(memBlock, info);
+            + memBlockInfo.getNumBytes() + " to " + memFile.length());
+        memBlockInfo.setNumBytes(memFile.length());
       }
     }
 
@@ -1734,19 +1978,158 @@
     }
   }
 
-  /**
-   * Get reference to the key in the volumeMap. To be called from methods that
-   * are synchronized on {@link FSDataset}
-   * @param blockId
-   * @return key from the volumeMap
-   */
-  Block getBlockKey(long blockId) {
+  @Override
+  public ReplicaInfo getReplica(long blockId) {
     assert(Thread.holdsLock(this));
-    for (Block b : volumeMap.keySet()) {
-      if (b.getBlockId() == blockId) {
-        return b;
-      }
+    return volumeMap.get(blockId);
+  }
+
+  @Override // FSDatasetInterface
+  public synchronized ReplicaRecoveryInfo initReplicaRecovery(
+      RecoveringBlock rBlock) throws IOException {
+    return initReplicaRecovery(
+        volumeMap, rBlock.getBlock(), rBlock.getNewGenerationStamp());
+  }
+
+  /** static version of {@link #initReplicaRecovery(Block, long)}. */
+  static ReplicaRecoveryInfo initReplicaRecovery(
+      ReplicasMap map, Block block, long recoveryId) throws IOException {
+    final ReplicaInfo replica = map.get(block.getBlockId());
+    DataNode.LOG.info("initReplicaRecovery: block=" + block
+        + ", recoveryId=" + recoveryId
+        + ", replica=" + replica);
+
+    //check replica
+    if (replica == null) {
+      return null;
     }
-    return null;
+
+    //stop writer if there is any
+    if (replica instanceof ReplicaInPipeline) {
+      ((ReplicaInPipeline)replica).stopWriter();
+    }
+
+    //check generation stamp
+    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+      throw new IOException(
+          "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+          + block + ", replica=" + replica);
+    }
+
+    //check recovery id
+    if (replica.getGenerationStamp() >= recoveryId) {
+      throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:"
+          + " replica.getGenerationStamp() >= recoveryId = " + recoveryId
+          + ", block=" + block + ", replica=" + replica);
+    }
+
+    //check RUR
+    final ReplicaUnderRecovery rur;
+    if (replica.getState() == ReplicaState.RUR) {
+      rur = (ReplicaUnderRecovery)replica;
+      if (rur.getRecoveryID() >= recoveryId) {
+        throw new RecoveryInProgressException(
+            "rur.getRecoveryID() >= recoveryId = " + recoveryId
+            + ", block=" + block + ", rur=" + rur);
+      }
+      final long oldRecoveryID = rur.getRecoveryID();
+      rur.setRecoveryID(recoveryId);
+      DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block
+          + " from " + oldRecoveryID + " to " + recoveryId);
+    }
+    else {
+      rur = new ReplicaUnderRecovery(replica, recoveryId);
+      map.add(rur);
+      DataNode.LOG.info("initReplicaRecovery: changing replica state for "
+          + block + " from " + replica.getState()
+          + " to " + rur.getState());
+    }
+    return rur.createInfo();
+  }
+
+  /** Update a replica of a block. */
+  synchronized void updateReplica(final Block block, final long recoveryId,
+      final long newlength) throws IOException {
+    //get replica
+    final ReplicaInfo replica = volumeMap.get(block.getBlockId());
+    DataNode.LOG.info("updateReplica: block=" + block
+        + ", recoveryId=" + recoveryId
+        + ", length=" + newlength
+        + ", replica=" + replica);
+
+    //check replica
+    if (replica == null) {
+      throw new ReplicaNotFoundException(block);
+    }
+
+    //check replica state
+    if (replica.getState() != ReplicaState.RUR) {
+      throw new IOException("replica.getState() != " + ReplicaState.RUR
+          + ", replica=" + replica);
+    }
+
+    //check replica files before update
+    checkReplicaFiles(replica);
+
+    //update replica
+    final ReplicaInfo finalized = (ReplicaInfo)updateReplicaUnderRecovery(
+                                    replica, recoveryId, newlength);
+
+    //check replica files after update
+    checkReplicaFiles(finalized);
+  }
+
+  @Override // FSDatasetInterface
+  public synchronized FinalizedReplica updateReplicaUnderRecovery(
+                                          Block oldBlock,
+                                          long recoveryId,
+                                          long newlength) throws IOException {
+    Replica r = getReplica(oldBlock.getBlockId());
+    if(r.getState() != ReplicaState.RUR)
+      throw new IOException("Replica " + r + " must be under recovery.");
+    ReplicaUnderRecovery rur = (ReplicaUnderRecovery)r;
+    DataNode.LOG.info("updateReplicaUnderRecovery: recoveryId=" + recoveryId
+        + ", newlength=" + newlength
+        + ", rur=" + rur);
+
+    //check recovery id
+    if (rur.getRecoveryID() != recoveryId) {
+      throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
+          + ", rur=" + rur);
+    }
+
+    // bump rur's GS to be recovery id
+    bumpReplicaGS(rur, recoveryId);
+
+    //update length
+    final File replicafile = rur.getBlockFile();
+    if (rur.getNumBytes() < newlength) {
+      throw new IOException("rur.getNumBytes() < newlength = " + newlength
+          + ", rur=" + rur);
+    }
+    if (rur.getNumBytes() > newlength) {
+      rur.unlinkBlock(1);
+      truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
+      // update RUR with the new length
+      rur.setNumBytes(newlength);
+   }
+
+    // finalize the block
+    return finalizeReplica(rur);
+  }
+
+  @Override // FSDatasetInterface
+  public synchronized long getReplicaVisibleLength(final Block block)
+  throws IOException {
+    final Replica replica = getReplica(block.getBlockId());
+    if (replica == null) {
+      throw new ReplicaNotFoundException(block);
+    }
+    if (replica.getGenerationStamp() < block.getGenerationStamp()) {
+      throw new IOException(
+          "replica.getGenerationStamp() < block.getGenerationStamp(), block="
+          + block + ", replica=" + replica);
+    }
+    return replica.getVisibleLength();
   }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Sep 30 23:39:30 2009
@@ -28,7 +28,10 @@
 
 
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
@@ -94,6 +97,14 @@
   public long getLength(Block b) throws IOException;
 
   /**
+   * Get reference to the replica meta info in the replicasMap. 
+   * To be called from methods that are synchronized on {@link FSDataset}
+   * @param blockId
+   * @return replica from the replicas map
+   */
+  public Replica getReplica(long blockId);
+
+  /**
    * @return the generation stamp stored with the block.
    */
   public Block getStoredBlock(long blkid) throws IOException;
@@ -144,6 +155,10 @@
         checksumOut = cOut;
       }
       
+      void close() throws IOException {
+        IOUtils.closeStream(dataOut);
+        IOUtils.closeStream(checksumOut);
+      }
     }
 
   /**
@@ -167,16 +182,76 @@
   }
     
   /**
-   * Creates the block and returns output streams to write data and CRC
-   * @param b
-   * @param isRecovery True if this is part of erro recovery, otherwise false
-   * @return a BlockWriteStreams object to allow writing the block data
-   *  and CRC
+   * Creates a temporary replica and returns the meta information of the replica
+   * 
+   * @param b block
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  public ReplicaInPipelineInterface createTemporary(Block b)
+  throws IOException;
+
+  /**
+   * Creates a RBW replica and returns the meta info of the replica
+   * 
+   * @param b block
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  public ReplicaInPipelineInterface createRbw(Block b) throws IOException;
+
+  /**
+   * Recovers a RBW replica and returns the meta info of the replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param minBytesRcvd the minimum number of bytes that the replica could have
+   * @param maxBytesRcvd the maximum number of bytes that the replica could have
+   * @return the meta info of the replica which is being written to
+   * @throws IOException if an error occurs
+   */
+  public ReplicaInPipelineInterface recoverRbw(Block b, 
+      long newGS, long minBytesRcvd, long maxBytesRcvd)
+  throws IOException;
+
+  /**
+   * Append to a finalized replica and returns the meta info of the replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @return the meata info of the replica which is being written to
    * @throws IOException
    */
-  public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
+  public ReplicaInPipelineInterface append(Block b, 
+      long newGS, long expectedBlockLen) throws IOException;
 
   /**
+   * Recover a failed append to a finalized replica
+   * and returns the meta info of the replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @return the meta info of the replica which is being written to
+   * @throws IOException
+   */
+  public ReplicaInPipelineInterface recoverAppend(Block b,
+      long newGS, long expectedBlockLen) throws IOException;
+  
+  /**
+   * Recover a failed pipeline close
+   * It bumps the replica's generation stamp and finalize it if RBW replica
+   * 
+   * @param b block
+   * @param newGS the new generation stamp for the replica
+   * @param expectedBlockLen the number of bytes the replica is expected to have
+   * @throws IOException
+   */
+  public void recoverClose(Block b,
+      long newGS, long expectedBlockLen) throws IOException;
+  
+  /**
    * Update the block to the new generation stamp and length.  
    */
   public void updateBlock(Block oldblock, Block newblock) throws IOException;
@@ -202,7 +277,7 @@
    * Returns the block report - the full list of blocks stored
    * @return - the block report - the full list of blocks stored
    */
-  public Block[] getBlockReport();
+  public BlockListAsLongs getBlockReport();
 
   /**
    * Is the block valid?
@@ -270,4 +345,25 @@
    * @return true if more then minimum valid volumes left in the FSDataSet
    */
   public boolean hasEnoughResource();
+
+  /**
+   * Get visible length of the specified replica.
+   */
+  long getReplicaVisibleLength(final Block block) throws IOException;
+
+  /**
+   * Initialize a replica recovery.
+   * 
+   * @return actual state of the replica on this data-node or 
+   * null if data-node does not have the replica.
+   */
+  public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+  throws IOException;
+
+  /**
+   * Update replica's generation stamp and length and finalize it.
+   */
+  public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock,
+                                          long recoveryId,
+                                          long newLength) throws IOException;
 }

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * This class describes a replica that has been finalized.
+ */
+class FinalizedReplica extends ReplicaInfo {
+  private boolean unlinked;      // copy-on-write done for block
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  FinalizedReplica(long blockId, long len, long genStamp,
+      FSVolume vol, File dir) {
+    super(blockId, len, genStamp, vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  FinalizedReplica(Block block, FSVolume vol, File dir) {
+    super(block, vol, dir);
+  }
+  
+  @Override  // ReplicaInfo
+  public ReplicaState getState() {
+    return ReplicaState.FINALIZED;
+  }
+  
+  @Override // ReplicaInfo
+  boolean isUnlinked() {
+    return unlinked;
+  }
+
+  @Override  // ReplicaInfo
+  void setUnlinked() {
+    unlinked = true;
+  }
+  
+  @Override
+  public long getVisibleLength() {
+    return getNumBytes();       // all bytes are visible
+  }
+
+  @Override
+  public long getBytesOnDisk() {
+    return getNumBytes();
+  }
+
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+  
+  @Override
+  public String toString() {
+    return super.toString()
+        + "\n  unlinked=" + unlinked;
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+
+/** 
+ * This represents block replicas which stored in DataNode.
+ */
+public interface Replica {
+  /** get block ID  */
+  public long getBlockId();
+
+  /** get generation stamp */
+  public long getGenerationStamp();
+
+  /**
+   * Get the replica state
+   * @return the replica state
+   */
+  public ReplicaState getState();
+
+  /**
+   * Get the number of bytes received
+   * @return the number of bytes that have been received
+   */
+  public long getNumBytes();
+  
+  /**
+   * Get the number of bytes that have written to disk
+   * @return the number of bytes that have written to disk
+   */
+  public long getBytesOnDisk();
+
+  /**
+   * Get the number of bytes that are visible to readers
+   * @return the number of bytes that are visible to readers
+   */
+  public long getVisibleLength();
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that the target block already exists 
+ * and is not set to be recovered/overwritten.  
+ */
+class ReplicaAlreadyExistsException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public ReplicaAlreadyExistsException() {
+    super();
+  }
+
+  public ReplicaAlreadyExistsException(String msg) {
+    super(msg);
+  }
+}

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/** This class represents replicas being written. 
+ * Those are the replicas that
+ * are created in a pipeline initiated by a dfs client.
+ */
+class ReplicaBeingWritten extends ReplicaInPipeline {
+  /**
+   * Constructor for a zero length replica
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+    ReplicaBeingWritten(long blockId, long genStamp, 
+        FSVolume vol, File dir) {
+    super( blockId, genStamp, vol, dir);
+  }
+  
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param writer a thread that is writing to this replica
+   */
+  ReplicaBeingWritten(Block block, 
+      FSVolume vol, File dir, Thread writer) {
+    super( block, vol, dir, writer);
+  }
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param writer a thread that is writing to this replica
+   */
+  ReplicaBeingWritten(long blockId, long len, long genStamp,
+      FSVolume vol, File dir, Thread writer ) {
+    super( blockId, len, genStamp, vol, dir, writer);
+  }
+  
+  @Override
+  public long getVisibleLength() {
+    return getBytesAcked();       // all acked bytes are visible
+  }
+
+  @Override   //ReplicaInfo
+  public ReplicaState getState() {
+    return ReplicaState.RBW;
+  }
+  
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+  
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+}