You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/04/02 19:38:58 UTC

svn commit: r1308437 [2/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ src/main/java/org/apache/hadoop/...

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Apr  2 17:38:56 2012
@@ -15,52 +15,53 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.PrintStream;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.DU;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@@ -72,10 +73,8 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -86,951 +85,18 @@ import org.apache.hadoop.util.Reflection
  *
  ***************************************************/
 @InterfaceAudience.Private
-public class FSDataset implements FsDatasetSpi<FSDataset.FSVolume> {
-  /**
-   * A factory for creating FSDataset objects.
-   */
-  public static class Factory extends FsDatasetSpi.Factory<FSDataset> {
-    @Override
-    public FSDataset newInstance(DataNode datanode,
-        DataStorage storage, Configuration conf) throws IOException {
-      return new FSDataset(datanode, storage, conf);
-    }
-  }
-
-  /**
-   * A node type that can be built into a tree reflecting the
-   * hierarchy of blocks on the local disk.
-   */
-  private static class FSDir {
-    final int maxBlocksPerDir;
-    final File dir;
-    int numBlocks = 0;
-    FSDir children[];
-    int lastChildIdx = 0;
-
-    private FSDir(File dir, int maxBlocksPerDir)
-      throws IOException {
-      this.dir = dir;
-      this.maxBlocksPerDir = maxBlocksPerDir;
-
-      this.children = null;
-      if (!dir.exists()) {
-        if (!dir.mkdirs()) {
-          throw new IOException("Mkdirs failed to create " + 
-                                dir.toString());
-        }
-      } else {
-        File[] files = FileUtil.listFiles(dir); 
-        List<FSDir> dirList = new ArrayList<FSDir>();
-        for (int idx = 0; idx < files.length; idx++) {
-          if (files[idx].isDirectory()) {
-            dirList.add(new FSDir(files[idx], maxBlocksPerDir));
-          } else if (Block.isBlockFilename(files[idx])) {
-            numBlocks++;
-          }
-        }
-        if (dirList.size() > 0) {
-          children = dirList.toArray(new FSDir[dirList.size()]);
-        }
-      }
-    }
-        
-    private File addBlock(Block b, File src) throws IOException {
-      //First try without creating subdirectories
-      File file = addBlock(b, src, false, false);          
-      return (file != null) ? file : addBlock(b, src, true, true);
-    }
-
-    private File addBlock(Block b, File src, boolean createOk, 
-                          boolean resetIdx) throws IOException {
-      if (numBlocks < maxBlocksPerDir) {
-        final File dest = moveBlockFiles(b, src, dir);
-        numBlocks += 1;
-        return dest;
-      }
-            
-      if (lastChildIdx < 0 && resetIdx) {
-        //reset so that all children will be checked
-        lastChildIdx = DFSUtil.getRandom().nextInt(children.length);              
-      }
-            
-      if (lastChildIdx >= 0 && children != null) {
-        //Check if any child-tree has room for a block.
-        for (int i=0; i < children.length; i++) {
-          int idx = (lastChildIdx + i)%children.length;
-          File file = children[idx].addBlock(b, src, false, resetIdx);
-          if (file != null) {
-            lastChildIdx = idx;
-            return file; 
-          }
-        }
-        lastChildIdx = -1;
-      }
-            
-      if (!createOk) {
-        return null;
-      }
-            
-      if (children == null || children.length == 0) {
-        children = new FSDir[maxBlocksPerDir];
-        for (int idx = 0; idx < maxBlocksPerDir; idx++) {
-          final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
-          children[idx] = new FSDir(sub, maxBlocksPerDir);
-        }
-      }
-            
-      //now pick a child randomly for creating a new set of subdirs.
-      lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
-      return children[ lastChildIdx ].addBlock(b, src, true, false); 
-    }
-
-    private void getVolumeMap(String bpid, ReplicasMap volumeMap, FSVolume volume) 
-    throws IOException {
-      if (children != null) {
-        for (int i = 0; i < children.length; i++) {
-          children[i].getVolumeMap(bpid, volumeMap, volume);
-        }
-      }
-
-      recoverTempUnlinkedBlock();
-      volume.addToReplicasMap(bpid, volumeMap, dir, true);
-    }
-        
-    /**
-     * Recover unlinked tmp files on datanode restart. If the original block
-     * does not exist, then the tmp file is renamed to be the
-     * original file name; otherwise the tmp file is deleted.
-     */
-    private void recoverTempUnlinkedBlock() throws IOException {
-      File files[] = FileUtil.listFiles(dir);
-      for (File file : files) {
-        if (!FSDataset.isUnlinkTmpFile(file)) {
-          continue;
-        }
-        File blockFile = getOrigFile(file);
-        if (blockFile.exists()) {
-          //
-          // If the original block file still exists, then no recovery
-          // is needed.
-          //
-          if (!file.delete()) {
-            throw new IOException("Unable to cleanup unlinked tmp file " +
-                file);
-          }
-        } else {
-          if (!file.renameTo(blockFile)) {
-            throw new IOException("Unable to cleanup detached file " +
-                file);
-          }
-        }
-      }
-    }
-    
-    /**
-     * check if a data diretory is healthy
-     * @throws DiskErrorException
-     */
-    private void checkDirTree() throws DiskErrorException {
-      DiskChecker.checkDir(dir);
-            
-      if (children != null) {
-        for (int i = 0; i < children.length; i++) {
-          children[i].checkDirTree();
-        }
-      }
-    }
-        
-    private void clearPath(File f) {
-      String root = dir.getAbsolutePath();
-      String dir = f.getAbsolutePath();
-      if (dir.startsWith(root)) {
-        String[] dirNames = dir.substring(root.length()).
-          split(File.separator + "subdir");
-        if (clearPath(f, dirNames, 1))
-          return;
-      }
-      clearPath(f, null, -1);
-    }
-        
-    /*
-     * dirNames is an array of string integers derived from
-     * usual directory structure data/subdirN/subdirXY/subdirM ...
-     * If dirName array is non-null, we only check the child at 
-     * the children[dirNames[idx]]. This avoids iterating over
-     * children in common case. If directory structure changes 
-     * in later versions, we need to revisit this.
-     */
-    private boolean clearPath(File f, String[] dirNames, int idx) {
-      if ((dirNames == null || idx == dirNames.length) &&
-          dir.compareTo(f) == 0) {
-        numBlocks--;
-        return true;
-      }
-          
-      if (dirNames != null) {
-        //guess the child index from the directory name
-        if (idx > (dirNames.length - 1) || children == null) {
-          return false;
-        }
-        int childIdx; 
-        try {
-          childIdx = Integer.parseInt(dirNames[idx]);
-        } catch (NumberFormatException ignored) {
-          // layout changed? we could print a warning.
-          return false;
-        }
-        return (childIdx >= 0 && childIdx < children.length) ?
-          children[childIdx].clearPath(f, dirNames, idx+1) : false;
-      }
-
-      //guesses failed. back to blind iteration.
-      if (children != null) {
-        for(int i=0; i < children.length; i++) {
-          if (children[i].clearPath(f, null, -1)){
-            return true;
-          }
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public String toString() {
-      return "FSDir{" +
-        "dir=" + dir +
-        ", children=" + (children == null ? null : Arrays.asList(children)) +
-        "}";
-    }
-  }
-
-  /**
-   * A BlockPoolSlice represents a portion of a BlockPool stored on a volume.  
-   * Taken together, all BlockPoolSlices sharing a block pool ID across a 
-   * cluster represent a single block pool.
-   * 
-   * This class is synchronized by {@link FSVolume}.
-   */
-  private static class BlockPoolSlice {
-    private final String bpid;
-    private final FSVolume volume; // volume to which this BlockPool belongs to
-    private final File currentDir; // StorageDirectory/current/bpid/current
-    private final FSDir finalizedDir; // directory store Finalized replica
-    private final File rbwDir; // directory store RBW replica
-    private final File tmpDir; // directory store Temporary replica
-    
-    // TODO:FEDERATION scalability issue - a thread per DU is needed
-    private final DU dfsUsage;
-
-    /**
-     * 
-     * @param bpid Block pool Id
-     * @param volume {@link FSVolume} to which this BlockPool belongs to
-     * @param bpDir directory corresponding to the BlockPool
-     * @param conf
-     * @throws IOException
-     */
-    BlockPoolSlice(String bpid, FSVolume volume, File bpDir, Configuration conf)
-        throws IOException {
-      this.bpid = bpid;
-      this.volume = volume;
-      this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
-      final File finalizedDir = new File(
-          currentDir, DataStorage.STORAGE_DIR_FINALIZED);
-
-      // Files that were being written when the datanode was last shutdown
-      // are now moved back to the data directory. It is possible that
-      // in the future, we might want to do some sort of datanode-local
-      // recovery for these blocks. For example, crc validation.
-      //
-      this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
-      if (tmpDir.exists()) {
-        FileUtil.fullyDelete(tmpDir);
-      }
-      this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
-      final boolean supportAppends = conf.getBoolean(
-          DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
-          DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
-      if (rbwDir.exists() && !supportAppends) {
-        FileUtil.fullyDelete(rbwDir);
-      }
-      final int maxBlocksPerDir = conf.getInt(
-          DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
-          DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
-      this.finalizedDir = new FSDir(finalizedDir, maxBlocksPerDir);
-      if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
-        if (!rbwDir.isDirectory()) {
-          throw new IOException("Mkdirs failed to create " + rbwDir.toString());
-        }
-      }
-      if (!tmpDir.mkdirs()) {
-        if (!tmpDir.isDirectory()) {
-          throw new IOException("Mkdirs failed to create " + tmpDir.toString());
-        }
-      }
-      this.dfsUsage = new DU(bpDir, conf);
-      this.dfsUsage.start();
-    }
-
-    File getDirectory() {
-      return currentDir.getParentFile();
-    }
-
-    File getFinalizedDir() {
-      return finalizedDir.dir;
-    }
-    
-    File getRbwDir() {
-      return rbwDir;
-    }
-    
-    /**
-     * This should be used only by {@link FSVolume#decDfsUsed(String, long)}
-     * and it will be synchronized there.
-     */
-    void decDfsUsed(long value) {
-      dfsUsage.decDfsUsed(value);
-    }
-    
-    long getDfsUsed() throws IOException {
-      return dfsUsage.getUsed();
-    }
-    
-    /**
-     * Temporary files. They get moved to the finalized block directory when
-     * the block is finalized.
-     */
-    File createTmpFile(Block b) throws IOException {
-      File f = new File(tmpDir, b.getBlockName());
-      return DatanodeUtil.createTmpFile(b, f);
-    }
-
-    /**
-     * RBW files. They get moved to the finalized block directory when
-     * the block is finalized.
-     */
-    File createRbwFile(Block b) throws IOException {
-      File f = new File(rbwDir, b.getBlockName());
-      return DatanodeUtil.createTmpFile(b, f);
-    }
-
-    File addBlock(Block b, File f) throws IOException {
-      File blockFile = finalizedDir.addBlock(b, f);
-      File metaFile = DatanodeUtil.getMetaFile(blockFile, b.getGenerationStamp());
-      dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
-      return blockFile;
-    }
-      
-    void checkDirs() throws DiskErrorException {
-      finalizedDir.checkDirTree();
-      DiskChecker.checkDir(tmpDir);
-      DiskChecker.checkDir(rbwDir);
-    }
-      
-    void getVolumeMap(ReplicasMap volumeMap) throws IOException {
-      // add finalized replicas
-      finalizedDir.getVolumeMap(bpid, volumeMap, volume);
-      // add rbw replicas
-      addToReplicasMap(volumeMap, rbwDir, false);
-    }
-
-    /**
-     * Add replicas under the given directory to the volume map
-     * @param volumeMap the replicas map
-     * @param dir an input directory
-     * @param isFinalized true if the directory has finalized replicas;
-     *                    false if the directory has rbw replicas
-     */
-    private void addToReplicasMap(ReplicasMap volumeMap, File dir,
-        boolean isFinalized) throws IOException {
-      File blockFiles[] = FileUtil.listFiles(dir);
-      for (File blockFile : blockFiles) {
-        if (!Block.isBlockFilename(blockFile))
-          continue;
-        
-        long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
-        long blockId = Block.filename2id(blockFile.getName());
-        ReplicaInfo newReplica = null;
-        if (isFinalized) {
-          newReplica = new FinalizedReplica(blockId, 
-              blockFile.length(), genStamp, volume, blockFile.getParentFile());
-        } else {
-          newReplica = new ReplicaWaitingToBeRecovered(blockId,
-              validateIntegrity(blockFile, genStamp), 
-              genStamp, volume, blockFile.getParentFile());
-        }
-
-        ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
-        if (oldReplica != null) {
-          DataNode.LOG.warn("Two block files with the same block id exist " +
-              "on disk: " + oldReplica.getBlockFile() +
-              " and " + blockFile );
-        }
-      }
-    }
-    
-    /**
-     * Find out the number of bytes in the block that match its crc.
-     * 
-     * This algorithm assumes that data corruption caused by unexpected 
-     * datanode shutdown occurs only in the last crc chunk. So it checks
-     * only the last chunk.
-     * 
-     * @param blockFile the block file
-     * @param genStamp generation stamp of the block
-     * @return the number of valid bytes
-     */
-    private long validateIntegrity(File blockFile, long genStamp) {
-      DataInputStream checksumIn = null;
-      InputStream blockIn = null;
-      try {
-        final File metaFile = DatanodeUtil.getMetaFile(blockFile, genStamp);
-        long blockFileLen = blockFile.length();
-        long metaFileLen = metaFile.length();
-        int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
-        if (!blockFile.exists() || blockFileLen == 0 ||
-            !metaFile.exists() || metaFileLen < crcHeaderLen) {
-          return 0;
-        }
-        checksumIn = new DataInputStream(
-            new BufferedInputStream(new FileInputStream(metaFile),
-                HdfsConstants.IO_FILE_BUFFER_SIZE));
-
-        // read and handle the common header here. For now just a version
-        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
-        short version = header.getVersion();
-        if (version != BlockMetadataHeader.VERSION) {
-          DataNode.LOG.warn("Wrong version (" + version + ") for metadata file "
-              + metaFile + " ignoring ...");
-        }
-        DataChecksum checksum = header.getChecksum();
-        int bytesPerChecksum = checksum.getBytesPerChecksum();
-        int checksumSize = checksum.getChecksumSize();
-        long numChunks = Math.min(
-            (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, 
-            (metaFileLen - crcHeaderLen)/checksumSize);
-        if (numChunks == 0) {
-          return 0;
-        }
-        IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
-        blockIn = new FileInputStream(blockFile);
-        long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
-        IOUtils.skipFully(blockIn, lastChunkStartPos);
-        int lastChunkSize = (int)Math.min(
-            bytesPerChecksum, blockFileLen-lastChunkStartPos);
-        byte[] buf = new byte[lastChunkSize+checksumSize];
-        checksumIn.readFully(buf, lastChunkSize, checksumSize);
-        IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
-
-        checksum.update(buf, 0, lastChunkSize);
-        if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
-          return lastChunkStartPos + lastChunkSize;
-        } else { // last chunck is corrupt
-          return lastChunkStartPos;
-        }
-      } catch (IOException e) {
-        DataNode.LOG.warn(e);
-        return 0;
-      } finally {
-        IOUtils.closeStream(checksumIn);
-        IOUtils.closeStream(blockIn);
-      }
-    }
-      
-    void clearPath(File f) {
-      finalizedDir.clearPath(f);
-    }
-      
-    public String toString() {
-      return currentDir.getAbsolutePath();
-    }
-    
-    public void shutdown() {
-      dfsUsage.shutdown();
-    }
-  }
-
-  /**
-   * The underlying volume used to store replica.
-   * 
-   * It uses the {@link FSDataset} object for synchronization.
-   */
-  static class FSVolume implements FsVolumeSpi {
-    private final FSDataset dataset;
-    private final String storageID;
-    private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
-    private final File currentDir;    // <StorageDirectory>/current
-    private final DF usage;           
-    private final long reserved;
-    
-    FSVolume(FSDataset dataset, String storageID, File currentDir,
-        Configuration conf) throws IOException {
-      this.dataset = dataset;
-      this.storageID = storageID;
-      this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
-                                   DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
-      this.currentDir = currentDir; 
-      File parent = currentDir.getParentFile();
-      this.usage = new DF(parent, conf);
-    }
-    
-    File getCurrentDir() {
-      return currentDir;
-    }
-    
-    File getRbwDir(String bpid) throws IOException {
-      BlockPoolSlice bp = getBlockPoolSlice(bpid);
-      return bp.getRbwDir();
-    }
-    
-    void decDfsUsed(String bpid, long value) {
-      synchronized(dataset) {
-        BlockPoolSlice bp = map.get(bpid);
-        if (bp != null) {
-          bp.decDfsUsed(value);
-        }
-      }
-    }
-    
-    long getDfsUsed() throws IOException {
-      long dfsUsed = 0;
-      synchronized(dataset) {
-        for(BlockPoolSlice s : map.values()) {
-          dfsUsed += s.getDfsUsed();
-        }
-      }
-      return dfsUsed;
-    }
-    
-    long getBlockPoolUsed(String bpid) throws IOException {
-      return getBlockPoolSlice(bpid).getDfsUsed();
-    }
-    
-    /**
-     * Calculate the capacity of the filesystem, after removing any
-     * reserved capacity.
-     * @return the unreserved number of bytes left in this filesystem. May be zero.
-     */
-    long getCapacity() {
-      long remaining = usage.getCapacity() - reserved;
-      return remaining > 0 ? remaining : 0;
-    }
-
-    @Override
-    public long getAvailable() throws IOException {
-      long remaining = getCapacity()-getDfsUsed();
-      long available = usage.getAvailable();
-      if (remaining>available) {
-        remaining = available;
-      }
-      return (remaining > 0) ? remaining : 0;
-    }
-      
-    long getReserved(){
-      return reserved;
-    }
-    
-    String getMount() throws IOException {
-      return usage.getMount();
-    }
-
-    private BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
-      BlockPoolSlice bp = map.get(bpid);
-      if (bp == null) {
-        throw new IOException("block pool " + bpid + " is not found");
-      }
-      return bp;
-    }
-
-    @Override
-    public String getPath(String bpid) throws IOException {
-      return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
-    }
-
-    @Override
-    public File getFinalizedDir(String bpid) throws IOException {
-      return getBlockPoolSlice(bpid).getFinalizedDir();
-    }
-
-    /**
-     * Make a deep copy of the list of currently active BPIDs
-     */
-    @Override
-    public String[] getBlockPoolList() {
-      synchronized(dataset) {
-        return map.keySet().toArray(new String[map.keySet().size()]);   
-      }
-    }
-
-    /**
-     * Temporary files. They get moved to the finalized block directory when
-     * the block is finalized.
-     */
-    File createTmpFile(String bpid, Block b) throws IOException {
-      BlockPoolSlice bp = getBlockPoolSlice(bpid);
-      return bp.createTmpFile(b);
-    }
-
-    /**
-     * RBW files. They get moved to the finalized block directory when
-     * the block is finalized.
-     */
-    File createRbwFile(String bpid, Block b) throws IOException {
-      BlockPoolSlice bp = getBlockPoolSlice(bpid);
-      return bp.createRbwFile(b);
-    }
-
-    File addBlock(String bpid, Block b, File f) throws IOException {
-      BlockPoolSlice bp = getBlockPoolSlice(bpid);
-      return bp.addBlock(b, f);
-    }
-      
-    /**
-     * This should be used only by {@link FSVolumeSet#checkDirs()}
-     * and it will be synchronized there.
-     */
-    void checkDirs() throws DiskErrorException {
-      // TODO:FEDERATION valid synchronization
-      for(BlockPoolSlice s : map.values()) {
-        s.checkDirs();
-      }
-    }
-
-    void getVolumeMap(ReplicasMap volumeMap) throws IOException {
-      Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
-      for (Entry<String, BlockPoolSlice> entry : set) {
-        entry.getValue().getVolumeMap(volumeMap);
-      }
-    }
-    
-    void getVolumeMap(String bpid, ReplicasMap volumeMap) throws IOException {
-      BlockPoolSlice bp = getBlockPoolSlice(bpid);
-      bp.getVolumeMap(volumeMap);
-    }
-    
-    /**
-     * Add replicas under the given directory to the volume map
-     * @param volumeMap the replicas map
-     * @param dir an input directory
-     * @param isFinalized true if the directory has finalized replicas;
-     *                    false if the directory has rbw replicas
-     * @throws IOException 
-     */
-    private void addToReplicasMap(String bpid, ReplicasMap volumeMap, 
-        File dir, boolean isFinalized) throws IOException {
-      BlockPoolSlice bp = getBlockPoolSlice(bpid);
-      // TODO move this up
-      // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
-      bp.addToReplicasMap(volumeMap, dir, isFinalized);
-    }
-    
-    void clearPath(String bpid, File f) throws IOException {
-      BlockPoolSlice bp = getBlockPoolSlice(bpid);
-      bp.clearPath(f);
-    }
-
-    @Override
-    public String toString() {
-      return currentDir.getAbsolutePath();
-    }
-
-    public void shutdown() {
-      Set<Entry<String, BlockPoolSlice>> set = map.entrySet();
-      for (Entry<String, BlockPoolSlice> entry : set) {
-        entry.getValue().shutdown();
-      }
-    }
-
-    public void addBlockPool(String bpid, Configuration conf)
-        throws IOException {
-      File bpdir = new File(currentDir, bpid);
-      BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
-      map.put(bpid, bp);
-    }
-    
-    public void shutdownBlockPool(String bpid) {
-      BlockPoolSlice bp = map.get(bpid);
-      if (bp!=null) {
-        bp.shutdown();
-      }
-      map.remove(bpid);
-    }
-
-    private boolean isBPDirEmpty(String bpid)
-        throws IOException {
-      File volumeCurrentDir = this.getCurrentDir();
-      File bpDir = new File(volumeCurrentDir, bpid);
-      File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
-      File finalizedDir = new File(bpCurrentDir,
-          DataStorage.STORAGE_DIR_FINALIZED);
-      File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
-      if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
-        return false;
-      }
-      if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
-        return false;
-      }
-      return true;
-    }
-    
-    private void deleteBPDirectories(String bpid, boolean force)
-        throws IOException {
-      File volumeCurrentDir = this.getCurrentDir();
-      File bpDir = new File(volumeCurrentDir, bpid);
-      if (!bpDir.isDirectory()) {
-        // nothing to be deleted
-        return;
-      }
-      File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
-      File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
-      File finalizedDir = new File(bpCurrentDir,
-          DataStorage.STORAGE_DIR_FINALIZED);
-      File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
-      if (force) {
-        FileUtil.fullyDelete(bpDir);
-      } else {
-        if (!rbwDir.delete()) {
-          throw new IOException("Failed to delete " + rbwDir);
-        }
-        if (!finalizedDir.delete()) {
-          throw new IOException("Failed to delete " + finalizedDir);
-        }
-        FileUtil.fullyDelete(tmpDir);
-        for (File f : FileUtil.listFiles(bpCurrentDir)) {
-          if (!f.delete()) {
-            throw new IOException("Failed to delete " + f);
-          }
-        }
-        if (!bpCurrentDir.delete()) {
-          throw new IOException("Failed to delete " + bpCurrentDir);
-        }
-        for (File f : FileUtil.listFiles(bpDir)) {
-          if (!f.delete()) {
-            throw new IOException("Failed to delete " + f);
-          }
-        }
-        if (!bpDir.delete()) {
-          throw new IOException("Failed to delete " + bpDir);
-        }
-      }
-    }
-
-    String getStorageID() {
-      return storageID;
-    }
-  }
-    
-  static class FSVolumeSet {
-    /*
-     * Read access to this unmodifiable list is not synchronized.
-     * This list is replaced on modification holding "this" lock.
-     */
-    private volatile List<FSVolume> volumes = null;
-
-    final VolumeChoosingPolicy<FSVolume> blockChooser;
-    int numFailedVolumes;
-
-    FSVolumeSet(List<FSVolume> volumes, int failedVols,
-        VolumeChoosingPolicy<FSVolume> blockChooser) {
-      this.volumes = Collections.unmodifiableList(volumes);
-      this.blockChooser = blockChooser;
-      this.numFailedVolumes = failedVols;
-    }
-    
-    private int numberOfFailedVolumes() {
-      return numFailedVolumes;
-    }
-    
-    /** 
-     * Get next volume. Synchronized to ensure {@link #curVolume} is updated
-     * by a single thread and next volume is chosen with no concurrent
-     * update to {@link #volumes}.
-     * @param blockSize free space needed on the volume
-     * @return next volume to store the block in.
-     */
-    synchronized FSVolume getNextVolume(long blockSize) throws IOException {
-      return blockChooser.chooseVolume(volumes, blockSize);
-    }
-      
-    private long getDfsUsed() throws IOException {
-      long dfsUsed = 0L;
-      for (FSVolume v : volumes) {
-        dfsUsed += v.getDfsUsed();
-      }
-      return dfsUsed;
-    }
-
-    private long getBlockPoolUsed(String bpid) throws IOException {
-      long dfsUsed = 0L;
-      for (FSVolume v : volumes) {
-        dfsUsed += v.getBlockPoolUsed(bpid);
-      }
-      return dfsUsed;
-    }
-
-    private long getCapacity() {
-      long capacity = 0L;
-      for (FSVolume v : volumes) {
-        capacity += v.getCapacity();
-      }
-      return capacity;
-    }
-      
-    private long getRemaining() throws IOException {
-      long remaining = 0L;
-      for (FsVolumeSpi vol : volumes) {
-        remaining += vol.getAvailable();
-      }
-      return remaining;
-    }
-      
-    private void getVolumeMap(ReplicasMap volumeMap) throws IOException {
-      for (FSVolume v : volumes) {
-        v.getVolumeMap(volumeMap);
-      }
-    }
-    
-    private void getVolumeMap(String bpid, ReplicasMap volumeMap)
-        throws IOException {
-      for (FSVolume v : volumes) {
-        v.getVolumeMap(bpid, volumeMap);
-      }
-    }
-      
-    /**
-     * Calls {@link FSVolume#checkDirs()} on each volume, removing any
-     * volumes from the active list that result in a DiskErrorException.
-     * 
-     * This method is synchronized to allow only one instance of checkDirs() 
-     * call
-     * @return list of all the removed volumes.
-     */
-    private synchronized List<FSVolume> checkDirs() {
-      ArrayList<FSVolume> removedVols = null;
-      
-      // Make a copy of volumes for performing modification 
-      final List<FSVolume> volumeList = new ArrayList<FSVolume>(volumes);
-      
-      for(Iterator<FSVolume> i = volumeList.iterator(); i.hasNext(); ) {
-        final FSVolume fsv = i.next();
-        try {
-          fsv.checkDirs();
-        } catch (DiskErrorException e) {
-          DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
-          if (removedVols == null) {
-            removedVols = new ArrayList<FSVolume>(2);
-          }
-          removedVols.add(fsv);
-          fsv.shutdown(); 
-          i.remove(); // Remove the volume
-          numFailedVolumes++;
-        }
-      }
-      
-      if (removedVols != null && removedVols.size() > 0) {
-        // Replace volume list
-        volumes = Collections.unmodifiableList(volumeList);
-        DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
-            + removedVols.size() + " volumes. List of current volumes: "
-            + this);
-      }
-
-      return removedVols;
-    }
-
-    @Override
-    public String toString() {
-      return volumes.toString();
-    }
-
-
-    private void addBlockPool(String bpid, Configuration conf)
-        throws IOException {
-      for (FSVolume v : volumes) {
-        v.addBlockPool(bpid, conf);
-      }
-    }
-    
-    private void removeBlockPool(String bpid) {
-      for (FSVolume v : volumes) {
-        v.shutdownBlockPool(bpid);
-      }
-    }
-
-    private void shutdown() {
-      for (FSVolume volume : volumes) {
-        if(volume != null) {
-          volume.shutdown();
-        }
-      }
-    }
-  }
-  
-  //////////////////////////////////////////////////////
-  //
-  // FSDataSet
-  //
-  //////////////////////////////////////////////////////
-
-  private static boolean isUnlinkTmpFile(File f) {
-    String name = f.getName();
-    return name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
-  }
-  
-  private static File getOrigFile(File unlinkTmpFile) {
-    String fileName = unlinkTmpFile.getName();
-    return new File(unlinkTmpFile.getParentFile(),
-        fileName.substring(0,
-            fileName.length() - DatanodeUtil.UNLINK_BLOCK_SUFFIX.length()));
-  }
-  
-  protected File getMetaFile(ExtendedBlock b) throws IOException {
-    return DatanodeUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp());
-  }
-
-  /** Find the metadata file for the specified block file.
-   * Return the generation stamp from the name of the metafile.
-   */
-  private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
-    String blockName = blockFile.getName();
-    for (int j = 0; j < listdir.length; j++) {
-      String path = listdir[j].getName();
-      if (!path.startsWith(blockName)) {
-        continue;
-      }
-      if (blockFile == listdir[j]) {
-        continue;
-      }
-      return Block.getGenerationStamp(listdir[j].getName());
-    }
-    DataNode.LOG.warn("Block " + blockFile + 
-                      " does not have a metafile!");
-    return GenerationStamp.GRANDFATHER_GENERATION_STAMP;
-  }
-  
-  /** Find the corresponding meta data file from a given block file */
-  private static long parseGenerationStamp(File blockFile, File metaFile
-      ) throws IOException {
-    String metaname = metaFile.getName();
-    String gs = metaname.substring(blockFile.getName().length() + 1,
-        metaname.length() - DatanodeUtil.METADATA_EXTENSION.length());
-    try {
-      return Long.parseLong(gs);
-    } catch(NumberFormatException nfe) {
-      throw (IOException)new IOException("blockFile=" + blockFile
-          + ", metaFile=" + metaFile).initCause(nfe);
-    }
-  }
+class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+  static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
 
   @Override // FsDatasetSpi
