You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/04/02 19:38:58 UTC

svn commit: r1308437 [3/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ src/main/java/org/apache/hadoop/...

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Mon Apr  2 17:38:56 2012
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/**
+ * The underlying volume used to store replica.
+ * 
+ * It uses the {@link FsDatasetImpl} object for synchronization.
+ */
+@InterfaceAudience.Private
+class FsVolumeImpl implements FsVolumeSpi {
+  private final FsDatasetImpl dataset;
+  private final String storageID;
+  private final Map<String, BlockPoolSlice> bpSlices
+      = new HashMap<String, BlockPoolSlice>();
+  private final File currentDir;    // <StorageDirectory>/current
+  private final DF usage;           
+  private final long reserved;
+  
+  FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
+      Configuration conf) throws IOException {
+    this.dataset = dataset;
+    this.storageID = storageID;
+    this.reserved = conf.getLong(
+        DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
+        DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
+    this.currentDir = currentDir; 
+    File parent = currentDir.getParentFile();
+    this.usage = new DF(parent, conf);
+  }
+  
+  File getCurrentDir() {
+    return currentDir;
+  }
+  
+  File getRbwDir(String bpid) throws IOException {
+    return getBlockPoolSlice(bpid).getRbwDir();
+  }
+  
+  void decDfsUsed(String bpid, long value) {
+    synchronized(dataset) {
+      BlockPoolSlice bp = bpSlices.get(bpid);
+      if (bp != null) {
+        bp.decDfsUsed(value);
+      }
+    }
+  }
+  
+  long getDfsUsed() throws IOException {
+    long dfsUsed = 0;
+    synchronized(dataset) {
+      for(BlockPoolSlice s : bpSlices.values()) {
+        dfsUsed += s.getDfsUsed();
+      }
+    }
+    return dfsUsed;
+  }
+
+  long getBlockPoolUsed(String bpid) throws IOException {
+    return getBlockPoolSlice(bpid).getDfsUsed();
+  }
+  
+  /**
+   * Calculate the capacity of the filesystem, after removing any
+   * reserved capacity.
+   * @return the unreserved number of bytes left in this filesystem. May be zero.
+   */
+  long getCapacity() {
+    long remaining = usage.getCapacity() - reserved;
+    return remaining > 0 ? remaining : 0;
+  }
+
+  @Override
+  public long getAvailable() throws IOException {
+    long remaining = getCapacity()-getDfsUsed();
+    long available = usage.getAvailable();
+    if (remaining > available) {
+      remaining = available;
+    }
+    return (remaining > 0) ? remaining : 0;
+  }
+    
+  long getReserved(){
+    return reserved;
+  }
+
+  BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
+    BlockPoolSlice bp = bpSlices.get(bpid);
+    if (bp == null) {
+      throw new IOException("block pool " + bpid + " is not found");
+    }
+    return bp;
+  }
+
+  @Override
+  public String getPath(String bpid) throws IOException {
+    return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
+  }
+
+  @Override
+  public File getFinalizedDir(String bpid) throws IOException {
+    return getBlockPoolSlice(bpid).getFinalizedDir();
+  }
+
+  /**
+   * Make a deep copy of the list of currently active BPIDs
+   */
+  @Override
+  public String[] getBlockPoolList() {
+    return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);   
+  }
+    
+  /**
+   * Temporary files. They get moved to the finalized block directory when
+   * the block is finalized.
+   */
+  File createTmpFile(String bpid, Block b) throws IOException {
+    return getBlockPoolSlice(bpid).createTmpFile(b);
+  }
+
+  /**
+   * RBW files. They get moved to the finalized block directory when
+   * the block is finalized.
+   */
+  File createRbwFile(String bpid, Block b) throws IOException {
+    return getBlockPoolSlice(bpid).createRbwFile(b);
+  }
+
+  File addBlock(String bpid, Block b, File f) throws IOException {
+    return getBlockPoolSlice(bpid).addBlock(b, f);
+  }
+    
+  void checkDirs() throws DiskErrorException {
+    // TODO:FEDERATION valid synchronization
+    for(BlockPoolSlice s : bpSlices.values()) {
+      s.checkDirs();
+    }
+  }
+    
+  void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+    for(BlockPoolSlice s : bpSlices.values()) {
+      s.getVolumeMap(volumeMap);
+    }
+  }
+  
+  void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
+    getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
+  }
+  
+  /**
+   * Add replicas under the given directory to the volume map
+   * @param volumeMap the replicas map
+   * @param dir an input directory
+   * @param isFinalized true if the directory has finalized replicas;
+   *                    false if the directory has rbw replicas
+   * @throws IOException 
+   */
+  void addToReplicasMap(String bpid, ReplicaMap volumeMap, 
+      File dir, boolean isFinalized) throws IOException {
+    BlockPoolSlice bp = getBlockPoolSlice(bpid);
+    // TODO move this up
+    // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
+    bp.addToReplicasMap(volumeMap, dir, isFinalized);
+  }
+  
+  void clearPath(String bpid, File f) throws IOException {
+    getBlockPoolSlice(bpid).clearPath(f);
+  }
+
+  @Override
+  public String toString() {
+    return currentDir.getAbsolutePath();
+  }
+
+  void shutdown() {
+    Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
+    for (Entry<String, BlockPoolSlice> entry : set) {
+      entry.getValue().shutdown();
+    }
+  }
+
+  void addBlockPool(String bpid, Configuration conf) throws IOException {
+    File bpdir = new File(currentDir, bpid);
+    BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
+    bpSlices.put(bpid, bp);
+  }
+  
+  void shutdownBlockPool(String bpid) {
+    BlockPoolSlice bp = bpSlices.get(bpid);
+    if (bp != null) {
+      bp.shutdown();
+    }
+    bpSlices.remove(bpid);
+  }
+
+  boolean isBPDirEmpty(String bpid) throws IOException {
+    File volumeCurrentDir = this.getCurrentDir();
+    File bpDir = new File(volumeCurrentDir, bpid);
+    File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
+    File finalizedDir = new File(bpCurrentDir,
+        DataStorage.STORAGE_DIR_FINALIZED);
+    File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
+    if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
+      return false;
+    }
+    if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
+      return false;
+    }
+    return true;
+  }
+  
+  void deleteBPDirectories(String bpid, boolean force) throws IOException {
+    File volumeCurrentDir = this.getCurrentDir();
+    File bpDir = new File(volumeCurrentDir, bpid);
+    if (!bpDir.isDirectory()) {
+      // nothing to be deleted
+      return;
+    }
+    File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); 
+    File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
+    File finalizedDir = new File(bpCurrentDir,
+        DataStorage.STORAGE_DIR_FINALIZED);
+    File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
+    if (force) {
+      FileUtil.fullyDelete(bpDir);
+    } else {
+      if (!rbwDir.delete()) {
+        throw new IOException("Failed to delete " + rbwDir);
+      }
+      if (!finalizedDir.delete()) {
+        throw new IOException("Failed to delete " + finalizedDir);
+      }
+      FileUtil.fullyDelete(tmpDir);
+      for (File f : FileUtil.listFiles(bpCurrentDir)) {
+        if (!f.delete()) {
+          throw new IOException("Failed to delete " + f);
+        }
+      }
+      if (!bpCurrentDir.delete()) {
+        throw new IOException("Failed to delete " + bpCurrentDir);
+      }
+      for (File f : FileUtil.listFiles(bpDir)) {
+        if (!f.delete()) {
+          throw new IOException("Failed to delete " + f);
+        }
+      }
+      if (!bpDir.delete()) {
+        throw new IOException("Failed to delete " + bpDir);
+      }
+    }
+  }
+
+  String getStorageID() {
+    return storageID;
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Mon Apr  2 17:38:56 2012
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+class FsVolumeList {
+  /**
+   * Read access to this unmodifiable list is not synchronized.
+   * This list is replaced on modification holding "this" lock.
+   */
+  volatile List<FsVolumeImpl> volumes = null;
+
+  private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
+  private volatile int numFailedVolumes;
+
+  FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
+      VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
+    this.volumes = Collections.unmodifiableList(volumes);
+    this.blockChooser = blockChooser;
+    this.numFailedVolumes = failedVols;
+  }
+  
+  int numberOfFailedVolumes() {
+    return numFailedVolumes;
+  }
+  
+  /** 
+   * Get next volume. Synchronized to ensure {@link #curVolume} is updated
+   * by a single thread and next volume is chosen with no concurrent
+   * update to {@link #volumes}.
+   * @param blockSize free space needed on the volume
+   * @return next volume to store the block in.
+   */
+  synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
+    return blockChooser.chooseVolume(volumes, blockSize);
+  }
+    
+  long getDfsUsed() throws IOException {
+    long dfsUsed = 0L;
+    for (FsVolumeImpl v : volumes) {
+      dfsUsed += v.getDfsUsed();
+    }
+    return dfsUsed;
+  }
+
+  long getBlockPoolUsed(String bpid) throws IOException {
+    long dfsUsed = 0L;
+    for (FsVolumeImpl v : volumes) {
+      dfsUsed += v.getBlockPoolUsed(bpid);
+    }
+    return dfsUsed;
+  }
+
+  long getCapacity() {
+    long capacity = 0L;
+    for (FsVolumeImpl v : volumes) {
+      capacity += v.getCapacity();
+    }
+    return capacity;
+  }
+    
+  long getRemaining() throws IOException {
+    long remaining = 0L;
+    for (FsVolumeSpi vol : volumes) {
+      remaining += vol.getAvailable();
+    }
+    return remaining;
+  }
+    
+  void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+    for (FsVolumeImpl v : volumes) {
+      v.getVolumeMap(volumeMap);
+    }
+  }
+  
+  void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
+    for (FsVolumeImpl v : volumes) {
+      v.getVolumeMap(bpid, volumeMap);
+    }
+  }
+    
+  /**
+   * Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
+   * volumes from the active list that result in a DiskErrorException.
+   * 
+   * This method is synchronized to allow only one instance of checkDirs() 
+   * call
+   * @return list of all the removed volumes.
+   */
+  synchronized List<FsVolumeImpl> checkDirs() {
+    ArrayList<FsVolumeImpl> removedVols = null;
+    
+    // Make a copy of volumes for performing modification 
+    final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
+
+    for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
+      final FsVolumeImpl fsv = i.next();
+      try {
+        fsv.checkDirs();
+      } catch (DiskErrorException e) {
+        FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ",e);
+        if (removedVols == null) {
+          removedVols = new ArrayList<FsVolumeImpl>(1);
+        }
+        removedVols.add(fsv);
+        fsv.shutdown(); 
+        i.remove(); // Remove the volume
+        numFailedVolumes++;
+      }
+    }
+    
+    if (removedVols != null && removedVols.size() > 0) {
+      // Replace volume list
+      volumes = Collections.unmodifiableList(volumeList);
+      FsDatasetImpl.LOG.info("Completed checkDirs. Removed " + removedVols.size()
+          + " volumes. Current volumes: " + this);
+    }
+
+    return removedVols;
+  }
+
+  @Override
+  public String toString() {
+    return volumes.toString();
+  }
+
+
+  void addBlockPool(String bpid, Configuration conf) throws IOException {
+    for (FsVolumeImpl v : volumes) {
+      v.addBlockPool(bpid, conf);
+    }
+  }
+  
+  void removeBlockPool(String bpid) {
+    for (FsVolumeImpl v : volumes) {
+      v.shutdownBlockPool(bpid);
+    }
+  }
+
+  void shutdown() {
+    for (FsVolumeImpl volume : volumes) {
+      if(volume != null) {
+        volume.shutdown();
+      }
+    }
+  }
+}
\ No newline at end of file

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LDir.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LDir.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LDir.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LDir.java Mon Apr  2 17:38:56 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+
+/**
+ * A node type that can be built into a tree reflecting the
+ * hierarchy of replicas on the local disk.
+ */
+class LDir {
+  final File dir;
+  final int maxBlocksPerDir;
+
+  private int numBlocks = 0;
+  private LDir[] children = null;
+  private int lastChildIdx = 0;
+
+  LDir(File dir, int maxBlocksPerDir) throws IOException {
+    this.dir = dir;
+    this.maxBlocksPerDir = maxBlocksPerDir;
+
+    if (!dir.exists()) {
+      if (!dir.mkdirs()) {
+        throw new IOException("Failed to mkdirs " + dir);
+      }
+    } else {
+      File[] files = FileUtil.listFiles(dir); 
+      List<LDir> dirList = new ArrayList<LDir>();
+      for (int idx = 0; idx < files.length; idx++) {
+        if (files[idx].isDirectory()) {
+          dirList.add(new LDir(files[idx], maxBlocksPerDir));
+        } else if (Block.isBlockFilename(files[idx])) {
+          numBlocks++;
+        }
+      }
+      if (dirList.size() > 0) {
+        children = dirList.toArray(new LDir[dirList.size()]);
+      }
+    }
+  }
+      
+  File addBlock(Block b, File src) throws IOException {
+    //First try without creating subdirectories
+    File file = addBlock(b, src, false, false);          
+    return (file != null) ? file : addBlock(b, src, true, true);
+  }
+
+  private File addBlock(Block b, File src, boolean createOk, boolean resetIdx
+      ) throws IOException {
+    if (numBlocks < maxBlocksPerDir) {
+      final File dest = FsDatasetImpl.moveBlockFiles(b, src, dir);
+      numBlocks += 1;
+      return dest;
+    }
+          
+    if (lastChildIdx < 0 && resetIdx) {
+      //reset so that all children will be checked
+      lastChildIdx = DFSUtil.getRandom().nextInt(children.length);              
+    }
+          
+    if (lastChildIdx >= 0 && children != null) {
+      //Check if any child-tree has room for a block.
+      for (int i=0; i < children.length; i++) {
+        int idx = (lastChildIdx + i)%children.length;
+        File file = children[idx].addBlock(b, src, false, resetIdx);
+        if (file != null) {
+          lastChildIdx = idx;
+          return file; 
+        }
+      }
+      lastChildIdx = -1;
+    }
+          
+    if (!createOk) {
+      return null;
+    }
+          
+    if (children == null || children.length == 0) {
+      children = new LDir[maxBlocksPerDir];
+      for (int idx = 0; idx < maxBlocksPerDir; idx++) {
+        final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
+        children[idx] = new LDir(sub, maxBlocksPerDir);
+      }
+    }
+          
+    //now pick a child randomly for creating a new set of subdirs.
+    lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
+    return children[ lastChildIdx ].addBlock(b, src, true, false); 
+  }
+
+  void getVolumeMap(String bpid, ReplicaMap volumeMap, FsVolumeImpl volume
+      ) throws IOException {
+    if (children != null) {
+      for (int i = 0; i < children.length; i++) {
+        children[i].getVolumeMap(bpid, volumeMap, volume);
+      }
+    }
+
+    recoverTempUnlinkedBlock();
+    volume.addToReplicasMap(bpid, volumeMap, dir, true);
+  }
+      
+  /**
+   * Recover unlinked tmp files on datanode restart. If the original block
+   * does not exist, then the tmp file is renamed to be the
+   * original file name; otherwise the tmp file is deleted.
+   */
+  private void recoverTempUnlinkedBlock() throws IOException {
+    File files[] = FileUtil.listFiles(dir);
+    for (File file : files) {
+      if (!FsDatasetUtil.isUnlinkTmpFile(file)) {
+        continue;
+      }
+      File blockFile = FsDatasetUtil.getOrigFile(file);
+      if (blockFile.exists()) {
+        // If the original block file still exists, then no recovery  is needed.
+        if (!file.delete()) {
+          throw new IOException("Unable to cleanup unlinked tmp file " + file);
+        }
+      } else {
+        if (!file.renameTo(blockFile)) {
+          throw new IOException("Unable to cleanup detached file " + file);
+        }
+      }
+    }
+  }
+  
+  /**
+   * check if a data diretory is healthy
+   * @throws DiskErrorException
+   */
+  void checkDirTree() throws DiskErrorException {
+    DiskChecker.checkDir(dir);
+          
+    if (children != null) {
+      for (int i = 0; i < children.length; i++) {
+        children[i].checkDirTree();
+      }
+    }
+  }
+      
+  void clearPath(File f) {
+    String root = dir.getAbsolutePath();
+    String dir = f.getAbsolutePath();
+    if (dir.startsWith(root)) {
+      String[] dirNames = dir.substring(root.length()).
+        split(File.separator + DataStorage.BLOCK_SUBDIR_PREFIX);
+      if (clearPath(f, dirNames, 1))
+        return;
+    }
+    clearPath(f, null, -1);
+  }
+      
+  /**
+   * dirNames is an array of string integers derived from
+   * usual directory structure data/subdirN/subdirXY/subdirM ...
+   * If dirName array is non-null, we only check the child at 
+   * the children[dirNames[idx]]. This avoids iterating over
+   * children in common case. If directory structure changes 
+   * in later versions, we need to revisit this.
+   */
+  private boolean clearPath(File f, String[] dirNames, int idx) {
+    if ((dirNames == null || idx == dirNames.length) &&
+        dir.compareTo(f) == 0) {
+      numBlocks--;
+      return true;
+    }
+        
+    if (dirNames != null) {
+      //guess the child index from the directory name
+      if (idx > (dirNames.length - 1) || children == null) {
+        return false;
+      }
+      int childIdx; 
+      try {
+        childIdx = Integer.parseInt(dirNames[idx]);
+      } catch (NumberFormatException ignored) {
+        // layout changed? we could print a warning.
+        return false;
+      }
+      return (childIdx >= 0 && childIdx < children.length) ?
+        children[childIdx].clearPath(f, dirNames, idx+1) : false;
+    }
+
+    //guesses failed. back to blind iteration.
+    if (children != null) {
+      for(int i=0; i < children.length; i++) {
+        if (children[i].clearPath(f, null, -1)){
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "FSDir{dir=" + dir + ", children="
+        + (children == null ? null : Arrays.asList(children)) + "}";
+  }
+}
\ No newline at end of file

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java Mon Apr  2 17:38:56 2012
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -23,11 +23,12 @@ import java.util.Map;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 
 /**
- * Maintains the replicas map. 
+ * Maintains the replica map. 
  */
-class ReplicasMap {
+class ReplicaMap {
   // Object using which this class is synchronized
   private final Object mutex;
   
@@ -35,7 +36,7 @@ class ReplicasMap {
   private Map<String, Map<Long, ReplicaInfo>> map = 
     new HashMap<String, Map<Long, ReplicaInfo>>();
   
-  ReplicasMap(Object mutex) {
+  ReplicaMap(Object mutex) {
     if (mutex == null) {
       throw new HadoopIllegalArgumentException(
           "Object to synchronize on cannot be null");

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java Mon Apr  2 17:38:56 2012
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
+
+class RollingLogsImpl implements RollingLogs {
+  private static final String CURR_SUFFIX = ".curr";
+  private static final String PREV_SUFFIX = ".prev";
+
+  static boolean isFilePresent(String dir, String filePrefix) {
+    return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
+           new File(dir, filePrefix + PREV_SUFFIX).exists();
+  }
+
+  private final File curr;
+  private final File prev;
+  private PrintStream out; //require synchronized access
+
+  private Appender appender = new Appender() {
+    @Override
+    public Appendable append(CharSequence csq) {
+      synchronized(RollingLogsImpl.this) {
+        if (out == null) {
+          throw new IllegalStateException(RollingLogsImpl.this
+              + " is not yet opened.");
+        }
+        out.print(csq);
+      }
+      return this;
+    }
+
+    @Override
+    public Appendable append(char c) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Appendable append(CharSequence csq, int start, int end) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() {
+      synchronized(RollingLogsImpl.this) {
+        if (out != null) {
+          out.close();
+          out = null;
+        }
+      }
+    }
+  };
+
+
+  private final AtomicInteger numReaders = new AtomicInteger();
+
+  RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
+    curr = new File(dir, filePrefix + CURR_SUFFIX);
+    prev = new File(dir, filePrefix + PREV_SUFFIX);
+    out = new PrintStream(new FileOutputStream(curr, true));
+  }
+
+  @Override
+  public Reader iterator(boolean skipPrevFile) throws IOException {
+    numReaders.incrementAndGet(); 
+    return new Reader(skipPrevFile);
+  }
+
+  @Override
+  public Appender appender() {
+    return appender;
+  }
+
+  @Override
+  public boolean roll() throws IOException {
+    if (numReaders.get() > 0) {
+      return false;
+    }
+    if (!prev.delete() && prev.exists()) {
+      throw new IOException("Failed to delete " + prev);
+    }
+
+    synchronized(this) {
+      appender.close();
+      final boolean renamed = curr.renameTo(prev);
+      out = new PrintStream(new FileOutputStream(curr, true));
+      if (!renamed) {
+        throw new IOException("Failed to rename " + curr + " to " + prev);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return curr.toString();
+  }
+  
+  /**
+   * This is used to read the lines in order.
+   * If the data is not read completely (i.e, untill hasNext() returns
+   * false), it needs to be explicitly 
+   */
+  private class Reader implements RollingLogs.LineIterator {
+    private File file;
+    private BufferedReader reader;
+    private String line;
+    private boolean closed = false;
+    
+    private Reader(boolean skipPrevFile) throws IOException {
+      reader = null;
+      file = skipPrevFile? curr : prev;
+      readNext();        
+    }
+
+    @Override
+    public boolean isPrevious() {
+      return file == prev;
+    }
+
+    private boolean openFile() throws IOException {
+
+      for(int i=0; i<2; i++) {
+        if (reader != null || i > 0) {
+          // move to next file
+          file = isPrevious()? curr : null;
+        }
+        if (file == null) {
+          return false;
+        }
+        if (file.exists()) {
+          break;
+        }
+      }
+      
+      if (reader != null ) {
+        reader.close();
+        reader = null;
+      }
+      
+      reader = new BufferedReader(new FileReader(file));
+      return true;
+    }
+    
+    // read next line if possible.
+    private void readNext() throws IOException {
+      line = null;
+      try {
+        if (reader != null && (line = reader.readLine()) != null) {
+          return;
+        }
+        if (line == null) {
+          // move to the next file.
+          if (openFile()) {
+            readNext();
+          }
+        }
+      } finally {
+        if (!hasNext()) {
+          close();
+        }
+      }
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return line != null;
+    }
+
+    @Override
+    public String next() {
+      String curLine = line;
+      try {
+        readNext();
+      } catch (IOException e) {
+        DataBlockScanner.LOG.warn("Failed to read next line.", e);
+      }
+      return curLine;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        try {
+          if (reader != null) {
+            reader.close();
+          }
+        } finally {
+          file = null;
+          reader = null;
+          closed = true;
+          final int n = numReaders.decrementAndGet();
+          assert(n >= 0);
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java Mon Apr  2 17:38:56 2012
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java Mon Apr  2 17:38:56 2012
@@ -22,12 +22,16 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.mockito.Mockito;
 
 import com.google.common.base.Preconditions;
@@ -100,6 +104,19 @@ public class DataNodeTestUtils {  
     return spy;
   }
 
+  public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
+      DataNode dn, DatanodeID datanodeid, final Configuration conf
+      ) throws IOException {
+    return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
+        dn.getDnConf().socketTimeout);
+  }
+  
+  public static void shutdownBlockScanner(DataNode dn) {
+    if (dn.blockScanner != null) {
+      dn.blockScanner.shutdown();
+    }
+  }
+
   /**
    * This method is used for testing. 
    * Examples are adding and deleting blocks directly.
@@ -111,26 +128,22 @@ public class DataNodeTestUtils {  
     return dn.getFSDataset();
   }
 
-  public static FSDataset getFsDatasetImpl(DataNode dn) {
-    return (FSDataset)dn.getFSDataset();
-  }
-
   public static File getFile(DataNode dn, String bpid, long bid) {
-    return getFsDatasetImpl(dn).getFile(bpid, bid);
+    return FsDatasetTestUtil.getFile(dn.getFSDataset(), bpid, bid);
   }
 
   public static File getBlockFile(DataNode dn, String bpid, Block b
       ) throws IOException {
-    return getFsDatasetImpl(dn).getBlockFile(bpid, b);
+    return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
   }
 
-  public static boolean unlinkBlock(DataNode dn, ExtendedBlock block, int numLinks
+  public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
       ) throws IOException {
-    return getFsDatasetImpl(dn).getReplicaInfo(block).unlinkBlock(numLinks);
+    return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);
   }
 
   public static long getPendingAsyncDeletions(DataNode dn) {
-    return getFsDatasetImpl(dn).asyncDiskService.countPendingDeletions();
+    return FsDatasetTestUtil.getPendingAsyncDeletions(dn.getFSDataset());
   }
 
   /**
@@ -142,6 +155,6 @@ public class DataNodeTestUtils {  
    */
   public static ReplicaInfo fetchReplicaInfo(final DataNode dn,
       final String bpid, final long blkId) {
-    return getFsDatasetImpl(dn).fetchReplicaInfo(bpid, blkId);
+    return FsDatasetTestUtil.fetchReplicaInfo(dn.getFSDataset(), bpid, blkId);
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java Mon Apr  2 17:38:56 2012
@@ -613,14 +613,13 @@ public class TestBlockReport {
     
     // Look about specified DN for the replica of the block from 1st DN
     final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
-    final FSDataset dataset1 = (FSDataset)DataNodeTestUtils.getFSDataset(dn1);
     String bpid = cluster.getNamesystem().getBlockPoolId();
-    Replica r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
+    Replica r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
     long start = System.currentTimeMillis();
     int count = 0;
     while (r == null) {
       waitTil(5);
-      r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
+      r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
       long waiting_period = System.currentTimeMillis() - start;
       if (count++ % 100 == 0)
         if(LOG.isDebugEnabled()) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Apr  2 17:38:56 2012
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -376,7 +377,7 @@ public class TestDataNodeVolumeFailure {
         new FilenameFilter() {
           public boolean accept(File dir, String name) {
             return name.startsWith("blk_") &&
-            name.endsWith(DatanodeUtil.METADATA_EXTENSION);
+            name.endsWith(Block.METADATA_EXTENSION);
           }
         }
     );

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Mon Apr  2 17:38:56 2012
@@ -38,7 +38,9 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 
 /**
  * Tests {@link DirectoryScanner} handling of differences
@@ -51,7 +53,7 @@ public class TestDirectoryScanner extend
 
   private MiniDFSCluster cluster;
   private String bpid;
-  private FSDataset fds = null;
+  private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
   private DirectoryScanner scanner = null;
   private Random rand = new Random();
   private Random r = new Random();
@@ -72,7 +74,7 @@ public class TestDirectoryScanner extend
   /** Truncate a block file */
   private long truncateBlockFile() throws IOException {
     synchronized (fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
+      for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
         // Truncate a block file that has a corresponding metadata file
@@ -91,7 +93,7 @@ public class TestDirectoryScanner extend
   /** Delete a block file */
   private long deleteBlockFile() {
     synchronized(fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
+      for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File f = b.getBlockFile();
         File mf = b.getMetaFile();
         // Delete a block file that has corresponding metadata file
@@ -107,7 +109,7 @@ public class TestDirectoryScanner extend
   /** Delete block meta file */
   private long deleteMetaFile() {
     synchronized(fds) {
-      for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
+      for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
         File file = b.getMetaFile();
         // Delete a metadata file
         if (file.exists() && file.delete()) {
@@ -124,7 +126,7 @@ public class TestDirectoryScanner extend
     long id = rand.nextLong();
     while (true) {
       id = rand.nextLong();
-      if (fds.fetchReplicaInfo(bpid, id) == null) {
+      if (FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, id) == null) {
         break;
       }
     }
@@ -142,7 +144,7 @@ public class TestDirectoryScanner extend
 
   /** Create a block file in a random volume*/
   private long createBlockFile() throws IOException {
-    List<FSVolume> volumes = fds.getVolumes();
+    List<? extends FsVolumeSpi> volumes = fds.getVolumes();
     int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
     File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -155,7 +157,7 @@ public class TestDirectoryScanner extend
 
   /** Create a metafile in a random volume*/
   private long createMetaFile() throws IOException {
-    List<FSVolume> volumes = fds.getVolumes();
+    List<? extends FsVolumeSpi> volumes = fds.getVolumes();
     int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
     File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -168,7 +170,7 @@ public class TestDirectoryScanner extend
 
   /** Create block file and corresponding metafile in a rondom volume */
   private long createBlockMetaFile() throws IOException {
-    List<FSVolume> volumes = fds.getVolumes();
+    List<? extends FsVolumeSpi> volumes = fds.getVolumes();
     int index = rand.nextInt(volumes.size() - 1);
     long id = getFreeBlockId();
     File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
@@ -228,8 +230,7 @@ public class TestDirectoryScanner extend
     try {
       cluster.waitActive();
       bpid = cluster.getNamesystem().getBlockPoolId();
-      fds = (FSDataset)DataNodeTestUtils.getFSDataset(
-          cluster.getDataNodes().get(0));
+      fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
       DataNode dn = cluster.getDataNodes().get(0);
@@ -348,12 +349,13 @@ public class TestDirectoryScanner extend
 
   private void verifyAddition(long blockId, long genStamp, long size) {
     final ReplicaInfo replicainfo;
-    replicainfo = fds.fetchReplicaInfo(bpid, blockId);
+    replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
     assertNotNull(replicainfo);
 
     // Added block has the same file as the one created by the test
     File file = new File(getBlockFile(blockId));
-    assertEquals(file.getName(), fds.getFile(bpid, blockId).getName());
+    assertEquals(file.getName(),
+        FsDatasetTestUtil.getFile(fds, bpid, blockId).getName());
 
     // Generation stamp is same as that of created file
     assertEquals(genStamp, replicainfo.getGenerationStamp());
@@ -364,12 +366,12 @@ public class TestDirectoryScanner extend
 
   private void verifyDeletion(long blockId) {
     // Ensure block does not exist in memory
-    assertNull(fds.fetchReplicaInfo(bpid, blockId));
+    assertNull(FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId));
   }
 
   private void verifyGenStamp(long blockId, long genStamp) {
     final ReplicaInfo memBlock;
-    memBlock = fds.fetchReplicaInfo(bpid, blockId);
+    memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
     assertNotNull(memBlock);
     assertEquals(genStamp, memBlock.getGenerationStamp());
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java Mon Apr  2 17:38:56 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
 import org.apache.hadoop.util.DataChecksum;
 
 /**
@@ -90,7 +91,7 @@ public class TestSimulatedFSDataset exte
   public void testFSDatasetFactory() {
     final Configuration conf = new Configuration();
     FsDatasetSpi.Factory<?> f = FsDatasetSpi.Factory.getFactory(conf);
-    assertEquals(FSDataset.Factory.class, f.getClass());
+    assertEquals(FsDatasetFactory.class, f.getClass());
     assertFalse(f.isSimulated());
 
     SimulatedFSDataset.setFactory(conf);
@@ -243,7 +244,7 @@ public class TestSimulatedFSDataset exte
     }
   }
 
-  public void checkInvalidBlock(ExtendedBlock b) throws IOException {
+  public void checkInvalidBlock(ExtendedBlock b) {
     final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
     assertFalse(fsdataset.isValidBlock(b));
     try {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java?rev=1308437&r1=1308436&r2=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java Mon Apr  2 17:38:56 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.log4j.Level;
 import org.junit.Assert;
@@ -58,8 +59,8 @@ public class TestTransferRbw {
   }
   private static ReplicaInPipeline getReplica(final DataNode datanode,
       final String bpid, final ReplicaState expectedState) throws InterruptedException {
-    final FSDataset dataset = ((FSDataset)datanode.data);
-    final Collection<ReplicaInfo> replicas = dataset.volumeMap.replicas(bpid);
+    final Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(
+        datanode.getFSDataset(), bpid);
     for(int i = 0; i < 5 && replicas.size() == 0; i++) {
       LOG.info("wait since replicas.size() == 0; i=" + i);
       Thread.sleep(1000);

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java?rev=1308437&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java Mon Apr  2 17:38:56 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+
+public class FsDatasetTestUtil {
+
+  public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
+    return ((FsDatasetImpl)fsd).getFile(bpid, bid);
+  }
+
+  public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
+      ) throws IOException {
+    return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
+  }
+
+  public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
+      ExtendedBlock block, int numLinks) throws IOException {
+    final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
+    return info.unlinkBlock(numLinks);
+  }
+
+  public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
+      final String bpid, final long blockId) {
+    return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
+  }
+
+  public static long getPendingAsyncDeletions(FsDatasetSpi<?> fsd) {
+    return ((FsDatasetImpl)fsd).asyncDiskService.countPendingDeletions();
+  }
+  
+  public static Collection<ReplicaInfo> getReplicas(FsDatasetSpi<?> fsd,
+      String bpid) {
+    return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid);
+  }
+}

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDatanodeRestart.java Mon Apr  2 17:38:56 2012
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -36,7 +36,10 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
@@ -98,8 +101,8 @@ public class TestDatanodeRestart {
       out.write(writeBuf);
       out.hflush();
       DataNode dn = cluster.getDataNodes().get(0);
-      for (FsVolumeSpi v : dn.data.getVolumes()) {
-        FSVolume volume = (FSVolume)v;
+      for (FsVolumeSpi v : dataset(dn).getVolumes()) {
+        final FsVolumeImpl volume = (FsVolumeImpl)v;
         File currentDir = volume.getCurrentDir().getParentFile().getParentFile();
         File rbwDir = new File(currentDir, "rbw");
         for (File file : rbwDir.listFiles()) {
@@ -114,7 +117,7 @@ public class TestDatanodeRestart {
 
       // check volumeMap: one rwr replica
       String bpid = cluster.getNamesystem().getBlockPoolId();
-      ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
+      ReplicaMap replicas = dataset(dn).volumeMap;
       Assert.assertEquals(1, replicas.size(bpid));
       ReplicaInfo replica = replicas.replicas(bpid).iterator().next();
       Assert.assertEquals(ReplicaState.RWR, replica.getState());
@@ -123,7 +126,7 @@ public class TestDatanodeRestart {
       } else {
         Assert.assertEquals(fileLen, replica.getNumBytes());
       }
-      dn.data.invalidate(bpid, new Block[]{replica});
+      dataset(dn).invalidate(bpid, new Block[]{replica});
     } finally {
       IOUtils.closeStream(out);
       if (fs.exists(src)) {
@@ -151,7 +154,7 @@ public class TestDatanodeRestart {
       String bpid = cluster.getNamesystem().getBlockPoolId();
       DataNode dn = cluster.getDataNodes().get(0);
       Iterator<ReplicaInfo> replicasItor = 
-        ((FSDataset)dn.data).volumeMap.replicas(bpid).iterator();
+          dataset(dn).volumeMap.replicas(bpid).iterator();
       ReplicaInfo replica = replicasItor.next();
       createUnlinkTmpFile(replica, true, true); // rename block file
       createUnlinkTmpFile(replica, false, true); // rename meta file
@@ -167,8 +170,7 @@ public class TestDatanodeRestart {
       dn = cluster.getDataNodes().get(0);
 
       // check volumeMap: 4 finalized replica
-      Collection<ReplicaInfo> replicas = 
-        ((FSDataset)(dn.data)).volumeMap.replicas(bpid);
+      Collection<ReplicaInfo> replicas = dataset(dn).volumeMap.replicas(bpid);
       Assert.assertEquals(4, replicas.size());
       replicasItor = replicas.iterator();
       while (replicasItor.hasNext()) {
@@ -180,6 +182,10 @@ public class TestDatanodeRestart {
     }
   }
 
+  private static FsDatasetImpl dataset(DataNode dn) {
+    return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
+  }
+
   private static void createUnlinkTmpFile(ReplicaInfo replicaInfo, 
       boolean changeBlockFile, 
       boolean isRename) throws IOException {

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java Mon Apr  2 17:38:56 2012
@@ -15,22 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.List;
 import java.net.InetSocketAddress;
-
 import java.net.SocketTimeoutException;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -41,16 +34,28 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -103,14 +108,14 @@ public class TestInterDatanodeProtocol {
   }
 
   public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
-    Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
+    Block metainfo = DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
+        b.getBlockPoolId(), b.getBlockId());
     Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
     Assert.assertEquals(b.getNumBytes(), metainfo.getNumBytes());
   }
 
   public static LocatedBlock getLastLocatedBlock(
-      ClientProtocol namenode, String src
-  ) throws IOException {
+      ClientProtocol namenode, String src) throws IOException {
     //get block info for the last block
     LocatedBlocks locations = namenode.getBlockLocations(src, 0, Long.MAX_VALUE);
     List<LocatedBlock> blocks = locations.getLocatedBlocks();
@@ -148,13 +153,11 @@ public class TestInterDatanodeProtocol {
 
       //connect to a data node
       DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
-      InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
-          datanodeinfo[0], conf, datanode.getDnConf().socketTimeout);
+      InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
+          datanode, datanodeinfo[0], conf);
       
       //stop block scanner, so we could compare lastScanTime
-      if (datanode.blockScanner != null) {
-        datanode.blockScanner.shutdown();
-      }
+      DataNodeTestUtils.shutdownBlockScanner(datanode);
 
       //verify BlockMetaDataInfo
       ExtendedBlock b = locatedblock.getBlock();
@@ -187,14 +190,14 @@ public class TestInterDatanodeProtocol {
   }
 
   /** Test 
-   * {@link FSDataset#initReplicaRecovery(String, ReplicasMap, Block, long)}
+   * {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long)}
    */
   @Test
   public void testInitReplicaRecovery() throws IOException {
     final long firstblockid = 10000L;
     final long gs = 7777L;
     final long length = 22L;
-    final ReplicasMap map = new ReplicasMap(this);
+    final ReplicaMap map = new ReplicaMap(this);
     String bpid = "BP-TEST";
     final Block[] blocks = new Block[5];
     for(int i = 0; i < blocks.length; i++) {
@@ -208,7 +211,8 @@ public class TestInterDatanodeProtocol {
       final ReplicaInfo originalInfo = map.get(bpid, b);
 
       final long recoveryid = gs + 1;
-      final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid);
+      final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl.initReplicaRecovery(
+          bpid, map, blocks[0], recoveryid);
       assertEquals(originalInfo, recoveryInfo);
 
       final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
@@ -217,7 +221,7 @@ public class TestInterDatanodeProtocol {
 
       //recover one more time 
       final long recoveryid2 = gs + 2;
-      final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
+      final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
       assertEquals(originalInfo, recoveryInfo2);
 
       final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
@@ -226,7 +230,7 @@ public class TestInterDatanodeProtocol {
       
       //case RecoveryInProgressException
       try {
-        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
+        FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
         Assert.fail();
       }
       catch(RecoveryInProgressException ripe) {
@@ -237,7 +241,7 @@ public class TestInterDatanodeProtocol {
     { // BlockRecoveryFI_01: replica not found
       final long recoveryid = gs + 1;
       final Block b = new Block(firstblockid - 1, length, gs);
-      ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
+      ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
       Assert.assertNull("Data-node should not have this replica.", r);
     }
     
@@ -245,7 +249,7 @@ public class TestInterDatanodeProtocol {
       final long recoveryid = gs - 1;
       final Block b = new Block(firstblockid + 1, length, gs);
       try {
-        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
+        FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
         Assert.fail();
       }
       catch(IOException ioe) {
@@ -258,7 +262,7 @@ public class TestInterDatanodeProtocol {
       final long recoveryid = gs + 1;
       final Block b = new Block(firstblockid, length, gs+1);
       try {
-        FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
+        FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
         fail("InitReplicaRecovery should fail because replica's " +
         		"gs is less than the block's gs");
       } catch (IOException e) {
@@ -270,7 +274,7 @@ public class TestInterDatanodeProtocol {
 
   /** 
    * Test  for
-   * {@link FSDataset#updateReplicaUnderRecovery(ExtendedBlock, long, long)} 
+   * {@link FsDatasetImpl#updateReplicaUnderRecovery(ExtendedBlock, long, long)} 
    * */
   @Test
   public void testUpdateReplicaUnderRecovery() throws IOException {
@@ -296,22 +300,22 @@ public class TestInterDatanodeProtocol {
       //get DataNode and FSDataset objects
       final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
       Assert.assertTrue(datanode != null);
-      Assert.assertTrue(datanode.data instanceof FSDataset);
-      final FSDataset fsdataset = (FSDataset)datanode.data;
 
       //initReplicaRecovery
       final ExtendedBlock b = locatedblock.getBlock();
       final long recoveryid = b.getGenerationStamp() + 1;
       final long newlength = b.getNumBytes() - 1;
+      final FsDatasetSpi<?> fsdataset = DataNodeTestUtils.getFSDataset(datanode);
       final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery(
           new RecoveringBlock(b, null, recoveryid));
 
       //check replica
-      final ReplicaInfo replica = fsdataset.fetchReplicaInfo(bpid, b.getBlockId());
+      final ReplicaInfo replica = FsDatasetTestUtil.fetchReplicaInfo(
+          fsdataset, bpid, b.getBlockId());
       Assert.assertEquals(ReplicaState.RUR, replica.getState());
 
       //check meta data before update
-      FSDataset.checkReplicaFiles(replica);
+      FsDatasetImpl.checkReplicaFiles(replica);
 
       //case "THIS IS NOT SUPPOSED TO HAPPEN"
       //with (block length) != (stored replica's on disk length). 

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReplicasMap.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestReplicaMap.java Mon Apr  2 17:38:56 2012
@@ -15,21 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReplicaMap;
 import org.junit.Before;
 import org.junit.Test;
 
 /**
  * Unit test for ReplicasMap class
  */
-public class TestReplicasMap {
-  private final ReplicasMap map = new ReplicasMap(TestReplicasMap.class);
+public class TestReplicaMap {
+  private final ReplicaMap map = new ReplicaMap(TestReplicaMap.class);
   private final String bpid = "BP-TEST";
   private final  Block block = new Block(1234, 1234, 1234);
   

Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java (from r1308436, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java&r1=1308436&r2=1308437&rev=1308437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java Mon Apr  2 17:38:56 2012
@@ -15,14 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.datanode;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.IOException;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -45,7 +54,7 @@ public class TestWriteToReplica {
     try {
       cluster.waitActive();
       DataNode dn = cluster.getDataNodes().get(0);
-      FSDataset dataSet = (FSDataset)dn.data;
+      FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
 
       // set up replicasMap
       String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -66,7 +75,7 @@ public class TestWriteToReplica {
     try {
       cluster.waitActive();
       DataNode dn = cluster.getDataNodes().get(0);
-      FSDataset dataSet = (FSDataset)dn.data;
+      FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
 
       // set up replicasMap
       String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -86,7 +95,7 @@ public class TestWriteToReplica {
     try {
       cluster.waitActive();
       DataNode dn = cluster.getDataNodes().get(0);
-      FSDataset dataSet = (FSDataset)dn.data;
+      FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
 
       // set up replicasMap
       String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -106,7 +115,7 @@ public class TestWriteToReplica {
     try {
       cluster.waitActive();
       DataNode dn = cluster.getDataNodes().get(0);
-      FSDataset dataSet = (FSDataset)dn.data;
+      FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
 
       // set up replicasMap
       String bpid = cluster.getNamesystem().getBlockPoolId();
@@ -128,7 +137,7 @@ public class TestWriteToReplica {
    * @return Contrived blocks for further testing.
    * @throws IOException
    */
-  private ExtendedBlock[] setup(String bpid, FSDataset dataSet) throws IOException {
+  private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOException {
     // setup replicas map
     
     ExtendedBlock[] blocks = new ExtendedBlock[] {
@@ -137,8 +146,8 @@ public class TestWriteToReplica {
         new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
     };
     
-    ReplicasMap replicasMap = dataSet.volumeMap;
-    FSVolume vol = dataSet.volumes.getNextVolume(0);
+    ReplicaMap replicasMap = dataSet.volumeMap;
+    FsVolumeImpl vol = dataSet.volumes.getNextVolume(0);
     ReplicaInfo replicaInfo = new FinalizedReplica(
         blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
     replicasMap.add(bpid, replicaInfo);
@@ -165,9 +174,9 @@ public class TestWriteToReplica {
     return blocks;
   }
   
-  private void testAppend(String bpid, FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
+  private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
     long newGS = blocks[FINALIZED].getGenerationStamp()+1;
-    final FSVolume v = (FSVolume)dataSet.volumeMap.get(
+    final FsVolumeImpl v = (FsVolumeImpl)dataSet.volumeMap.get(
         bpid, blocks[FINALIZED].getLocalBlock()).getVolume();
     long available = v.getCapacity()-v.getDfsUsed();
     long expectedLen = blocks[FINALIZED].getNumBytes();
@@ -285,7 +294,7 @@ public class TestWriteToReplica {
     }
   }
 
-  private void testClose(FSDataset dataSet, ExtendedBlock [] blocks) throws IOException {
+  private void testClose(FsDatasetImpl dataSet, ExtendedBlock [] blocks) throws IOException {
     long newGS = blocks[FINALIZED].getGenerationStamp()+1;
     dataSet.recoverClose(blocks[FINALIZED], newGS, 
         blocks[FINALIZED].getNumBytes());  // successful
@@ -335,7 +344,7 @@ public class TestWriteToReplica {
     }
   }
   
-  private void testWriteToRbw(FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
+  private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
     try {
       dataSet.recoverRbw(blocks[FINALIZED],
           blocks[FINALIZED].getGenerationStamp()+1,
@@ -428,7 +437,7 @@ public class TestWriteToReplica {
     dataSet.createRbw(blocks[NON_EXISTENT]);
   }
   
-  private void testWriteToTemporary(FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
+  private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
     try {
       dataSet.createTemporary(blocks[FINALIZED]);
       Assert.fail("Should not have created a temporary replica that was " +