-  public List<FSVolume> getVolumes() {
+  public List<FsVolumeImpl> getVolumes() {
     return volumes.volumes;
   }
 
   @Override
-  public synchronized FSVolume getVolume(final ExtendedBlock b) {
+  public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
     final ReplicaInfo r =  volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
-    return r != null? (FSVolume)r.getVolume(): null;
+    return r != null? (FsVolumeImpl)r.getVolume(): null;
   }
 
   @Override // FsDatasetSpi
@@ -1040,11 +106,12 @@ public class FSDataset implements FsData
     if (blockfile == null) {
       return null;
     }
-    final File metafile = DatanodeUtil.findMetaFile(blockfile);
-    return new Block(blkid, blockfile.length(),
-        parseGenerationStamp(blockfile, metafile));
+    final File metafile = FsDatasetUtil.findMetaFile(blockfile);
+    final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile);
+    return new Block(blkid, blockfile.length(), gs);
   }
 
+
   /**
    * Returns a clone of a replica stored in data-node memory.
    * Should be primarily used for testing.
@@ -1069,21 +136,21 @@ public class FSDataset implements FsData
     }
     return null;
   }
-
+  
   @Override // FsDatasetSpi
   public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
       throws IOException {
-    final File meta = getMetaFile(b);
+    File meta = FsDatasetUtil.getMetaFile(getBlockFile(b), b.getGenerationStamp());
     if (meta == null || !meta.exists()) {
       return null;
     }
     return new LengthInputStream(new FileInputStream(meta), meta.length());
   }
     
-  private final DataNode datanode;
-  final FSVolumeSet volumes;
-  final ReplicasMap volumeMap;
-  final FSDatasetAsyncDiskService asyncDiskService;
+  final DataNode datanode;
+  final FsVolumeList volumes;
+  final ReplicaMap volumeMap;
+  final FsDatasetAsyncDiskService asyncDiskService;
   private final int validVolsRequired;
 
   // Used for synchronizing access to usage stats
@@ -1092,7 +159,7 @@ public class FSDataset implements FsData
   /**
    * An FSDataset has a directory where it loads its data files.
    */
-  private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
+  FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
       ) throws IOException {
     this.datanode = datanode;
     // The number of volumes required for operation is the total number 
@@ -1119,29 +186,29 @@ public class FSDataset implements FsData
           + ", volume failures tolerated: " + volFailuresTolerated);
     }
 
-    final List<FSVolume> volArray = new ArrayList<FSVolume>(
+    final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
         storage.getNumStorageDirs());
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       final File dir = storage.getStorageDir(idx).getCurrentDir();
-      volArray.add(new FSVolume(this, storage.getStorageID(), dir, conf));
-      DataNode.LOG.info("FSDataset added volume - " + dir);
+      volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf));
+      LOG.info("Added volume - " + dir);
     }
-    volumeMap = new ReplicasMap(this);
+    volumeMap = new ReplicaMap(this);
 
     @SuppressWarnings("unchecked")
-    final VolumeChoosingPolicy<FSVolume> blockChooserImpl =
+    final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
         ReflectionUtils.newInstance(conf.getClass(
             DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
             RoundRobinVolumeChoosingPolicy.class,
             VolumeChoosingPolicy.class), conf);
-    volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl);
+    volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
     volumes.getVolumeMap(volumeMap);
 
     File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();
     }
-    asyncDiskService = new FSDatasetAsyncDiskService(this, roots);
+    asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
     registerMBean(storage.getStorageID());
   }
 
@@ -1221,8 +288,8 @@ public class FSDataset implements FsData
   File getBlockFile(String bpid, Block b) throws IOException {
     File f = validateBlockFile(bpid, b);
     if(f == null) {
-      if (DataNode.LOG.isDebugEnabled()) {
-        DataNode.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
       }
       throw new IOException("Block " + b + " is not valid.");
     }
@@ -1322,27 +389,12 @@ public class FSDataset implements FsData
     return new ReplicaInputStreams(new FileInputStream(blockInFile.getFD()),
                                 new FileInputStream(metaInFile.getFD()));
   }
-    
-  /**
-   * Make a copy of the block if this block is linked to an existing
-   * snapshot. This ensures that modifying this block does not modify
-   * data in any existing snapshots.
-   * @param block Block
-   * @param numLinks Unlink if the number of links exceed this value
-   * @throws IOException
-   * @return - true if the specified block was unlinked or the block
-   *           is not in any snapshot.
-   */
-  public boolean unlinkBlock(ExtendedBlock block, int numLinks) throws IOException {
-    ReplicaInfo info = getReplicaInfo(block);
-    return info.unlinkBlock(numLinks);
-  }
 
-  private static File moveBlockFiles(Block b, File srcfile, File destdir
+  static File moveBlockFiles(Block b, File srcfile, File destdir
       ) throws IOException {
     final File dstfile = new File(destdir, b.getBlockName());
-    final File srcmeta = DatanodeUtil.getMetaFile(srcfile, b.getGenerationStamp());
-    final File dstmeta = DatanodeUtil.getMetaFile(dstfile, b.getGenerationStamp());
+    final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
+    final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
     if (!srcmeta.renameTo(dstmeta)) {
       throw new IOException("Failed to move meta file for " + b
           + " from " + srcmeta + " to " + dstmeta);
@@ -1351,16 +403,16 @@ public class FSDataset implements FsData
       throw new IOException("Failed to move block file for " + b
           + " from " + srcfile + " to " + dstfile.getAbsolutePath());
     }
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta);
-      DataNode.LOG.debug("addBlock: Moved " + srcfile + " to " + dstfile);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
+          + " and " + srcfile + " to " + dstfile);
     }
     return dstfile;
   }
 
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
-    DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
+    LOG.info("truncateBlock: blockFile=" + blockFile
         + ", metaFile=" + metaFile
         + ", oldlen=" + oldlen
         + ", newlen=" + newlen);
@@ -1411,7 +463,7 @@ public class FSDataset implements FsData
 
 
   @Override  // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,
+  public synchronized ReplicaInPipeline append(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
     // If the block was successfully finalized because all packets
     // were successfully processed at the Datanode but the ack for
@@ -1425,7 +477,7 @@ public class FSDataset implements FsData
           " should be greater than the replica " + b + "'s generation stamp");
     }
     ReplicaInfo replicaInfo = getReplicaInfo(b);
-    DataNode.LOG.info("Appending to replica " + replicaInfo);
+    LOG.info("Appending to replica " + replicaInfo);
     if (replicaInfo.getState() != ReplicaState.FINALIZED) {
       throw new ReplicaNotFoundException(
           ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
@@ -1460,7 +512,7 @@ public class FSDataset implements FsData
     
     // construct a RBW replica with the new GS
     File blkfile = replicaInfo.getBlockFile();
-    FSVolume v = (FSVolume)replicaInfo.getVolume();
+    FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
     if (v.getAvailable() < estimateBlockLen - replicaInfo.getNumBytes()) {
       throw new DiskOutOfSpaceException("Insufficient space for appending to "
           + replicaInfo);
@@ -1473,8 +525,8 @@ public class FSDataset implements FsData
     File newmeta = newReplicaInfo.getMetaFile();
 
     // rename meta file to rbw directory
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
     }
     if (!oldmeta.renameTo(newmeta)) {
       throw new IOException("Block " + replicaInfo + " reopen failed. " +
@@ -1483,13 +535,13 @@ public class FSDataset implements FsData
     }
 
     // rename block file to rbw directory
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("Renaming " + blkfile + " to " + newBlkFile);
-      DataNode.LOG.debug("Old block file length is " + blkfile.length());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+          + ", file length=" + blkfile.length());
     }
     if (!blkfile.renameTo(newBlkFile)) {
       if (!newmeta.renameTo(oldmeta)) {  // restore the meta file
-        DataNode.LOG.warn("Cannot move meta file " + newmeta + 
+        LOG.warn("Cannot move meta file " + newmeta + 
             "back to the finalized directory " + oldmeta);
       }
       throw new IOException("Block " + replicaInfo + " reopen failed. " +
@@ -1552,9 +604,9 @@ public class FSDataset implements FsData
   }
   
   @Override  // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface recoverAppend(ExtendedBlock b,
+  public synchronized ReplicaInPipeline recoverAppend(ExtendedBlock b,
       long newGS, long expectedBlockLen) throws IOException {
-    DataNode.LOG.info("Recover failed append to " + b);
+    LOG.info("Recover failed append to " + b);
 
     ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
 
@@ -1571,10 +623,9 @@ public class FSDataset implements FsData
   @Override // FsDatasetSpi
   public void recoverClose(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
-    DataNode.LOG.info("Recover failed close " + b);
+    LOG.info("Recover failed close " + b);
     // check replica's state
-    ReplicaInfo replicaInfo = recoverCheck(b, newGS,
-        expectedBlockLen);
+    ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
     // bump the replica's GS
     bumpReplicaGS(replicaInfo, newGS);
     // finalize the replica if RBW
@@ -1599,8 +650,8 @@ public class FSDataset implements FsData
     File newmeta = replicaInfo.getMetaFile();
 
     // rename meta file to new GS
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
     }
     if (!oldmeta.renameTo(newmeta)) {
       replicaInfo.setGenerationStamp(oldGS); // restore old GS
@@ -1611,7 +662,7 @@ public class FSDataset implements FsData
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface createRbw(ExtendedBlock b)
+  public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
       throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
         b.getBlockId());
@@ -1621,7 +672,7 @@ public class FSDataset implements FsData
       " and thus cannot be created.");
     }
     // create a new block
-    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
     // create a rbw file to hold block in the designated volume
     File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
@@ -1631,10 +682,10 @@ public class FSDataset implements FsData
   }
   
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface recoverRbw(ExtendedBlock b,
+  public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
       long newGS, long minBytesRcvd, long maxBytesRcvd)
       throws IOException {
-    DataNode.LOG.info("Recover the RBW replica " + b);
+    LOG.info("Recover the RBW replica " + b);
 
     ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
     
@@ -1645,7 +696,7 @@ public class FSDataset implements FsData
     }
     ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
     
-    DataNode.LOG.info("Recovering replica " + rbw);
+    LOG.info("Recovering replica " + rbw);
 
     // Stop the previous writer
     rbw.stopWriter();
@@ -1676,12 +727,12 @@ public class FSDataset implements FsData
   }
   
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface convertTemporaryToRbw(
+  public synchronized ReplicaInPipeline convertTemporaryToRbw(
       final ExtendedBlock b) throws IOException {
     final long blockId = b.getBlockId();
     final long expectedGs = b.getGenerationStamp();
     final long visible = b.getNumBytes();
-    DataNode.LOG.info("Convert replica " + b
+    LOG.info("Convert replica " + b
         + " from Temporary to RBW, visible length=" + visible);
 
     final ReplicaInPipeline temp;
@@ -1717,7 +768,7 @@ public class FSDataset implements FsData
           + visible + ", temp=" + temp);
     }
     // check volume
-    final FSVolume v = (FSVolume)temp.getVolume();
+    final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume();
     if (v == null) {
       throw new IOException("r.getVolume() = null, temp="  + temp);
     }
@@ -1737,7 +788,7 @@ public class FSDataset implements FsData
   }
 
   @Override // FsDatasetSpi
-  public synchronized ReplicaInPipelineInterface createTemporary(ExtendedBlock b)
+  public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
       throws IOException {
     ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId());
     if (replicaInfo != null) {
@@ -1746,7 +797,7 @@ public class FSDataset implements FsData
           " and thus cannot be created.");
     }
     
-    FSVolume v = volumes.getNextVolume(b.getNumBytes());
+    FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
     // create a temporary file to hold block in the designated volume
     File f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
@@ -1763,12 +814,12 @@ public class FSDataset implements FsData
   @Override // FsDatasetSpi
   public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams streams, 
       int checksumSize) throws IOException {
-    FileOutputStream file = (FileOutputStream) streams.getChecksumOut();
+    FileOutputStream file = (FileOutputStream)streams.getChecksumOut();
     FileChannel channel = file.getChannel();
     long oldPos = channel.position();
     long newPos = oldPos - checksumSize;
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("Changing meta file offset of block " + b + " from " +
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Changing meta file offset of block " + b + " from " +
           oldPos + " to " + newPos);
     }
     channel.position(newPos);
@@ -1805,7 +856,7 @@ public class FSDataset implements FsData
       newReplicaInfo = (FinalizedReplica)
              ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
     } else {
-      FSVolume v = (FSVolume)replicaInfo.getVolume();
+      FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume();
       File f = replicaInfo.getBlockFile();
       if (v == null) {
         throw new IOException("No volume for temporary file " + f + 
@@ -1833,7 +884,7 @@ public class FSDataset implements FsData
       // delete the on-disk temp file
       if (delBlockFromDisk(replicaInfo.getBlockFile(), 
           replicaInfo.getMetaFile(), b.getLocalBlock())) {
-        DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
+        LOG.warn("Block " + b + " unfinalized and removed. " );
       }
     }
   }
@@ -1847,17 +898,16 @@ public class FSDataset implements FsData
    */
   private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
     if (blockFile == null) {
-      DataNode.LOG.warn("No file exists for block: " + b);
+      LOG.warn("No file exists for block: " + b);
       return true;
     }
     
     if (!blockFile.delete()) {
-      DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
+      LOG.warn("Not able to delete the block file: " + blockFile);
       return false;
     } else { // remove the meta file
       if (metaFile != null && !metaFile.delete()) {
-        DataNode.LOG.warn(
-            "Not able to delete the meta block file: " + metaFile);
+        LOG.warn("Not able to delete the meta block file: " + metaFile);
         return false;
       }
     }
@@ -1958,8 +1008,8 @@ public class FSDataset implements FsData
       datanode.checkDiskError();
     }
     
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("b=" + b + ", f=" + f);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("b=" + b + ", f=" + f);
     }
     return null;
   }
@@ -1977,7 +1027,7 @@ public class FSDataset implements FsData
     }
 
     //check replica's meta file
-    final File metafile = DatanodeUtil.getMetaFile(f, r.getGenerationStamp());
+    final File metafile = FsDatasetUtil.getMetaFile(f, r.getGenerationStamp());
     if (!metafile.exists()) {
       throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
     }
@@ -1995,69 +1045,64 @@ public class FSDataset implements FsData
   public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
     boolean error = false;
     for (int i = 0; i < invalidBlks.length; i++) {
-      File f = null;
-      final FSVolume v;
+      final File f;
+      final FsVolumeImpl v;
       synchronized (this) {
         f = getFile(bpid, invalidBlks[i].getBlockId());
-        ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]);
-        if (dinfo == null || 
-            dinfo.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
-          DataNode.LOG.warn("Unexpected error trying to delete block "
-                           + invalidBlks[i] + 
-                           ". BlockInfo not found in volumeMap.");
+        ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]);
+        if (info == null) {
+          LOG.warn("Failed to delete replica " + invalidBlks[i]
+              + ": ReplicaInfo not found.");
+          error = true;
+          continue;
+        }
+        if (info.getGenerationStamp() != invalidBlks[i].getGenerationStamp()) {
+          LOG.warn("Failed to delete replica " + invalidBlks[i]
+              + ": GenerationStamp not matched, info=" + info);
           error = true;
           continue;
         }
-        v = (FSVolume)dinfo.getVolume();
+        v = (FsVolumeImpl)info.getVolume();
         if (f == null) {
-          DataNode.LOG.warn("Unexpected error trying to delete block "
-                            + invalidBlks[i] + 
-                            ". Block not found in blockMap." +
-                            ((v == null) ? " " : " Block found in volumeMap."));
+          LOG.warn("Failed to delete replica " + invalidBlks[i]
+              +  ": File not found, volume=" + v);
           error = true;
           continue;
         }
         if (v == null) {
-          DataNode.LOG.warn("Unexpected error trying to delete block "
-                            + invalidBlks[i] + 
-                            ". No volume for this block." +
-                            " Block found in blockMap. " + f + ".");
+          LOG.warn("Failed to delete replica " + invalidBlks[i]
+              +  ". No volume for this replica, file=" + f + ".");
           error = true;
           continue;
         }
         File parent = f.getParentFile();
         if (parent == null) {
-          DataNode.LOG.warn("Unexpected error trying to delete block "
-                            + invalidBlks[i] + 
-                            ". Parent not found for file " + f + ".");
+          LOG.warn("Failed to delete replica " + invalidBlks[i]
+              +  ". Parent not found for file " + f + ".");
           error = true;
           continue;
         }
-        ReplicaState replicaState = dinfo.getState();
+        ReplicaState replicaState = info.getState();
         if (replicaState == ReplicaState.FINALIZED || 
             (replicaState == ReplicaState.RUR && 
-                ((ReplicaUnderRecovery)dinfo).getOriginalReplica().getState() == 
+                ((ReplicaUnderRecovery)info).getOriginalReplica().getState() == 
                   ReplicaState.FINALIZED)) {
           v.clearPath(bpid, parent);
         }
         volumeMap.remove(bpid, invalidBlks[i]);
       }
-      File metaFile = DatanodeUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp());
 
       // Delete the block asynchronously to make sure we can do it fast enough
-      asyncDiskService.deleteAsync(v, f, metaFile,
+      asyncDiskService.deleteAsync(v, f,
+          FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
           new ExtendedBlock(bpid, invalidBlks[i]));
     }
     if (error) {
       throw new IOException("Error in deleting blocks.");
     }
   }
-  
-  public void notifyNamenodeDeletedBlock(ExtendedBlock block){
-    datanode.notifyNamenodeDeletedBlock(block);
-  }
 
-  @Override // {@link FsDatasetSpi}
+  @Override // FsDatasetSpi
   public synchronized boolean contains(final ExtendedBlock block) {
     final long blockId = block.getLocalBlock().getBlockId();
     return getFile(block.getBlockPoolId(), blockId) != null;
@@ -2085,7 +1130,7 @@ public class FSDataset implements FsData
   @Override // FsDatasetSpi
   public void checkDataDir() throws DiskErrorException {
     long totalBlocks=0, removedBlocks=0;
-    List<FSVolume> failedVols =  volumes.checkDirs();
+    List<FsVolumeImpl> failedVols =  volumes.checkDirs();
     
     // If there no failed volumes return
     if (failedVols == null) { 
@@ -2095,16 +1140,16 @@ public class FSDataset implements FsData
     // Otherwise remove blocks for the failed volumes
     long mlsec = System.currentTimeMillis();
     synchronized (this) {
-      for (FSVolume fv: failedVols) {
-        for (String bpid : fv.map.keySet()) {
+      for (FsVolumeImpl fv: failedVols) {
+        for (String bpid : fv.getBlockPoolList()) {
           Iterator<ReplicaInfo> ib = volumeMap.replicas(bpid).iterator();
           while(ib.hasNext()) {
             ReplicaInfo b = ib.next();
             totalBlocks++;
             // check if the volume block belongs to still valid
             if(b.getVolume() == fv) {
-              DataNode.LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
-                  + " on failed volume " + fv.currentDir.getAbsolutePath());
+              LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
+                  + " on failed volume " + fv.getCurrentDir().getAbsolutePath());
               ib.remove();
               removedBlocks++;
             }
@@ -2113,16 +1158,15 @@ public class FSDataset implements FsData
       }
     } // end of sync
     mlsec = System.currentTimeMillis() - mlsec;
-    DataNode.LOG.warn("Removed " + removedBlocks + " out of " + totalBlocks +
+    LOG.warn("Removed " + removedBlocks + " out of " + totalBlocks +
         "(took " + mlsec + " millisecs)");
 
     // report the error
     StringBuilder sb = new StringBuilder();
-    for (FSVolume fv : failedVols) {
-      sb.append(fv.currentDir.getAbsolutePath() + ";");
+    for (FsVolumeImpl fv : failedVols) {
+      sb.append(fv.getCurrentDir().getAbsolutePath() + ";");
     }
-
-    throw  new DiskErrorException("DataNode failed volumes:" + sb);
+    throw new DiskErrorException("DataNode failed volumes:" + sb);
   }
     
 
@@ -2152,9 +1196,9 @@ public class FSDataset implements FsData
       bean = new StandardMBean(this,FSDatasetMBean.class);
       mbeanName = MBeans.register("DataNode", "FSDatasetState-" + storageName, bean);
     } catch (NotCompliantMBeanException e) {
-      DataNode.LOG.warn("Error registering FSDatasetState MBean", e);
+      LOG.warn("Error registering FSDatasetState MBean", e);
     }
-    DataNode.LOG.info("Registered FSDatasetState MBean");
+    LOG.info("Registered FSDatasetState MBean");
   }
 
   @Override // FsDatasetSpi
@@ -2221,7 +1265,7 @@ public class FSDataset implements FsData
           // If metadata file exists then delete it
           if (diskMetaFile != null && diskMetaFile.exists()
               && diskMetaFile.delete()) {
-            DataNode.LOG.warn("Deleted a metadata file without a block "
+            LOG.warn("Deleted a metadata file without a block "
                 + diskMetaFile.getAbsolutePath());
           }
           return;
@@ -2230,15 +1274,16 @@ public class FSDataset implements FsData
           // Block is in memory and not on the disk
           // Remove the block from volumeMap
           volumeMap.remove(bpid, blockId);
-          if (datanode.blockScanner != null) {
-            datanode.blockScanner.deleteBlock(bpid, new Block(blockId));
+          final DataBlockScanner blockScanner = datanode.getBlockScanner();
+          if (blockScanner != null) {
+            blockScanner.deleteBlock(bpid, new Block(blockId));
           }
-          DataNode.LOG.warn("Removed block " + blockId
+          LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
           // Finally remove the metadata file
           if (diskMetaFile != null && diskMetaFile.exists()
               && diskMetaFile.delete()) {
-            DataNode.LOG.warn("Deleted a metadata file for the deleted block "
+            LOG.warn("Deleted a metadata file for the deleted block "
                 + diskMetaFile.getAbsolutePath());
           }
         }
@@ -2252,10 +1297,11 @@ public class FSDataset implements FsData
         ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, 
             diskFile.length(), diskGS, vol, diskFile.getParentFile());
         volumeMap.add(bpid, diskBlockInfo);
-        if (datanode.blockScanner != null) {
-          datanode.blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+        final DataBlockScanner blockScanner = datanode.getBlockScanner();
+        if (blockScanner != null) {
+          blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
         }
-        DataNode.LOG.warn("Added missing block to memory " + diskBlockInfo);
+        LOG.warn("Added missing block to memory " + diskBlockInfo);
         return;
       }
       /*
@@ -2265,7 +1311,7 @@ public class FSDataset implements FsData
       File memFile = memBlockInfo.getBlockFile();
       if (memFile.exists()) {
         if (memFile.compareTo(diskFile) != 0) {
-          DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
+          LOG.warn("Block file " + memFile.getAbsolutePath()
               + " does not match file found by scan "
               + diskFile.getAbsolutePath());
           // TODO: Should the diskFile be deleted?
@@ -2275,25 +1321,25 @@ public class FSDataset implements FsData
         // Update the block with the file found on the disk. Since the block
         // file and metadata file are found as a pair on the disk, update
         // the block based on the metadata file found on the disk
-        DataNode.LOG.warn("Block file in volumeMap "
+        LOG.warn("Block file in volumeMap "
             + memFile.getAbsolutePath()
             + " does not exist. Updating it to the file found during scan "
             + diskFile.getAbsolutePath());
         memBlockInfo.setDir(diskFile.getParentFile());
         memFile = diskFile;
 
-        DataNode.LOG.warn("Updating generation stamp for block " + blockId
+        LOG.warn("Updating generation stamp for block " + blockId
             + " from " + memBlockInfo.getGenerationStamp() + " to " + diskGS);
         memBlockInfo.setGenerationStamp(diskGS);
       }
 
       // Compare generation stamp
       if (memBlockInfo.getGenerationStamp() != diskGS) {
-        File memMetaFile = DatanodeUtil.getMetaFile(diskFile, 
+        File memMetaFile = FsDatasetUtil.getMetaFile(diskFile, 
             memBlockInfo.getGenerationStamp());
         if (memMetaFile.exists()) {
           if (memMetaFile.compareTo(diskMetaFile) != 0) {
-            DataNode.LOG.warn("Metadata file in memory "
+            LOG.warn("Metadata file in memory "
                 + memMetaFile.getAbsolutePath()
                 + " does not match file found by scan "
                 + (diskMetaFile == null? null: diskMetaFile.getAbsolutePath()));
@@ -2306,7 +1352,7 @@ public class FSDataset implements FsData
               && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
               : GenerationStamp.GRANDFATHER_GENERATION_STAMP;
 
-          DataNode.LOG.warn("Updating generation stamp for block " + blockId
+          LOG.warn("Updating generation stamp for block " + blockId
               + " from " + memBlockInfo.getGenerationStamp() + " to " + gs);
 
           memBlockInfo.setGenerationStamp(gs);
@@ -2317,7 +1363,7 @@ public class FSDataset implements FsData
       if (memBlockInfo.getNumBytes() != memFile.length()) {
         // Update the length based on the block file
         corruptBlock = new Block(memBlockInfo);
-        DataNode.LOG.warn("Updating size of block " + blockId + " from "
+        LOG.warn("Updating size of block " + blockId + " from "
             + memBlockInfo.getNumBytes() + " to " + memFile.length());
         memBlockInfo.setNumBytes(memFile.length());
       }
@@ -2325,12 +1371,12 @@ public class FSDataset implements FsData
 
     // Send corrupt block report outside the lock
     if (corruptBlock != null) {
-      DataNode.LOG.warn("Reporting the block " + corruptBlock
+      LOG.warn("Reporting the block " + corruptBlock
           + " as corrupt due to length mismatch");
       try {
         datanode.reportBadBlocks(new ExtendedBlock(bpid, corruptBlock));  
       } catch (IOException e) {
-        DataNode.LOG.warn("Failed to repot bad block " + corruptBlock, e);
+        LOG.warn("Failed to repot bad block " + corruptBlock, e);
       }
     }
   }
@@ -2359,9 +1405,9 @@ public class FSDataset implements FsData
 
   /** static version of {@link #initReplicaRecovery(Block, long)}. */
   static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
-      ReplicasMap map, Block block, long recoveryId) throws IOException {
+      ReplicaMap map, Block block, long recoveryId) throws IOException {
     final ReplicaInfo replica = map.get(bpid, block.getBlockId());
-    DataNode.LOG.info("initReplicaRecovery: block=" + block
+    LOG.info("initReplicaRecovery: block=" + block
         + ", recoveryId=" + recoveryId
         + ", replica=" + replica);
 
@@ -2410,13 +1456,13 @@ public class FSDataset implements FsData
       }
       final long oldRecoveryID = rur.getRecoveryID();
       rur.setRecoveryID(recoveryId);
-      DataNode.LOG.info("initReplicaRecovery: update recovery id for " + block
+      LOG.info("initReplicaRecovery: update recovery id for " + block
           + " from " + oldRecoveryID + " to " + recoveryId);
     }
     else {
       rur = new ReplicaUnderRecovery(replica, recoveryId);
       map.add(bpid, rur);
-      DataNode.LOG.info("initReplicaRecovery: changing replica state for "
+      LOG.info("initReplicaRecovery: changing replica state for "
           + block + " from " + replica.getState()
           + " to " + rur.getState());
     }
@@ -2431,7 +1477,7 @@ public class FSDataset implements FsData
     //get replica
     final String bpid = oldBlock.getBlockPoolId();
     final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
-    DataNode.LOG.info("updateReplica: block=" + oldBlock
+    LOG.info("updateReplica: block=" + oldBlock
         + ", recoveryId=" + recoveryId
         + ", length=" + newlength
         + ", replica=" + replica);
@@ -2518,16 +1564,18 @@ public class FSDataset implements FsData
     return replica.getVisibleLength();
   }
   
+  @Override
   public synchronized void addBlockPool(String bpid, Configuration conf)
       throws IOException {
-    DataNode.LOG.info("Adding block pool " + bpid);
+    LOG.info("Adding block pool " + bpid);
     volumes.addBlockPool(bpid, conf);
     volumeMap.initBlockPool(bpid);
     volumes.getVolumeMap(bpid, volumeMap);
   }
-  
+
+  @Override
   public synchronized void shutdownBlockPool(String bpid) {
-    DataNode.LOG.info("Removing block pool " + bpid);
+    LOG.info("Removing block pool " + bpid);
     volumeMap.cleanUpBlockPool(bpid);
     volumes.removeBlockPool(bpid);
   }
@@ -2546,30 +1594,29 @@ public class FSDataset implements FsData
     final long freeSpace;
     final long reservedSpace;
 
-    VolumeInfo(String dir, long usedSpace, long freeSpace, long reservedSpace) {
-      this.directory = dir;
+    VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
+      this.directory = v.toString();
       this.usedSpace = usedSpace;
       this.freeSpace = freeSpace;
-      this.reservedSpace = reservedSpace;
+      this.reservedSpace = v.getReserved();
     }
   }  
 
   private Collection<VolumeInfo> getVolumeInfo() {
     Collection<VolumeInfo> info = new ArrayList<VolumeInfo>();
-    for (FSVolume volume : volumes.volumes) {
+    for (FsVolumeImpl volume : volumes.volumes) {
       long used = 0;
       long free = 0;
       try {
         used = volume.getDfsUsed();
         free = volume.getAvailable();
       } catch (IOException e) {
-        DataNode.LOG.warn(e.getMessage());
+        LOG.warn(e.getMessage());
         used = 0;
         free = 0;
       }
       
-      info.add(new VolumeInfo(volume.toString(), used, free, 
-          volume.getReserved()));
+      info.add(new VolumeInfo(volume, used, free));
     }
     return info;
   }
@@ -2592,16 +1639,15 @@ public class FSDataset implements FsData
   public synchronized void deleteBlockPool(String bpid, boolean force)
       throws IOException {
     if (!force) {
-      for (FSVolume volume : volumes.volumes) {
+      for (FsVolumeImpl volume : volumes.volumes) {
         if (!volume.isBPDirEmpty(bpid)) {
-          DataNode.LOG.warn(bpid
-              + " has some block files, cannot delete unless forced");
+          LOG.warn(bpid + " has some block files, cannot delete unless forced");
           throw new IOException("Cannot delete block pool, "
               + "it contains some block files");
         }
       }
     }
-    for (FSVolume volume : volumes.volumes) {
+    for (FsVolumeImpl volume : volumes.volumes) {
       volume.deleteBPDirectories(bpid, force);
     }
   }
@@ -2610,7 +1656,7 @@ public class FSDataset implements FsData
   public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
       throws IOException {
     File datafile = getBlockFile(block);
-    File metafile = DatanodeUtil.getMetaFile(datafile, block.getGenerationStamp());
+    File metafile = FsDatasetUtil.getMetaFile(datafile, block.getGenerationStamp());
     BlockLocalPathInfo info = new BlockLocalPathInfo(block,
         datafile.getAbsolutePath(), metafile.getAbsolutePath());
     return info;
@@ -2620,8 +1666,8 @@ public class FSDataset implements FsData
   public RollingLogs createRollingLogs(String bpid, String prefix
       ) throws IOException {
     String dir = null;
-    final List<FSVolume> volumes = getVolumes();
-    for (FSVolume vol : volumes) {
+    final List<FsVolumeImpl> volumes = getVolumes();
+    for (FsVolumeImpl vol : volumes) {
       String bpDir = vol.getPath(bpid);
       if (RollingLogsImpl.isFilePresent(bpDir, prefix)) {
         dir = bpDir;
@@ -2633,202 +1679,4 @@ public class FSDataset implements FsData
     }
     return new RollingLogsImpl(dir, prefix);
   }
-
-  static class RollingLogsImpl implements RollingLogs {
-    private static final String CURR_SUFFIX = ".curr";
-    private static final String PREV_SUFFIX = ".prev";
-
-    static boolean isFilePresent(String dir, String filePrefix) {
-      return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
-             new File(dir, filePrefix + PREV_SUFFIX).exists();
-    }
-
-    private final File curr;
-    private final File prev;
-    private PrintStream out; //require synchronized access
-
-    private Appender appender = new Appender() {
-      @Override
-      public Appendable append(CharSequence csq) {
-        synchronized(RollingLogsImpl.this) {
-          if (out == null) {
-            throw new IllegalStateException(RollingLogsImpl.this
-                + " is not yet opened.");
-          }
-          out.print(csq);
-        }
-        return this;
-      }
-
-      @Override
-      public Appendable append(char c) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Appendable append(CharSequence csq, int start, int end) {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void close() {
-        synchronized(RollingLogsImpl.this) {
-          if (out != null) {
-            out.close();
-            out = null;
-          }
-        }
-      }
-    };
-
-
-    private final AtomicInteger numReaders = new AtomicInteger();
-
-    private RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
-      curr = new File(dir, filePrefix + CURR_SUFFIX);
-      prev = new File(dir, filePrefix + PREV_SUFFIX);
-      out = new PrintStream(new FileOutputStream(curr, true));
-    }
-
-    @Override
-    public Reader iterator(boolean skipPrevFile) throws IOException {
-      numReaders.incrementAndGet(); 
-      return new Reader(skipPrevFile);
-    }
-
-    @Override
-    public Appender appender() {
-      return appender;
-    }
-
-    @Override
-    public boolean roll() throws IOException {
-      if (numReaders.get() > 0) {
-        return false;
-      }
-      if (!prev.delete() && prev.exists()) {
-        throw new IOException("Failed to delete " + prev);
-      }
-
-      synchronized(this) {
-        appender.close();
-        final boolean renamed = curr.renameTo(prev);
-        out = new PrintStream(new FileOutputStream(curr, true));
-        if (!renamed) {
-          throw new IOException("Failed to rename " + curr + " to " + prev);
-        }
-      }
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return curr.toString();
-    }
-    
-    /**
-     * This is used to read the lines in order.
-     * If the data is not read completely (i.e, untill hasNext() returns
-     * false), it needs to be explicitly 
-     */
-    private class Reader implements RollingLogs.LineIterator {
-      private File file;
-      private BufferedReader reader;
-      private String line;
-      private boolean closed = false;
-      
-      private Reader(boolean skipPrevFile) throws IOException {
-        reader = null;
-        file = skipPrevFile? curr : prev;
-        readNext();        
-      }
-
-      @Override
-      public boolean isPrevious() {
-        return file == prev;
-      }
-
-      private boolean openFile() throws IOException {
-
-        for(int i=0; i<2; i++) {
-          if (reader != null || i > 0) {
-            // move to next file
-            file = isPrevious()? curr : null;
-          }
-          if (file == null) {
-            return false;
-          }
-          if (file.exists()) {
-            break;
-          }
-        }
-        
-        if (reader != null ) {
-          reader.close();
-          reader = null;
-        }
-        
-        reader = new BufferedReader(new FileReader(file));
-        return true;
-      }
-      
-      // read next line if possible.
-      private void readNext() throws IOException {
-        line = null;
-        try {
-          if (reader != null && (line = reader.readLine()) != null) {
-            return;
-          }
-          if (line == null) {
-            // move to the next file.
-            if (openFile()) {
-              readNext();
-            }
-          }
-        } finally {
-          if (!hasNext()) {
-            close();
-          }
-        }
-      }
-      
-      @Override
-      public boolean hasNext() {
-        return line != null;
-      }
-
-      @Override
-      public String next() {
-        String curLine = line;
-        try {
-          readNext();
-        } catch (IOException e) {
-          DataBlockScanner.LOG.warn("Failed to read next line.", e);
-        }
-        return curLine;
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void close() throws IOException {
-        if (!closed) {
-          try {
-            if (reader != null) {
-              reader.close();
-            }
-          } finally {
-            file = null;
-            reader = null;
-            closed = true;
-            final int n = numReaders.decrementAndGet();
-            assert(n >= 0);
-          }
-        }
-      }
-    }
-  }
 }

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java Mon Apr  2 17:38:56 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+
+/** Utility methods. */
+@InterfaceAudience.Private
+public class FsDatasetUtil {
+  static boolean isUnlinkTmpFile(File f) {
+    return f.getName().endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
+  }
+
+  static File getOrigFile(File unlinkTmpFile) {
+    final String name = unlinkTmpFile.getName();
+    if (!name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX)) {
+      throw new IllegalArgumentException("unlinkTmpFile=" + unlinkTmpFile
+          + " does not end with " + DatanodeUtil.UNLINK_BLOCK_SUFFIX);
+    }
+    final int n = name.length() - DatanodeUtil.UNLINK_BLOCK_SUFFIX.length(); 
+    return new File(unlinkTmpFile.getParentFile(), name.substring(0, n));
+  }
+  
+  static File getMetaFile(File f, long gs) {
+    return new File(f.getParent(),
+        DatanodeUtil.getMetaName(f.getName(), gs));
+  }
+
+  /** Find the corresponding meta data file from a given block file */
+  static File findMetaFile(final File blockFile) throws IOException {
+    final String prefix = blockFile.getName() + "_";
+    final File parent = blockFile.getParentFile();
+    final File[] matches = parent.listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return dir.equals(parent) && name.startsWith(prefix)
+            && name.endsWith(Block.METADATA_EXTENSION);
+      }
+    });
+
+    if (matches == null || matches.length == 0) {
+      throw new IOException("Meta file not found, blockFile=" + blockFile);
+    }
+    if (matches.length > 1) {
+      throw new IOException("Found more than one meta files: " 
+          + Arrays.asList(matches));
+    }
+    return matches[0];
+  }
+
+  /**
+   * Find the meta-file for the specified block file
+   * and then return the generation stamp from the name of the meta-file.
+   */
+  static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+    String blockName = blockFile.getName();
+    for (int j = 0; j < listdir.length; j++) {
+      String path = listdir[j].getName();
+      if (!path.startsWith(blockName)) {
+        continue;
+      }
+      if (blockFile == listdir[j]) {
+        continue;
+      }
+      return Block.getGenerationStamp(listdir[j].getName());
+    }
+    FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!");
+    return GenerationStamp.GRANDFATHER_GENERATION_STAMP;
+  }
+
+  /** Find the corresponding meta data file from a given block file */
+  static long parseGenerationStamp(File blockFile, File metaFile
+      ) throws IOException {
+    final String metaname = metaFile.getName();
+    final String gs = metaname.substring(blockFile.getName().length() + 1,
+        metaname.length() - Block.METADATA_EXTENSION.length());
+    try {
+      return Long.parseLong(gs);
+    } catch(NumberFormatException nfe) {
+      throw new IOException("Failed to parse generation stamp: blockFile="
+          + blockFile + ", metaFile=" + metaFile, nfe);
+    }
+  }
+}